You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/06 13:58:20 UTC

[3/6] flink git commit: [FLINK-2139] [streaming] Streaming outputformat tests

[FLINK-2139] [streaming] Streaming outputformat tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f72e5c8c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f72e5c8c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f72e5c8c

Branch: refs/heads/master
Commit: f72e5c8cd781b2d32aa89fd62f3b0b8c78ded0f8
Parents: 2298cfe
Author: mbalassi <mb...@apache.org>
Authored: Thu Jun 4 11:04:46 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Sat Jun 6 13:56:54 2015 +0200

----------------------------------------------------------------------
 .../flink/api/avro/AvroOutputFormatITCase.java  | 173 +++++++++++++++++++
 .../flink/api/avro/AvroOutputFormatTest.java    | 173 -------------------
 .../api/functions/sink/SocketClientSink.java    |   2 +
 .../api/outputformat/CsvOutputFormatITCase.java |  78 +++++++++
 .../outputformat/SocketOutputFormatITCase.java  |  52 ++++++
 .../outputformat/TextOutputFormatITCase.java    |  55 ++++++
 .../streaming/util/SocketOutputTestBase.java    | 129 ++++++++++++++
 .../streaming/util/SocketProgramITCaseBase.java |   2 +-
 .../socket/SocketTextStreamWordCountITCase.java |   2 +-
 .../socket/SocketTextStreamWordCountITCase.java |   2 +-
 .../flink-streaming-scala/pom.xml               |  20 ++-
 .../scala/api/CsvOutputFormatITCase.java        |  67 +++++++
 .../scala/api/SocketOutputFormatITCase.java     |  36 ++++
 .../scala/api/TextOutputFormatITCase.java       |  43 +++++
 .../api/scala/OutputFormatTestPrograms.scala    |  76 ++++++++
 15 files changed, 733 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
