You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/08/05 16:24:35 UTC

[1/4] flink git commit: [FLINK-2447] [java api] TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type

Repository: flink
Updated Branches:
  refs/heads/master 941ac6dfd -> fb7e63422


[FLINK-2447] [java api] TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type

This closes #986


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5546a1ef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5546a1ef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5546a1ef

Branch: refs/heads/master
Commit: 5546a1efabcc9c5f500abdb4e38a5cc05d35980a
Parents: 941ac6d
Author: twalthr <tw...@apache.org>
Authored: Tue Aug 4 15:30:28 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Aug 5 12:00:56 2015 +0200

----------------------------------------------------------------------
 .../flink/api/java/typeutils/TypeExtractor.java | 53 ++++++++++++----
 .../type/extractor/PojoTypeExtractionTest.java  | 63 ++++++++++++++++++++
 2 files changed, 104 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5546a1ef/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 41644f9..1ae8d3d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -26,9 +26,7 @@ import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.lang.reflect.TypeVariable;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
 import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.flink.api.common.functions.CoGroupFunction;
@@ -66,15 +64,33 @@ import com.google.common.base.Preconditions;
  * functions.
  */
 public class TypeExtractor {
+
+	/*
+	 * NOTE: Most methods of the TypeExtractor work with a so-called "typeHierarchy".
+	 * The type hierarchy describes all types (Classes, ParameterizedTypes, TypeVariables etc. ) and intermediate
+	 * types from a given type of a function or type (e.g. MyMapper, Tuple2) until a current type
+	 * (depends on the method, e.g. MyPojoFieldType).
+	 *
+	 * Thus, it fully qualifies types until tuple/POJO field level.
+	 *
+	 * A typical typeHierarchy could look like:
+	 *
+	 * UDF: MyMapFunction.class
+	 * top-level UDF: MyMapFunctionBase.class
+	 * RichMapFunction: RichMapFunction.class
+	 * MapFunction: MapFunction.class
+	 * Function's OUT: Tuple1<MyPojo>
+	 * user-defined POJO: MyPojo.class
+	 * user-defined top-level POJO: MyPojoBase.class
+	 * POJO field: Tuple1<String>
+	 * Field type: String.class
+	 *
+	 */
 	
 	private static final Logger LOG = LoggerFactory.getLogger(TypeExtractor.class);
 
-	// We need this to detect recursive types and not get caught
-	// in an endless recursion
-	private Set<Class<?>> alreadySeen;
-
 	protected TypeExtractor() {
-		alreadySeen = new HashSet<Class<?>>();
+		// only create instances for special use cases
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -416,10 +432,12 @@ public class TypeExtractor {
 			
 			TypeInformation<?>[] tupleSubTypes = new TypeInformation<?>[subtypes.length];
 			for (int i = 0; i < subtypes.length; i++) {
+				ArrayList<Type> subTypeHierarchy = new ArrayList<Type>(typeHierarchy);
+				subTypeHierarchy.add(subtypes[i]);
 				// sub type could not be determined with materializing
 				// try to derive the type info of the TypeVariable from the immediate base child input as a last attempt
 				if (subtypes[i] instanceof TypeVariable<?>) {
-					tupleSubTypes[i] = createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], typeHierarchy, in1Type, in2Type);
+					tupleSubTypes[i] = createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], subTypeHierarchy, in1Type, in2Type);
 					
 					// variable could not be determined
 					if (tupleSubTypes[i] == null) {
@@ -430,7 +448,7 @@ public class TypeExtractor {
 								+ "all variables in the return type can be deduced from the input type(s).");
 					}
 				} else {
-					tupleSubTypes[i] = createTypeInfoWithTypeHierarchy(new ArrayList<Type>(typeHierarchy), subtypes[i], in1Type, in2Type);
+					tupleSubTypes[i] = createTypeInfoWithTypeHierarchy(subTypeHierarchy, subtypes[i], in1Type, in2Type);
 				}
 			}
 			
