You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/10 15:30:44 UTC

[3/5] incubator-flink git commit: [streaming] Java 8 WordCount example added for streaming

[streaming] Java 8 WordCount example added for streaming

[streaming] Documentation added for streaming java 8 support


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d8066c55
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d8066c55
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d8066c55

Branch: refs/heads/master
Commit: d8066c55baa6e2f8373f74cccfffe5fc352ea845
Parents: 4a7ba2d
Author: mbalassi <mb...@apache.org>
Authored: Tue Dec 9 15:13:49 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Dec 10 13:27:38 2014 +0100

----------------------------------------------------------------------
 docs/streaming_guide.md                         |  53 ++++++--
 .../flink/streaming/api/JobGraphBuilder.java    |   2 +-
 flink-java8/pom.xml                             |  15 +++
 .../examples/java8/wordcount/WordCount.java     | 127 +++++++++++++++++++
 4 files changed, 188 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8066c55/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index ec3675a..379bb73 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -366,15 +366,19 @@ Calling the `.groupBy(fields)` method on a windowed stream groups the elements b
 The user can also create windows and triggers on a per group basis calling `.window(…).every(…)` on an already grouped data stream. To highlight the differences let us look at to examples.
 
 To get the maximal value by key on the last 100 elements we use the first approach:
+
 ~~~java
 dataStream.window(Count.of(100)).every(…).groupBy(groupingField).max(field);
 ~~~
+
 Using this approach we took the last 100 elements, divided it into groups by key then applied the aggregation.
 
 To create fixed size windows for every key we need to reverse the order of the groupBy call. So to take the max for the last 100 elements in Each group:
+
 ~~~java
 dataStream.groupBy(groupingField).window(Count.of(100)).every(…).max(field);
 ~~~
+
 This will create separate windows for different keys and apply the trigger and eviction policies on a per group basis.
 
 ### Temporal database style operators
@@ -451,7 +455,7 @@ dataStream1.connect(dataStream2)
         })
 ~~~
 
-#### windowReduce on ConnectedDataStream
+#### WindowReduce on ConnectedDataStream
 The windowReduce operator applies a user defined `CoWindowFunction` to time aligned windows of the two data streams and return zero or more elements of an arbitrary type. The user can define the window and slide intervals and can also implement custom timestamps to be used for calculating windows.
 
 #### Reduce on ConnectedDataStream
@@ -497,13 +501,6 @@ Iterable<String> select(Integer value) {
 }
 ~~~
 
-Or more compactly we can use lambda expressions in Java 8:
-
-~~~java
-SplitDataStream<Integer> split = someDataStream
-					.split(x -> Arrays.asList(String.valueOf(x % 2)));
-~~~
-
 Every output will be emitted to the selected outputs exactly once, even if you add the same output names more than once.
 
 ### Iterations
