You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/11/24 16:32:04 UTC

[2/2] incubator-flink git commit: [FLINK-1173] [streaming] SocketTextStream minor fixes + documentation

[FLINK-1173] [streaming] SocketTextStream minor fixes + documentation

This closes #204


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6dfb3fad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6dfb3fad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6dfb3fad

Branch: refs/heads/master
Commit: 6dfb3fad9a415ba3ce57273cb97771735bf88e93
Parents: 80416ac
Author: mbalassi <mb...@apache.org>
Authored: Sun Nov 23 18:13:53 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Mon Nov 24 11:33:32 2014 +0100

----------------------------------------------------------------------
 docs/streaming_guide.md                         |  41 +++++---
 .../environment/StreamExecutionEnvironment.java |  15 ++-
 .../source/SocketTextStreamFunction.java        |  27 ++++-
 .../flink-streaming-examples/pom.xml            |   9 +-
 .../socket/SocketTextStreamWordCount.java       | 101 +++++++++++++++++++
 .../wordcount/SocketTextStreamWordCount.java    | 100 ------------------
 6 files changed, 166 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6dfb3fad/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index a508b46..1c6cb2e 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -44,15 +44,14 @@ public class StreamingWordCount {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
         
         DataStream<Tuple2<String, Integer>> dataStream = env
-                .fromElements("Who's there?",
-            "I think I hear them. Stand, ho! Who's there?")
+                .socketTextStream("localhost", 9999)
                 .flatMap(new Splitter())
                 .groupBy(0)
                 .sum(1);
         
         dataStream.print();
         
-        env.execute();
+        env.execute("Socket Stream WordCount");
     }
     
     public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@@ -67,6 +66,14 @@ public class StreamingWordCount {
 }
 ~~~
 
