You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/08/05 16:24:35 UTC
[1/4] flink git commit: [FLINK-2447] [java api] TypeExtractor returns
wrong type info when a Tuple has two fields of the same POJO type
Repository: flink
Updated Branches:
refs/heads/master 941ac6dfd -> fb7e63422
[FLINK-2447] [java api] TypeExtractor returns wrong type info when a Tuple has two fields of the same POJO type
This closes #986
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5546a1ef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5546a1ef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5546a1ef
Branch: refs/heads/master
Commit: 5546a1efabcc9c5f500abdb4e38a5cc05d35980a
Parents: 941ac6d
Author: twalthr <tw...@apache.org>
Authored: Tue Aug 4 15:30:28 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Aug 5 12:00:56 2015 +0200
----------------------------------------------------------------------
.../flink/api/java/typeutils/TypeExtractor.java | 53 ++++++++++++----
.../type/extractor/PojoTypeExtractionTest.java | 63 ++++++++++++++++++++
2 files changed, 104 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5546a1ef/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 41644f9..1ae8d3d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -26,9 +26,7 @@ import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.flink.api.common.functions.CoGroupFunction;
@@ -66,15 +64,33 @@ import com.google.common.base.Preconditions;
* functions.
*/
public class TypeExtractor {
+
+ /*
+ * NOTE: Most methods of the TypeExtractor work with a so-called "typeHierarchy".
+ * The type hierarchy describes all types (Classes, ParameterizedTypes, TypeVariables etc. ) and intermediate
+ * types from a given type of a function or type (e.g. MyMapper, Tuple2) until a current type
+ * (depends on the method, e.g. MyPojoFieldType).
+ *
+ * Thus, it fully qualifies types until tuple/POJO field level.
+ *
+ * A typical typeHierarchy could look like:
+ *
+ * UDF: MyMapFunction.class
+ * top-level UDF: MyMapFunctionBase.class
+ * RichMapFunction: RichMapFunction.class
+ * MapFunction: MapFunction.class
+ * Function's OUT: Tuple1<MyPojo>
+ * user-defined POJO: MyPojo.class
+ * user-defined top-level POJO: MyPojoBase.class
+ * POJO field: Tuple1<String>
+ * Field type: String.class
+ *
+ */
private static final Logger LOG = LoggerFactory.getLogger(TypeExtractor.class);
- // We need this to detect recursive types and not get caught
- // in an endless recursion
- private Set<Class<?>> alreadySeen;
-
protected TypeExtractor() {
- alreadySeen = new HashSet<Class<?>>();
+ // only create instances for special use cases
}
// --------------------------------------------------------------------------------------------
@@ -416,10 +432,12 @@ public class TypeExtractor {
TypeInformation<?>[] tupleSubTypes = new TypeInformation<?>[subtypes.length];
for (int i = 0; i < subtypes.length; i++) {
+ ArrayList<Type> subTypeHierarchy = new ArrayList<Type>(typeHierarchy);
+ subTypeHierarchy.add(subtypes[i]);
// sub type could not be determined with materializing
// try to derive the type info of the TypeVariable from the immediate base child input as a last attempt
if (subtypes[i] instanceof TypeVariable<?>) {
- tupleSubTypes[i] = createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], typeHierarchy, in1Type, in2Type);
+ tupleSubTypes[i] = createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], subTypeHierarchy, in1Type, in2Type);
// variable could not be determined
if (tupleSubTypes[i] == null) {
@@ -430,7 +448,7 @@ public class TypeExtractor {
+ "all variables in the return type can be deduced from the input type(s).");
}
} else {
- tupleSubTypes[i] = createTypeInfoWithTypeHierarchy(new ArrayList<Type>(typeHierarchy), subtypes[i], in1Type, in2Type);
+ tupleSubTypes[i] = createTypeInfoWithTypeHierarchy(subTypeHierarchy, subtypes[i], in1Type, in2Type);
}
}
@@ -912,6 +930,19 @@ public class TypeExtractor {
// --------------------------------------------------------------------------------------------
// Utility methods
// --------------------------------------------------------------------------------------------
+
+ /**
+ * @return number of items with equal type or same raw type
+ */
+ private static int countTypeInHierarchy(ArrayList<Type> typeHierarchy, Type type) {
+ int count = 0;
+ for (Type t : typeHierarchy) {
+ if (t == type || (isClassType(type) && t == typeToClass(type))) {
+ count++;
+ }
+ }
+ return count;
+ }
/**
* @param curT : start type
@@ -1183,12 +1214,10 @@ public class TypeExtractor {
return (TypeInformation<OUT>) new AvroTypeInfo(clazz);
}
- if (alreadySeen.contains(clazz)) {
+ if (countTypeInHierarchy(typeHierarchy, clazz) > 1) {
return new GenericTypeInfo<OUT>(clazz);
}
- alreadySeen.add(clazz);
-
if (Modifier.isInterface(clazz.getModifiers())) {
// Interface has no members and is therefore not handled as POJO
return new GenericTypeInfo<OUT>(clazz);
http://git-wip-us.apache.org/repos/asf/flink/blob/5546a1ef/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
index 1f3f71c..34fde20 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
@@ -810,4 +810,67 @@ public class PojoTypeExtractionTest {
+ ">"));
Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, ti);
}
+
+ public static class RecursivePojo1 {
+ public RecursivePojo1 field;
+ }
+
+ public static class RecursivePojo2 {
+ public Tuple1<RecursivePojo2> field;
+ }
+
+ public static class RecursivePojo3 {
+ public NestedPojo field;
+ }
+
+ public static class NestedPojo {
+ public RecursivePojo3 field;
+ }
+
+ @Test
+ public void testRecursivePojo1() {
+ TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo1.class);
+ Assert.assertTrue(ti instanceof PojoTypeInfo);
+ Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) ti).getPojoFieldAt(0).type.getClass());
+ }
+
+ @Test
+ public void testRecursivePojo2() {
+ TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo2.class);
+ Assert.assertTrue(ti instanceof PojoTypeInfo);
+ PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0);
+ Assert.assertTrue(pf.type instanceof TupleTypeInfo);
+ Assert.assertEquals(GenericTypeInfo.class, ((TupleTypeInfo) pf.type).getTypeAt(0).getClass());
+ }
+
+ @Test
+ public void testRecursivePojo3() {
+ TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo3.class);
+ Assert.assertTrue(ti instanceof PojoTypeInfo);
+ PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0);
+ Assert.assertTrue(pf.type instanceof PojoTypeInfo);
+ Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) pf.type).getPojoFieldAt(0).type.getClass());
+ }
+
+ public static class FooBarPojo {
+ public int foo, bar;
+ public FooBarPojo() {}
+ }
+
+ public static class DuplicateMapper implements MapFunction<FooBarPojo, Tuple2<FooBarPojo, FooBarPojo>> {
+ @Override
+ public Tuple2<FooBarPojo, FooBarPojo> map(FooBarPojo value) throws Exception {
+ return null;
+ }
+ }
+
+ @Test
+ public void testDualUseOfPojo() {
+ MapFunction<?, ?> function = new DuplicateMapper();
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeExtractor.createTypeInfo(FooBarPojo.class));
+ Assert.assertTrue(ti instanceof TupleTypeInfo);
+ TupleTypeInfo<?> tti = ((TupleTypeInfo) ti);
+ Assert.assertTrue(tti.getTypeAt(0) instanceof PojoTypeInfo);
+ Assert.assertTrue(tti.getTypeAt(1) instanceof PojoTypeInfo);
+ }
}
[4/4] flink git commit: [FLINK-2467] Example WordCountStorm.jar is
not packaged correctly - fixed assembly xml file
Posted by fh...@apache.org.
[FLINK-2467] Example WordCountStorm.jar is not packaged correctly
- fixed assembly xml file
This closes #974
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0aa6f0cb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0aa6f0cb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0aa6f0cb
Branch: refs/heads/master
Commit: 0aa6f0cb7b3510e0e9aa938411b6db77f7f7505e
Parents: 100e8c5
Author: mjsax <mj...@informatik.hu-berlin.de>
Authored: Mon Aug 3 15:51:46 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Aug 5 14:46:35 2015 +0200
----------------------------------------------------------------------
.../src/assembly/word-count-storm.xml | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0aa6f0cb/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/assembly/word-count-storm.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/assembly/word-count-storm.xml b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/assembly/word-count-storm.xml
index 9721115..96ac429 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/assembly/word-count-storm.xml
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/assembly/word-count-storm.xml
@@ -34,9 +34,9 @@ under the License.
<outputDirectory>/</outputDirectory>
<unpack>true</unpack>
<includes>
- <!-- need to be added explicitly to get 'defaults.yaml' -->
<include>org.apache.storm:storm-core:jar</include>
- <include>org.apache.flink:flink-storm-examples:jar</include>
+ <include>org.apache.flink:flink-storm-compatibility-core:jar</include>
+ <include>org.apache.flink:flink-storm-compatibility-examples:jar</include>
</includes>
<unpackOptions>
<includes>
@@ -60,10 +60,10 @@ under the License.
<include>org/apache/flink/stormcompatibility/api/*.class</include>
<include>org/apache/flink/stormcompatibility/wrappers/*.class</include>
<!-- Word Count -->
- <include>org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.class
- </include>
+ <include>org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.class</include>
<include>org/apache/flink/stormcompatibility/wordcount/WordCountTopology.class</include>
<include>org/apache/flink/stormcompatibility/wordcount/stormoperators/*.class</include>
+ <include>org/apache/flink/stormcompatibility/util/*.class</include>
<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
</includes>
</unpackOptions>
[2/4] flink git commit: [FLINK-1882] Removed RemotedCollector classes
Posted by fh...@apache.org.
[FLINK-1882] Removed RemotedCollector classes
This closes #985
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/100e8c5f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/100e8c5f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/100e8c5f
Branch: refs/heads/master
Commit: 100e8c5ff9f6d25b3d5db326a5f31b9c4432e334
Parents: 5546a1e
Author: zentol <s....@web.de>
Authored: Sat Jul 25 15:17:55 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Aug 5 14:46:35 2015 +0200
----------------------------------------------------------------------
.../RemoteCollectorOutputFormatExample.java | 114 ----------
.../flink/api/java/io/RemoteCollector.java | 46 ----
.../api/java/io/RemoteCollectorConsumer.java | 26 ---
.../flink/api/java/io/RemoteCollectorImpl.java | 228 -------------------
.../java/io/RemoteCollectorOutputFormat.java | 175 --------------
5 files changed, 589 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java
deleted file mode 100644
index f524718..0000000
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/misc/RemoteCollectorOutputFormatExample.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.java.misc;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.io.RemoteCollectorConsumer;
-import org.apache.flink.api.java.io.RemoteCollectorImpl;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence
- * histogram over some sample data and collects the results with an
- * implementation of a {@link RemoteCollectorConsumer}.
- */
-@SuppressWarnings("serial")
-public class RemoteCollectorOutputFormatExample {
-
- public static void main(String[] args) throws Exception {
-
- /**
- * We create a remote {@link ExecutionEnvironment} here, because this
- * OutputFormat is designed for use in a distributed setting. For local
- * use you should consider using the {@link LocalCollectionOutputFormat
- * <T>}.
- */
- final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("<remote>", 6124,
- "/path/to/your/file.jar");
-
- // get input data
- DataSet<String> text = env.fromElements(
- "To be, or not to be,--that is the question:--",
- "Whether 'tis nobler in the mind to suffer",
- "The slings and arrows of outrageous fortune",
- "Or to take arms against a sea of troubles,");
-
- DataSet<Tuple2<String, Integer>> counts =
- // split up the lines in pairs (2-tuples) containing: (word,1)
- text.flatMap(new LineSplitter())
- // group by the tuple field "0" and sum up tuple field "1"
- .groupBy(0).aggregate(Aggregations.SUM, 1);
-
- // emit result
- RemoteCollectorImpl.collectLocal(counts,
- new RemoteCollectorConsumer<Tuple2<String, Integer>>() {
- // user defined IRemoteCollectorConsumer
- @Override
- public void collect(Tuple2<String, Integer> element) {
- System.out.println("word/occurrences:" + element);
- }
- });
-
- // local collection to store results in
- Set<Tuple2<String, Integer>> collection = new HashSet<Tuple2<String, Integer>>();
- // collect results from remote in local collection
- RemoteCollectorImpl.collectLocal(counts, collection);
-
- // execute program
- env.execute("WordCount Example with RemoteCollectorOutputFormat");
-
- System.out.println(collection);
-
- RemoteCollectorImpl.shutdownAll();
- }
-
- //
- // User Functions
- //
-
- /**
- * Implements the string tokenizer that splits sentences into words as a
- * user-defined FlatMapFunction. The function takes a line (String) and
- * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
- * Integer>).
- */
- public static final class LineSplitter implements
- FlatMapFunction<String, Tuple2<String, Integer>> {
-
- @Override
- public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
- // normalize and split the line
- String[] tokens = value.toLowerCase().split("\\W+");
-
- // emit the pairs
- for (String token : tokens) {
- if (token.length() > 0) {
- out.collect(new Tuple2<String, Integer>(token, 1));
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java
deleted file mode 100644
index bcfc332..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollector.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-import org.apache.flink.api.java.DataSet;
-
-import java.rmi.Remote;
-import java.rmi.RemoteException;
-
-/**
- * This interface is the counterpart to the {@link RemoteCollectorOutputFormat}
- * and implementations will receive remote results through the collect function.
- *
- * @param <T>
- * The type of the records the collector will receive
- *
- * @deprecated Results are retrieved through {@link org.apache.flink.api.common.accumulators.Accumulator}
- * and the {@link DataSet#collect()} method respectively.
- */
-@Deprecated
-public interface RemoteCollector<T> extends Remote {
-
- public void collect(T element) throws RemoteException;
-
- public RemoteCollectorConsumer<T> getConsumer() throws RemoteException;
-
- public void setConsumer(RemoteCollectorConsumer<T> consumer)
- throws RemoteException;
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java
deleted file mode 100644
index 439c6af..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorConsumer.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-/**
- * This interface describes consumers of {@link RemoteCollector} implementations.
- */
-public interface RemoteCollectorConsumer<T> {
- public void collect(T element);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java
deleted file mode 100644
index 2d080ab..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorImpl.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.ServerSocket;
-import java.rmi.AccessException;
-import java.rmi.AlreadyBoundException;
-import java.rmi.NotBoundException;
-import java.rmi.Remote;
-import java.rmi.RemoteException;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
-import java.rmi.server.UnicastRemoteObject;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DataSink;
-
-/**
- * This class provides a counterpart implementation for the
- * {@link RemoteCollectorOutputFormat}.
- *
- * @deprecated Results are retrieved through {@link org.apache.flink.api.common.accumulators.Accumulator}
- * and the {@link DataSet#collect()} method respectively.
- */
-
-@Deprecated
-public class RemoteCollectorImpl<T> extends UnicastRemoteObject implements
- RemoteCollector<T> {
-
- private static final long serialVersionUID = 1L;
-
- /**
- * Instance of an implementation of a {@link RemoteCollectorConsumer}. This
- * instance will get the records passed.
- */
-
- private RemoteCollectorConsumer<T> consumer;
-
- /**
- * This list stores all created {@link Registry}s to unbind and unexport all
- * exposed {@link Remote} objects ({@link RemoteCollectorConsumer} in our
- * case) in the shutdown phase.
- */
- private static List<Registry> registries = new ArrayList<Registry>();
-
- /**
- * This factory method creates an instance of the
- * {@link RemoteCollectorImpl} and binds it in the local RMI
- * {@link Registry}.
- *
- * @param port
- * The port where the local colector is listening.
- * @param consumer
- * The consumer instance.
- * @param rmiId
- * An ID to register the collector in the RMI registry.
- */
- public static <T> void createAndBind(Integer port, RemoteCollectorConsumer<T> consumer, String rmiId) {
- RemoteCollectorImpl<T> collectorInstance = null;
-
- try {
- collectorInstance = new RemoteCollectorImpl<T>();
-
- Registry registry;
-
- registry = LocateRegistry.createRegistry(port);
- registry.bind(rmiId, collectorInstance);
-
- registries.add(registry);
- } catch (RemoteException e) {
- e.printStackTrace();
- } catch (AlreadyBoundException e) {
- e.printStackTrace();
- }
-
- collectorInstance.setConsumer(consumer);
- }
-
- /**
- * Writes a DataSet to a {@link RemoteCollectorConsumer} through an
- * {@link RemoteCollector} remotely called from the
- * {@link RemoteCollectorOutputFormat}.<br/>
- *
- * @return The DataSink that writes the DataSet.
- */
- public static <T> DataSink<T> collectLocal(DataSet<T> source,
- RemoteCollectorConsumer<T> consumer) {
- // if the RMI parameter was not set by the user make a "good guess"
- String ip = System.getProperty("java.rmi.server.hostname");
- if (ip == null) {
- Enumeration<NetworkInterface> networkInterfaces = null;
- try {
- networkInterfaces = NetworkInterface.getNetworkInterfaces();
- } catch (Throwable t) {
- throw new RuntimeException(t);
- }
- while (networkInterfaces.hasMoreElements()) {
- NetworkInterface networkInterface = (NetworkInterface) networkInterfaces
- .nextElement();
- Enumeration<InetAddress> inetAddresses = networkInterface
- .getInetAddresses();
- while (inetAddresses.hasMoreElements()) {
- InetAddress inetAddress = (InetAddress) inetAddresses
- .nextElement();
- if (!inetAddress.isLoopbackAddress()
- && inetAddress instanceof Inet4Address) {
- ip = inetAddress.getHostAddress();
- System.setProperty("java.rmi.server.hostname", ip);
- }
- }
- }
- }
-
- // get some random free port
- Integer randomPort = 0;
- try {
- ServerSocket tmp = new ServerSocket(0);
- randomPort = tmp.getLocalPort();
- tmp.close();
- } catch (Throwable t) {
- throw new RuntimeException(t);
- }
-
- // create an ID for this output format instance
- String rmiId = String.format("%s-%s", RemoteCollectorOutputFormat.class.getName(), UUID.randomUUID());
-
- // create the local listening object and bind it to the RMI registry
- RemoteCollectorImpl.createAndBind(randomPort, consumer, rmiId);
-
- // create and configure the output format
- OutputFormat<T> remoteCollectorOutputFormat = new RemoteCollectorOutputFormat<T>(ip, randomPort, rmiId);
-
- // create sink
- return source.output(remoteCollectorOutputFormat);
- }
-
- /**
- * Writes a DataSet to a local {@link Collection} through an
- * {@link RemoteCollector} and a standard {@link RemoteCollectorConsumer}
- * implementation remotely called from the
- * {@link RemoteCollectorOutputFormat}.<br/>
- *
- * @param source the source data set
- * @param collection the local collection
- */
- public static <T> void collectLocal(DataSet<T> source,
- Collection<T> collection) {
- final Collection<T> synchronizedCollection = Collections
- .synchronizedCollection(collection);
- collectLocal(source, new RemoteCollectorConsumer<T>() {
- @Override
- public void collect(T element) {
- synchronizedCollection.add(element);
- }
- });
- }
-
- /**
- * Necessary private default constructor.
- *
- * @throws RemoteException
- */
- private RemoteCollectorImpl() throws RemoteException {
- super();
- }
-
- /**
- * This method is called by the remote to collect records.
- */
- @Override
- public void collect(T element) throws RemoteException {
- this.consumer.collect(element);
- }
-
- @Override
- public RemoteCollectorConsumer<T> getConsumer() {
- return this.consumer;
- }
-
- @Override
- public void setConsumer(RemoteCollectorConsumer<T> consumer) {
- this.consumer = consumer;
- }
-
- /**
- * This method unbinds and unexports all exposed {@link Remote} objects
- *
- * @throws AccessException
- * @throws RemoteException
- * @throws NotBoundException
- */
- public static void shutdownAll() throws AccessException, RemoteException, NotBoundException {
- for (Registry registry : registries) {
- for (String id : registry.list()) {
- Remote remote = registry.lookup(id);
- registry.unbind(id);
- UnicastRemoteObject.unexportObject(remote, true);
- }
-
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/100e8c5f/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java
deleted file mode 100644
index 3fe5cef..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/RemoteCollectorOutputFormat.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-import java.io.IOException;
-import java.rmi.AccessException;
-import java.rmi.NotBoundException;
-import java.rmi.RemoteException;
-import java.rmi.registry.LocateRegistry;
-import java.rmi.registry.Registry;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.configuration.Configuration;
-
-/**
- * An output format that sends results through JAVA RMI to an
- * {@link RemoteCollector} implementation. The client has to provide an
- * implementation of {@link RemoteCollector} and has to write it's plan's output
- * into an instance of {@link RemoteCollectorOutputFormat}. Further in the
- * client's VM parameters -Djava.rmi.server.hostname should be set to the own IP
- * address.
- *
- * @deprecated Results are retrieved through {@link org.apache.flink.api.common.accumulators.Accumulator}
- * and the {@link DataSet#collect()} method respectively.
- */
-@Deprecated
-public class RemoteCollectorOutputFormat<T> implements OutputFormat<T> {
-
- private static final long serialVersionUID = 1922744224032398102L;
-
- /**
- * The reference of the {@link RemoteCollector} object
- */
- private transient RemoteCollector<T> remoteCollector;
-
- transient private Registry registry;
-
- /**
- * Config parameter for the remote's port number
- */
- public static final String PORT = "port";
- /**
- * Config parameter for the remote's address
- */
- public static final String REMOTE = "remote";
- /**
- * An id used necessary for Java RMI
- */
- public static final String RMI_ID = "rmiId";
-
- private String remote;
-
- private int port;
-
- private String rmiId;
-
- /**
- * Create a new {@link RemoteCollectorOutputFormat} instance. The remote and
- * port for this output are by default localhost:8888 but can be configured
- * via a {@link Configuration} object.
- *
- * @see RemoteCollectorOutputFormat#REMOTE
- * @see RemoteCollectorOutputFormat#PORT
- */
- public RemoteCollectorOutputFormat() {
- this("localhost", 8888, null);
- }
-
- /**
- * Creates a new {@link RemoteCollectorOutputFormat} instance for the
- * specified remote and port.
- *
- * @param rmiId
- */
- public RemoteCollectorOutputFormat(String remote, int port, String rmiId) {
- super();
- this.remote = remote;
- this.port = port;
- this.rmiId = rmiId;
-
- if (this.remote == null) {
- throw new IllegalStateException(String.format(
- "No remote configured for %s.", this));
- }
-
- if (this.rmiId == null) {
- throw new IllegalStateException(String.format(
- "No registry ID configured for %s.", this));
- }
- }
-
- @Override
- /**
- * This method receives the Configuration object, where the fields "remote" and "port" must be set.
- */
- public void configure(Configuration parameters) {
- this.remote = parameters.getString(REMOTE, this.remote);
- this.port = parameters.getInteger(PORT, this.port);
- this.rmiId = parameters.getString(RMI_ID, this.rmiId);
-
- if (this.remote == null) {
- throw new IllegalStateException(String.format(
- "No remote configured for %s.", this));
- }
-
- if (this.rmiId == null) {
- throw new IllegalStateException(String.format(
- "No registry ID configured for %s.", this));
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void open(int taskNumber, int numTasks) throws IOException {
- // get the remote's RMI Registry
- try {
- registry = LocateRegistry.getRegistry(this.remote, this.port);
- } catch (RemoteException e) {
- throw new IllegalStateException(e);
- }
-
- // try to get an intance of an IRemoteCollector implementation
- try {
- this.remoteCollector = (RemoteCollector<T>) registry
- .lookup(this.rmiId);
- } catch (AccessException e) {
- throw new IllegalStateException(e);
- } catch (RemoteException e) {
- throw new IllegalStateException(e);
- } catch (NotBoundException e) {
- throw new IllegalStateException(e);
- }
- }
-
- /**
- * This method forwards records simply to the remote's
- * {@link RemoteCollector} implementation
- */
- @Override
- public void writeRecord(T record) throws IOException {
- remoteCollector.collect(record);
- }
-
- /**
- * This method unbinds the reference of the implementation of
- * {@link RemoteCollector}.
- */
- @Override
- public void close() throws IOException {
- }
-
- @Override
- public String toString() {
- return "RemoteCollectorOutputFormat(" + remote + ":" + port + ", "
- + rmiId + ")";
- }
-
-}
[3/4] flink git commit: [FLINK-1680] Remove Tachyon test and rename
Maven module to "flink-fs-tests"
Posted by fh...@apache.org.
[FLINK-1680] Remove Tachyon test and rename Maven module to "flink-fs-tests"
This closes #987
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fb7e6342
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fb7e6342
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fb7e6342
Branch: refs/heads/master
Commit: fb7e6342211d116a2db13933241d3546bbf8d4e8
Parents: 0aa6f0c
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Aug 4 13:35:12 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Aug 5 14:46:35 2015 +0200
----------------------------------------------------------------------
flink-staging/flink-fs-tests/pom.xml | 78 +++++++++
.../flink/tachyon/FileStateHandleTest.java | 126 ++++++++++++++
.../java/org/apache/flink/tachyon/HDFSTest.java | 174 +++++++++++++++++++
.../src/test/resources/log4j.properties | 31 ++++
flink-staging/flink-tachyon/pom.xml | 113 ------------
.../flink/tachyon/FileStateHandleTest.java | 126 --------------
.../java/org/apache/flink/tachyon/HDFSTest.java | 157 -----------------
.../tachyon/TachyonFileSystemWrapperTest.java | 167 ------------------
.../src/test/resources/log4j.properties | 31 ----
.../src/test/resources/tachyonHadoopConf.xml | 28 ---
flink-staging/pom.xml | 4 +-
11 files changed, 411 insertions(+), 624 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-fs-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-fs-tests/pom.xml b/flink-staging/flink-fs-tests/pom.xml
new file mode 100644
index 0000000..fe1abb3
--- /dev/null
+++ b/flink-staging/flink-fs-tests/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-staging</artifactId>
+ <version>0.10-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-fs-tests</artifactId>
+ <name>flink-fs-tests</name>
+
+ <packaging>jar</packaging>
+
+ <!--
+ This is a Hadoop2 only flink module.
+ -->
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>${shading-artifact.name}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java-examples</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-avro</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ <version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ <version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
new file mode 100644
index 0000000..2873c78
--- /dev/null
+++ b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tachyon;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.flink.runtime.state.FileStateHandle;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.runtime.util.SerializedValue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FileStateHandleTest {
+
+ private String hdfsURI;
+ private MiniDFSCluster hdfsCluster;
+ private org.apache.hadoop.fs.Path hdPath;
+ private org.apache.hadoop.fs.FileSystem hdfs;
+
+ @Before
+ public void createHDFS() {
+ try {
+ Configuration hdConf = new Configuration();
+
+ File baseDir = new File("./target/hdfs/filestatehandletest").getAbsoluteFile();
+ FileUtil.fullyDelete(baseDir);
+ hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+ hdfsCluster = builder.build();
+
+ hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":"
+ + hdfsCluster.getNameNodePort() + "/";
+
+ hdPath = new org.apache.hadoop.fs.Path("/StateHandleTest");
+ hdfs = hdPath.getFileSystem(hdConf);
+ hdfs.mkdirs(hdPath);
+
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail("Test failed " + e.getMessage());
+ }
+ }
+
+ @After
+ public void destroyHDFS() {
+ try {
+ hdfs.delete(hdPath, true);
+ hdfsCluster.shutdown();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ @Test
+ public void testFileStateHandle() throws Exception {
+
+ Serializable state = "state";
+
+ // Create a state handle provider for the hdfs directory
+ StateHandleProvider<Serializable> handleProvider = FileStateHandle.createProvider(hdfsURI
+ + hdPath);
+
+ FileStateHandle handle = (FileStateHandle) handleProvider.createStateHandle(state);
+
+ try {
+ handleProvider.createStateHandle(null);
+ fail();
+ } catch (RuntimeException e) {
+ // good
+ }
+
+ assertTrue(handle.stateFetched());
+ assertFalse(handle.isWritten());
+
+ // Serialize the handle so it writes the value to hdfs
+ SerializedValue<StateHandle<Serializable>> serializedHandle = new SerializedValue<StateHandle<Serializable>>(
+ handle);
+
+ assertTrue(handle.isWritten());
+
+ // Deserialize the handle and verify that the state is not fetched yet
+ FileStateHandle deserializedHandle = (FileStateHandle) serializedHandle
+ .deserializeValue(Thread.currentThread().getContextClassLoader());
+ assertFalse(deserializedHandle.stateFetched());
+
+ // Fetch the and compare with original
+ assertEquals(state, deserializedHandle.getState());
+
+ // Test whether discard removes the checkpoint file properly
+ assertTrue(hdfs.listFiles(hdPath, true).hasNext());
+ deserializedHandle.discardState();
+ assertFalse(hdfs.listFiles(hdPath, true).hasNext());
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/HDFSTest.java b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/HDFSTest.java
new file mode 100644
index 0000000..633d022
--- /dev/null
+++ b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/HDFSTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tachyon;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import org.apache.flink.api.java.LocalEnvironment;
+import org.apache.flink.api.java.io.AvroOutputFormat;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.examples.java.wordcount.WordCount;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+
+/**
+ * This test should logically be located in the 'flink-runtime' tests. However, this project
+ * has already all dependencies required (flink-java-examples). Also, the ParallelismOneExecEnv is here.
+ */
+public class HDFSTest {
+
+ protected String hdfsURI;
+ private MiniDFSCluster hdfsCluster;
+ private org.apache.hadoop.fs.Path hdPath;
+ protected org.apache.hadoop.fs.FileSystem hdfs;
+
+ @Before
+ public void createHDFS() {
+ try {
+ Configuration hdConf = new Configuration();
+
+ File baseDir = new File("./target/hdfs/hdfsTest").getAbsoluteFile();
+ FileUtil.fullyDelete(baseDir);
+ hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+ hdfsCluster = builder.build();
+
+ hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
+
+ hdPath = new org.apache.hadoop.fs.Path("/test");
+ hdfs = hdPath.getFileSystem(hdConf);
+ FSDataOutputStream stream = hdfs.create(hdPath);
+ for(int i = 0; i < 10; i++) {
+ stream.write("Hello HDFS\n".getBytes());
+ }
+ stream.close();
+
+ } catch(Throwable e) {
+ e.printStackTrace();
+ Assert.fail("Test failed " + e.getMessage());
+ }
+ }
+
+ @After
+ public void destroyHDFS() {
+ try {
+ hdfs.delete(hdPath, false);
+ hdfsCluster.shutdown();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ @Test
+ public void testHDFS() {
+
+ Path file = new Path(hdfsURI + hdPath);
+ org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/result");
+ try {
+ FileSystem fs = file.getFileSystem();
+ Assert.assertTrue("Must be HadoopFileSystem", fs instanceof HadoopFileSystem);
+ new DopOneTestEnvironment();
+ try {
+ WordCount.main(new String[]{file.toString(), result.toString()});
+ } catch(Throwable t) {
+ t.printStackTrace();
+ Assert.fail("Test failed with " + t.getMessage());
+ }
+ Assert.assertTrue("No result file present", hdfs.exists(result));
+ // validate output:
+ org.apache.hadoop.fs.FSDataInputStream inStream = hdfs.open(result);
+ StringWriter writer = new StringWriter();
+ IOUtils.copy(inStream, writer);
+ String resultString = writer.toString();
+
+ Assert.assertEquals("hdfs 10\n" +
+ "hello 10\n", resultString);
+ inStream.close();
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail("Error in test: " + e.getMessage() );
+ }
+ }
+
+ @Test
+ public void testAvroOut() {
+ String type = "one";
+ AvroOutputFormat<String> avroOut =
+ new AvroOutputFormat<String>( String.class );
+
+ org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/avroTest");
+
+ avroOut.setOutputFilePath(new Path(result.toString()));
+ avroOut.setWriteMode(FileSystem.WriteMode.NO_OVERWRITE);
+ avroOut.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.ALWAYS);
+
+ try {
+ avroOut.open(0, 2);
+ avroOut.writeRecord(type);
+ avroOut.close();
+
+ avroOut.open(1, 2);
+ avroOut.writeRecord(type);
+ avroOut.close();
+
+
+ Assert.assertTrue("No result file present", hdfs.exists(result));
+ FileStatus[] files = hdfs.listStatus(result);
+ Assert.assertEquals(2, files.length);
+ for(FileStatus file : files) {
+ Assert.assertTrue("1.avro".equals(file.getPath().getName()) || "2.avro".equals(file.getPath().getName()));
+ }
+
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ // package visible
+ static final class DopOneTestEnvironment extends LocalEnvironment {
+ static {
+ initializeContextEnvironment(new ExecutionEnvironmentFactory() {
+ @Override
+ public ExecutionEnvironment createExecutionEnvironment() {
+ LocalEnvironment le = new LocalEnvironment();
+ le.setParallelism(1);
+ return le;
+ }
+ });
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-fs-tests/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-fs-tests/src/test/resources/log4j.properties b/flink-staging/flink-fs-tests/src/test/resources/log4j.properties
new file mode 100644
index 0000000..f533ba2
--- /dev/null
+++ b/flink-staging/flink-fs-tests/src/test/resources/log4j.properties
@@ -0,0 +1,31 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Tachyon's test-jar dependency adds a log4j.properties file to classpath.
+# Until the issue is resolved (see https://github.com/amplab/tachyon/pull/571)
+# we provide a log4j.properties file ourselves.
+
+log4j.rootLogger=OFF, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-tachyon/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/pom.xml b/flink-staging/flink-tachyon/pom.xml
deleted file mode 100644
index 7ad9139..0000000
--- a/flink-staging/flink-tachyon/pom.xml
+++ /dev/null
@@ -1,113 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-staging</artifactId>
- <version>0.10-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-tachyon</artifactId>
- <name>flink-tachyon</name>
-
- <packaging>jar</packaging>
-
- <!--
- This is a Hadoop2 only flink module.
- -->
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>${shading-artifact.name}</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java-examples</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-avro</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <scope>test</scope>
- <type>test-jar</type>
- <version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <scope>test</scope>
- <type>test-jar</type>
- <version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
- </dependency>
- <dependency>
- <groupId>org.tachyonproject</groupId>
- <artifactId>tachyon</artifactId>
- <version>0.5.0</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.tachyonproject</groupId>
- <artifactId>tachyon</artifactId>
- <version>0.5.0</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-util</artifactId>
- <version>7.6.8.v20121106</version><!--$NO-MVN-MAN-VER$-->
- <scope>test</scope>
- </dependency>
- </dependencies>
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-server</artifactId>
- <version>7.6.8.v20121106</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-servlet</artifactId>
- <version>7.6.8.v20121106</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
- </dependencyManagement>
-</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
deleted file mode 100644
index 2873c78..0000000
--- a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tachyon;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.flink.runtime.state.FileStateHandle;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StateHandleProvider;
-import org.apache.flink.runtime.util.SerializedValue;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FileStateHandleTest {
-
- private String hdfsURI;
- private MiniDFSCluster hdfsCluster;
- private org.apache.hadoop.fs.Path hdPath;
- private org.apache.hadoop.fs.FileSystem hdfs;
-
- @Before
- public void createHDFS() {
- try {
- Configuration hdConf = new Configuration();
-
- File baseDir = new File("./target/hdfs/filestatehandletest").getAbsoluteFile();
- FileUtil.fullyDelete(baseDir);
- hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
- MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
- hdfsCluster = builder.build();
-
- hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":"
- + hdfsCluster.getNameNodePort() + "/";
-
- hdPath = new org.apache.hadoop.fs.Path("/StateHandleTest");
- hdfs = hdPath.getFileSystem(hdConf);
- hdfs.mkdirs(hdPath);
-
- } catch (Throwable e) {
- e.printStackTrace();
- Assert.fail("Test failed " + e.getMessage());
- }
- }
-
- @After
- public void destroyHDFS() {
- try {
- hdfs.delete(hdPath, true);
- hdfsCluster.shutdown();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- }
-
- @Test
- public void testFileStateHandle() throws Exception {
-
- Serializable state = "state";
-
- // Create a state handle provider for the hdfs directory
- StateHandleProvider<Serializable> handleProvider = FileStateHandle.createProvider(hdfsURI
- + hdPath);
-
- FileStateHandle handle = (FileStateHandle) handleProvider.createStateHandle(state);
-
- try {
- handleProvider.createStateHandle(null);
- fail();
- } catch (RuntimeException e) {
- // good
- }
-
- assertTrue(handle.stateFetched());
- assertFalse(handle.isWritten());
-
- // Serialize the handle so it writes the value to hdfs
- SerializedValue<StateHandle<Serializable>> serializedHandle = new SerializedValue<StateHandle<Serializable>>(
- handle);
-
- assertTrue(handle.isWritten());
-
- // Deserialize the handle and verify that the state is not fetched yet
- FileStateHandle deserializedHandle = (FileStateHandle) serializedHandle
- .deserializeValue(Thread.currentThread().getContextClassLoader());
- assertFalse(deserializedHandle.stateFetched());
-
- // Fetch the and compare with original
- assertEquals(state, deserializedHandle.getState());
-
- // Test whether discard removes the checkpoint file properly
- assertTrue(hdfs.listFiles(hdPath, true).hasNext());
- deserializedHandle.discardState();
- assertFalse(hdfs.listFiles(hdPath, true).hasNext());
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
deleted file mode 100644
index a761712..0000000
--- a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tachyon;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.examples.java.wordcount.WordCount;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.StringWriter;
-
-/**
- * This test should logically be located in the 'flink-runtime' tests. However, this project
- * has already all dependencies required (flink-java-examples). Also, the ParallelismOneExecEnv is here.
- */
-public class HDFSTest {
-
- protected String hdfsURI;
- private MiniDFSCluster hdfsCluster;
- private org.apache.hadoop.fs.Path hdPath;
- protected org.apache.hadoop.fs.FileSystem hdfs;
-
- @Before
- public void createHDFS() {
- try {
- Configuration hdConf = new Configuration();
-
- File baseDir = new File("./target/hdfs/hdfsTest").getAbsoluteFile();
- FileUtil.fullyDelete(baseDir);
- hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
- MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
- hdfsCluster = builder.build();
-
- hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
-
- hdPath = new org.apache.hadoop.fs.Path("/test");
- hdfs = hdPath.getFileSystem(hdConf);
- FSDataOutputStream stream = hdfs.create(hdPath);
- for(int i = 0; i < 10; i++) {
- stream.write("Hello HDFS\n".getBytes());
- }
- stream.close();
-
- } catch(Throwable e) {
- e.printStackTrace();
- Assert.fail("Test failed " + e.getMessage());
- }
- }
-
- @After
- public void destroyHDFS() {
- try {
- hdfs.delete(hdPath, false);
- hdfsCluster.shutdown();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- }
-
- @Test
- public void testHDFS() {
-
- Path file = new Path(hdfsURI + hdPath);
- org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/result");
- try {
- FileSystem fs = file.getFileSystem();
- Assert.assertTrue("Must be HadoopFileSystem", fs instanceof HadoopFileSystem);
- new TachyonFileSystemWrapperTest.DopOneTestEnvironment();
- try {
- WordCount.main(new String[]{file.toString(), result.toString()});
- } catch(Throwable t) {
- t.printStackTrace();
- Assert.fail("Test failed with " + t.getMessage());
- }
- Assert.assertTrue("No result file present", hdfs.exists(result));
- // validate output:
- org.apache.hadoop.fs.FSDataInputStream inStream = hdfs.open(result);
- StringWriter writer = new StringWriter();
- IOUtils.copy(inStream, writer);
- String resultString = writer.toString();
-
- Assert.assertEquals("hdfs 10\n" +
- "hello 10\n", resultString);
- inStream.close();
-
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail("Error in test: " + e.getMessage() );
- }
- }
-
- @Test
- public void testAvroOut() {
- String type = "one";
- AvroOutputFormat<String> avroOut =
- new AvroOutputFormat<String>( String.class );
-
- org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/avroTest");
-
- avroOut.setOutputFilePath(new Path(result.toString()));
- avroOut.setWriteMode(FileSystem.WriteMode.NO_OVERWRITE);
- avroOut.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.ALWAYS);
-
- try {
- avroOut.open(0, 2);
- avroOut.writeRecord(type);
- avroOut.close();
-
- avroOut.open(1, 2);
- avroOut.writeRecord(type);
- avroOut.close();
-
-
- Assert.assertTrue("No result file present", hdfs.exists(result));
- FileStatus[] files = hdfs.listStatus(result);
- Assert.assertEquals(2, files.length);
- for(FileStatus file : files) {
- Assert.assertTrue("1.avro".equals(file.getPath().getName()) || "2.avro".equals(file.getPath().getName()));
- }
-
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
deleted file mode 100644
index 3b2fb7f..0000000
--- a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.tachyon;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironmentFactory;
-import org.apache.flink.api.java.LocalEnvironment;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.examples.java.wordcount.WordCount;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import tachyon.client.InStream;
-import tachyon.client.OutStream;
-import tachyon.client.ReadType;
-import tachyon.client.TachyonFS;
-import tachyon.client.TachyonFile;
-import tachyon.client.WriteType;
-import tachyon.master.LocalTachyonCluster;
-
-import java.io.File;
-import java.io.StringWriter;
-import java.net.URISyntaxException;
-import java.net.URL;
-
-public class TachyonFileSystemWrapperTest {
- private static final long TACHYON_WORKER_CAPACITY = 1024 * 1024 * 32;
- private static final String TACHYON_TEST_IN_FILE_NAME = "tachyontest";
- private static final String TACHYON_TEST_OUT_FILE_NAME = "result";
- private static final Path HADOOP_CONFIG_PATH;
-
- static {
- URL resource = TachyonFileSystemWrapperTest.class.getResource("/tachyonHadoopConf.xml");
- File file = null;
- try {
- file = new File(resource.toURI());
- } catch (URISyntaxException e) {
- throw new RuntimeException("Unable to load req. res", e);
- }
- if(!file.exists()) {
- throw new RuntimeException("Unable to load required resource");
- }
- HADOOP_CONFIG_PATH = new Path(file.getAbsolutePath());
- }
-
- private LocalTachyonCluster cluster;
- private TachyonFS client;
- private String input;
- private String output;
-
- @Before
- public void startTachyon() {
- try {
- cluster = new LocalTachyonCluster(TACHYON_WORKER_CAPACITY);
- cluster.start();
- client = cluster.getClient();
- int id = client.createFile("/" + TACHYON_TEST_IN_FILE_NAME, 1024 * 32);
- Assert.assertNotEquals("Unable to create file", -1, id);
-
- TachyonFile testFile = client.getFile(id);
- Assert.assertNotNull(testFile);
-
-
- OutStream outStream = testFile.getOutStream(WriteType.MUST_CACHE);
- for(int i = 0; i < 10; i++) {
- outStream.write("Hello Tachyon\n".getBytes());
- }
- outStream.close();
- final String tachyonBase = "tachyon://" + cluster.getMasterHostname() + ":" + cluster.getMasterPort();
- input = tachyonBase + "/" + TACHYON_TEST_IN_FILE_NAME;
- output = tachyonBase + "/" + TACHYON_TEST_OUT_FILE_NAME;
-
- } catch(Exception e) {
- e.printStackTrace();
- Assert.fail("Test preparation failed with exception: "+e.getMessage());
- }
- }
-
- @After
- public void stopTachyon() {
- try {
- cluster.stop();
- } catch(Exception e) {
- e.printStackTrace();
- Assert.fail("Test teardown failed with exception: "+e.getMessage());
- }
- }
- // Verify that Hadoop's FileSystem can load the TFS (Tachyon File System)
- @Test
- public void testHadoopLoadability() {
- try {
- Path tPath = new Path(input);
- Configuration conf = new Configuration();
- conf.addResource(HADOOP_CONFIG_PATH);
- Assert.assertEquals("tachyon.hadoop.TFS", conf.get("fs.tachyon.impl", null));
- tPath.getFileSystem(conf);
- } catch(Exception e) {
- e.printStackTrace();
- Assert.fail("Test failed with exception: "+e.getMessage());
- }
- }
-
-
- @Test
- public void testTachyon() {
- try {
- org.apache.flink.configuration.Configuration addHDConfToFlinkConf = new org.apache.flink.configuration.Configuration();
- addHDConfToFlinkConf.setString(ConfigConstants.HDFS_DEFAULT_CONFIG, HADOOP_CONFIG_PATH.toString());
- GlobalConfiguration.includeConfiguration(addHDConfToFlinkConf);
-
- new DopOneTestEnvironment(); // initialize parallelism one
-
- WordCount.main(new String[]{input, output});
-
- // verify result
- TachyonFile resultFile = client.getFile("/" + TACHYON_TEST_OUT_FILE_NAME);
- Assert.assertNotNull("Result file has not been created", resultFile);
- InStream inStream = resultFile.getInStream(ReadType.CACHE);
- Assert.assertNotNull("Result file has not been created", inStream);
- StringWriter writer = new StringWriter();
- IOUtils.copy(inStream, writer);
- String resultString = writer.toString();
-
- Assert.assertEquals("hello 10\n" +
- "tachyon 10\n", resultString);
-
- } catch(Exception e) {
- e.printStackTrace();
- Assert.fail("Test failed with exception: "+e.getMessage());
- }
- }
-
- // package visible
- static final class DopOneTestEnvironment extends LocalEnvironment {
- static {
- initializeContextEnvironment(new ExecutionEnvironmentFactory() {
- @Override
- public ExecutionEnvironment createExecutionEnvironment() {
- LocalEnvironment le = new LocalEnvironment();
- le.setParallelism(1);
- return le;
- }
- });
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-tachyon/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/src/test/resources/log4j.properties b/flink-staging/flink-tachyon/src/test/resources/log4j.properties
deleted file mode 100644
index f533ba2..0000000
--- a/flink-staging/flink-tachyon/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,31 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Tachyon's test-jar dependency adds a log4j.properties file to classpath.
-# Until the issue is resolved (see https://github.com/amplab/tachyon/pull/571)
-# we provide a log4j.properties file ourselves.
-
-log4j.rootLogger=OFF, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-tachyon/src/test/resources/tachyonHadoopConf.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/src/test/resources/tachyonHadoopConf.xml b/flink-staging/flink-tachyon/src/test/resources/tachyonHadoopConf.xml
deleted file mode 100644
index 0af8190..0000000
--- a/flink-staging/flink-tachyon/src/test/resources/tachyonHadoopConf.xml
+++ /dev/null
@@ -1,28 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-
-<configuration>
- <property>
- <name>fs.tachyon.impl</name>
- <value>tachyon.hadoop.TFS</value>
- </property>
-</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml
index a05c8b1..b3aec14 100644
--- a/flink-staging/pom.xml
+++ b/flink-staging/pom.xml
@@ -59,10 +59,10 @@ under the License.
</property>
</activation>
<modules>
- <!-- Include the Flink-tachyon project only for HD2.
+ <!-- Include the flink-fs-tests project only for HD2.
The HDFS minicluster interfaces changed between the two versions.
-->
- <module>flink-tachyon</module>
+ <module>flink-fs-tests</module>
</modules>
</profile>
<profile>