You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/24 21:51:41 UTC
[08/12] git commit: [FLINK-1103] [streaming] Updated WordCount
example to become self-contained and removed old TestDataUtil
[FLINK-1103] [streaming] Updated WordCount example to become self-contained and removed old TestDataUtil
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/30ac9fe6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/30ac9fe6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/30ac9fe6
Branch: refs/heads/master
Commit: 30ac9fe650b833bea2a9ee61b7b2f34f6181eb6d
Parents: 2dc5437
Author: mbalassi <ba...@gmail.com>
Authored: Wed Sep 24 18:08:49 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Wed Sep 24 19:54:39 2014 +0200
----------------------------------------------------------------------
.../flink/streaming/util/TestDataUtil.java | 118 ---------------
.../flink/streaming/util/TestDataUtilTest.java | 44 ------
.../flink-streaming-examples/pom.xml | 6 +
.../streaming/examples/wordcount/WordCount.java | 149 +++++++++++++++++++
.../examples/wordcount/WordCountLocal.java | 59 --------
.../testdata_checksum/ASTopology.data.md5 | 1 -
.../testdata_checksum/MovieLens100k.data.md5 | 1 -
.../resources/testdata_checksum/hamlet.txt.md5 | 1 -
.../testdata_checksum/terainput.txt.md5 | 1 -
9 files changed, 155 insertions(+), 225 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30ac9fe6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
deleted file mode 100644
index ad42f1f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.MalformedURLException;
-import java.net.URL;
-
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestDataUtil {
-
- // TODO: Exception handling
- // TODO: check checksum after download
- private static final Logger LOG = LoggerFactory.getLogger(TestDataUtil.class);
- public static final String testDataDir = "src/test/resources/testdata/";
- public static final String testRepoUrl = "http://info.ilab.sztaki.hu/~mbalassi/flink-streaming/testdata/";
- public static final String testChekSumDir = "src/test/resources/testdata_checksum/";
-
- public static void downloadIfNotExists(String fileName) {
-
- File file = new File(testDataDir + fileName);
- File checkFile = new File(testChekSumDir + fileName + ".md5");
- String checkSumDesired = new String();
- String checkSumActaul = new String();
-
- File testDataDirectory = new File(testDataDir);
- testDataDirectory.mkdirs();
-
- try {
- FileReader fileReader = new FileReader(checkFile);
- BufferedReader bufferedReader = new BufferedReader(fileReader);
- checkSumDesired = bufferedReader.readLine();
- bufferedReader.close();
- fileReader.close();
- } catch (FileNotFoundException e) {
- throw new RuntimeException("File not found: " + file.getAbsolutePath(), e);
- } catch (IOException e) {
- throw new RuntimeException("Cannot read file: " + file.getAbsolutePath(), e);
- }
-
- if (file.exists()) {
- if (LOG.isInfoEnabled()) {
- LOG.info("{} already exists.", fileName);
- }
-
- try {
- checkSumActaul = DigestUtils.md5Hex(FileUtils.readFileToByteArray(file));
- } catch (IOException e) {
- throw new RuntimeException("Cannot read file to byte array: "
- + file.getAbsolutePath(), e);
- }
- if (!checkSumActaul.equals(checkSumDesired)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("Checksum is incorrect.");
- LOG.info("Downloading file.");
- }
- download(fileName);
- }
- } else {
- if (LOG.isInfoEnabled()) {
- LOG.info("File does not exist.");
- LOG.info("Downloading file.");
- }
- download(fileName);
- }
-
- }
-
- public static void download(String fileName) {
- if (LOG.isInfoEnabled()) {
- LOG.info("downloading {}", fileName);
- }
-
- try {
- URL website = new URL(testRepoUrl + fileName);
- BufferedReader bReader = new BufferedReader(new InputStreamReader(website.openStream()));
- File outFile = new File(testDataDir + fileName);
- BufferedWriter bWriter = new BufferedWriter(new FileWriter(outFile));
-
- String line;
- while ((line = bReader.readLine()) != null) {
- bWriter.write(line);
- bWriter.newLine();
- }
- bWriter.close();
- } catch (MalformedURLException e) {
- throw new RuntimeException("URL is malformed: ", e);
- } catch (IOException e) {
- throw new RuntimeException("Unexpected problem while downloading file " + fileName, e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30ac9fe6/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestDataUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestDataUtilTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestDataUtilTest.java
deleted file mode 100644
index fc11bcc..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestDataUtilTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.io.BufferedReader;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
-public class TestDataUtilTest {
-
- @SuppressWarnings("resource")
- public boolean compareFile(String file1, String file2) throws FileNotFoundException,
- IOException {
-
- BufferedReader myInput1 = new BufferedReader(new InputStreamReader(new FileInputStream(
- file1)));
- BufferedReader myInput2 = new BufferedReader(new InputStreamReader(new FileInputStream(
- file2)));
-
- String line1, line2;
- while ((line1 = myInput1.readLine()) != null && (line2 = myInput2.readLine()) != null) {
- if (!line1.equals(line2))
- return false;
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30ac9fe6/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
index 9c9f00d..7222879 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
@@ -44,6 +44,12 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-java-examples</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-connectors</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30ac9fe6/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
new file mode 100644
index 0000000..3be0c89
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.wordcount;
+
+import java.util.StringTokenizer;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence
+ * histogram over text files in a streaming fashion.
+ *
+ * <p>
+ * The input is a plain text file with lines separated by newline characters.
+ *
+ * <p>
+ * Usage: <code>WordCount <text path> <result path></code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link WordCountData}.
+ *
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>write a simple Flink Streaming program.
+ * <li>use Tuple data types.
+ * <li>write and use user-defined functions.
+ * </ul>
+ *
+ */
+public class WordCount {
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ // set up the execution environment
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // get input data
+ DataStream<String> text = getTextDataStream(env);
+
+ DataStream<Tuple2<String, Integer>> counts =
+ // split up the lines in pairs (2-tuples) containing: (word,1)
+ text.flatMap(new Tokenizer())
+ // group by the tuple field "0" and sum up tuple field "1"
+ .groupBy(0)
+ .sum(1);
+
+ // emit result
+ if (fileOutput) {
+ counts.writeAsText(outputPath, 1);
+ } else {
+ counts.print();
+ }
+
+ // execute program
+ env.execute();
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ /**
+ * Implements the string tokenizer that splits sentences into words as a
+ * user-defined FlatMapFunction. The function takes a line (String) and
+ * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
+ * Integer>).
+ */
+ public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap(String inTuple, Collector<Tuple2<String, Integer>> out)
+ throws Exception {
+ // tokenize the line
+ StringTokenizer tokenizer = new StringTokenizer(inTuple);
+
+ // emit the pairs
+ while (tokenizer.hasMoreTokens()) {
+ out.collect(new Tuple2<String, Integer>(tokenizer.nextToken(), 1));
+ }
+ }
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String textPath;
+ private static String outputPath;
+
+ private static boolean parseParameters(String[] args) {
+
+ if (args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+ if (args.length == 2) {
+ textPath = args[0];
+ outputPath = args[1];
+ } else {
+ System.err.println("Usage: WordCount <text path> <result path>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing WordCount example with built-in default data.");
+ System.out.println(" Provide parameters to read input data from a file.");
+ System.out.println(" Usage: WordCount <text path> <result path>");
+ }
+ return true;
+ }
+
+ private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
+ if (fileOutput) {
+ // read the text file from given input path
+ return env.readTextFile(textPath);
+ } else {
+ // get default test text data
+ return env.fromElements(WordCountData.WORDS);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30ac9fe6/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
deleted file mode 100644
index 9ffeeb1..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.wordcount;
-
-import java.util.StringTokenizer;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.TestDataUtil;
-import org.apache.flink.util.Collector;
-
-// This example will count the occurrence of each word in the input file.
-public class WordCountLocal {
-
- public static class WordCountSplitter implements
- FlatMapFunction<String, Tuple2<String, Integer>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap(String inTuple, Collector<Tuple2<String, Integer>> out)
- throws Exception {
- StringTokenizer tokenizer = new StringTokenizer(inTuple);
- while (tokenizer.hasMoreTokens()) {
- out.collect(new Tuple2<String, Integer>(tokenizer.nextToken(), 1));
- }
- }
- }
-
- public static void main(String[] args) throws Exception {
-
- TestDataUtil.downloadIfNotExists("hamlet.txt");
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
- DataStream<Tuple2<String, Integer>> dataStream = env
- .readTextFile("src/test/resources/testdata/hamlet.txt")
- .flatMap(new WordCountSplitter()).groupBy(0).sum(1);
-
- dataStream.print();
-
- env.execute();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30ac9fe6/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/ASTopology.data.md5
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/ASTopology.data.md5 b/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/ASTopology.data.md5
deleted file mode 100644
index 2c386b7..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/ASTopology.data.md5
+++ /dev/null
@@ -1 +0,0 @@
-f1b947a26b33b32f1de2cdd841f7b4c8
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30ac9fe6/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/MovieLens100k.data.md5
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/MovieLens100k.data.md5 b/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/MovieLens100k.data.md5
deleted file mode 100644
index 6499b43..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/MovieLens100k.data.md5
+++ /dev/null
@@ -1 +0,0 @@
-6e47046882bad158b0efbb84cd5cb987
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30ac9fe6/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/hamlet.txt.md5
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/hamlet.txt.md5 b/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/hamlet.txt.md5
deleted file mode 100644
index 6526a51..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/hamlet.txt.md5
+++ /dev/null
@@ -1 +0,0 @@
-4bb8c10cdde12a4953250423266465cc
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30ac9fe6/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/terainput.txt.md5
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/terainput.txt.md5 b/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/terainput.txt.md5
deleted file mode 100644
index 365f210..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/terainput.txt.md5
+++ /dev/null
@@ -1 +0,0 @@
-7002e15fe547614160a0df6f22a5b8d0