You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/09/24 21:51:41 UTC

[08/12] git commit: [FLINK-1103] [streaming] Updated WordCount example to become self-contained and removed old TestDataUtil

[FLINK-1103] [streaming] Updated WordCount example to become self-contained and removed old TestDataUtil


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/30ac9fe6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/30ac9fe6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/30ac9fe6

Branch: refs/heads/master
Commit: 30ac9fe650b833bea2a9ee61b7b2f34f6181eb6d
Parents: 2dc5437
Author: mbalassi <ba...@gmail.com>
Authored: Wed Sep 24 18:08:49 2014 +0200
Committer: mbalassi <ba...@gmail.com>
Committed: Wed Sep 24 19:54:39 2014 +0200

----------------------------------------------------------------------
 .../flink/streaming/util/TestDataUtil.java      | 118 ---------------
 .../flink/streaming/util/TestDataUtilTest.java  |  44 ------
 .../flink-streaming-examples/pom.xml            |   6 +
 .../streaming/examples/wordcount/WordCount.java | 149 +++++++++++++++++++
 .../examples/wordcount/WordCountLocal.java      |  59 --------
 .../testdata_checksum/ASTopology.data.md5       |   1 -
 .../testdata_checksum/MovieLens100k.data.md5    |   1 -
 .../resources/testdata_checksum/hamlet.txt.md5  |   1 -
 .../testdata_checksum/terainput.txt.md5         |   1 -
 9 files changed, 155 insertions(+), 225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30ac9fe6/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