@@ -912,6 +930,19 @@ public class TypeExtractor {
 	// --------------------------------------------------------------------------------------------
 	//  Utility methods
 	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * @return number of items with equal type or same raw type
+	 */
+	private static int countTypeInHierarchy(ArrayList<Type> typeHierarchy, Type type) {
+		int count = 0;
+		for (Type t : typeHierarchy) {
+			if (t == type || (isClassType(type) && t == typeToClass(type))) {
+				count++;
+			}
+		}
+		return count;
+	}
 	
 	/**
 	 * @param curT : start type
@@ -1183,12 +1214,10 @@ public class TypeExtractor {
 			return (TypeInformation<OUT>) new AvroTypeInfo(clazz);
 		}
 
-		if (alreadySeen.contains(clazz)) {
+		if (countTypeInHierarchy(typeHierarchy, clazz) > 1) {
 			return new GenericTypeInfo<OUT>(clazz);
 		}
 
-		alreadySeen.add(clazz);
-
 		if (Modifier.isInterface(clazz.getModifiers())) {
 			// Interface has no members and is therefore not handled as POJO
 			return new GenericTypeInfo<OUT>(clazz);

http://git-wip-us.apache.org/repos/asf/flink/blob/5546a1ef/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
index 1f3f71c..34fde20 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
@@ -810,4 +810,67 @@ public class PojoTypeExtractionTest {
 						+ ">"));
 		Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
 	}
+
+	public static class RecursivePojo1 {
+		public RecursivePojo1 field;
+	}
+
+	public static class RecursivePojo2 {
+		public Tuple1<RecursivePojo2> field;
+	}
+
+	public static class RecursivePojo3 {
+		public NestedPojo field;
+	}
+
+	public static class NestedPojo {
+		public RecursivePojo3 field;
+	}
+
+	@Test
+	public void testRecursivePojo1() {
+		TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo1.class);
+		Assert.assertTrue(ti instanceof PojoTypeInfo);
+		Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) ti).getPojoFieldAt(0).type.getClass());
+	}
+
+	@Test
+	public void testRecursivePojo2() {
+		TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo2.class);
+		Assert.assertTrue(ti instanceof PojoTypeInfo);
+		PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0);
+		Assert.assertTrue(pf.type instanceof TupleTypeInfo);
+		Assert.assertEquals(GenericTypeInfo.class, ((TupleTypeInfo) pf.type).getTypeAt(0).getClass());
+	}
+
+	@Test
+	public void testRecursivePojo3() {
+		TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo3.class);
+		Assert.assertTrue(ti instanceof PojoTypeInfo);
+		PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0);
+		Assert.assertTrue(pf.type instanceof PojoTypeInfo);
+		Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) pf.type).getPojoFieldAt(0).type.getClass());
+	}
+
+	public static class FooBarPojo {
+		public int foo, bar;
+		public FooBarPojo() {}
+	}
+
+	public static class DuplicateMapper implements MapFunction<FooBarPojo, Tuple2<FooBarPojo, FooBarPojo>> {
+		@Override
+		public Tuple2<FooBarPojo, FooBarPojo> map(FooBarPojo value) throws Exception {
+			return null;
+		}
+	}
+
+	@Test
+	public void testDualUseOfPojo() {
+		MapFunction<?, ?> function = new DuplicateMapper();
+		TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeExtractor.createTypeInfo(FooBarPojo.class));
+		Assert.assertTrue(ti instanceof TupleTypeInfo);
+		TupleTypeInfo<?> tti = ((TupleTypeInfo) ti);
+		Assert.assertTrue(tti.getTypeAt(0) instanceof PojoTypeInfo);
+		Assert.assertTrue(tti.getTypeAt(1) instanceof PojoTypeInfo);
+	}
 }


[4/4] flink git commit: [FLINK-2467] Example WordCountStorm.jar is not packaged correctly - fixed assembly xml file

Posted by fh...@apache.org.
[FLINK-2467] Example WordCountStorm.jar is not packaged correctly
 - fixed assembly xml file

This closes #974


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0aa6f0cb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0aa6f0cb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0aa6f0cb

Branch: refs/heads/master
Commit: 0aa6f0cb7b3510e0e9aa938411b6db77f7f7505e
Parents: 100e8c5
Author: mjsax <mj...@informatik.hu-berlin.de>
Authored: Mon Aug 3 15:51:46 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Aug 5 14:46:35 2015 +0200

----------------------------------------------------------------------
 .../src/assembly/word-count-storm.xml                        | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0aa6f0cb/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/assembly/word-count-storm.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/assembly/word-count-storm.xml b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/assembly/word-count-storm.xml
index 9721115..96ac429 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/assembly/word-count-storm.xml
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/assembly/word-count-storm.xml
@@ -34,9 +34,9 @@ under the License.
             <outputDirectory>/</outputDirectory>
             <unpack>true</unpack>
             <includes>
-                <!-- need to be added explicitly to get 'defaults.yaml' -->
                 <include>org.apache.storm:storm-core:jar</include>
-                <include>org.apache.flink:flink-storm-examples:jar</include>
+                <include>org.apache.flink:flink-storm-compatibility-core:jar</include>
+                <include>org.apache.flink:flink-storm-compatibility-examples:jar</include>
             </includes>
             <unpackOptions>
                 <includes>
@@ -60,10 +60,10 @@ under the License.
                     <include>org/apache/flink/stormcompatibility/api/*.class</include>
                     <include>org/apache/flink/stormcompatibility/wrappers/*.class</include>
                     <!-- Word Count -->
-                    <include>org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.class
-                    </include>
+                    <include>org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.class</include>
                     <include>org/apache/flink/stormcompatibility/wordcount/WordCountTopology.class</include>
                     <include>org/apache/flink/stormcompatibility/wordcount/stormoperators/*.class</include>
+                    <include>org/apache/flink/stormcompatibility/util/*.class</include>
                     <include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
                 </includes>
             </unpackOptions>


[2/4] flink git commit: [FLINK-1882] Removed RemotedCollector classes

Posted by fh...@apache.org.
[FLINK-1882] Removed RemotedCollector classes

This closes #985


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/100e8c5f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/100e8c5f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/100e8c5f

Branch: refs/heads/master
Commit: 100e8c5ff9f6d25b3d5db326a5f31b9c4432e334
Parents: 5546a1e
Author: zentol <s....@web.de>
Authored: Sat Jul 25 15:17:55 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Aug 5 14:46:35 2015 +0200

----------------------------------------------------------------------
 .../RemoteCollectorOutputFormatExample.java     | 114 ----------
 .../flink/api/java/io/RemoteCollector.java      |  46 ----
 .../api/java/io/RemoteCollectorConsumer.java    |  26 ---
 .../flink/api/java/io/RemoteCollectorImpl.java  | 228 -------------------
 .../java/io/RemoteCollectorOutputFormat.java    | 175 --------------
 5 files changed, 589 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java
deleted file mode 100644
index f524718..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.java.misc;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.io.RemoteCollectorConsumer;
-import org.apache.flink.api.java.io.RemoteCollectorImpl;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence
- * histogram over some sample data and collects the results with an
- * implementation of a {@link RemoteCollectorConsumer}.
- */
-@SuppressWarnings("serial")
-public class RemoteCollectorOutputFormatExample {
-
-	public static void main(String[] args) throws Exception {
-
-		/**
-		 * We create a remote {@link ExecutionEnvironment} here, because this
-		 * OutputFormat is designed for use in a distributed setting. For local
-		 * use you should consider using the {@link LocalCollectionOutputFormat
-		 * <T>}.
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("<remote>", 6124,
-				"/path/to/your/file.jar");
-
-		// get input data
-		DataSet<String> text = env.fromElements(
-				"To be, or not to be,--that is the question:--",
-				"Whether 'tis nobler in the mind to suffer",
-				"The slings and arrows of outrageous fortune",
-				"Or to take arms against a sea of troubles,");
-
-		DataSet<Tuple2<String, Integer>> counts =
-		// split up the lines in pairs (2-tuples) containing: (word,1)
-		text.flatMap(new LineSplitter())
-		// group by the tuple field "0" and sum up tuple field "1"
-				.groupBy(0).aggregate(Aggregations.SUM, 1);
-
-		// emit result
-		RemoteCollectorImpl.collectLocal(counts,
-				new RemoteCollectorConsumer<Tuple2<String, Integer>>() {
-					// user defined IRemoteCollectorConsumer
-					@Override
-					public void collect(Tuple2<String, Integer> element) {
-						System.out.println("word/occurrences:" + element);
-					}
-				});
-
-		// local collection to store results in
-		Set<Tuple2<String, Integer>> collection = new HashSet<Tuple2<String, Integer>>();
-		// collect results from remote in local collection
-		RemoteCollectorImpl.collectLocal(counts, collection);
-
-		// execute program
-		env.execute("WordCount Example with RemoteCollectorOutputFormat");
-
-		System.out.println(collection);
-		
-		RemoteCollectorImpl.shutdownAll();
-	}
-
-	//
-	// User Functions
-	//
-
-	/**
-	 * Implements the string tokenizer that splits sentences into words as a
-	 * user-defined FlatMapFunction. The function takes a line (String) and
-	 * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
-	 * Integer>).
-	 */
-	public static final class LineSplitter implements
-			FlatMapFunction<String, Tuple2<String, Integer>> {
-
-		@Override
-		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
-			// normalize and split the line
-			String[] tokens = value.toLowerCase().split("\\W+");
-
-			// emit the pairs
-			for (String token : tokens) {
-				if (token.length() > 0) {
-					out.collect(new Tuple2<String, Integer>(token, 1));
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java
deleted file mode 100644
index bcfc332..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-import org.apache.flink.api.java.DataSet;
-
-import java.rmi.Remote;
-import java.rmi.RemoteException;
-
-/**
- * This interface is the counterpart to the {@link RemoteCollectorOutputFormat}
- * and implementations will receive remote results through the collect function.
- * 
- * @param <T>
- *            The type of the records the collector will receive
- *
- * @deprecated Results are retrieved through {@link org.apache.flink.api.common.accumulators.Accumulator}
- * and the {@link DataSet#collect()} method respectively.
- */
-@Deprecated
-public interface RemoteCollector<T> extends Remote {
-
-	public void collect(T element) throws RemoteException;
-
-	public RemoteCollectorConsumer<T> getConsumer() throws RemoteException;
-
-	public void setConsumer(RemoteCollectorConsumer<T> consumer)
-			throws RemoteException;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java
deleted file mode 100644
index 439c6af..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-/**
- * This interface describes consumers of {@link RemoteCollector} implementations.
- */
-public interface RemoteCollectorConsumer<T> {
-	public void collect(T element);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java
deleted file mode 100644
index 2d080ab..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.ServerSocket;
-import java.rmi.AccessException;
-import java.rmi.AlreadyBoundException;
-import java.rmi.NotBoundException;
-import java.rmi.Remote;
-import java.rmi.RemoteException;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
-import java.rmi.server.UnicastRemoteObject;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DataSink;
-
-/**
- * This class provides a counterpart implementation for the
- * {@link RemoteCollectorOutputFormat}.
- *
- * @deprecated Results are retrieved through {@link org.apache.flink.api.common.accumulators.Accumulator}
- * and the {@link DataSet#collect()} method respectively.
- */
-
-@Deprecated
-public class RemoteCollectorImpl<T> extends UnicastRemoteObject implements
-		RemoteCollector<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * Instance of an implementation of a {@link RemoteCollectorConsumer}. This
-	 * instance will get the records passed.
-	 */
-
-	private RemoteCollectorConsumer<T> consumer;
-	
-        /**
-         * This list stores all created {@link Registry}s to unbind and unexport all
-         * exposed {@link Remote} objects ({@link RemoteCollectorConsumer} in our
-         * case) in the shutdown phase.
-         */
-	private static List<Registry> registries = new ArrayList<Registry>();
-
-	/**
-	 * This factory method creates an instance of the
-	 * {@link RemoteCollectorImpl} and binds it in the local RMI
-	 * {@link Registry}.
-	 * 
-	 * @param port
-	 *            The port where the local colector is listening.
-	 * @param consumer
-	 *            The consumer instance.
-	 * @param rmiId 
-	 * 	          An ID to register the collector in the RMI registry.
-	 */
-	public static <T> void createAndBind(Integer port, RemoteCollectorConsumer<T> consumer, String rmiId) {
-		RemoteCollectorImpl<T> collectorInstance = null;
-
-		try {
-			collectorInstance = new RemoteCollectorImpl<T>();
-
-			Registry registry;
-
-			registry = LocateRegistry.createRegistry(port);
-			registry.bind(rmiId, collectorInstance);
-			
-			registries.add(registry);
-		} catch (RemoteException e) {
-			e.printStackTrace();
-		} catch (AlreadyBoundException e) {
-			e.printStackTrace();
-		}
-
-		collectorInstance.setConsumer(consumer);
-	}
-
-	/**
-	 * Writes a DataSet to a {@link RemoteCollectorConsumer} through an
-	 * {@link RemoteCollector} remotely called from the
-	 * {@link RemoteCollectorOutputFormat}.<br/>
-	 * 
-	 * @return The DataSink that writes the DataSet.
-	 */
-	public static <T> DataSink<T> collectLocal(DataSet<T> source,
-			RemoteCollectorConsumer<T> consumer) {
-		// if the RMI parameter was not set by the user make a "good guess"
-		String ip = System.getProperty("java.rmi.server.hostname");
-		if (ip == null) {
-			Enumeration<NetworkInterface> networkInterfaces = null;
-			try {
-				networkInterfaces = NetworkInterface.getNetworkInterfaces();
-			} catch (Throwable t) {
-				throw new RuntimeException(t);
-			}
-			while (networkInterfaces.hasMoreElements()) {
-				NetworkInterface networkInterface = (NetworkInterface) networkInterfaces
-						.nextElement();
-				Enumeration<InetAddress> inetAddresses = networkInterface
-						.getInetAddresses();
-				while (inetAddresses.hasMoreElements()) {
-					InetAddress inetAddress = (InetAddress) inetAddresses
-							.nextElement();
-					if (!inetAddress.isLoopbackAddress()
-							&& inetAddress instanceof Inet4Address) {
-						ip = inetAddress.getHostAddress();
-						System.setProperty("java.rmi.server.hostname", ip);
-					}
-				}
-			}
-		}
-
-		// get some random free port
-		Integer randomPort = 0;
-		try {
-			ServerSocket tmp = new ServerSocket(0);
-			randomPort = tmp.getLocalPort();
-			tmp.close();
-		} catch (Throwable t) {
-			throw new RuntimeException(t);
-		}
-
-		// create an ID for this output format instance
-		String rmiId = String.format("%s-%s", RemoteCollectorOutputFormat.class.getName(), UUID.randomUUID());
-		
-		// create the local listening object and bind it to the RMI registry
-		RemoteCollectorImpl.createAndBind(randomPort, consumer, rmiId);
-
-		// create and configure the output format
-		OutputFormat<T> remoteCollectorOutputFormat = new RemoteCollectorOutputFormat<T>(ip, randomPort, rmiId);
-
-		// create sink
-		return source.output(remoteCollectorOutputFormat);
-	}
-
-	/**
-	 * Writes a DataSet to a local {@link Collection} through an
-	 * {@link RemoteCollector} and a standard {@link RemoteCollectorConsumer}
-	 * implementation remotely called from the
-	 * {@link RemoteCollectorOutputFormat}.<br/>
-	 * 
-	 * @param source the source data set
-	 * @param collection the local collection
-	 */
-	public static <T> void collectLocal(DataSet<T> source,
-			Collection<T> collection) {
-		final Collection<T> synchronizedCollection = Collections
-				.synchronizedCollection(collection);
-		collectLocal(source, new RemoteCollectorConsumer<T>() {
-			@Override
-			public void collect(T element) {
-				synchronizedCollection.add(element);
-			}
-		});
-	}
-
-	/**
-	 * Necessary private default constructor.
-	 * 
-	 * @throws RemoteException
-	 */
-	private RemoteCollectorImpl() throws RemoteException {
-		super();
-	}
-
-	/**
-	 * This method is called by the remote to collect records.
-	 */
-	@Override
-	public void collect(T element) throws RemoteException {
-		this.consumer.collect(element);
-	}
-
-	@Override
-	public RemoteCollectorConsumer<T> getConsumer() {
-		return this.consumer;
-	}
-
-	@Override
-	public void setConsumer(RemoteCollectorConsumer<T> consumer) {
-		this.consumer = consumer;
-	}
-
-	/**
-	 * This method unbinds and unexports all exposed {@link Remote} objects
-	 * 
-	 * @throws AccessException
-	 * @throws RemoteException
-	 * @throws NotBoundException
-	 */
-	public static void shutdownAll() throws AccessException, RemoteException, NotBoundException {
-		for (Registry registry : registries) {
-			for (String id : registry.list()) {
-				Remote remote = registry.lookup(id);
-				registry.unbind(id);
-				UnicastRemoteObject.unexportObject(remote, true);
-			}
-
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java
deleted file mode 100644
index 3fe5cef..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-import java.io.IOException;
-import java.rmi.AccessException;
-import java.rmi.NotBoundException;
-import java.rmi.RemoteException;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * An output format that sends results through JAVA RMI to an
- * {@link RemoteCollector} implementation. The client has to provide an
- * implementation of {@link RemoteCollector} and has to write it's plan's output
- * into an instance of {@link RemoteCollectorOutputFormat}. Further in the
- * client's VM parameters -Djava.rmi.server.hostname should be set to the own IP
- * address.
- *
- * @deprecated Results are retrieved through {@link org.apache.flink.api.common.accumulators.Accumulator}
- * and the {@link DataSet#collect()} method respectively.
- */
-@Deprecated
-public class RemoteCollectorOutputFormat<T> implements OutputFormat<T> {
-
-	private static final long serialVersionUID = 1922744224032398102L;
-
-	/**
-	 * The reference of the {@link RemoteCollector} object
-	 */
-	private transient RemoteCollector<T> remoteCollector;
-
-	transient private Registry registry;
-
-	/**
-	 * Config parameter for the remote's port number
-	 */
-	public static final String PORT = "port";
-	/**
-	 * Config parameter for the remote's address
-	 */
-	public static final String REMOTE = "remote";
-	/**
-	 * An id used necessary for Java RMI
-	 */
-	public static final String RMI_ID = "rmiId";
-
-	private String remote;
-
-	private int port;
-
-	private String rmiId;
-
-	/**
-	 * Create a new {@link RemoteCollectorOutputFormat} instance. The remote and
-	 * port for this output are by default localhost:8888 but can be configured
-	 * via a {@link Configuration} object.
-	 * 
-	 * @see RemoteCollectorOutputFormat#REMOTE
-	 * @see RemoteCollectorOutputFormat#PORT
-	 */
-	public RemoteCollectorOutputFormat() {
-		this("localhost", 8888, null);
-	}
-
-	/**
-	 * Creates a new {@link RemoteCollectorOutputFormat} instance for the
-	 * specified remote and port.
-	 * 
-	 * @param rmiId
-	 */
-	public RemoteCollectorOutputFormat(String remote, int port, String rmiId) {
-		super();
-		this.remote = remote;
-		this.port = port;
-		this.rmiId = rmiId;
-		
-		if (this.remote == null) {
-			throw new IllegalStateException(String.format(
-					"No remote configured for %s.", this));
-		}
-
-		if (this.rmiId == null) {
-			throw new IllegalStateException(String.format(
-					"No registry ID configured for %s.", this));
-		}
-	}
-
-	@Override
-	/**
-	 * This method receives the Configuration object, where the fields "remote" and "port" must be set.
-	 */
-	public void configure(Configuration parameters) {
-		this.remote = parameters.getString(REMOTE, this.remote);
-		this.port = parameters.getInteger(PORT, this.port);
-		this.rmiId = parameters.getString(RMI_ID, this.rmiId);
-
-		if (this.remote == null) {
-			throw new IllegalStateException(String.format(
-					"No remote configured for %s.", this));
-		}
-
-		if (this.rmiId == null) {
-			throw new IllegalStateException(String.format(
-					"No registry ID configured for %s.", this));
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public void open(int taskNumber, int numTasks) throws IOException {
-		// get the remote's RMI Registry
-		try {
-			registry = LocateRegistry.getRegistry(this.remote, this.port);
-		} catch (RemoteException e) {
-			throw new IllegalStateException(e);
-		}
-
-		// try to get an intance of an IRemoteCollector implementation
-		try {
-			this.remoteCollector = (RemoteCollector<T>) registry
-					.lookup(this.rmiId);
-		} catch (AccessException e) {
-			throw new IllegalStateException(e);
-		} catch (RemoteException e) {
-			throw new IllegalStateException(e);
-		} catch (NotBoundException e) {
-			throw new IllegalStateException(e);
-		}
-	}
-
-	/**
-	 * This method forwards records simply to the remote's
-	 * {@link RemoteCollector} implementation
-	 */
-	@Override
-	public void writeRecord(T record) throws IOException {
-		remoteCollector.collect(record);
-	}
-
-	/**
-	 * This method unbinds the reference of the implementation of
-	 * {@link RemoteCollector}.
-	 */
-	@Override
-	public void close() throws IOException {
-	}
-
-	@Override
-	public String toString() {
-		return "RemoteCollectorOutputFormat(" + remote + ":" + port + ", "
-				+ rmiId + ")";
-	}
-
-}


[3/4] flink git commit: [FLINK-1680] Remove Tachyon test and rename Maven module to "flink-fs-tests"

Posted by fh...@apache.org.
[FLINK-1680] Remove Tachyon test and rename Maven module to "flink-fs-tests"

This closes #987


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fb7e6342
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fb7e6342
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fb7e6342

Branch: refs/heads/master
Commit: fb7e6342211d116a2db13933241d3546bbf8d4e8
Parents: 0aa6f0c
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Aug 4 13:35:12 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Aug 5 14:46:35 2015 +0200

----------------------------------------------------------------------
 flink-staging/flink-fs-tests/pom.xml            |  78 +++++++++
 .../flink/tachyon/FileStateHandleTest.java      | 126 ++++++++++++++
 .../java/org/apache/flink/tachyon/HDFSTest.java | 174 +++++++++++++++++++
 .../src/test/resources/log4j.properties         |  31 ++++
 flink-staging/flink-tachyon/pom.xml             | 113 ------------
 .../flink/tachyon/FileStateHandleTest.java      | 126 --------------
 .../java/org/apache/flink/tachyon/HDFSTest.java | 157 -----------------
 .../tachyon/TachyonFileSystemWrapperTest.java   | 167 ------------------
 .../src/test/resources/log4j.properties         |  31 ----
 .../src/test/resources/tachyonHadoopConf.xml    |  28 ---
 flink-staging/pom.xml                           |   4 +-
 11 files changed, 411 insertions(+), 624 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-fs-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-fs-tests/pom.xml b/flink-staging/flink-fs-tests/pom.xml
new file mode 100644
index 0000000..fe1abb3
--- /dev/null
+++ b/flink-staging/flink-fs-tests/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-staging</artifactId>
+		<version>0.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-fs-tests</artifactId>
+	<name>flink-fs-tests</name>
+
+	<packaging>jar</packaging>
+
+	<!--
+		This is a Hadoop2 only flink module.
+	-->
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>${shading-artifact.name}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java-examples</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-avro</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-hdfs</artifactId>
+			<scope>test</scope>
+			<type>test-jar</type>
+			<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-common</artifactId>
+			<scope>test</scope>
+			<type>test-jar</type>
+			<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
+		</dependency>
+	</dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
new file mode 100644
index 0000000..2873c78
--- /dev/null
+++ b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tachyon;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.flink.runtime.state.FileStateHandle;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.runtime.util.SerializedValue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FileStateHandleTest {
+
+	private String hdfsURI;
+	private MiniDFSCluster hdfsCluster;
+	private org.apache.hadoop.fs.Path hdPath;
+	private org.apache.hadoop.fs.FileSystem hdfs;
+
+	@Before
+	public void createHDFS() {
+		try {
+			Configuration hdConf = new Configuration();
+
+			File baseDir = new File("./target/hdfs/filestatehandletest").getAbsoluteFile();
+			FileUtil.fullyDelete(baseDir);
+			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+			hdfsCluster = builder.build();
+
+			hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":"
+					+ hdfsCluster.getNameNodePort() + "/";
+
+			hdPath = new org.apache.hadoop.fs.Path("/StateHandleTest");
+			hdfs = hdPath.getFileSystem(hdConf);
+			hdfs.mkdirs(hdPath);
+
+		} catch (Throwable e) {
+			e.printStackTrace();
+			Assert.fail("Test failed " + e.getMessage());
+		}
+	}
+
+	@After
+	public void destroyHDFS() {
+		try {
+			hdfs.delete(hdPath, true);
+			hdfsCluster.shutdown();
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+
+	}
+
+	@Test
+	public void testFileStateHandle() throws Exception {
+
+		Serializable state = "state";
+
+		// Create a state handle provider for the hdfs directory
+		StateHandleProvider<Serializable> handleProvider = FileStateHandle.createProvider(hdfsURI
+				+ hdPath);
+
+		FileStateHandle handle = (FileStateHandle) handleProvider.createStateHandle(state);
+		
+		try {
+			handleProvider.createStateHandle(null);
+			fail();
+		} catch (RuntimeException e) {
+			// good
+		}
+
+		assertTrue(handle.stateFetched());
+		assertFalse(handle.isWritten());
+
+		// Serialize the handle so it writes the value to hdfs
+		SerializedValue<StateHandle<Serializable>> serializedHandle = new SerializedValue<StateHandle<Serializable>>(
+				handle);
+		
+		assertTrue(handle.isWritten());
+		
+		// Deserialize the handle and verify that the state is not fetched yet
+		FileStateHandle deserializedHandle = (FileStateHandle) serializedHandle
+				.deserializeValue(Thread.currentThread().getContextClassLoader());
+		assertFalse(deserializedHandle.stateFetched());
+
+		// Fetch the and compare with original
+		assertEquals(state, deserializedHandle.getState());
+
+		// Test whether discard removes the checkpoint file properly
+		assertTrue(hdfs.listFiles(hdPath, true).hasNext());
+		deserializedHandle.discardState();
+		assertFalse(hdfs.listFiles(hdPath, true).hasNext());
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/HDFSTest.java b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/HDFSTest.java
new file mode 100644
index 0000000..633d022
--- /dev/null
+++ b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/HDFSTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tachyon;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import org.apache.flink.api.java.LocalEnvironment;
+import org.apache.flink.api.java.io.AvroOutputFormat;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.examples.java.wordcount.WordCount;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+
+/**
+ * This test should logically be located in the 'flink-runtime' tests. However, this project
+ * has already all dependencies required (flink-java-examples). Also, the ParallelismOneExecEnv is here.
+ */
+public class HDFSTest {
+
+	protected String hdfsURI;
+	private MiniDFSCluster hdfsCluster;
+	private org.apache.hadoop.fs.Path hdPath;
+	protected org.apache.hadoop.fs.FileSystem hdfs;
+
+	@Before
+	public void createHDFS() {
+		try {
+			Configuration hdConf = new Configuration();
+
+			File baseDir = new File("./target/hdfs/hdfsTest").getAbsoluteFile();
+			FileUtil.fullyDelete(baseDir);
+			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+			hdfsCluster = builder.build();
+
+			hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
+
+			hdPath = new org.apache.hadoop.fs.Path("/test");
+			hdfs = hdPath.getFileSystem(hdConf);
+			FSDataOutputStream stream = hdfs.create(hdPath);
+			for(int i = 0; i < 10; i++) {
+				stream.write("Hello HDFS\n".getBytes());
+			}
+			stream.close();
+
+		} catch(Throwable e) {
+			e.printStackTrace();
+			Assert.fail("Test failed " + e.getMessage());
+		}
+	}
+
+	@After
+	public void destroyHDFS() {
+		try {
+			hdfs.delete(hdPath, false);
+			hdfsCluster.shutdown();
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+
+	}
+
+	@Test
+	public void testHDFS() {
+
+		Path file = new Path(hdfsURI + hdPath);
+		org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/result");
+		try {
+			FileSystem fs = file.getFileSystem();
+			Assert.assertTrue("Must be HadoopFileSystem", fs instanceof HadoopFileSystem);
+			new DopOneTestEnvironment();
+			try {
+				WordCount.main(new String[]{file.toString(), result.toString()});
+			} catch(Throwable t) {
+				t.printStackTrace();
+				Assert.fail("Test failed with " + t.getMessage());
+			}
+			Assert.assertTrue("No result file present", hdfs.exists(result));
+			// validate output:
+			org.apache.hadoop.fs.FSDataInputStream inStream = hdfs.open(result);
+			StringWriter writer = new StringWriter();
+			IOUtils.copy(inStream, writer);
+			String resultString = writer.toString();
+
+			Assert.assertEquals("hdfs 10\n" +
+					"hello 10\n", resultString);
+			inStream.close();
+
+		} catch (IOException e) {
+			e.printStackTrace();
+			Assert.fail("Error in test: " + e.getMessage() );
+		}
+	}
+
+	@Test
+	public void testAvroOut() {
+		String type = "one";
+		AvroOutputFormat<String> avroOut =
+				new AvroOutputFormat<String>( String.class );
+
+		org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/avroTest");
+
+		avroOut.setOutputFilePath(new Path(result.toString()));
+		avroOut.setWriteMode(FileSystem.WriteMode.NO_OVERWRITE);
+		avroOut.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.ALWAYS);
+
+		try {
+			avroOut.open(0, 2);
+			avroOut.writeRecord(type);
+			avroOut.close();
+
+			avroOut.open(1, 2);
+			avroOut.writeRecord(type);
+			avroOut.close();
+
+
+			Assert.assertTrue("No result file present", hdfs.exists(result));
+			FileStatus[] files = hdfs.listStatus(result);
+			Assert.assertEquals(2, files.length);
+			for(FileStatus file : files) {
+				Assert.assertTrue("1.avro".equals(file.getPath().getName()) || "2.avro".equals(file.getPath().getName()));
+			}
+
+		} catch (IOException e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+
+	// package visible
+	static final class DopOneTestEnvironment extends LocalEnvironment {
+		static {
+			initializeContextEnvironment(new ExecutionEnvironmentFactory() {
+				@Override
+				public ExecutionEnvironment createExecutionEnvironment() {
+					LocalEnvironment le = new LocalEnvironment();
+					le.setParallelism(1);
+					return le;
+				}
+			});
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-fs-tests/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-fs-tests/src/test/resources/log4j.properties b/flink-staging/flink-fs-tests/src/test/resources/log4j.properties
new file mode 100644
index 0000000..f533ba2
--- /dev/null
+++ b/flink-staging/flink-fs-tests/src/test/resources/log4j.properties
@@ -0,0 +1,31 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Tachyon's test-jar dependency adds a log4j.properties file to classpath.
+# Until the issue is resolved (see https://github.com/amplab/tachyon/pull/571)
+# we provide a log4j.properties file ourselves.
+
+log4j.rootLogger=OFF, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-tachyon/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/pom.xml b/flink-staging/flink-tachyon/pom.xml
deleted file mode 100644
index 7ad9139..0000000
--- a/flink-staging/flink-tachyon/pom.xml
+++ /dev/null
@@ -1,113 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-  http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-staging</artifactId>
-		<version>0.10-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-tachyon</artifactId>
-	<name>flink-tachyon</name>
-
-	<packaging>jar</packaging>
-
-	<!--
-		This is a Hadoop2 only flink module.
-	-->
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>${shading-artifact.name}</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java-examples</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-avro</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-hdfs</artifactId>
-			<scope>test</scope>
-			<type>test-jar</type>
-			<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-common</artifactId>
-			<scope>test</scope>
-			<type>test-jar</type>
-			<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
-		</dependency>
-		<dependency>
-			<groupId>org.tachyonproject</groupId>
-			<artifactId>tachyon</artifactId>
-			<version>0.5.0</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.tachyonproject</groupId>
-			<artifactId>tachyon</artifactId>
-			<version>0.5.0</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.eclipse.jetty</groupId>
-			<artifactId>jetty-util</artifactId>
-			<version>7.6.8.v20121106</version><!--$NO-MVN-MAN-VER$-->
-			<scope>test</scope>
-		</dependency>
-	</dependencies>
-	<dependencyManagement>
-		<dependencies>
-			<dependency>
-				<groupId>org.eclipse.jetty</groupId>
-				<artifactId>jetty-server</artifactId>
-				<version>7.6.8.v20121106</version>
-				<scope>test</scope>
-			</dependency>
-			<dependency>
-				<groupId>org.eclipse.jetty</groupId>
-				<artifactId>jetty-servlet</artifactId>
-				<version>7.6.8.v20121106</version>
-				<scope>test</scope>
-			</dependency>
-		</dependencies>
-	</dependencyManagement>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
deleted file mode 100644
index 2873c78..0000000
--- a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tachyon;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.flink.runtime.state.FileStateHandle;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StateHandleProvider;
-import org.apache.flink.runtime.util.SerializedValue;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FileStateHandleTest {
-
-	private String hdfsURI;
-	private MiniDFSCluster hdfsCluster;
-	private org.apache.hadoop.fs.Path hdPath;
-	private org.apache.hadoop.fs.FileSystem hdfs;
-
-	@Before
-	public void createHDFS() {
-		try {
-			Configuration hdConf = new Configuration();
-
-			File baseDir = new File("./target/hdfs/filestatehandletest").getAbsoluteFile();
-			FileUtil.fullyDelete(baseDir);
-			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
-			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
-			hdfsCluster = builder.build();
-
-			hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":"
-					+ hdfsCluster.getNameNodePort() + "/";
-
-			hdPath = new org.apache.hadoop.fs.Path("/StateHandleTest");
-			hdfs = hdPath.getFileSystem(hdConf);
-			hdfs.mkdirs(hdPath);
-
-		} catch (Throwable e) {
-			e.printStackTrace();
-			Assert.fail("Test failed " + e.getMessage());
-		}
-	}
-
-	@After
-	public void destroyHDFS() {
-		try {
-			hdfs.delete(hdPath, true);
-			hdfsCluster.shutdown();
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-
-	}
-
-	@Test
-	public void testFileStateHandle() throws Exception {
-
-		Serializable state = "state";
-
-		// Create a state handle provider for the hdfs directory
-		StateHandleProvider<Serializable> handleProvider = FileStateHandle.createProvider(hdfsURI
-				+ hdPath);
-
-		FileStateHandle handle = (FileStateHandle) handleProvider.createStateHandle(state);
-		
-		try {
-			handleProvider.createStateHandle(null);
-			fail();
-		} catch (RuntimeException e) {
-			// good
-		}
-
-		assertTrue(handle.stateFetched());
-		assertFalse(handle.isWritten());
-
-		// Serialize the handle so it writes the value to hdfs
-		SerializedValue<StateHandle<Serializable>> serializedHandle = new SerializedValue<StateHandle<Serializable>>(
-				handle);
-		
-		assertTrue(handle.isWritten());
-		
-		// Deserialize the handle and verify that the state is not fetched yet
-		FileStateHandle deserializedHandle = (FileStateHandle) serializedHandle
-				.deserializeValue(Thread.currentThread().getContextClassLoader());
-		assertFalse(deserializedHandle.stateFetched());
-
-		// Fetch the and compare with original
-		assertEquals(state, deserializedHandle.getState());
-
-		// Test whether discard removes the checkpoint file properly
-		assertTrue(hdfs.listFiles(hdPath, true).hasNext());
-		deserializedHandle.discardState();
-		assertFalse(hdfs.listFiles(hdPath, true).hasNext());
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
deleted file mode 100644
index a761712..0000000
--- a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tachyon;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.examples.java.wordcount.WordCount;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.StringWriter;
-
-/**
- * This test should logically be located in the 'flink-runtime' tests. However, this project
- * has already all dependencies required (flink-java-examples). Also, the ParallelismOneExecEnv is here.
- */
-public class HDFSTest {
-
-	protected String hdfsURI;
-	private MiniDFSCluster hdfsCluster;
-	private org.apache.hadoop.fs.Path hdPath;
-	protected org.apache.hadoop.fs.FileSystem hdfs;
-
-	@Before
-	public void createHDFS() {
-		try {
-			Configuration hdConf = new Configuration();
-
-			File baseDir = new File("./target/hdfs/hdfsTest").getAbsoluteFile();
-			FileUtil.fullyDelete(baseDir);
-			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
-			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
-			hdfsCluster = builder.build();
-
-			hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
-
-			hdPath = new org.apache.hadoop.fs.Path("/test");
-			hdfs = hdPath.getFileSystem(hdConf);
-			FSDataOutputStream stream = hdfs.create(hdPath);
-			for(int i = 0; i < 10; i++) {
-				stream.write("Hello HDFS\n".getBytes());
-			}
-			stream.close();
-
-		} catch(Throwable e) {
-			e.printStackTrace();
-			Assert.fail("Test failed " + e.getMessage());
-		}
-	}
-
-	@After
-	public void destroyHDFS() {
-		try {
-			hdfs.delete(hdPath, false);
-			hdfsCluster.shutdown();
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-
-	}
-
-	@Test
-	public void testHDFS() {
-
-		Path file = new Path(hdfsURI + hdPath);
-		org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/result");
-		try {
-			FileSystem fs = file.getFileSystem();
-			Assert.assertTrue("Must be HadoopFileSystem", fs instanceof HadoopFileSystem);
-			new TachyonFileSystemWrapperTest.DopOneTestEnvironment();
-			try {
-				WordCount.main(new String[]{file.toString(), result.toString()});
-			} catch(Throwable t) {
-				t.printStackTrace();
-				Assert.fail("Test failed with " + t.getMessage());
-			}
-			Assert.assertTrue("No result file present", hdfs.exists(result));
-			// validate output:
-			org.apache.hadoop.fs.FSDataInputStream inStream = hdfs.open(result);
-			StringWriter writer = new StringWriter();
-			IOUtils.copy(inStream, writer);
-			String resultString = writer.toString();
-
-			Assert.assertEquals("hdfs 10\n" +
-					"hello 10\n", resultString);
-			inStream.close();
-
-		} catch (IOException e) {
-			e.printStackTrace();
-			Assert.fail("Error in test: " + e.getMessage() );
-		}
-	}
-
-	@Test
-	public void testAvroOut() {
-		String type = "one";
-		AvroOutputFormat<String> avroOut =
-				new AvroOutputFormat<String>( String.class );
-
-		org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/avroTest");
-
-		avroOut.setOutputFilePath(new Path(result.toString()));
-		avroOut.setWriteMode(FileSystem.WriteMode.NO_OVERWRITE);
-		avroOut.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.ALWAYS);
-
-		try {
-			avroOut.open(0, 2);
-			avroOut.writeRecord(type);
-			avroOut.close();
-
-			avroOut.open(1, 2);
-			avroOut.writeRecord(type);
-			avroOut.close();
-
-
-			Assert.assertTrue("No result file present", hdfs.exists(result));
-			FileStatus[] files = hdfs.listStatus(result);
-			Assert.assertEquals(2, files.length);
-			for(FileStatus file : files) {
-				Assert.assertTrue("1.avro".equals(file.getPath().getName()) || "2.avro".equals(file.getPath().getName()));
-			}
-
-		} catch (IOException e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
deleted file mode 100644
index 3b2fb7f..0000000
--- a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.tachyon;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironmentFactory;
-import org.apache.flink.api.java.LocalEnvironment;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.examples.java.wordcount.WordCount;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import tachyon.client.InStream;
-import tachyon.client.OutStream;
-import tachyon.client.ReadType;
-import tachyon.client.TachyonFS;
-import tachyon.client.TachyonFile;
-import tachyon.client.WriteType;
-import tachyon.master.LocalTachyonCluster;
-
-import java.io.File;
-import java.io.StringWriter;
-import java.net.URISyntaxException;
-import java.net.URL;
-
-public class TachyonFileSystemWrapperTest {
-	private static final long TACHYON_WORKER_CAPACITY = 1024 * 1024 * 32;
-	private static final String TACHYON_TEST_IN_FILE_NAME = "tachyontest";
-	private static final String TACHYON_TEST_OUT_FILE_NAME = "result";
-	private static final Path HADOOP_CONFIG_PATH;
-
-	static {
-		URL resource = TachyonFileSystemWrapperTest.class.getResource("/tachyonHadoopConf.xml");
-		File file = null;
-		try {
-			file = new File(resource.toURI());
-		} catch (URISyntaxException e) {
-			throw new RuntimeException("Unable to load req. res", e);
-		}
-		if(!file.exists()) {
-			throw new RuntimeException("Unable to load required resource");
-		}
-		HADOOP_CONFIG_PATH = new Path(file.getAbsolutePath());
-	}
-
-	private LocalTachyonCluster cluster;
-	private TachyonFS client;
-	private String input;
-	private String output;
-
-	@Before
-	public void startTachyon() {
-		try {
-			cluster = new LocalTachyonCluster(TACHYON_WORKER_CAPACITY);
-			cluster.start();
-			client = cluster.getClient();
-			int id = client.createFile("/" + TACHYON_TEST_IN_FILE_NAME, 1024 * 32);
-			Assert.assertNotEquals("Unable to create file", -1, id);
-
-			TachyonFile testFile = client.getFile(id);
-			Assert.assertNotNull(testFile);
-
-
-			OutStream outStream = testFile.getOutStream(WriteType.MUST_CACHE);
-			for(int i = 0; i < 10; i++) {
-				outStream.write("Hello Tachyon\n".getBytes());
-			}
-			outStream.close();
-			final String tachyonBase = "tachyon://" + cluster.getMasterHostname() + ":" + cluster.getMasterPort();
-			input = tachyonBase + "/" + TACHYON_TEST_IN_FILE_NAME;
-			output = tachyonBase + "/" + TACHYON_TEST_OUT_FILE_NAME;
-
-		} catch(Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test preparation failed with exception: "+e.getMessage());
-		}
-	}
-
-	@After
-	public void stopTachyon() {
-		try {
-			cluster.stop();
-		} catch(Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test teardown failed with exception: "+e.getMessage());
-		}
-	}
-	// Verify that Hadoop's FileSystem can load the TFS (Tachyon File System)
-	@Test
-	public void testHadoopLoadability() {
-		try {
-			Path tPath = new Path(input);
-			Configuration conf = new Configuration();
-			conf.addResource(HADOOP_CONFIG_PATH);
-			Assert.assertEquals("tachyon.hadoop.TFS", conf.get("fs.tachyon.impl", null));
-			tPath.getFileSystem(conf);
-		} catch(Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test failed with exception: "+e.getMessage());
-		}
-	}
-
-
-	@Test
-	public void testTachyon() {
-		try {
-			org.apache.flink.configuration.Configuration addHDConfToFlinkConf = new org.apache.flink.configuration.Configuration();
-			addHDConfToFlinkConf.setString(ConfigConstants.HDFS_DEFAULT_CONFIG, HADOOP_CONFIG_PATH.toString());
-			GlobalConfiguration.includeConfiguration(addHDConfToFlinkConf);
-
-			new DopOneTestEnvironment(); // initialize parallelism one
-
-			WordCount.main(new String[]{input, output});
-
-			// verify result
-			TachyonFile resultFile = client.getFile("/" + TACHYON_TEST_OUT_FILE_NAME);
-			Assert.assertNotNull("Result file has not been created", resultFile);
-			InStream inStream = resultFile.getInStream(ReadType.CACHE);
-			Assert.assertNotNull("Result file has not been created", inStream);
-			StringWriter writer = new StringWriter();
-			IOUtils.copy(inStream, writer);
-			String resultString = writer.toString();
-
-			Assert.assertEquals("hello 10\n" +
-					"tachyon 10\n", resultString);
-
-		} catch(Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test failed with exception: "+e.getMessage());
-		}
-	}
-
-	// package visible
-	static final class DopOneTestEnvironment extends LocalEnvironment {
-	 	static {
-    		initializeContextEnvironment(new ExecutionEnvironmentFactory() {
-				@Override
-				public ExecutionEnvironment createExecutionEnvironment() {
-					LocalEnvironment le = new LocalEnvironment();
-					le.setParallelism(1);
-					return le;
-				}
-			});
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-tachyon/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/src/test/resources/log4j.properties b/flink-staging/flink-tachyon/src/test/resources/log4j.properties
deleted file mode 100644
index f533ba2..0000000
--- a/flink-staging/flink-tachyon/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,31 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Tachyon's test-jar dependency adds a log4j.properties file to classpath.
-# Until the issue is resolved (see https://github.com/amplab/tachyon/pull/571)
-# we provide a log4j.properties file ourselves.
-
-log4j.rootLogger=OFF, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-tachyon/src/test/resources/tachyonHadoopConf.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/src/test/resources/tachyonHadoopConf.xml b/flink-staging/flink-tachyon/src/test/resources/tachyonHadoopConf.xml
deleted file mode 100644
index 0af8190..0000000
--- a/flink-staging/flink-tachyon/src/test/resources/tachyonHadoopConf.xml
+++ /dev/null
@@ -1,28 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-
-<configuration>
-    <property>
-        <name>fs.tachyon.impl</name>
-        <value>tachyon.hadoop.TFS</value>
-    </property>
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml
index a05c8b1..b3aec14 100644
--- a/flink-staging/pom.xml
+++ b/flink-staging/pom.xml
@@ -59,10 +59,10 @@ under the License.
 				</property>
 			</activation>
 			<modules>
-				<!-- Include the Flink-tachyon project only for HD2.
+				<!-- Include the flink-fs-tests project only for HD2.
 				 	The HDFS minicluster interfaces changed between the two versions.
 				 -->
-				<module>flink-tachyon</module>
+				<module>flink-fs-tests</module>
 			</modules>
 		</profile>
 		<profile>