You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:26:28 UTC

[51/51] [abbrv] git commit: [streaming] Fixes for tests to support parallel builds

[streaming] Fixes for tests to support parallel builds


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/923b508d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/923b508d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/923b508d

Branch: refs/heads/master
Commit: 923b508d18fdf471c0c74a263bc6c93b9c4ecf59
Parents: 0163cfa
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 18 18:48:55 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 18:48:55 2014 +0200

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    |  1 -
 .../api/environment/LocalStreamEnvironment.java |  3 -
 .../environment/StreamExecutionEnvironment.java |  1 -
 .../flink/streaming/util/ClusterUtil.java       | 42 ++++++-------
 .../apache/flink/streaming/api/IterateTest.java |  7 +--
 .../apache/flink/streaming/api/PrintTest.java   |  9 +--
 .../flink/streaming/api/WriteAsCsvTest.java     | 57 ++++++++++--------
 .../flink/streaming/api/WriteAsTextTest.java    | 62 +++++++++++---------
 .../api/invokable/operator/BatchReduceTest.java |  2 -
 .../api/invokable/operator/CoMapTest.java       | 10 ++--
 .../api/invokable/operator/FilterTest.java      |  2 -
 .../api/invokable/operator/FlatMapTest.java     |  2 -
 .../api/invokable/operator/MapTest.java         |  2 -
 .../streamcomponent/StreamComponentTest.java    | 24 ++++----
 14 files changed, 109 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/923b508d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 243cc5e..ab14bc6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -159,7 +159,6 @@ public abstract class DataStream<OUT> {
 	 *            The DataStreams to merge output with.
 	 * @return The {@link MergedDataStream}.
 	 */
-	@SuppressWarnings("unchecked")
 	public MergedDataStream<OUT> merge(DataStream<OUT>... streams) {
 		MergedDataStream<OUT> returnStream = new MergedDataStream<OUT>(this);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/923b508d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 16e84bb..828a566 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,7 +13,6 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
 package org.apache.flink.streaming.api.environment;
@@ -36,5 +34,4 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 		ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getExecutionParallelism(),
 				memorySize);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/923b508d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 9b74b40..68e2421 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -194,7 +194,6 @@ public abstract class StreamExecutionEnvironment {
 	 *            type of the returned stream
 	 * @return The DataStream representing the elements.
 	 */
-	@SuppressWarnings("unchecked")
 	public <OUT extends Serializable> DataStreamSource<OUT> fromElements(OUT... data) {
 		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements");
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/923b508d/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index a6e842f..8815e78 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,7 +13,6 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
 package org.apache.flink.streaming.util;
@@ -25,7 +23,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.client.minicluster.NepheleMiniCluster;
 import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 
@@ -49,7 +46,6 @@ public class ClusterUtil {
 		NepheleMiniCluster exec = new NepheleMiniCluster();
 		exec.setMemorySize(memorySize);
 		exec.setNumTaskTracker(numberOfTaskTrackers);
-		Client client = new Client(new InetSocketAddress("localhost", 6498), configuration, ClusterUtil.class.getClassLoader());
 		
 		if (LOG.isInfoEnabled()) {
 			LOG.info("Running on mini cluster");
@@ -58,11 +54,15 @@ public class ClusterUtil {
 		try {
 			exec.start();
 
+			Client client = new Client(new InetSocketAddress("localhost", exec.getJobManagerRpcPort()), configuration, ClusterUtil.class.getClassLoader());
 			client.run(jobGraph, true);
 
-			exec.stop();
 		} catch (Exception e) {
 			throw new RuntimeException(e);
+		} finally {
+			try {
+				exec.stop();
+			} catch (Throwable t) {}
 		}
 	}
 
@@ -70,20 +70,22 @@ public class ClusterUtil {
 		runOnMiniCluster(jobGraph, numberOfTaskTrackers, -1);
 	}
 
-	public static void runOnLocalCluster(JobGraph jobGraph, String IP, int port) {
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Running on mini cluster");
-		}
-		
-		Configuration configuration = jobGraph.getJobConfiguration();
-
-		Client client = new Client(new InetSocketAddress(IP, port), configuration, ClusterUtil.class.getClassLoader());
-
-		try {
-			client.run(jobGraph, true);
-		} catch (ProgramInvocationException e) {
-			throw new RuntimeException(e);
-		}
-	}
+	// this one will not work easily any more, because we recently introduces concurrent test, which
+	// implies that the master RPC port becomes dynamic
+//	public static void runOnLocalCluster(JobGraph jobGraph, String IP, int port) {
+//		if (LOG.isInfoEnabled()) {
+//			LOG.info("Running on mini cluster");
+//		}
+//		
+//		Configuration configuration = jobGraph.getJobConfiguration();
+//
+//		Client client = new Client(new InetSocketAddress(IP, port), configuration, ClusterUtil.class.getClassLoader());
+//
+//		try {
+//			client.run(jobGraph, true);
+//		} catch (ProgramInvocationException e) {
+//			throw new RuntimeException(e);
+//		}
+//	}
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/923b508d/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 9498b8e..da9de05 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,7 +13,6 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
 package org.apache.flink.streaming.api;
@@ -27,9 +25,8 @@ import org.apache.flink.streaming.api.datastream.IterativeDataStream;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.util.LogUtils;
 import org.apache.flink.util.Collector;
-import org.apache.log4j.Level;
+import org.apache.flink.util.LogUtils;
 import org.junit.Test;
 
 public class IterateTest {
@@ -77,7 +74,7 @@ public class IterateTest {
 
 	@Test
 	public void test() throws Exception {
-		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
+		LogUtils.initializeDefaultTestConsoleLogger();
 
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/923b508d/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
index 90fac36..cdcd993 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,15 +13,13 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
 package org.apache.flink.streaming.api;
 
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.LogUtils;
-import org.apache.log4j.Level;
+import org.apache.flink.util.LogUtils;
 import org.junit.Test;
 
 public class PrintTest{
@@ -31,12 +28,10 @@ public class PrintTest{
 
 	@Test
 	public void test() throws Exception {
-		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
+		LogUtils.initializeDefaultTestConsoleLogger();
 
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 		env.generateSequence(1, 10).print();
 		env.executeTest(MEMORYSIZE);
-
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/923b508d/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
index 8e9475a..77dc9c8 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
@@ -34,9 +34,14 @@ import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.LogUtils;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class WriteAsCsvTest {
+	
+	private static final String PREFIX = System.getProperty("java.io.tmpdir") + "/" + WriteAsCsvTest.class.getSimpleName() + "_";
+	
 	private static final long MEMORYSIZE = 32;
 
 	private static List<String> result1 = new ArrayList<String>();
@@ -111,61 +116,67 @@ public class WriteAsCsvTest {
 		}
 	}
 
+	@BeforeClass
+	public static void createFileCleanup() {
+		Runnable r = new Runnable() {
+			
+			@Override
+			public void run() {
+				try { new File(PREFIX + "test1.txt").delete(); } catch (Throwable t) {}
+				try { new File(PREFIX + "test2.txt").delete(); } catch (Throwable t) {}
+				try { new File(PREFIX + "test3.txt").delete(); } catch (Throwable t) {}
+				try { new File(PREFIX + "test4.txt").delete(); } catch (Throwable t) {}
+				try { new File(PREFIX + "test5.txt").delete(); } catch (Throwable t) {}
+			}
+		};
+		
+		Runtime.getRuntime().addShutdownHook(new Thread(r));
+	}
+	
 	@Test
 	public void test() throws Exception {
 
+		LogUtils.initializeDefaultTestConsoleLogger();
+		
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1(), 1).writeAsCsv(
-				"test1.txt");
+		DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1(), 1).writeAsCsv(PREFIX + "test1.txt");
 
 		fillExpected1();
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream2 = env.addSource(new MySource1(), 1).writeAsCsv(
-				"test2.txt", 5);
+		DataStream<Tuple1<Integer>> dataStream2 = env.addSource(new MySource1(), 1).writeAsCsv(PREFIX + "test2.txt", 5);
 
 		fillExpected2();
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream3 = env.addSource(new MySource1(), 1).writeAsCsv(
-				"test3.txt", 10);
+		DataStream<Tuple1<Integer>> dataStream3 = env.addSource(new MySource1(), 1).writeAsCsv(PREFIX + "test3.txt", 10);
 
 		fillExpected3();
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream4 = env.addSource(new MySource1(), 1).writeAsCsv(
-				"test4.txt", 10, new Tuple1<Integer>(26));
+		DataStream<Tuple1<Integer>> dataStream4 = env.addSource(new MySource1(), 1).writeAsCsv(PREFIX + "test4.txt", 10, new Tuple1<Integer>(26));
 
 		fillExpected4();
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream5 = env.addSource(new MySource1(), 1).writeAsCsv(
-				"test5.txt", 10, new Tuple1<Integer>(14));
+		DataStream<Tuple1<Integer>> dataStream5 = env.addSource(new MySource1(), 1).writeAsCsv(PREFIX + "test5.txt", 10, new Tuple1<Integer>(14));
 
 		fillExpected5();
 
 		env.executeTest(MEMORYSIZE);
 
-		readFile("test1.txt", result1);
-		readFile("test2.txt", result2);
-		readFile("test3.txt", result3);
-		readFile("test4.txt", result4);
-		readFile("test5.txt", result5);
+		readFile(PREFIX + "test1.txt", result1);
+		readFile(PREFIX + "test2.txt", result2);
+		readFile(PREFIX + "test3.txt", result3);
+		readFile(PREFIX + "test4.txt", result4);
+		readFile(PREFIX + "test5.txt", result5);
 
 		assertTrue(expected1.equals(result1));
 		assertTrue(expected2.equals(result2));
 		assertTrue(expected3.equals(result3));
 		assertTrue(expected4.equals(result4));
 		assertTrue(expected5.equals(result5));
-
-		new File("test1.txt").delete();
-		new File("test2.txt").delete();
-		new File("test3.txt").delete();
-		new File("test4.txt").delete();
-		new File("test5.txt").delete();
-
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/923b508d/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
index 122979d..eea97ec 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,7 +13,6 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
 package org.apache.flink.streaming.api;
@@ -33,12 +31,15 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.streaming.util.LogUtils;
+import org.apache.flink.util.LogUtils;
 import org.apache.flink.util.Collector;
-import org.apache.log4j.Level;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class WriteAsTextTest {
+	
+	private static final String PREFIX = System.getProperty("java.io.tmpdir") + "/" + WriteAsTextTest.class.getSimpleName() + "_";
+	
 	private static final long MEMORYSIZE = 32;
 
 	private static List<String> result1 = new ArrayList<String>();
@@ -112,64 +113,69 @@ public class WriteAsTextTest {
 			expected5.add("(" + i + ")");
 		}
 	}
+	
+	@BeforeClass
+	public static void createFileCleanup() {
+		Runnable r = new Runnable() {
+			
+			@Override
+			public void run() {
+				try { new File(PREFIX + "test1.txt").delete(); } catch (Throwable t) {}
+				try { new File(PREFIX + "test2.txt").delete(); } catch (Throwable t) {}
+				try { new File(PREFIX + "test3.txt").delete(); } catch (Throwable t) {}
+				try { new File(PREFIX + "test4.txt").delete(); } catch (Throwable t) {}
+				try { new File(PREFIX + "test5.txt").delete(); } catch (Throwable t) {}
+			}
+		};
+		
+		Runtime.getRuntime().addShutdownHook(new Thread(r));
+	}
 
 	@Test
 	public void test() throws Exception {
 		
+		LogUtils.initializeDefaultTestConsoleLogger();
+		
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
-		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
 		
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1(), 1).writeAsText(
-				"test1.txt");
+		DataStream<Tuple1<Integer>> dataStream1 = env.addSource(new MySource1(), 1).writeAsText(PREFIX + "test1.txt");
 
 		fillExpected1();
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream2 = env.addSource(new MySource1(), 1).writeAsText(
-				"test2.txt", 5);
+		DataStream<Tuple1<Integer>> dataStream2 = env.addSource(new MySource1(), 1).writeAsText(PREFIX + "test2.txt", 5);
 
 		fillExpected2();
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream3 = env.addSource(new MySource1(), 1).writeAsText(
-				"test3.txt", 10);
+		DataStream<Tuple1<Integer>> dataStream3 = env.addSource(new MySource1(), 1).writeAsText(PREFIX + "test3.txt", 10);
 
 		fillExpected3();
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream4 = env.addSource(new MySource1(), 1).writeAsText(
-				"test4.txt", 10, new Tuple1<Integer>(26));
+		DataStream<Tuple1<Integer>> dataStream4 = env.addSource(new MySource1(), 1).writeAsText(PREFIX + "test4.txt", 10, new Tuple1<Integer>(26));
 
 		fillExpected4();
 
 		@SuppressWarnings("unused")
-		DataStream<Tuple1<Integer>> dataStream5 = env.addSource(new MySource1(), 1).writeAsText(
-				"test5.txt", 10, new Tuple1<Integer>(14));
+		DataStream<Tuple1<Integer>> dataStream5 = env.addSource(new MySource1(), 1).writeAsText(PREFIX + "test5.txt", 10, new Tuple1<Integer>(14));
 
 		fillExpected5();
 
 		env.executeTest(MEMORYSIZE);
 
-		readFile("test1.txt", result1);
-		readFile("test2.txt", result2);
-		readFile("test3.txt", result3);
-		readFile("test4.txt", result4);
-		readFile("test5.txt", result5);
+		readFile(PREFIX + "test1.txt", result1);
+		readFile(PREFIX + "test2.txt", result2);
+		readFile(PREFIX + "test3.txt", result3);
+		readFile(PREFIX + "test4.txt", result4);
+		readFile(PREFIX + "test5.txt", result5);
 
 		assertEquals(expected1,result1);
 		assertEquals(expected2,result2);
 		assertEquals(expected3,result3);
 		assertEquals(expected4,result4);
 		assertEquals(expected5,result5);
-
-		new File("test1.txt").delete();
-		new File("test2.txt").delete();
-		new File("test3.txt").delete();
-		new File("test4.txt").delete();
-		new File("test5.txt").delete();
-
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/923b508d/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
index 63ba627..85534ad 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,7 +13,6 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
 package org.apache.flink.streaming.api.invokable.operator;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/923b508d/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
index 068efe2..e24362e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,7 +13,6 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
 package org.apache.flink.streaming.api.invokable.operator;
@@ -28,8 +26,8 @@ import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.co.CoMapFunction;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.util.LogUtils;
-import org.apache.log4j.Level;
+import org.apache.flink.util.LogUtils;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -65,7 +63,9 @@ public class CoMapTest implements Serializable {
 
 	@Test
 	public void multipleInputTest() {
-		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
+		
+		LogUtils.initializeDefaultTestConsoleLogger();
+		
 		expected.add("a");
 		expected.add("b");
 		expected.add("c");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/923b508d/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
index 152f992..0f4a5a3 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,7 +13,6 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
 package org.apache.flink.streaming.api.invokable.operator;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/923b508d/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
index fe367d3..4d84435 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,7 +13,6 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
 package org.apache.flink.streaming.api.invokable.operator;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/923b508d/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
index e3c7cb7..8096a88 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,7 +13,6 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
 package org.apache.flink.streaming.api.invokable.operator;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/923b508d/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
index 545169d..b16ffee 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
@@ -1,5 +1,4 @@
 /**
- *
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,7 +13,6 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- *
  */
 
 package org.apache.flink.streaming.api.streamcomponent;
@@ -27,23 +25,22 @@ import java.util.Map;
 import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
-import org.junit.BeforeClass;
+import org.apache.flink.util.LogUtils;
 import org.junit.Test;
 
 public class StreamComponentTest {
 
 	@SuppressWarnings("unused")
-  private static final int PARALLELISM = 1;
+	private static final int PARALLELISM = 1;
 	private static final int SOURCE_PARALELISM = 1;
 	private static final long MEMORYSIZE = 32;
 
-	public static Map<Integer, Integer> data = new HashMap<Integer, Integer>();
+	private static Map<Integer, Integer> data = new HashMap<Integer, Integer>();
 
 	public static class MySource implements SourceFunction<Tuple1<Integer>> {
 		private static final long serialVersionUID = 1L;
@@ -93,20 +90,19 @@ public class StreamComponentTest {
 		}
 	}
 
-	@SuppressWarnings("unused")
-  @BeforeClass
-	public static void runStream() {
+	@Test
+	public void runStream() {
+		
+		LogUtils.initializeDefaultTestConsoleLogger();
+		
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(SOURCE_PARALELISM);
 
-		DataStream<Tuple2<Integer, Integer>> oneTask = env
+		env
 				.addSource(new MySource(), SOURCE_PARALELISM).map(new MyTask())
 				.addSink(new MySink());
 
 		env.executeTest(MEMORYSIZE);
-	}
-
-	@Test
-	public void test() {
+		
 		assertEquals(10, data.keySet().size());
 
 		for (Integer k : data.keySet()) {