new file mode 100644
index 0000000..d40fec5
--- /dev/null
+++ b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.avro;
+
+import org.junit.Assert;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.flink.api.io.avro.example.User;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.io.AvroOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+@SuppressWarnings("serial")
+public class AvroOutputFormatITCase extends JavaProgramTestBase {
+
+	public static String outputPath1;
+
+	public static String outputPath2;
+
+	public static String inputPath;
+
+	public static String userData = "alice|1|blue\n" +
+		"bob|2|red\n" +
+		"john|3|yellow\n" +
+		"walt|4|black\n";
+
+	@Override
+	protected void preSubmit() throws Exception {
+		inputPath = createTempFile("user", userData);
+		outputPath1 = getTempDirPath("avro_output1");
+		outputPath2 = getTempDirPath("avro_output2");
+	}
+
+
+	@Override
+	protected void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<String, Integer, String>> input = env.readCsvFile(inputPath)
+			.fieldDelimiter("|")
+			.types(String.class, Integer.class, String.class);
+
+		//output the data with AvroOutputFormat for specific user type
+		DataSet<User> specificUser = input.map(new ConvertToUser());
+		specificUser.write(new AvroOutputFormat<User>(User.class), outputPath1);
+
+		//output the data with AvroOutputFormat for reflect user type
+		DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new ConvertToReflective());
+		reflectiveUser.write(new AvroOutputFormat<ReflectiveUser>(ReflectiveUser.class), outputPath2);
+
+		env.execute();
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		//compare result for specific user type
+		File [] output1;
+		File file1 = asFile(outputPath1);
+		if (file1.isDirectory()) {
+			output1 = file1.listFiles();
+			// check for avro ext in dir.
+			for (File avroOutput : output1) {
+				Assert.assertTrue("Expect extension '.avro'", avroOutput.toString().endsWith(".avro"));
+			}
+		} else {
+			output1 = new File[] {file1};
+		}
+		List<String> result1 = new ArrayList<String>();
+		DatumReader<User> userDatumReader1 = new SpecificDatumReader<User>(User.class);
+		for (File avroOutput : output1) {
+
+			DataFileReader<User> dataFileReader1 = new DataFileReader<User>(avroOutput, userDatumReader1);
+			while (dataFileReader1.hasNext()) {
+				User user = dataFileReader1.next();
+				result1.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor());
+			}
+		}
+		for (String expectedResult : userData.split("\n")) {
+			Assert.assertTrue("expected user " + expectedResult + " not found.", result1.contains(expectedResult));
+		}
+
+		//compare result for reflect user type
+		File [] output2;
+		File file2 = asFile(outputPath2);
+		if (file2.isDirectory()) {
+			output2 = file2.listFiles();
+		} else {
+			output2 = new File[] {file2};
+		}
+		List<String> result2 = new ArrayList<String>();
+		DatumReader<ReflectiveUser> userDatumReader2 = new ReflectDatumReader<ReflectiveUser>(ReflectiveUser.class);
+		for (File avroOutput : output2) {
+			DataFileReader<ReflectiveUser> dataFileReader2 = new DataFileReader<ReflectiveUser>(avroOutput, userDatumReader2);
+			while (dataFileReader2.hasNext()) {
+				ReflectiveUser user = dataFileReader2.next();
+				result2.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor());
+			}
+		}
+		for (String expectedResult : userData.split("\n")) {
+			Assert.assertTrue("expected user " + expectedResult + " not found.", result2.contains(expectedResult));
+		}
+
+
+	}
+
+
+	public final static class ConvertToUser extends RichMapFunction<Tuple3<String, Integer, String>, User> {
+
+		@Override
+		public User map(Tuple3<String, Integer, String> value) throws Exception {
+			return new User(value.f0, value.f1, value.f2);
+		}
+	}
+
+	public final static class ConvertToReflective extends RichMapFunction<User, ReflectiveUser> {
+
+		@Override
+		public ReflectiveUser map(User value) throws Exception {
+			return new ReflectiveUser(value.getName().toString(), value.getFavoriteNumber(), value.getFavoriteColor().toString());
+		}
+	}
+
+	
+	public static class ReflectiveUser {
+		private String name;
+		private int favoriteNumber;
+		private String favoriteColor;
+
+		public ReflectiveUser() {}
+
+		public ReflectiveUser(String name, int favoriteNumber, String favoriteColor) {
+			this.name = name;
+			this.favoriteNumber = favoriteNumber;
+			this.favoriteColor = favoriteColor;
+		}
+		
+		public String getName() {
+			return this.name;
+		}
+		public String getFavoriteColor() {
+			return this.favoriteColor;
+		}
+		public int getFavoriteNumber() {
+			return this.favoriteNumber;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
deleted file mode 100644
index a8bace3..0000000
--- a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import org.junit.Assert;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.flink.api.io.avro.example.User;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-@SuppressWarnings("serial")
-public class AvroOutputFormatTest extends JavaProgramTestBase {
-
-	public static String outputPath1;
-
-	public static String outputPath2;
-
-	public static String inputPath;
-
-	public static String userData = "alice|1|blue\n" +
-		"bob|2|red\n" +
-		"john|3|yellow\n" +
-		"walt|4|black\n";
-
-	@Override
-	protected void preSubmit() throws Exception {
-		inputPath = createTempFile("user", userData);
-		outputPath1 = getTempDirPath("avro_output1");
-		outputPath2 = getTempDirPath("avro_output2");
-	}
-
-
-	@Override
-	protected void testProgram() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<String, Integer, String>> input = env.readCsvFile(inputPath)
-			.fieldDelimiter("|")
-			.types(String.class, Integer.class, String.class);
-
-		//output the data with AvroOutputFormat for specific user type
-		DataSet<User> specificUser = input.map(new ConvertToUser());
-		specificUser.write(new AvroOutputFormat<User>(User.class), outputPath1);
-
-		//output the data with AvroOutputFormat for reflect user type
-		DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new ConvertToReflective());
-		reflectiveUser.write(new AvroOutputFormat<ReflectiveUser>(ReflectiveUser.class), outputPath2);
-
-		env.execute();
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		//compare result for specific user type
-		File [] output1;
-		File file1 = asFile(outputPath1);
-		if (file1.isDirectory()) {
-			output1 = file1.listFiles();
-			// check for avro ext in dir.
-			for (File avroOutput : output1) {
-				Assert.assertTrue("Expect extension '.avro'", avroOutput.toString().endsWith(".avro"));
-			}
-		} else {
-			output1 = new File[] {file1};
-		}
-		List<String> result1 = new ArrayList<String>();
-		DatumReader<User> userDatumReader1 = new SpecificDatumReader<User>(User.class);
-		for (File avroOutput : output1) {
-
-			DataFileReader<User> dataFileReader1 = new DataFileReader<User>(avroOutput, userDatumReader1);
-			while (dataFileReader1.hasNext()) {
-				User user = dataFileReader1.next();
-				result1.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor());
-			}
-		}
-		for (String expectedResult : userData.split("\n")) {
-			Assert.assertTrue("expected user " + expectedResult + " not found.", result1.contains(expectedResult));
-		}
-
-		//compare result for reflect user type
-		File [] output2;
-		File file2 = asFile(outputPath2);
-		if (file2.isDirectory()) {
-			output2 = file2.listFiles();
-		} else {
-			output2 = new File[] {file2};
-		}
-		List<String> result2 = new ArrayList<String>();
-		DatumReader<ReflectiveUser> userDatumReader2 = new ReflectDatumReader<ReflectiveUser>(ReflectiveUser.class);
-		for (File avroOutput : output2) {
-			DataFileReader<ReflectiveUser> dataFileReader2 = new DataFileReader<ReflectiveUser>(avroOutput, userDatumReader2);
-			while (dataFileReader2.hasNext()) {
-				ReflectiveUser user = dataFileReader2.next();
-				result2.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor());
-			}
-		}
-		for (String expectedResult : userData.split("\n")) {
-			Assert.assertTrue("expected user " + expectedResult + " not found.", result2.contains(expectedResult));
-		}
-
-
-	}
-
-
-	public final static class ConvertToUser extends RichMapFunction<Tuple3<String, Integer, String>, User> {
-
-		@Override
-		public User map(Tuple3<String, Integer, String> value) throws Exception {
-			return new User(value.f0, value.f1, value.f2);
-		}
-	}
-
-	public final static class ConvertToReflective extends RichMapFunction<User, ReflectiveUser> {
-
-		@Override
-		public ReflectiveUser map(User value) throws Exception {
-			return new ReflectiveUser(value.getName().toString(), value.getFavoriteNumber(), value.getFavoriteColor().toString());
-		}
-	}
-
-	
-	public static class ReflectiveUser {
-		private String name;
-		private int favoriteNumber;
-		private String favoriteColor;
-
-		public ReflectiveUser() {}
-
-		public ReflectiveUser(String name, int favoriteNumber, String favoriteColor) {
-			this.name = name;
-			this.favoriteNumber = favoriteNumber;
-			this.favoriteColor = favoriteColor;
-		}
-		
-		public String getName() {
-			return this.name;
-		}
-		public String getFavoriteColor() {
-			return this.favoriteColor;
-		}
-		public int getFavoriteNumber() {
-			return this.favoriteNumber;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
index cd6c21c..3fd2678 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
@@ -85,6 +85,8 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
 			if(LOG.isErrorEnabled()){
 				LOG.error("Cannot send message to socket server at " + hostName + ":" + port, e);
 			}
+			throw new RuntimeException("Cannot send message \"" + value.toString() +
+					"\" to socket server at " + hostName + ":" + port, e);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
new file mode 100644
index 0000000..68e2a75
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.outputformat;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.util.Collector;
+
+public class CsvOutputFormatITCase extends StreamingProgramTestBase {
+
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<String> text = env.fromElements(WordCountData.TEXT);
+
+		DataStream<Tuple2<String, Integer>> counts =
+				text.flatMap(new Tokenizer())
+						.groupBy(0).sum(1);
+
+		counts.writeAsCsv(resultPath);
+
+		env.execute("WriteAsCsvTest");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		//Strip the parentheses from the expected text like output
+		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES
+				.replaceAll("[\\\\(\\\\)]", ""), resultPath);
+	}
+
+	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
+				throws Exception {
+			// normalize and split the line
+			String[] tokens = value.toLowerCase().split("\\W+");
+
+			// emit the pairs
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Tuple2<String, Integer>(token, 1));
+				}
+			}
+		}
+	}
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java
new file mode 100644
index 0000000..bf96cc1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.outputformat;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.SocketOutputTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+import org.junit.Ignore;
+
+@Ignore
+//This test sometimes failes most likely due to the behaviour
+//of the socket. Disabled for now.
+public class SocketOutputFormatITCase extends SocketOutputTestBase {
+
+	@Override
+	protected void testProgram() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<String> text = env.fromElements(WordCountData.TEXT);
+
+		DataStream<String> counts =
+				text.flatMap(new CsvOutputFormatITCase.Tokenizer())
+						.groupBy(0).sum(1).map(new MapFunction<Tuple2<String, Integer>, String>() {
+					@Override
+					public String map(Tuple2<String, Integer> value) throws Exception {
+						return value.toString() + "\n";
+					}
+				});
+		counts.writeToSocket(HOST, port, new DummyStringSchema());
+
+		env.execute("WriteToSocketTest");
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
new file mode 100644
index 0000000..3c48b3f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.outputformat;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class TextOutputFormatITCase extends StreamingProgramTestBase {
+
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<String> text = env.fromElements(WordCountData.TEXT);
+
+		DataStream<Tuple2<String, Integer>> counts =
+				text.flatMap(new CsvOutputFormatITCase.Tokenizer())
+						.groupBy(0).sum(1);
+
+		counts.writeAsText(resultPath);
+
+		env.execute("WriteAsTextTest");
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
new file mode 100644
index 0000000..a6e1e7e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.test.testdata.WordCountData;
+import org.junit.Assert;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Test base for streaming programs relying on an open server socket to write to.
+ */
+public abstract class SocketOutputTestBase extends StreamingProgramTestBase {
+
+	protected static final String HOST = "localhost";
+	protected static Integer port;
+	protected Set<String> dataReadFromSocket = new HashSet<String>();
+
+	@Override
+	protected void preSubmit() throws Exception {
+		port = NetUtils.getAvailablePort();
+		temporarySocket = createLocalSocket(port);
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		Set<String> expectedData = new HashSet<String>(Arrays.asList(WordCountData.STREAMING_COUNTS_AS_TUPLES.split("\n")));
+		Assert.assertEquals(expectedData, dataReadFromSocket);
+		temporarySocket.close();
+	}
+
+	protected ServerSocket temporarySocket;
+
+	public ServerSocket createLocalSocket(int port) throws Exception {
+		ServerSocket serverSocket = new ServerSocket(port);
+		ServerThread st = new ServerThread(serverSocket);
+		st.start();
+		return serverSocket;
+	}
+
+	protected class ServerThread extends Thread {
+
+		private ServerSocket serverSocket;
+		private Thread t;
+
+		public ServerThread(ServerSocket serverSocket) {
+			this.serverSocket = serverSocket;
+			t = new Thread(this);
+		}
+
+		public void waitForAccept() throws Exception {
+			Socket socket = serverSocket.accept();
+			BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+			DeserializationSchema<String> schema = new DummyStringSchema();
+			String rawData = in.readLine();
+			while (rawData != null){
+				String string = schema.deserialize(rawData.getBytes());
+				dataReadFromSocket.add(string);
+				rawData = in.readLine();
+			}
+			socket.close();
+		}
+
+		public void run() {
+			try {
+				waitForAccept();
+			} catch (Exception e) {
+				Assert.fail();
+				throw new RuntimeException(e);
+			}
+		}
+
+		@Override
+		public void start() {
+			t.start();
+		}
+	}
+
+	public static class DummyStringSchema implements DeserializationSchema<String>, SerializationSchema<String, byte[]>{
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public boolean isEndOfStream(String nextElement) {
+		return nextElement.equals("q");
+	}
+
+		@Override
+		public byte[] serialize(String element) {
+		return element.getBytes();
+	}
+
+		@Override
+		public String deserialize(byte[] message) {
+		return new String(message);
+	}
+
+		@Override
+		public TypeInformation<String> getProducedType() {
+		return TypeExtractor.getForClass(String.class);
+	}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
index 43b061e..37f6958 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
@@ -86,4 +86,4 @@ public abstract class SocketProgramITCaseBase extends StreamingProgramTestBase {
 			t.start();
 		}
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java
index 20f6ebe..838834b 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java
@@ -27,4 +27,4 @@ public class SocketTextStreamWordCountITCase extends SocketProgramITCaseBase {
 		SocketTextStreamWordCount.main(new String[]{HOST, port.toString(), resultPath});
 	}
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
index cfde04f..b3629ad 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
@@ -27,4 +27,4 @@ public class SocketTextStreamWordCountITCase extends SocketProgramITCaseBase {
 		SocketTextStreamWordCount.main(new String[]{HOST, port.toString(), resultPath});
 	}
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/pom.xml b/flink-staging/flink-streaming/flink-streaming-scala/pom.xml
index 51bea21..9ea30fc 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/pom.xml
+++ b/flink-staging/flink-streaming/flink-streaming-scala/pom.xml
@@ -72,6 +72,7 @@ under the License.
 			<version>${guava.version}</version>
 		</dependency>
 
+		<!-- To access general test utils -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-tests</artifactId>
@@ -80,6 +81,23 @@ under the License.
 			<type>test-jar</type>
 		</dependency>
 
+		<!-- To access test data -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<!-- To access streaming test utils -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
 	</dependencies>
 
 	<build>
@@ -124,7 +142,7 @@ under the License.
 				   </compilerPlugins>
 				</configuration>
 			</plugin>
-			
+
 			<!-- Eclipse Integration -->
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
new file mode 100644
index 0000000..0c60719
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.api;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.util.Collector;
+
+public class CsvOutputFormatITCase extends StreamingProgramTestBase {
+
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath);
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		//Strip the parentheses from the expected text like output
+		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES
+				.replaceAll("[\\\\(\\\\)]", ""), resultPath);
+	}
+
+	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
+				throws Exception {
+			// normalize and split the line
+			String[] tokens = value.toLowerCase().split("\\W+");
+
+			// emit the pairs
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Tuple2<String, Integer>(token, 1));
+				}
+			}
+		}
+	}
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
new file mode 100644
index 0000000..a2a78b7
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.api;
+
+import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms;
+import org.apache.flink.streaming.util.SocketOutputTestBase;
+import org.apache.flink.streaming.util.SocketProgramITCaseBase;
+import org.apache.flink.test.testdata.WordCountData;
+import org.junit.Ignore;
+
+@Ignore
+//This test sometimes failes most likely due to the behaviour
+//of the socket. Disabled for now.
+public class SocketOutputFormatITCase extends SocketOutputTestBase {
+
+		@Override
+		protected void testProgram() throws Exception {
+			OutputFormatTestPrograms.wordCountToSocket(WordCountData.TEXT, HOST, port);
+		}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
new file mode 100644
index 0000000..530ba67
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.api;
+
+import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class TextOutputFormatITCase extends StreamingProgramTestBase {
+
+	protected String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		OutputFormatTestPrograms.wordCountToText(WordCountData.TEXT, resultPath);
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
new file mode 100644
index 0000000..88b0f4f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.streaming.util.SocketOutputTestBase.DummyStringSchema
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema
+
+import scala.language.existentials
+
+/**
+ * Test programs for built in output formats. Invoked from {@link OutputFormatTest}.
+ */
+object OutputFormatTestPrograms  {
+
+  def wordCountToText(input : String, outputPath : String) : Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    //Create streams for names and ages by mapping the inputs to the corresponding objects
+    val text = env.fromElements(input)
+    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
+      .map { (_, 1) }
+      .groupBy(0)
+      .sum(1)
+
+    counts.writeAsText(outputPath)
+
+    env.execute("Scala WordCountToText")
+  }
+
+  def wordCountToCsv(input : String, outputPath : String) : Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    //Create streams for names and ages by mapping the inputs to the corresponding objects
+    val text = env.fromElements(input)
+    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
+      .map { (_, 1) }
+      .groupBy(0)
+      .sum(1)
+
+    counts.writeAsCsv(outputPath)
+
+    env.execute("Scala WordCountToCsv")
+  }
+
+  def wordCountToSocket(input : String, outputHost : String, outputPort : Int) : Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    //Create streams for names and ages by mapping the inputs to the corresponding objects
+    val text = env.fromElements(input)
+    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
+      .map { (_, 1) }
+      .groupBy(0)
+      .sum(1)
+      .map(tuple => tuple.toString() + "\n")
+
+    counts.writeToSocket(outputHost, outputPort, new DummyStringSchema())
+
+    env.execute("Scala WordCountToCsv")
+  }
+
+}