deleted file mode 100644
index ad42f1f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/TestDataUtil.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.MalformedURLException;
-import java.net.URL;
-
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestDataUtil {
-
-	// TODO: Exception handling
-	// TODO: check checksum after download
-	private static final Logger LOG = LoggerFactory.getLogger(TestDataUtil.class);
-	public static final String testDataDir = "src/test/resources/testdata/";
-	public static final String testRepoUrl = "http://info.ilab.sztaki.hu/~mbalassi/flink-streaming/testdata/";
-	public static final String testChekSumDir = "src/test/resources/testdata_checksum/";
-
-	public static void downloadIfNotExists(String fileName) {
-
-		File file = new File(testDataDir + fileName);
-		File checkFile = new File(testChekSumDir + fileName + ".md5");
-		String checkSumDesired = new String();
-		String checkSumActaul = new String();
-
-		File testDataDirectory = new File(testDataDir);
-		testDataDirectory.mkdirs();
-
-		try {
-			FileReader fileReader = new FileReader(checkFile);
-			BufferedReader bufferedReader = new BufferedReader(fileReader);
-			checkSumDesired = bufferedReader.readLine();
-			bufferedReader.close();
-			fileReader.close();
-		} catch (FileNotFoundException e) {
-			throw new RuntimeException("File not found: " + file.getAbsolutePath(), e);
-		} catch (IOException e) {
-			throw new RuntimeException("Cannot read file: " + file.getAbsolutePath(), e);
-		}
-
-		if (file.exists()) {
-			if (LOG.isInfoEnabled()) {
-				LOG.info("{} already exists.", fileName);
-			}
-
-			try {
-				checkSumActaul = DigestUtils.md5Hex(FileUtils.readFileToByteArray(file));
-			} catch (IOException e) {
-				throw new RuntimeException("Cannot read file to byte array: "
-						+ file.getAbsolutePath(), e);
-			}
-			if (!checkSumActaul.equals(checkSumDesired)) {
-				if (LOG.isInfoEnabled()) {
-					LOG.info("Checksum is incorrect.");
-					LOG.info("Downloading file.");
-				}
-				download(fileName);
-			}
-		} else {
-			if (LOG.isInfoEnabled()) {
-				LOG.info("File does not exist.");
-				LOG.info("Downloading file.");
-			}
-			download(fileName);
-		}
-
-	}
-
-	public static void download(String fileName) {
-		if (LOG.isInfoEnabled()) {
-			LOG.info("downloading {}", fileName);
-		}
-		
-		try {
-			URL website = new URL(testRepoUrl + fileName);
-			BufferedReader bReader = new BufferedReader(new InputStreamReader(website.openStream()));
-			File outFile = new File(testDataDir + fileName);
-			BufferedWriter bWriter = new BufferedWriter(new FileWriter(outFile));
-
-			String line;
-			while ((line = bReader.readLine()) != null) {
-				bWriter.write(line);
-				bWriter.newLine();
-			}
-			bWriter.close();
-		} catch (MalformedURLException e) {
-			throw new RuntimeException("URL is malformed: ", e);
-		} catch (IOException e) {
-			throw new RuntimeException("Unexpected problem while downloading file " + fileName, e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30ac9fe6/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestDataUtilTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestDataUtilTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestDataUtilTest.java
deleted file mode 100644
index fc11bcc..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestDataUtilTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.io.BufferedReader;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStreamReader;
-
-public class TestDataUtilTest {
-
-	@SuppressWarnings("resource")
-	public boolean compareFile(String file1, String file2) throws FileNotFoundException,
-			IOException {
-
-		BufferedReader myInput1 = new BufferedReader(new InputStreamReader(new FileInputStream(
-				file1)));
-		BufferedReader myInput2 = new BufferedReader(new InputStreamReader(new FileInputStream(
-				file2)));
-
-		String line1, line2;
-		while ((line1 = myInput1.readLine()) != null && (line2 = myInput2.readLine()) != null) {
-			if (!line1.equals(line2))
-				return false;
-		}
-		return true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30ac9fe6/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
index 9c9f00d..7222879 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
@@ -44,6 +44,12 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java-examples</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+        <dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-streaming-connectors</artifactId>
 			<version>${project.version}</version>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30ac9fe6/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
new file mode 100644
index 0000000..3be0c89
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.wordcount;
+
+import java.util.StringTokenizer;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence
+ * histogram over text files in a streaming fashion.
+ * 
+ * <p>
+ * The input is a plain text file with lines separated by newline characters.
+ * 
+ * <p>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link WordCountData}.
+ * 
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>write a simple Flink Streaming program.
+ * <li>use Tuple data types.
+ * <li>write and use user-defined functions.
+ * </ul>
+ * 
+ */
+public class WordCount {
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// get input data
+		DataStream<String> text = getTextDataStream(env);
+
+		DataStream<Tuple2<String, Integer>> counts =
+				// split up the lines in pairs (2-tuples) containing: (word,1)
+				text.flatMap(new Tokenizer())
+				// group by the tuple field "0" and sum up tuple field "1"
+				.groupBy(0)
+				.sum(1);
+
+		// emit result
+		if (fileOutput) {
+			counts.writeAsText(outputPath, 1);
+		} else {
+			counts.print();
+		}
+
+		// execute program
+		env.execute();
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 * Implements the string tokenizer that splits sentences into words as a
+	 * user-defined FlatMapFunction. The function takes a line (String) and
+	 * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
+	 * Integer>).
+	 */
+	public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap(String inTuple, Collector<Tuple2<String, Integer>> out)
+				throws Exception {
+			// tokenize the line
+			StringTokenizer tokenizer = new StringTokenizer(inTuple);
+
+			// emit the pairs
+			while (tokenizer.hasMoreTokens()) {
+				out.collect(new Tuple2<String, Integer>(tokenizer.nextToken(), 1));
+			}
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if (args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: WordCount <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing WordCount example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from a file.");
+			System.out.println("  Usage: WordCount <text path> <result path>");
+		}
+		return true;
+	}
+
+	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
+		if (fileOutput) {
+			// read the text file from given input path
+			return env.readTextFile(textPath);
+		} else {
+			// get default test text data
+			return env.fromElements(WordCountData.WORDS);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30ac9fe6/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
deleted file mode 100644
index 9ffeeb1..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.wordcount;
-
-import java.util.StringTokenizer;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.TestDataUtil;
-import org.apache.flink.util.Collector;
-
-// This example will count the occurrence of each word in the input file.
-public class WordCountLocal {
-
-	public static class WordCountSplitter implements
-			FlatMapFunction<String, Tuple2<String, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(String inTuple, Collector<Tuple2<String, Integer>> out)
-				throws Exception {
-			StringTokenizer tokenizer = new StringTokenizer(inTuple);
-			while (tokenizer.hasMoreTokens()) {
-				out.collect(new Tuple2<String, Integer>(tokenizer.nextToken(), 1));
-			}
-		}
-	}
-
-	public static void main(String[] args) throws Exception {
-
-		TestDataUtil.downloadIfNotExists("hamlet.txt");
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
-		DataStream<Tuple2<String, Integer>> dataStream = env
-				.readTextFile("src/test/resources/testdata/hamlet.txt")
-				.flatMap(new WordCountSplitter()).groupBy(0).sum(1);
-
-		dataStream.print();
-
-		env.execute();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30ac9fe6/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/ASTopology.data.md5
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/ASTopology.data.md5 b/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/ASTopology.data.md5
deleted file mode 100644
index 2c386b7..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/ASTopology.data.md5
+++ /dev/null
@@ -1 +0,0 @@
-f1b947a26b33b32f1de2cdd841f7b4c8

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30ac9fe6/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/MovieLens100k.data.md5
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/MovieLens100k.data.md5 b/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/MovieLens100k.data.md5
deleted file mode 100644
index 6499b43..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/MovieLens100k.data.md5
+++ /dev/null
@@ -1 +0,0 @@
-6e47046882bad158b0efbb84cd5cb987

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30ac9fe6/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/hamlet.txt.md5
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/hamlet.txt.md5 b/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/hamlet.txt.md5
deleted file mode 100644
index 6526a51..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/hamlet.txt.md5
+++ /dev/null
@@ -1 +0,0 @@
-4bb8c10cdde12a4953250423266465cc

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/30ac9fe6/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/terainput.txt.md5
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/terainput.txt.md5 b/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/terainput.txt.md5
deleted file mode 100644
index 365f210..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/test/resources/testdata_checksum/terainput.txt.md5
+++ /dev/null
@@ -1 +0,0 @@
-7002e15fe547614160a0df6f22a5b8d0