@@ -549,6 +546,46 @@ Rich functions provide, in addition to the user-defined function (`map()`, `redu
 
 [Back to top](#top)
 
+### Lambda expressions with Java 8
+
+For a more consice code one can rely on one of the main feautere of Java 8, lambda expressions. The following program has similar functionality to the one provided in the [example](#example-program) section, while showcasing the usage of lambda expressions.
+
+~~~java
+public class StreamingWordCount {
+    public static void main(String[] args) throws Exception {
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+	    DataStream<String> text = env.fromElements(
+                "Who's there?",
+                "I think I hear them. Stand, ho! Who's there?");
+
+            DataStream<Tuple2<String, Integer>> counts = 
+		// normalize and split each line
+		text.map(line -> line.toLowerCase().split("\\W+"))
+		// convert splitted line in pairs (2-tuples) containing: (word,1)
+		.flatMap((String[] tokens, Collector<Tuple2<String, Integer>> out) -> {
+		// emit the pairs with non-zero-length words
+			Arrays.stream(tokens)
+				.filter(t -> t.length() > 0)
+				.forEach(t -> out.collect(new Tuple2<>(t, 1)));
+		})
+		// group by the tuple field "0" and sum up tuple field "1"
+		.groupBy(0)
+		.sum(1);
+
+        counts.print();
+
+        env.execute("Streaming WordCount");
+    }
+}
+~~~
+
+For a detailed Java 8 Guide please refer to the [Java 8 Programming Guide](java8_programming_guide.html). Operators specific to streaming, such as Operator splitting also support this usage. [Output splitting](#output-splitting) can be rewritten as follows:
+
+~~~java
+SplitDataStream<Integer> split = someDataStream
+					.split(x -> Arrays.asList(String.valueOf(x % 2)));
+~~~
 
 Operator Settings
 ----------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8066c55/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index c45164a..e80d86d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -127,7 +127,7 @@ public class JobGraphBuilder {
 	 * @param inTypeInfo
 	 *            Input type for serialization
 	 * @param outTypeInfo
-	 *            Output typ for serialization
+	 *            Output type for serialization
 	 * @param operatorName
 	 *            Operator type
 	 * @param serializedFunction

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8066c55/flink-java8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java8/pom.xml b/flink-java8/pom.xml
index 2a3eb09..ec6a3e7 100644
--- a/flink-java8/pom.xml
+++ b/flink-java8/pom.xml
@@ -45,16 +45,31 @@ under the License.
 			<artifactId>flink-core</artifactId>
 			<version>${project.version}</version>
 		</dependency>
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-test-utils</artifactId>
 			<version>${project.version}</version>
 		</dependency>
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-java</artifactId>
 			<version>${project.version}</version>
 		</dependency>
+		
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-core</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java-examples</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
 		<dependency>
 			<groupId>junit</groupId>
 			<artifactId>junit</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d8066c55/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java b/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
new file mode 100644
index 0000000..c1d6042
--- /dev/null
+++ b/flink-java8/src/main/java/org/apache/flink/streaming/examples/java8/wordcount/WordCount.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.java8.wordcount;
+
+import java.util.Arrays;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+
+/**
+ * Implements the streaming "WordCount" program that computes a simple word occurrences
+ * over text files. 
+ * 
+ * <p>
+ * The input is a plain text file with lines separated by newline characters.
+ * 
+ * <p>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link WordCountData}.
+ * 
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>write a compact Flink Streaming program with Java 8 Lambda Expressions.
+ * </ul>
+ * 
+ */
+public class WordCount {
+	
+	// *************************************************************************
+	//     PROGRAM
+	// *************************************************************************
+	
+	public static void main(String[] args) throws Exception {
+		
+		if(!parseParameters(args)) {
+			return;
+		}
+		
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		
+		// get input data
+		DataStream<String> text = getTextDataStream(env);
+		
+		DataStream<Tuple2<String, Integer>> counts = 
+				// normalize and split each line
+				text.map(line -> line.toLowerCase().split("\\W+"))
+				// convert splitted line in pairs (2-tuples) containing: (word,1)
+				.flatMap((String[] tokens, Collector<Tuple2<String, Integer>> out) -> {
+					// emit the pairs with non-zero-length words
+					Arrays.stream(tokens)
+					.filter(t -> t.length() > 0)
+					.forEach(t -> out.collect(new Tuple2<>(t, 1)));
+				})
+				// group by the tuple field "0" and sum up tuple field "1"
+				.groupBy(0)
+				.sum(1);
+
+		// emit result
+		if(fileOutput) {
+			counts.writeAsCsv(outputPath, 1);
+		} else {
+			counts.print();
+		}
+		
+		// execute program
+		env.execute("Streaming WordCount Example");
+	}
+	
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+	
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+	
+	private static boolean parseParameters(String[] args) {
+		
+		if(args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if(args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: WordCount <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing WordCount example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from a file.");
+			System.out.println("  Usage: WordCount <text path> <result path>");
+		}
+		return true;
+	}
+	
+	private static DataStream<String> getTextDataStream(StreamExecutionEnvironment env) {
+		if (fileOutput) {
+			// read the text file from given input path
+			return env.readTextFile(textPath);
+		} else {
+			// get default test text data
+			return env.fromElements(WordCountData.WORDS);
+		}
+	}
+}