You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:26:28 UTC
[51/51] [abbrv] git commit: [streaming] Fixes for tests to support
parallel builds
[streaming] Fixes for tests to support parallel builds
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/923b508d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/923b508d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/923b508d
Branch: refs/heads/master
Commit: 923b508d18fdf471c0c74a263bc6c93b9c4ecf59
Parents: 0163cfa
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 18 18:48:55 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 18:48:55 2014 +0200
----------------------------------------------------------------------
.../streaming/api/datastream/DataStream.java | 1 -
.../api/environment/LocalStreamEnvironment.java | 3 -
.../environment/StreamExecutionEnvironment.java | 1 -
.../flink/streaming/util/ClusterUtil.java | 42 ++++++-------
.../apache/flink/streaming/api/IterateTest.java | 7 +--
.../apache/flink/streaming/api/PrintTest.java | 9 +--
.../flink/streaming/api/WriteAsCsvTest.java | 57 ++++++++++--------
.../flink/streaming/api/WriteAsTextTest.java | 62 +++++++++++---------
.../api/invokable/operator/BatchReduceTest.java | 2 -
.../api/invokable/operator/CoMapTest.java | 10 ++--
.../api/invokable/operator/FilterTest.java | 2 -
.../api/invokable/operator/FlatMapTest.java | 2 -
.../api/invokable/operator/MapTest.java | 2 -
.../streamcomponent/StreamComponentTest.java | 24 ++++----
14 files changed, 109 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/923b508d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 243cc5e..ab14bc6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -159,7 +159,6 @@ public abstract class DataStream<OUT> {
* The DataStreams to merge output with.
* @return The {@link MergedDataStream}.
*/
- @SuppressWarnings("unchecked")
public MergedDataStream<OUT> merge(DataStream<OUT>... streams) {
MergedDataStream<OUT> returnStream = new MergedDataStream<OUT>(this);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/923b508d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 16e84bb..828a566 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,7 +13,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
package org.apache.flink.streaming.api.environment;
@@ -36,5 +34,4 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getExecutionParallelism(),
memorySize);
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/923b508d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 9b74b40..68e2421 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -194,7 +194,6 @@ public abstract class StreamExecutionEnvironment {
* type of the returned stream
* @return The DataStream representing the elements.
*/
- @SuppressWarnings("unchecked")
public <OUT extends Serializable> DataStreamSource<OUT> fromElements(OUT... data) {
DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/923b508d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index a6e842f..8815e78 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,7 +13,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
package org.apache.flink.streaming.util;
@@ -25,7 +23,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.client.minicluster.NepheleMiniCluster;
import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -49,7 +46,6 @@ public class ClusterUtil {
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.setMemorySize(memorySize);
exec.setNumTaskTracker(numberOfTaskTrackers);
- Client client = new Client(new InetSocketAddress("localhost", 6498), configuration, ClusterUtil.class.getClassLoader());
if (LOG.isInfoEnabled()) {
LOG.info("Running on mini cluster");
@@ -58,11 +54,15 @@ public class ClusterUtil {
try {
exec.start();
+ Client client = new Client(new InetSocketAddress("localhost", exec.getJobManagerRpcPort()), configuration, ClusterUtil.class.getClassLoader());
client.run(jobGraph, true);
- exec.stop();
} catch (Exception e) {
throw new RuntimeException(e);
+ } finally {
+ try {
+ exec.stop();
+ } catch (Throwable t) {}
}
}
@@ -70,20 +70,22 @@ public class ClusterUtil {
runOnMiniCluster(jobGraph, numberOfTaskTrackers, -1);
}
- public static void runOnLocalCluster(JobGraph jobGraph, String IP, int port) {
- if (LOG.isInfoEnabled()) {
- LOG.info("Running on mini cluster");
- }
-
- Configuration configuration = jobGraph.getJobConfiguration();
-
- Client client = new Client(new InetSocketAddress(IP, port), configuration, ClusterUtil.class.getClassLoader());
-
- try {
- client.run(jobGraph, true);
- } catch (ProgramInvocationException e) {
- throw new RuntimeException(e);
- }
- }
+ // this one will not work easily any more, because we recently introduces concurrent test, which
+ // implies that the master RPC port becomes dynamic
+// public static void runOnLocalCluster(JobGraph jobGraph, String IP, int port) {
+// if (LOG.isInfoEnabled()) {
+// LOG.info("Running on mini cluster");
+// }
+//
+// Configuration configuration = jobGraph.getJobConfiguration();
+//
+// Client client = new Client(new InetSocketAddress(IP, port), configuration, ClusterUtil.class.getClassLoader());
+//
+// try {
+// client.run(jobGraph, true);
+// } catch (ProgramInvocationException e) {
+// throw new RuntimeException(e);
+// }
+// }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/923b508d/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 9498b8e..da9de05 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,7 +13,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
package org.apache.flink.streaming.api;
@@ -27,9 +25,8 @@ import org.apache.flink.streaming.api.datastream.IterativeDataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.util.LogUtils;
import org.apache.flink.util.Collector;
-import org.apache.log4j.Level;
+import org.apache.flink.util.LogUtils;
import org.junit.Test;
public class IterateTest {
@@ -77,7 +74,7 @@ public class IterateTest {
@Test
public void test() throws Exception {
- LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
+ LogUtils.initializeDefaultTestConsoleLogger();
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/923b508d/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
index 90fac36..cdcd993 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,15 +13,13 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
package org.apache.flink.streaming.api;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.LogUtils;
-import org.apache.log4j.Level;
+import org.apache.flink.util.LogUtils;
import org.junit.Test;
public class PrintTest{
@@ -31,12 +28,10 @@ public class PrintTest{
@Test
public void test() throws Exception {
- LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
+ LogUtils.initializeDefaultTestConsoleLogger();
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
env.generateSequence(1, 10).print();
env.executeTest(MEMORYSIZE);
-
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/923b508d/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
index 8e9475a..77dc9c8 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
@@ -34,9 +34,14 @@ import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.LogUtils;
+import org.junit.BeforeClass;
import org.junit.Test;
public class WriteAsCsvTest {
+
+ private static final String PREFIX = System.getProperty("java.io.tmpdir") + "/" + WriteAsCsvTest.class.getSimpleName() + "_";
+
private static final long MEMORYSIZE = 32;
private static List<String> result1 = new ArrayList<String>();
@@ -111,61 +116,67 @@ public class WriteAsCsvTest {
}
}
+ @BeforeClass
+ public static void createFileCleanup() {
+ Runnable r = new Runnable() {
+
+ @Override
+ public void run() {
+ try { new File(PREFIX + "test1.txt").delete(); } catch (Throwable t) {}
+ try { new File(PREFIX + "test2.txt").delete(); } catch (Throwable t) {}
+ try { new File(PREFIX + "test3.txt").delete(); } catch (Throwable t) {}
+ try { new File(PREFIX + "test4.txt").delete(); } catch (Throwable t) {}
+ try { new File(PREFIX + "test5.txt").delete(); } catch (Throwable t) {}
+ }
+ };
+
+ Runtime.getRuntime().addShutdownHook(new Thread(r));
+ }
+
@Test
public void test() throws Exception {
+ LogUtils.initializeDefaultTestConsoleLogger();
+
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
@SuppressWarnings("unused")
- DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1(), 1).writeAsCsv(
- "test1.txt");
+ DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1(), 1).writeAsCsv(PREFIX + "test1.txt");
fillExpected1();
@SuppressWarnings("unused")
- DataStream<Tuple1<Integer>> dataStream2 = env.addSource(new MySource1(), 1).writeAsCsv(
- "test2.txt", 5);
+ DataStream<Tuple1<Integer>> dataStream2 = env.addSource(new MySource1(), 1).writeAsCsv(PREFIX + "test2.txt", 5);
fillExpected2();
@SuppressWarnings("unused")
- DataStream<Tuple1<Integer>> dataStream3 = env.addSource(new MySource1(), 1).writeAsCsv(
- "test3.txt", 10);
+ DataStream<Tuple1<Integer>> dataStream3 = env.addSource(new MySource1(), 1).writeAsCsv(PREFIX + "test3.txt", 10);
fillExpected3();
@SuppressWarnings("unused")
- DataStream<Tuple1<Integer>> dataStream4 = env.addSource(new MySource1(), 1).writeAsCsv(
- "test4.txt", 10, new Tuple1<Integer>(26));
+ DataStream<Tuple1<Integer>> dataStream4 = env.addSource(new MySource1(), 1).writeAsCsv(PREFIX + "test4.txt", 10, new Tuple1<Integer>(26));
fillExpected4();
@SuppressWarnings("unused")
- DataStream<Tuple1<Integer>> dataStream5 = env.addSource(new MySource1(), 1).writeAsCsv(
- "test5.txt", 10, new Tuple1<Integer>(14));
+ DataStream<Tuple1<Integer>> dataStream5 = env.addSource(new MySource1(), 1).writeAsCsv(PREFIX + "test5.txt", 10, new Tuple1<Integer>(14));
fillExpected5();
env.executeTest(MEMORYSIZE);
- readFile("test1.txt", result1);
- readFile("test2.txt", result2);
- readFile("test3.txt", result3);
- readFile("test4.txt", result4);
- readFile("test5.txt", result5);
+ readFile(PREFIX + "test1.txt", result1);
+ readFile(PREFIX + "test2.txt", result2);
+ readFile(PREFIX + "test3.txt", result3);
+ readFile(PREFIX + "test4.txt", result4);
+ readFile(PREFIX + "test5.txt", result5);
assertTrue(expected1.equals(result1));
assertTrue(expected2.equals(result2));
assertTrue(expected3.equals(result3));
assertTrue(expected4.equals(result4));
assertTrue(expected5.equals(result5));
-
- new File("test1.txt").delete();
- new File("test2.txt").delete();
- new File("test3.txt").delete();
- new File("test4.txt").delete();
- new File("test5.txt").delete();
-
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/923b508d/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
index 122979d..eea97ec 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,7 +13,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
package org.apache.flink.streaming.api;
@@ -33,12 +31,15 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.util.LogUtils;
+import org.apache.flink.util.LogUtils;
import org.apache.flink.util.Collector;
-import org.apache.log4j.Level;
+import org.junit.BeforeClass;
import org.junit.Test;
public class WriteAsTextTest {
+
+ private static final String PREFIX = System.getProperty("java.io.tmpdir") + "/" + WriteAsTextTest.class.getSimpleName() + "_";
+
private static final long MEMORYSIZE = 32;
private static List<String> result1 = new ArrayList<String>();
@@ -112,64 +113,69 @@ public class WriteAsTextTest {
expected5.add("(" + i + ")");
}
}
+
+ @BeforeClass
+ public static void createFileCleanup() {
+ Runnable r = new Runnable() {
+
+ @Override
+ public void run() {
+ try { new File(PREFIX + "test1.txt").delete(); } catch (Throwable t) {}
+ try { new File(PREFIX + "test2.txt").delete(); } catch (Throwable t) {}
+ try { new File(PREFIX + "test3.txt").delete(); } catch (Throwable t) {}
+ try { new File(PREFIX + "test4.txt").delete(); } catch (Throwable t) {}
+ try { new File(PREFIX + "test5.txt").delete(); } catch (Throwable t) {}
+ }
+ };
+
+ Runtime.getRuntime().addShutdownHook(new Thread(r));
+ }
@Test
public void test() throws Exception {
+ LogUtils.initializeDefaultTestConsoleLogger();
+
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
- LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
@SuppressWarnings("unused")
- DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1(), 1).writeAsText(
- "test1.txt");
+ DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1(), 1).writeAsText(PREFIX + "test1.txt");
fillExpected1();
@SuppressWarnings("unused")
- DataStream<Tuple1<Integer>> dataStream2 = env.addSource(new MySource1(), 1).writeAsText(
- "test2.txt", 5);
+ DataStream<Tuple1<Integer>> dataStream2 = env.addSource(new MySource1(), 1).writeAsText(PREFIX + "test2.txt", 5);
fillExpected2();
@SuppressWarnings("unused")
- DataStream<Tuple1<Integer>> dataStream3 = env.addSource(new MySource1(), 1).writeAsText(
- "test3.txt", 10);
+ DataStream<Tuple1<Integer>> dataStream3 = env.addSource(new MySource1(), 1).writeAsText(PREFIX + "test3.txt", 10);
fillExpected3();
@SuppressWarnings("unused")
- DataStream<Tuple1<Integer>> dataStream4 = env.addSource(new MySource1(), 1).writeAsText(
- "test4.txt", 10, new Tuple1<Integer>(26));
+ DataStream<Tuple1<Integer>> dataStream4 = env.addSource(new MySource1(), 1).writeAsText(PREFIX + "test4.txt", 10, new Tuple1<Integer>(26));
fillExpected4();
@SuppressWarnings("unused")
- DataStream<Tuple1<Integer>> dataStream5 = env.addSource(new MySource1(), 1).writeAsText(
- "test5.txt", 10, new Tuple1<Integer>(14));
+ DataStream<Tuple1<Integer>> dataStream5 = env.addSource(new MySource1(), 1).writeAsText(PREFIX + "test5.txt", 10, new Tuple1<Integer>(14));
fillExpected5();
env.executeTest(MEMORYSIZE);
- readFile("test1.txt", result1);
- readFile("test2.txt", result2);
- readFile("test3.txt", result3);
- readFile("test4.txt", result4);
- readFile("test5.txt", result5);
+ readFile(PREFIX + "test1.txt", result1);
+ readFile(PREFIX + "test2.txt", result2);
+ readFile(PREFIX + "test3.txt", result3);
+ readFile(PREFIX + "test4.txt", result4);
+ readFile(PREFIX + "test5.txt", result5);
assertEquals(expected1,result1);
assertEquals(expected2,result2);
assertEquals(expected3,result3);
assertEquals(expected4,result4);
assertEquals(expected5,result5);
-
- new File("test1.txt").delete();
- new File("test2.txt").delete();
- new File("test3.txt").delete();
- new File("test4.txt").delete();
- new File("test5.txt").delete();
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/923b508d/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
index 63ba627..85534ad 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,7 +13,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
package org.apache.flink.streaming.api.invokable.operator;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/923b508d/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
index 068efe2..e24362e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,7 +13,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
package org.apache.flink.streaming.api.invokable.operator;
@@ -28,8 +26,8 @@ import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.util.LogUtils;
-import org.apache.log4j.Level;
+import org.apache.flink.util.LogUtils;
+
import org.junit.Assert;
import org.junit.Test;
@@ -65,7 +63,9 @@ public class CoMapTest implements Serializable {
@Test
public void multipleInputTest() {
- LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
+
+ LogUtils.initializeDefaultTestConsoleLogger();
+
expected.add("a");
expected.add("b");
expected.add("c");
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/923b508d/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
index 152f992..0f4a5a3 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,7 +13,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
package org.apache.flink.streaming.api.invokable.operator;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/923b508d/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
index fe367d3..4d84435 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,7 +13,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
package org.apache.flink.streaming.api.invokable.operator;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/923b508d/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
index e3c7cb7..8096a88 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,7 +13,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
package org.apache.flink.streaming.api.invokable.operator;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/923b508d/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
index 545169d..b16ffee 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
@@ -1,5 +1,4 @@
/**
- *
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,7 +13,6 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- *
*/
package org.apache.flink.streaming.api.streamcomponent;
@@ -27,23 +25,22 @@ import java.util.Map;
import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.util.Collector;
-import org.junit.BeforeClass;
+import org.apache.flink.util.LogUtils;
import org.junit.Test;
public class StreamComponentTest {
@SuppressWarnings("unused")
- private static final int PARALLELISM = 1;
+ private static final int PARALLELISM = 1;
private static final int SOURCE_PARALELISM = 1;
private static final long MEMORYSIZE = 32;
- public static Map<Integer, Integer> data = new HashMap<Integer, Integer>();
+ private static Map<Integer, Integer> data = new HashMap<Integer, Integer>();
public static class MySource implements SourceFunction<Tuple1<Integer>> {
private static final long serialVersionUID = 1L;
@@ -93,20 +90,19 @@ public class StreamComponentTest {
}
}
- @SuppressWarnings("unused")
- @BeforeClass
- public static void runStream() {
+ @Test
+ public void runStream() {
+
+ LogUtils.initializeDefaultTestConsoleLogger();
+
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(SOURCE_PARALELISM);
- DataStream<Tuple2<Integer, Integer>> oneTask = env
+ env
.addSource(new MySource(), SOURCE_PARALELISM).map(new MyTask())
.addSink(new MySink());
env.executeTest(MEMORYSIZE);
- }
-
- @Test
- public void test() {
+
assertEquals(10, data.keySet().size());
for (Integer k : data.keySet()) {