You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/03/04 22:53:16 UTC

[3/3] flink git commit: [FLINK-1625] [streaming] Streaming cancellation minor fix and documentation

[FLINK-1625] [streaming] Streaming cancellation minor fix and documentation

This closes #449


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/08ef02eb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/08ef02eb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/08ef02eb

Branch: refs/heads/master
Commit: 08ef02ebade3016b31fe4b401e93fa0a7080147c
Parents: 8436e9c
Author: mbalassi <mb...@apache.org>
Authored: Wed Mar 4 16:27:07 2015 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Mar 4 22:38:59 2015 +0100

----------------------------------------------------------------------
 docs/streaming_guide.md                         | 12 ++++---
 .../api/function/sink/SinkFunction.java         | 15 ++++++++
 .../function/source/ParallelSourceFunction.java |  6 +++-
 .../api/function/source/SourceFunction.java     | 38 +++++++++++++++-----
 .../api/invokable/StreamInvokable.java          |  8 +++++
 5 files changed, 65 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/08ef02eb/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index 3d6b75e..0fb7dac 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -168,7 +168,11 @@ Usage: `dataStream.broadcast()`
  * *Global*: All data points end up at the same operator instance. To achieve this use the parallelism setting of the corresponding operator.
 Usage: `operator.setParallelism(1)`
 
-### Sources
+### Connecting to the outside world
+
+The user is expected to connect to the outside world through the source and the sink interfaces. We provide a `cancel()` method where allocated resources can be freed up in case some other parts of the topology failed. The `cancel()` method is called upon termination.
+
+#### Sources
 
 The user can connect to data streams by the different implementations of `SourceFunction` using `StreamExecutionEnvironment.addSource(SourceFunction)`. In contrast with other operators, DataStreamSources have a default operator parallelism of 1.
 
@@ -186,7 +190,7 @@ There are several predefined ones similar to the ones of the batch API and some
 These can be used to easily test and debug streaming programs.
 There are pre-implemented connectors for a number of the most popular message queue services, please refer to the section on [connectors](#stream-connectors) for more detail.
 
-### Sinks
+#### Sinks
 
 `DataStreamSink` represents the different outputs of a Flink Streaming program. There are several pre-defined implementations available right away:
 
@@ -495,13 +499,13 @@ Most data stream operators support directed outputs (output splitting), meaning
 
 ~~~java
 SplitDataStream<Integer> split = someDataStream.split(outputSelector);
-DataStream<Integer> even = split.select("even”);
+DataStream<Integer> even = split.select("even");
 DataStream<Integer> odd = split.select("odd");
 ~~~
 
 In the above example the data stream named ‘even’ will only contain elements that are directed to the output named “even”. The user can of course further transform these new stream by for example squaring only the even elements.
 
-Data streams only receive the elements directed to selected output names. The user can also select multiple output names by `splitStream.select(“output1”, “output2”…)`. It is common that a stream listens to all the outputs, so `split.selectAll()` provides this functionality without having to select all names.
+Data streams only receive the elements directed to selected output names. The user can also select multiple output names by `splitStream.select(“output1”, “output2”, …)`. It is common that a stream listens to all the outputs, so `split.selectAll()` provides this functionality without having to select all names.
 
 The outputs of an operator are directed by implementing a selector function (implementing the `OutputSelector` interface):
 

http://git-wip-us.apache.org/repos/asf/flink/blob/08ef02eb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
index 05ae34d..ffa5a67 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
@@ -21,10 +21,25 @@ import java.io.Serializable;
 
 import org.apache.flink.api.common.functions.Function;
 
+/**
+ * Interface for implementing user defined sink functionality.
+ *
+ * @param <IN> INput type parameter.
+ */
 public interface SinkFunction<IN> extends Function, Serializable {
 
+	/**
+	 * Function for standard sink behaviour. This function is called for every record.
+	 *
+	 * @param value The input record.
+	 * @throws Exception
+	 */
 	public void invoke(IN value) throws Exception;
 
+	/**
+	 * In case another vertex in topology fails this method is called before terminating
+	 * the sink. Make sure to free up any allocated resources here.
+	 */
 	public void cancel();
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/08ef02eb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
index 041915f..e37e851 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
@@ -17,6 +17,10 @@
 
 package org.apache.flink.streaming.api.function.source;
 
+/**
+ * {@link SourceFunction} that may be executed in parallel.
+ *
+ * @param <OUT>
+ */
 public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> {
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/08ef02eb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
index 4f579fe..af63d80 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -12,20 +12,40 @@
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
- * limitations under the License.
+ * limitations under the License.
  */
 
 package org.apache.flink.streaming.api.function.source;
 
 import java.io.Serializable;
-
+
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.util.Collector;
 
-public interface SourceFunction<OUT> extends Function, Serializable {
-
-	public void run(Collector<OUT> collector) throws Exception;
-	
-	public void cancel();
+/**
+ * Interface for implementing user defined source functionality.
+ *
+ * <p>Sources implementing this specific interface are executed with
+ * degree of parallelism 1. To execute your sources in parallel
+ * see {@link ParallelSourceFunction}.</p>
+ *
+ * @param <OUT> Output type parameter.
+ */
+public interface SourceFunction<OUT> extends Function, Serializable {
+
+	/**
+	 * Function for standard source behaviour. This function is called only once
+	 * thus to produce multiple outputs make sure to produce multiple records.
+	 *
+	 * @param collector Collector for passing output records
+	 * @throws Exception
+	 */
+	public void run(Collector<OUT> collector) throws Exception;
+
+	/**
+	 * In case another vertex in topology fails this method is called before terminating
+	 * the source. Make sure to free up any allocated resources here.
+	 */
+	public void cancel();
 		
-}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08ef02eb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index 85fb9a4..abe31d4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -114,6 +114,14 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
 				// Task already cancelled do nothing
 				return null;
 			}
+		}  catch (IllegalStateException e) {
+			if (isRunning) {
+				throw new RuntimeException("Could not read next record due to: "
+						+ StringUtils.stringifyException(e));
+			} else {
+				// Task already cancelled do nothing
+				return null;
+			}
 		}
 	}