+To run the example program start the input stream with netcat first from a terminal:
+
+~~~batch
+nc -lk 9999
+~~~
+
+The lines typed to this terminal are submitted as a source for your streaming job.
+
 [Back to top](#top)
 
 Program Skeleton
@@ -91,7 +98,7 @@ StreamExecutionEnvironment.createRemoteEnvironment(params…)
 For connecting to data streams the `StreamExecutionEnvironment` has many different methods, from basic file sources to completely general user defined data sources. We will go into details in the [basics](#basics) section.
 
 ~~~java
-env.readTextFile(filePath)
+env.socketTextStream(host, port)
 ~~~
 
 After defining the data stream sources, the user can specify transformations on the data streams to create a new data stream. Different data streams can be also combined together for joint transformations which are being showcased in the [operations](#operations) section.
@@ -106,10 +113,10 @@ The processed data can be pushed to different outputs called sinks. The user can
 dataStream.writeAsCsv(path)
 ~~~
 
-Once the complete program is specified `execute()` needs to be called on the `StreamExecutionEnvironment`. This will either execute on the local machine or submit the program for execution on a cluster, depending on the chosen execution environment.
+Once the complete program is specified `execute(programName)` is to be called on the `StreamExecutionEnvironment`. This will either execute on the local machine or submit the program for execution on a cluster, depending on the chosen execution environment.
 
 ~~~java
-env.execute()
+env.execute(programName)
 ~~~
 
 [Back to top](#top)
@@ -142,16 +149,18 @@ Usage: `operator.setParallelism(1)`
 
 ### Sources
 
-The user can connect to data streams by the different implemenations of `DataStreamSource` using methods provided in `StreamExecutionEnvironment`. There are several predefined ones similar to the ones provided by the batch API like:
+The user can connect to data streams by the different implemenations of `DataStreamSource` using methods provided by the `StreamExecutionEnvironment`. There are several predefined ones similar to the ones of the batch API and some streaming specific ones like:
 
- * `env.genereateSequence(from, to)`
- * `env.fromElements(elements…)`
- * `env.fromCollection(collection)`
- * `env.readTextFile(filepath)`
+ * `socketTextStream(hostname, port)`
+ * `readTextStream(filepath)`
+ * `genereateSequence(from, to)`
+ * `fromElements(elements…)`
+ * `fromCollection(collection)`
+ * `readTextFile(filepath)`
 
-These can be used to easily test and debug streaming programs. There are also some streaming specific sources for example `env.readTextStream(filepath)` which iterates over the same file infinitely providing yet another nice testing tool.
-There are implemented connectors for a number of the most popular message queue services, please refer to the section on [connectors](#stream-connectors) for more detail.
-Besides the pre-defined solutions the user can implement their own source by implementing the `SourceFunction` interface and using the `env.addSource(sourceFunction)` method of the `StreamExecutionEnvironment`.
+These can be used to easily test and debug streaming programs.
+There are pre-implemented connectors for a number of the most popular message queue services, please refer to the section on [connectors](#stream-connectors) for more detail.
+Besides the pre-defined solutions the user can implement their own source by implementing the `SourceFunction` interface and using the `addSource(sourceFunction)` method of the `StreamExecutionEnvironment`.
 
 ### Sinks
 
@@ -326,7 +335,7 @@ dataStream1.connect(dataStream2)
         })
 ~~~
 
-#### winddowReduceGroup on ConnectedDataStream
+#### windowReduceGroup on ConnectedDataStream
 The windowReduceGroup operator applies a user defined `CoGroupFunction` to time aligned windows of the two data streams and return zero or more elements of an arbitrary type. The user can define the window and slide intervals and can also implement custom timestamps to be used for calculating windows.
 
 ~~~java
@@ -404,6 +413,7 @@ DataStream<Integer> tail = head.map(new IterationTail());
 iteration.closeWith(tail);
 ~~~
 Or to use with output splitting:
+
 ~~~java
 SplitDataStream<Integer> tail = head.map(new IterationTail()).split(outputSelector);
 iteration.closeWith(tail.select("iterate"));
@@ -449,6 +459,7 @@ env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
 
 Most operators allow setting mutability for reading input data. If the operator is set mutable then the variable used to store input data for operators will be reused in a mutable fashion to avoid excessive object creation. By default, all operators are set to immutable.
 Usage:
+
 ~~~java
 operator.setMutability(isMutable)
 ~~~

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6dfb3fad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 82563ef..4f1efd1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -280,7 +280,20 @@ public abstract class StreamExecutionEnvironment {
 	public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter) {
 		return addSource(new SocketTextStreamFunction(hostname, port, delimiter));
 	}
-
+	
+	
+	/**
+	 * Creates a new DataStream that contains the strings received infinitely
+	 * from socket. Received strings are decoded by the system's default
+	 * character set, uses '\n' as delimiter.
+	 *
+	 * @param hostname
+	 *            The host name which a server socket bind.
+	 * @param port
+	 * 			  The port number which a server socket bind. A port number of
+	 * 			  0 means that the port number is automatically allocated.
+	 * @return A DataStream, containing the strings received from socket.
+	 */
 	public DataStreamSource<String> socketTextStream(String hostname, int port) {
 		return socketTextStream(hostname, port, '\n');
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6dfb3fad/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
index 4811da5..ac82b10 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
@@ -17,18 +17,22 @@
 
 package org.apache.flink.streaming.api.function.source;
 
-import org.apache.flink.util.Collector;
-
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
+import java.net.InetSocketAddress;
 import java.net.Socket;
 
-public class SocketTextStreamFunction implements SourceFunction<String> {
-	private static final long serialVersionUID = 1L;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
 
+public class SocketTextStreamFunction extends RichSourceFunction<String> {
+	private static final long serialVersionUID = 1L;
+	
 	private String hostname;
 	private int port;
 	private char delimiter;
+	private Socket socket;
+	private static final int CONNECTION_TIMEOUT_TIME = 0;
 
 	public SocketTextStreamFunction(String hostname, int port, char delimiter) {
 		this.hostname = hostname;
@@ -37,8 +41,15 @@ public class SocketTextStreamFunction implements SourceFunction<String> {
 	}
 
 	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		socket = new Socket();
+		
+		socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
+	}
+	
+	@Override
 	public void invoke(Collector<String> collector) throws Exception {
-		Socket socket = new Socket(hostname, port);
 		while (!socket.isClosed() && socket.isConnected()) {
 			streamFromSocket(collector, socket);
 		}
@@ -66,4 +77,10 @@ public class SocketTextStreamFunction implements SourceFunction<String> {
 			collector.collect(buffer.toString());
 		}
 	}
+
+	@Override
+	public void close() throws Exception {
+		socket.close();
+		super.close();
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6dfb3fad/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
index 075a468..1369828 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-examples/pom.xml
@@ -242,16 +242,13 @@ under the License.
 
 							<archive>
 								<manifestEntries>
-									<program-class>org.apache.flink.streaming.examples.wordcount.SocketTextStreamWordCount</program-class>
+									<program-class>org.apache.flink.streaming.examples.socket.SocketTextStreamWordCount</program-class>
 								</manifestEntries>
 							</archive>
 
 							<includes>
-								<include>org/apache/flink/streaming/examples/wordcount/SocketTextStreamWordCount.class</include>
-								<include>org/apache/flink/streaming/examples/wordcount/SocketTextStreamWordCount$*.class</include>
-								<include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include>
-								<include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include>
-								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
+								<include>org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.class</include>
+								<include>org/apache/flink/streaming/examples/wordcount/WordCount$Tokenizer.class</include>
 							</includes>
 						</configuration>
 					</execution>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6dfb3fad/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
new file mode 100644
index 0000000..ec32e9f
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.socket;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.examples.wordcount.WordCount.Tokenizer;
+
+/**
+ * This example shows an implementation of WordCount with data from socket.
+ *
+ * <p>
+ * Usage: <code>SocketTextStreamWordCount &lt;hostname&gt; &lt;port &gt; &lt;result path&gt;</code><br>
+ *
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>use StreamExecutionEnvironment.socketTextStream
+ * <li>write a simple Flink program,
+ * <li>write and use user-defined functions.
+ * </ul>
+ */
+public class SocketTextStreamWordCount {
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		// get input data
+		DataStream<String> text = env.socketTextStream(hostname, port);
+
+		DataStream<Tuple2<String, Integer>> counts =
+		// split up the lines in pairs (2-tuples) containing: (word,1)
+		text.flatMap(new Tokenizer())
+		// group by the tuple field "0" and sum up tuple field "1"
+			.groupBy(0).sum(1);
+
+		if (fileOutput) {
+			counts.writeAsText(outputPath, 1);
+		} else {
+			counts.print();
+		}
+
+		// execute program
+		env.execute("WordCount with SocketTextStream Example");
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String hostname;
+	private static int port;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			if (args.length == 3) {
+				fileOutput = true;
+				hostname = args[0];
+				port = Integer.valueOf(args[1]);
+				outputPath = args[2];
+			} else if (args.length == 2) {
+				hostname = args[0];
+				port = Integer.valueOf(args[1]);
+			} else {
+				System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> <output path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing WordCount example with data from socket.");
+			System.out.println("  Provide parameters to connect data source.");
+			System.out.println("  Usage: SocketTextStreamWordCount <hostname> <port> <output path>");
+			return false;
+		}
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6dfb3fad/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/SocketTextStreamWordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/SocketTextStreamWordCount.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/SocketTextStreamWordCount.java
deleted file mode 100644
index 85a63cc..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/SocketTextStreamWordCount.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.wordcount;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-/**
- * This example shows an implementation of WordCount with data from socket.
- *
- * <p>
- * Usage: <code>SocketTextStreamWordCount &lt;hostname&gt; &lt;port &gt; &lt;result path&gt;</code><br>
- *
- * <p>
- * This example shows how to:
- * <ul>
- * <li>use StreamExecutionEnvironment.socketTextStream
- * <li>write a simple Flink program,
- * <li>write and use user-defined functions.
- * </ul>
- */
-public class SocketTextStreamWordCount {
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		// set up the execution environment
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		// get input data
-		DataStream<String> text = env.socketTextStream(hostname, port);
-
-		DataStream<Tuple2<String, Integer>> counts =
-		// split up the lines in pairs (2-tuples) containing: (word,1)
-		text.flatMap(new WordCount.Tokenizer())
-		// group by the tuple field "0" and sum up tuple field "1"
-			.groupBy(0).sum(1);
-
-		if (fileOutput) {
-			counts.writeAsText(outputPath, 1);
-		} else {
-			counts.print();
-		}
-
-		// execute program
-		env.execute("WordCount with SocketTextStream Example");
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String hostname;
-	private static int port;
-	private static String outputPath;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			// parse input arguments
-			if (args.length == 3) {
-				fileOutput = true;
-				hostname = args[0];
-				port = Integer.valueOf(args[1]);
-				outputPath = args[2];
-			} else if (args.length == 2) {
-				hostname = args[0];
-				port = Integer.valueOf(args[1]);
-			} else {
-				System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> <output path>");
-				return false;
-			}
-		} else {
-			System.out.println("Executing WordCount example with data from socket.");
-			System.out.println("  Provide parameters to connect data source.");
-			System.out.println("  Usage: SocketTextStreamWordCount <hostname> <port> <output path>");
-			return false;
-		}
-		return true;
-	}
-}