You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/06 13:58:20 UTC
[3/6] flink git commit: [FLINK-2139] [streaming] Streaming
outputformat tests
[FLINK-2139] [streaming] Streaming outputformat tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f72e5c8c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f72e5c8c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f72e5c8c
Branch: refs/heads/master
Commit: f72e5c8cd781b2d32aa89fd62f3b0b8c78ded0f8
Parents: 2298cfe
Author: mbalassi <mb...@apache.org>
Authored: Thu Jun 4 11:04:46 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Sat Jun 6 13:56:54 2015 +0200
----------------------------------------------------------------------
.../flink/api/avro/AvroOutputFormatITCase.java | 173 +++++++++++++++++++
.../flink/api/avro/AvroOutputFormatTest.java | 173 -------------------
.../api/functions/sink/SocketClientSink.java | 2 +
.../api/outputformat/CsvOutputFormatITCase.java | 78 +++++++++
.../outputformat/SocketOutputFormatITCase.java | 52 ++++++
.../outputformat/TextOutputFormatITCase.java | 55 ++++++
.../streaming/util/SocketOutputTestBase.java | 129 ++++++++++++++
.../streaming/util/SocketProgramITCaseBase.java | 2 +-
.../socket/SocketTextStreamWordCountITCase.java | 2 +-
.../socket/SocketTextStreamWordCountITCase.java | 2 +-
.../flink-streaming-scala/pom.xml | 20 ++-
.../scala/api/CsvOutputFormatITCase.java | 67 +++++++
.../scala/api/SocketOutputFormatITCase.java | 36 ++++
.../scala/api/TextOutputFormatITCase.java | 43 +++++
.../api/scala/OutputFormatTestPrograms.scala | 76 ++++++++
15 files changed, 733 insertions(+), 177 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
new file mode 100644
index 0000000..d40fec5
--- /dev/null
+++ b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatITCase.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.avro;
+
+import org.junit.Assert;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.flink.api.io.avro.example.User;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.io.AvroOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+@SuppressWarnings("serial")
+public class AvroOutputFormatITCase extends JavaProgramTestBase {
+
+ public static String outputPath1;
+
+ public static String outputPath2;
+
+ public static String inputPath;
+
+ public static String userData = "alice|1|blue\n" +
+ "bob|2|red\n" +
+ "john|3|yellow\n" +
+ "walt|4|black\n";
+
+ @Override
+ protected void preSubmit() throws Exception {
+ inputPath = createTempFile("user", userData);
+ outputPath1 = getTempDirPath("avro_output1");
+ outputPath2 = getTempDirPath("avro_output2");
+ }
+
+
+ @Override
+ protected void testProgram() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<String, Integer, String>> input = env.readCsvFile(inputPath)
+ .fieldDelimiter("|")
+ .types(String.class, Integer.class, String.class);
+
+ //output the data with AvroOutputFormat for specific user type
+ DataSet<User> specificUser = input.map(new ConvertToUser());
+ specificUser.write(new AvroOutputFormat<User>(User.class), outputPath1);
+
+ //output the data with AvroOutputFormat for reflect user type
+ DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new ConvertToReflective());
+ reflectiveUser.write(new AvroOutputFormat<ReflectiveUser>(ReflectiveUser.class), outputPath2);
+
+ env.execute();
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ //compare result for specific user type
+ File [] output1;
+ File file1 = asFile(outputPath1);
+ if (file1.isDirectory()) {
+ output1 = file1.listFiles();
+ // check for avro ext in dir.
+ for (File avroOutput : output1) {
+ Assert.assertTrue("Expect extension '.avro'", avroOutput.toString().endsWith(".avro"));
+ }
+ } else {
+ output1 = new File[] {file1};
+ }
+ List<String> result1 = new ArrayList<String>();
+ DatumReader<User> userDatumReader1 = new SpecificDatumReader<User>(User.class);
+ for (File avroOutput : output1) {
+
+ DataFileReader<User> dataFileReader1 = new DataFileReader<User>(avroOutput, userDatumReader1);
+ while (dataFileReader1.hasNext()) {
+ User user = dataFileReader1.next();
+ result1.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor());
+ }
+ }
+ for (String expectedResult : userData.split("\n")) {
+ Assert.assertTrue("expected user " + expectedResult + " not found.", result1.contains(expectedResult));
+ }
+
+ //compare result for reflect user type
+ File [] output2;
+ File file2 = asFile(outputPath2);
+ if (file2.isDirectory()) {
+ output2 = file2.listFiles();
+ } else {
+ output2 = new File[] {file2};
+ }
+ List<String> result2 = new ArrayList<String>();
+ DatumReader<ReflectiveUser> userDatumReader2 = new ReflectDatumReader<ReflectiveUser>(ReflectiveUser.class);
+ for (File avroOutput : output2) {
+ DataFileReader<ReflectiveUser> dataFileReader2 = new DataFileReader<ReflectiveUser>(avroOutput, userDatumReader2);
+ while (dataFileReader2.hasNext()) {
+ ReflectiveUser user = dataFileReader2.next();
+ result2.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor());
+ }
+ }
+ for (String expectedResult : userData.split("\n")) {
+ Assert.assertTrue("expected user " + expectedResult + " not found.", result2.contains(expectedResult));
+ }
+
+
+ }
+
+
+ public final static class ConvertToUser extends RichMapFunction<Tuple3<String, Integer, String>, User> {
+
+ @Override
+ public User map(Tuple3<String, Integer, String> value) throws Exception {
+ return new User(value.f0, value.f1, value.f2);
+ }
+ }
+
+ public final static class ConvertToReflective extends RichMapFunction<User, ReflectiveUser> {
+
+ @Override
+ public ReflectiveUser map(User value) throws Exception {
+ return new ReflectiveUser(value.getName().toString(), value.getFavoriteNumber(), value.getFavoriteColor().toString());
+ }
+ }
+
+
+ public static class ReflectiveUser {
+ private String name;
+ private int favoriteNumber;
+ private String favoriteColor;
+
+ public ReflectiveUser() {}
+
+ public ReflectiveUser(String name, int favoriteNumber, String favoriteColor) {
+ this.name = name;
+ this.favoriteNumber = favoriteNumber;
+ this.favoriteColor = favoriteColor;
+ }
+
+ public String getName() {
+ return this.name;
+ }
+ public String getFavoriteColor() {
+ return this.favoriteColor;
+ }
+ public int getFavoriteNumber() {
+ return this.favoriteNumber;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
deleted file mode 100644
index a8bace3..0000000
--- a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroOutputFormatTest.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.avro;
-
-import org.junit.Assert;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.flink.api.io.avro.example.User;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-@SuppressWarnings("serial")
-public class AvroOutputFormatTest extends JavaProgramTestBase {
-
- public static String outputPath1;
-
- public static String outputPath2;
-
- public static String inputPath;
-
- public static String userData = "alice|1|blue\n" +
- "bob|2|red\n" +
- "john|3|yellow\n" +
- "walt|4|black\n";
-
- @Override
- protected void preSubmit() throws Exception {
- inputPath = createTempFile("user", userData);
- outputPath1 = getTempDirPath("avro_output1");
- outputPath2 = getTempDirPath("avro_output2");
- }
-
-
- @Override
- protected void testProgram() throws Exception {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Tuple3<String, Integer, String>> input = env.readCsvFile(inputPath)
- .fieldDelimiter("|")
- .types(String.class, Integer.class, String.class);
-
- //output the data with AvroOutputFormat for specific user type
- DataSet<User> specificUser = input.map(new ConvertToUser());
- specificUser.write(new AvroOutputFormat<User>(User.class), outputPath1);
-
- //output the data with AvroOutputFormat for reflect user type
- DataSet<ReflectiveUser> reflectiveUser = specificUser.map(new ConvertToReflective());
- reflectiveUser.write(new AvroOutputFormat<ReflectiveUser>(ReflectiveUser.class), outputPath2);
-
- env.execute();
- }
-
- @Override
- protected void postSubmit() throws Exception {
- //compare result for specific user type
- File [] output1;
- File file1 = asFile(outputPath1);
- if (file1.isDirectory()) {
- output1 = file1.listFiles();
- // check for avro ext in dir.
- for (File avroOutput : output1) {
- Assert.assertTrue("Expect extension '.avro'", avroOutput.toString().endsWith(".avro"));
- }
- } else {
- output1 = new File[] {file1};
- }
- List<String> result1 = new ArrayList<String>();
- DatumReader<User> userDatumReader1 = new SpecificDatumReader<User>(User.class);
- for (File avroOutput : output1) {
-
- DataFileReader<User> dataFileReader1 = new DataFileReader<User>(avroOutput, userDatumReader1);
- while (dataFileReader1.hasNext()) {
- User user = dataFileReader1.next();
- result1.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor());
- }
- }
- for (String expectedResult : userData.split("\n")) {
- Assert.assertTrue("expected user " + expectedResult + " not found.", result1.contains(expectedResult));
- }
-
- //compare result for reflect user type
- File [] output2;
- File file2 = asFile(outputPath2);
- if (file2.isDirectory()) {
- output2 = file2.listFiles();
- } else {
- output2 = new File[] {file2};
- }
- List<String> result2 = new ArrayList<String>();
- DatumReader<ReflectiveUser> userDatumReader2 = new ReflectDatumReader<ReflectiveUser>(ReflectiveUser.class);
- for (File avroOutput : output2) {
- DataFileReader<ReflectiveUser> dataFileReader2 = new DataFileReader<ReflectiveUser>(avroOutput, userDatumReader2);
- while (dataFileReader2.hasNext()) {
- ReflectiveUser user = dataFileReader2.next();
- result2.add(user.getName() + "|" + user.getFavoriteNumber() + "|" + user.getFavoriteColor());
- }
- }
- for (String expectedResult : userData.split("\n")) {
- Assert.assertTrue("expected user " + expectedResult + " not found.", result2.contains(expectedResult));
- }
-
-
- }
-
-
- public final static class ConvertToUser extends RichMapFunction<Tuple3<String, Integer, String>, User> {
-
- @Override
- public User map(Tuple3<String, Integer, String> value) throws Exception {
- return new User(value.f0, value.f1, value.f2);
- }
- }
-
- public final static class ConvertToReflective extends RichMapFunction<User, ReflectiveUser> {
-
- @Override
- public ReflectiveUser map(User value) throws Exception {
- return new ReflectiveUser(value.getName().toString(), value.getFavoriteNumber(), value.getFavoriteColor().toString());
- }
- }
-
-
- public static class ReflectiveUser {
- private String name;
- private int favoriteNumber;
- private String favoriteColor;
-
- public ReflectiveUser() {}
-
- public ReflectiveUser(String name, int favoriteNumber, String favoriteColor) {
- this.name = name;
- this.favoriteNumber = favoriteNumber;
- this.favoriteColor = favoriteColor;
- }
-
- public String getName() {
- return this.name;
- }
- public String getFavoriteColor() {
- return this.favoriteColor;
- }
- public int getFavoriteNumber() {
- return this.favoriteNumber;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
index cd6c21c..3fd2678 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
@@ -85,6 +85,8 @@ public class SocketClientSink<IN> extends RichSinkFunction<IN> {
if(LOG.isErrorEnabled()){
LOG.error("Cannot send message to socket server at " + hostName + ":" + port, e);
}
+ throw new RuntimeException("Cannot send message \"" + value.toString() +
+ "\" to socket server at " + hostName + ":" + port, e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
new file mode 100644
index 0000000..68e2a75
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/CsvOutputFormatITCase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.outputformat;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.util.Collector;
+
+public class CsvOutputFormatITCase extends StreamingProgramTestBase {
+
+ protected String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<String> text = env.fromElements(WordCountData.TEXT);
+
+ DataStream<Tuple2<String, Integer>> counts =
+ text.flatMap(new Tokenizer())
+ .groupBy(0).sum(1);
+
+ counts.writeAsCsv(resultPath);
+
+ env.execute("WriteAsCsvTest");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ //Strip the parentheses from the expected text like output
+ compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES
+ .replaceAll("[\\\\(\\\\)]", ""), resultPath);
+ }
+
+ public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
+ throws Exception {
+ // normalize and split the line
+ String[] tokens = value.toLowerCase().split("\\W+");
+
+ // emit the pairs
+ for (String token : tokens) {
+ if (token.length() > 0) {
+ out.collect(new Tuple2<String, Integer>(token, 1));
+ }
+ }
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java
new file mode 100644
index 0000000..bf96cc1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/SocketOutputFormatITCase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.outputformat;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.SocketOutputTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+import org.junit.Ignore;
+
+@Ignore
+//This test sometimes failes most likely due to the behaviour
+//of the socket. Disabled for now.
+public class SocketOutputFormatITCase extends SocketOutputTestBase {
+
+ @Override
+ protected void testProgram() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<String> text = env.fromElements(WordCountData.TEXT);
+
+ DataStream<String> counts =
+ text.flatMap(new CsvOutputFormatITCase.Tokenizer())
+ .groupBy(0).sum(1).map(new MapFunction<Tuple2<String, Integer>, String>() {
+ @Override
+ public String map(Tuple2<String, Integer> value) throws Exception {
+ return value.toString() + "\n";
+ }
+ });
+ counts.writeToSocket(HOST, port, new DummyStringSchema());
+
+ env.execute("WriteToSocketTest");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
new file mode 100644
index 0000000..3c48b3f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/outputformat/TextOutputFormatITCase.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.outputformat;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class TextOutputFormatITCase extends StreamingProgramTestBase {
+
+ protected String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<String> text = env.fromElements(WordCountData.TEXT);
+
+ DataStream<Tuple2<String, Integer>> counts =
+ text.flatMap(new CsvOutputFormatITCase.Tokenizer())
+ .groupBy(0).sum(1);
+
+ counts.writeAsText(resultPath);
+
+ env.execute("WriteAsTextTest");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
new file mode 100644
index 0000000..a6e1e7e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.test.testdata.WordCountData;
+import org.junit.Assert;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Test base for streaming programs relying on an open server socket to write to.
+ */
+public abstract class SocketOutputTestBase extends StreamingProgramTestBase {
+
+ protected static final String HOST = "localhost";
+ protected static Integer port;
+ protected Set<String> dataReadFromSocket = new HashSet<String>();
+
+ @Override
+ protected void preSubmit() throws Exception {
+ port = NetUtils.getAvailablePort();
+ temporarySocket = createLocalSocket(port);
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ Set<String> expectedData = new HashSet<String>(Arrays.asList(WordCountData.STREAMING_COUNTS_AS_TUPLES.split("\n")));
+ Assert.assertEquals(expectedData, dataReadFromSocket);
+ temporarySocket.close();
+ }
+
+ protected ServerSocket temporarySocket;
+
+ public ServerSocket createLocalSocket(int port) throws Exception {
+ ServerSocket serverSocket = new ServerSocket(port);
+ ServerThread st = new ServerThread(serverSocket);
+ st.start();
+ return serverSocket;
+ }
+
+ protected class ServerThread extends Thread {
+
+ private ServerSocket serverSocket;
+ private Thread t;
+
+ public ServerThread(ServerSocket serverSocket) {
+ this.serverSocket = serverSocket;
+ t = new Thread(this);
+ }
+
+ public void waitForAccept() throws Exception {
+ Socket socket = serverSocket.accept();
+ BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+ DeserializationSchema<String> schema = new DummyStringSchema();
+ String rawData = in.readLine();
+ while (rawData != null){
+ String string = schema.deserialize(rawData.getBytes());
+ dataReadFromSocket.add(string);
+ rawData = in.readLine();
+ }
+ socket.close();
+ }
+
+ public void run() {
+ try {
+ waitForAccept();
+ } catch (Exception e) {
+ Assert.fail();
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void start() {
+ t.start();
+ }
+ }
+
+ public static class DummyStringSchema implements DeserializationSchema<String>, SerializationSchema<String, byte[]>{
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean isEndOfStream(String nextElement) {
+ return nextElement.equals("q");
+ }
+
+ @Override
+ public byte[] serialize(String element) {
+ return element.getBytes();
+ }
+
+ @Override
+ public String deserialize(byte[] message) {
+ return new String(message);
+ }
+
+ @Override
+ public TypeInformation<String> getProducedType() {
+ return TypeExtractor.getForClass(String.class);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
index 43b061e..37f6958 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
@@ -86,4 +86,4 @@ public abstract class SocketProgramITCaseBase extends StreamingProgramTestBase {
t.start();
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java
index 20f6ebe..838834b 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleJavaPrograms/socket/SocketTextStreamWordCountITCase.java
@@ -27,4 +27,4 @@ public class SocketTextStreamWordCountITCase extends SocketProgramITCaseBase {
SocketTextStreamWordCount.main(new String[]{HOST, port.toString(), resultPath});
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
index cfde04f..b3629ad 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/socket/SocketTextStreamWordCountITCase.java
@@ -27,4 +27,4 @@ public class SocketTextStreamWordCountITCase extends SocketProgramITCaseBase {
SocketTextStreamWordCount.main(new String[]{HOST, port.toString(), resultPath});
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/pom.xml b/flink-staging/flink-streaming/flink-streaming-scala/pom.xml
index 51bea21..9ea30fc 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/pom.xml
+++ b/flink-staging/flink-streaming/flink-streaming-scala/pom.xml
@@ -72,6 +72,7 @@ under the License.
<version>${guava.version}</version>
</dependency>
+ <!-- To access general test utils -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-tests</artifactId>
@@ -80,6 +81,23 @@ under the License.
<type>test-jar</type>
</dependency>
+ <!-- To access test data -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- To access streaming test utils -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-core</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
</dependencies>
<build>
@@ -124,7 +142,7 @@ under the License.
</compilerPlugins>
</configuration>
</plugin>
-
+
<!-- Eclipse Integration -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
new file mode 100644
index 0000000..0c60719
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/CsvOutputFormatITCase.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.api;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.util.Collector;
+
+public class CsvOutputFormatITCase extends StreamingProgramTestBase {
+
+ protected String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ OutputFormatTestPrograms.wordCountToCsv(WordCountData.TEXT, resultPath);
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ //Strip the parentheses from the expected text like output
+ compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES
+ .replaceAll("[\\\\(\\\\)]", ""), resultPath);
+ }
+
+ public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
+ throws Exception {
+ // normalize and split the line
+ String[] tokens = value.toLowerCase().split("\\W+");
+
+ // emit the pairs
+ for (String token : tokens) {
+ if (token.length() > 0) {
+ out.collect(new Tuple2<String, Integer>(token, 1));
+ }
+ }
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
new file mode 100644
index 0000000..a2a78b7
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/SocketOutputFormatITCase.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.api;
+
+import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms;
+import org.apache.flink.streaming.util.SocketOutputTestBase;
+import org.apache.flink.streaming.util.SocketProgramITCaseBase;
+import org.apache.flink.test.testdata.WordCountData;
+import org.junit.Ignore;
+
+@Ignore
+//This test sometimes failes most likely due to the behaviour
+//of the socket. Disabled for now.
+public class SocketOutputFormatITCase extends SocketOutputTestBase {
+
+ @Override
+ protected void testProgram() throws Exception {
+ OutputFormatTestPrograms.wordCountToSocket(WordCountData.TEXT, HOST, port);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
new file mode 100644
index 0000000..530ba67
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/java/org/apache/flink/streaming/scala/api/TextOutputFormatITCase.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.api;
+
+import org.apache.flink.streaming.api.scala.OutputFormatTestPrograms;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class TextOutputFormatITCase extends StreamingProgramTestBase {
+
+ protected String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ OutputFormatTestPrograms.wordCountToText(WordCountData.TEXT, resultPath);
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f72e5c8c/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
new file mode 100644
index 0000000..88b0f4f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.streaming.util.SocketOutputTestBase.DummyStringSchema
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema
+
+import scala.language.existentials
+
+/**
+ * Test programs for built in output formats. Invoked from {@link OutputFormatTest}.
+ */
+object OutputFormatTestPrograms {
+
+ def wordCountToText(input : String, outputPath : String) : Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+ //Create streams for names and ages by mapping the inputs to the corresponding objects
+ val text = env.fromElements(input)
+ val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
+ .map { (_, 1) }
+ .groupBy(0)
+ .sum(1)
+
+ counts.writeAsText(outputPath)
+
+ env.execute("Scala WordCountToText")
+ }
+
+ def wordCountToCsv(input : String, outputPath : String) : Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+ //Create streams for names and ages by mapping the inputs to the corresponding objects
+ val text = env.fromElements(input)
+ val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
+ .map { (_, 1) }
+ .groupBy(0)
+ .sum(1)
+
+ counts.writeAsCsv(outputPath)
+
+ env.execute("Scala WordCountToCsv")
+ }
+
+ def wordCountToSocket(input : String, outputHost : String, outputPort : Int) : Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+ //Create streams for names and ages by mapping the inputs to the corresponding objects
+ val text = env.fromElements(input)
+ val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
+ .map { (_, 1) }
+ .groupBy(0)
+ .sum(1)
+ .map(tuple => tuple.toString() + "\n")
+
+ counts.writeToSocket(outputHost, outputPort, new DummyStringSchema())
+
+ env.execute("Scala WordCountToCsv")
+ }
+
+}