You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/03/04 22:53:14 UTC

[1/3] flink git commit: [FLINK-1625] [streaming] Refactored StreamVertex and subclasses to clean up after invoke and properly log and propagate exceptions

Repository: flink
Updated Branches:
  refs/heads/master e1e03062c -> 08ef02eba


[FLINK-1625] [streaming] Refactored StreamVertex and subclasses to clean up after invoke and properly log and propagate exceptions


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3abd6c8f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3abd6c8f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3abd6c8f

Branch: refs/heads/master
Commit: 3abd6c8f4d45b1dc37a8d832a699a4d7fea34332
Parents: e1e0306
Author: Gyula Fora <gy...@apache.org>
Authored: Tue Mar 3 11:49:37 2015 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Mar 4 22:38:06 2015 +0100

----------------------------------------------------------------------
 .../streaming/api/collector/StreamOutput.java   |  4 ++
 .../api/streamvertex/CoStreamVertex.java        | 22 ++----
 .../api/streamvertex/InputHandler.java          |  6 ++
 .../api/streamvertex/OutputHandler.java         | 27 ++------
 .../api/streamvertex/StreamIterationHead.java   | 54 ++++++++-------
 .../api/streamvertex/StreamIterationTail.java   | 22 ++++--
 .../api/streamvertex/StreamVertex.java          | 70 ++++++++++++++------
 .../flink/streaming/io/CoRecordReader.java      | 58 ++++++++++------
 8 files changed, 157 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3abd6c8f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
index 4551c5a..a497119 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
@@ -83,4 +83,8 @@ public class StreamOutput<OUT> implements Collector<OUT> {
 		}
 	}
 
+	public void clearBuffers() {
+		output.clearBuffers();
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3abd6c8f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
index df7bcad..6957652 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.streamvertex;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.io.CoReaderIterator;
@@ -41,11 +40,9 @@ public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
 	CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>> coReader;
 	CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> coIter;
 
-	private CoInvokable<IN1, IN2, OUT> userInvokable;
 	private static int numTasks;
 
 	public CoStreamVertex() {
-		userInvokable = null;
 		numTasks = newVertex();
 		instanceID = numTasks;
 	}
@@ -66,9 +63,9 @@ public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
 	}
 
 	@Override
-	protected void setInvokable() {
-		userInvokable = configuration.getUserInvokable(userClassLoader);
-		userInvokable.setup(this);
+	public void clearBuffers() {
+		outputHandler.clearWriters();
+		coReader.clearBuffers();
 	}
 
 	protected void setConfigInputs() throws StreamVertexException {
@@ -94,21 +91,16 @@ public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
 			}
 		}
 
-		final InputGate reader1 = inputList1.size() == 1 ? inputList1.get(0)
-				: new UnionInputGate(inputList1.toArray(new InputGate[inputList1.size()]));
+		final InputGate reader1 = inputList1.size() == 1 ? inputList1.get(0) : new UnionInputGate(
+				inputList1.toArray(new InputGate[inputList1.size()]));
 
-		final InputGate reader2 = inputList2.size() == 1 ? inputList2.get(0)
-				: new UnionInputGate(inputList2.toArray(new InputGate[inputList2.size()]));
+		final InputGate reader2 = inputList2.size() == 1 ? inputList2.get(0) : new UnionInputGate(
+				inputList2.toArray(new InputGate[inputList2.size()]));
 
 		coReader = new CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>>(
 				reader1, reader2);
 	}
 
-	@Override
-	public void invoke() throws Exception {
-		outputHandler.invokeUserFunction("CO-TASK", userInvokable);
-	}
-
 	@SuppressWarnings("unchecked")
 	@Override
 	public <X> MutableObjectIterator<X> getInput(int index) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3abd6c8f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
index 73dbfce..e8a2ce1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
@@ -90,4 +90,10 @@ public class InputHandler<IN> {
 	public IndexedReaderIterator<StreamRecord<IN>> getInputIter() {
 		return inputIter;
 	}
+
+	public void clearReaders() {
+		if (inputs != null) {
+			inputs.clearBuffers();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3abd6c8f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index 911f060..359675d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -32,7 +32,6 @@ import org.apache.flink.streaming.api.collector.CollectorWrapper;
 import org.apache.flink.streaming.api.collector.DirectedCollectorWrapper;
 import org.apache.flink.streaming.api.collector.StreamOutput;
 import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.io.StreamRecordWriter;
@@ -166,8 +165,8 @@ public class OutputHandler<OUT> {
 	 *            The config of upStream task
 	 * @return
 	 */
-	private <T> StreamOutput<T> createStreamOutput(Integer outputVertex, StreamConfig configuration,
-			int outputIndex) {
+	private <T> StreamOutput<T> createStreamOutput(Integer outputVertex,
+			StreamConfig configuration, int outputIndex) {
 
 		StreamRecordSerializer<T> outSerializer = configuration
 				.getTypeSerializerOut1(vertex.userClassLoader);
@@ -218,25 +217,9 @@ public class OutputHandler<OUT> {
 		}
 	}
 
-	public void invokeUserFunction(String componentTypeName, StreamInvokable<?, OUT> userInvokable)
-			throws IOException, InterruptedException {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("{} {} invoked with instance id {}", componentTypeName, vertex.getName(),
-					vertex.getInstanceID());
+	public void clearWriters() {
+		for (StreamOutput<?> output : outputMap.values()) {
+			output.clearBuffers();
 		}
-
-		try {
-			vertex.invokeUserFunction(userInvokable);
-		} catch (Exception e) {
-			flushOutputs();
-			throw new RuntimeException(e);
-		}
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("{} {} invoke finished instance id {}", componentTypeName, vertex.getName(),
-					vertex.getInstanceID());
-		}
-
-		flushOutputs();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3abd6c8f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
index bc89241..adbdde3 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
@@ -17,8 +17,7 @@
 
 package org.apache.flink.streaming.api.streamvertex;
 
-import java.util.LinkedList;
-import java.util.List;
+import java.util.Collection;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -26,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.flink.streaming.api.collector.StreamOutput;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.io.BlockingQueueBroker;
+import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,7 +33,7 @@ public class StreamIterationHead<OUT> extends StreamVertex<OUT, OUT> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class);
 
-	private OutputHandler<OUT> outputHandler;
+	private Collection<StreamOutput<?>> outputs;
 
 	private static int numSources;
 	private Integer iterationId;
@@ -52,6 +52,7 @@ public class StreamIterationHead<OUT> extends StreamVertex<OUT, OUT> {
 	@Override
 	public void setInputsOutputs() {
 		outputHandler = new OutputHandler<OUT>(this);
+		outputs = outputHandler.getOutputs();
 
 		iterationId = configuration.getIterationId();
 		iterationWaitTime = configuration.getIterationWaitTime();
@@ -60,39 +61,46 @@ public class StreamIterationHead<OUT> extends StreamVertex<OUT, OUT> {
 		try {
 			BlockingQueueBroker.instance().handIn(iterationId.toString(), dataChannel);
 		} catch (Exception e) {
-
+			throw new RuntimeException(e);
 		}
+
 	}
 
 	@SuppressWarnings("unchecked")
 	@Override
 	public void invoke() throws Exception {
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("SOURCE {} invoked with instance id {}", getName(), getInstanceID());
-		}
-
-		StreamRecord<OUT> nextRecord;
-
-		List<StreamOutput<OUT>> outputs = new LinkedList<StreamOutput<OUT>>();
-		for (StreamOutput<?> output : outputHandler.getOutputs()) {
-			outputs.add((StreamOutput<OUT>) output);
+			LOG.debug("Iteration source {} invoked with instance id {}", getName(), getInstanceID());
 		}
 
-		while (true) {
-			if (shouldWait) {
-				nextRecord = dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS);
-			} else {
-				nextRecord = dataChannel.take();
-			}
-			if (nextRecord == null) {
-				break;
+		try {
+			StreamRecord<OUT> nextRecord;
+
+			while (true) {
+				if (shouldWait) {
+					nextRecord = dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS);
+				} else {
+					nextRecord = dataChannel.take();
+				}
+				if (nextRecord == null) {
+					break;
+				}
+				for (StreamOutput<?> output : outputs) {
+					((StreamOutput<OUT>) output).collect(nextRecord.getObject());
+				}
 			}
-			for (StreamOutput<OUT> output : outputs) {
-				output.collect(nextRecord.getObject());
+
+		} catch (Exception e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Iteration source failed due to: {}", StringUtils.stringifyException(e));
 			}
+			throw e;
+		} finally {
+			// Cleanup
+			outputHandler.flushOutputs();
+			clearBuffers();
 		}
 
-		outputHandler.flushOutputs();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/3abd6c8f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
index b3ecdf8..53e8750 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
@@ -59,13 +59,23 @@ public class StreamIterationTail<IN> extends StreamVertex<IN, IN> {
 	@Override
 	public void invoke() throws Exception {
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("SINK {} invoked", getName());
+			LOG.debug("Iteration sink {} invoked", getName());
 		}
 
-		forwardRecords();
+		try {
+			forwardRecords();
 
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("SINK {} invoke finished", getName());
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Iteration sink {} invoke finished", getName());
+			}
+		} catch (Exception e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Iteration sink failed due to: {}", StringUtils.stringifyException(e));
+			}
+			throw e;
+		} finally {
+			// Cleanup
+			clearBuffers();
 		}
 	}
 
@@ -75,12 +85,11 @@ public class StreamIterationTail<IN> extends StreamVertex<IN, IN> {
 			if (!pushToQueue(reuse)) {
 				break;
 			}
-			// TODO: Fix object reuse for iteration
 			reuse = inputHandler.getInputSerializer().createInstance();
 		}
 	}
 
-	private boolean pushToQueue(StreamRecord<IN> record) {
+	private boolean pushToQueue(StreamRecord<IN> record) throws InterruptedException {
 		try {
 			if (shouldWait) {
 				return dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS);
@@ -92,6 +101,7 @@ public class StreamIterationTail<IN> extends StreamVertex<IN, IN> {
 			if (LOG.isErrorEnabled()) {
 				LOG.error("Pushing back record at iteration %s failed due to: {}", iterationId,
 						StringUtils.stringifyException(e));
+				throw e;
 			}
 			return false;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/3abd6c8f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 5033357..bd25e72 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -30,9 +30,14 @@ import org.apache.flink.streaming.io.IndexedReaderIterator;
 import org.apache.flink.streaming.state.OperatorState;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTaskContext<OUT> {
 
+	private static final Logger LOG = LoggerFactory.getLogger(StreamVertex.class);
+
 	private static int numTasks;
 
 	protected StreamConfig configuration;
@@ -73,24 +78,6 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
 		this.context = createRuntimeContext(getEnvironment().getTaskName(), this.states);
 	}
 
-	protected <T> void invokeUserFunction(StreamInvokable<?, T> userInvokable) throws Exception {
-		userInvokable.setRuntimeContext(context);
-		userInvokable.open(getTaskConfiguration());
-
-		for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
-			invokable.setRuntimeContext(context);
-			invokable.open(getTaskConfiguration());
-		}
-
-		userInvokable.invoke();
-		userInvokable.close();
-
-		for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
-			invokable.close();
-		}
-
-	}
-
 	public void setInputsOutputs() {
 		inputHandler = new InputHandler<IN>(this);
 		outputHandler = new OutputHandler<OUT>(this);
@@ -118,7 +105,52 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
 
 	@Override
 	public void invoke() throws Exception {
-		outputHandler.invokeUserFunction("TASK", userInvokable);
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Task {} invoked with instance id {}", getName(), getInstanceID());
+		}
+
+		try {
+			userInvokable.setRuntimeContext(context);
+			userInvokable.open(getTaskConfiguration());
+
+			for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
+				invokable.setRuntimeContext(context);
+				invokable.open(getTaskConfiguration());
+			}
+
+			userInvokable.invoke();
+
+			userInvokable.close();
+
+			for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
+				invokable.close();
+			}
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Task {} invoke finished instance id {}", getName(), getInstanceID());
+			}
+
+		} catch (Exception e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("StreamInvokable failed due to: {}", StringUtils.stringifyException(e));
+			}
+			throw e;
+		} finally {
+			// Cleanup
+			outputHandler.flushOutputs();
+			clearBuffers();
+		}
+
+	}
+
+	protected void clearBuffers() {
+		if (outputHandler != null) {
+			outputHandler.clearWriters();
+		}
+		if (inputHandler != null) {
+			inputHandler.clearReaders();
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/3abd6c8f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
index 84b08f7..bb20ecb 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
 import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
@@ -36,7 +37,8 @@ import java.util.concurrent.LinkedBlockingQueue;
  * types to read records effectively.
  */
 @SuppressWarnings("rawtypes")
-public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadableWritable> extends AbstractReader implements EventListener<InputGate> {
+public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadableWritable> extends
+		AbstractReader implements EventListener<InputGate> {
 
 	private final InputGate bufferReader1;
 
@@ -63,8 +65,10 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 		this.bufferReader1 = bufferReader1;
 		this.bufferReader2 = bufferReader2;
 
-		this.reader1RecordDeserializers = new AdaptiveSpanningRecordDeserializer[bufferReader1.getNumberOfInputChannels()];
-		this.reader2RecordDeserializers = new AdaptiveSpanningRecordDeserializer[bufferReader2.getNumberOfInputChannels()];
+		this.reader1RecordDeserializers = new AdaptiveSpanningRecordDeserializer[bufferReader1
+				.getNumberOfInputChannels()];
+		this.reader2RecordDeserializers = new AdaptiveSpanningRecordDeserializer[bufferReader2
+				.getNumberOfInputChannels()];
 
 		for (int i = 0; i < reader1RecordDeserializers.length; i++) {
 			reader1RecordDeserializers[i] = new AdaptiveSpanningRecordDeserializer<T1>();
@@ -104,7 +108,8 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 			if (currentReaderIndex == 1) {
 				while (true) {
 					if (reader1currentRecordDeserializer != null) {
-						RecordDeserializer.DeserializationResult result = reader1currentRecordDeserializer.getNextRecord(target1);
+						RecordDeserializer.DeserializationResult result = reader1currentRecordDeserializer
+								.getNextRecord(target1);
 
 						if (result.isBufferConsumed()) {
 							reader1currentRecordDeserializer.getCurrentBuffer().recycle();
@@ -116,27 +121,26 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 						if (result.isFullRecord()) {
 							return 1;
 						}
-					}
-					else {
+					} else {
 
 						final BufferOrEvent boe = bufferReader1.getNextBufferOrEvent();
 
 						if (boe.isBuffer()) {
-							reader1currentRecordDeserializer = reader1RecordDeserializers[boe.getChannelIndex()];
+							reader1currentRecordDeserializer = reader1RecordDeserializers[boe
+									.getChannelIndex()];
 							reader1currentRecordDeserializer.setNextBuffer(boe.getBuffer());
-						}
-						else if (handleEvent(boe.getEvent())) {
+						} else if (handleEvent(boe.getEvent())) {
 							currentReaderIndex = 0;
 
 							break;
 						}
 					}
 				}
-			}
-			else if (currentReaderIndex == 2) {
+			} else if (currentReaderIndex == 2) {
 				while (true) {
 					if (reader2currentRecordDeserializer != null) {
-						RecordDeserializer.DeserializationResult result = reader2currentRecordDeserializer.getNextRecord(target2);
+						RecordDeserializer.DeserializationResult result = reader2currentRecordDeserializer
+								.getNextRecord(target2);
 
 						if (result.isBufferConsumed()) {
 							reader2currentRecordDeserializer.getCurrentBuffer().recycle();
@@ -148,23 +152,21 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 						if (result.isFullRecord()) {
 							return 2;
 						}
-					}
-					else {
+					} else {
 						final BufferOrEvent boe = bufferReader2.getNextBufferOrEvent();
 
 						if (boe.isBuffer()) {
-							reader2currentRecordDeserializer = reader2RecordDeserializers[boe.getChannelIndex()];
+							reader2currentRecordDeserializer = reader2RecordDeserializers[boe
+									.getChannelIndex()];
 							reader2currentRecordDeserializer.setNextBuffer(boe.getBuffer());
-						}
-						else if (handleEvent(boe.getEvent())) {
+						} else if (handleEvent(boe.getEvent())) {
 							currentReaderIndex = 0;
 
 							break;
 						}
 					}
 				}
-			}
-			else {
+			} else {
 				throw new IllegalStateException("Bug: unexpected current reader index.");
 			}
 		}
@@ -182,9 +184,23 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 	public void onEvent(InputGate bufferReader) {
 		if (bufferReader == bufferReader1) {
 			availableRecordReaders.add(1);
-		}
-		else if (bufferReader == bufferReader2) {
+		} else if (bufferReader == bufferReader2) {
 			availableRecordReaders.add(2);
 		}
 	}
+
+	public void clearBuffers() {
+		for (RecordDeserializer<?> deserializer : reader1RecordDeserializers) {
+			Buffer buffer = deserializer.getCurrentBuffer();
+			if (buffer != null && !buffer.isRecycled()) {
+				buffer.recycle();
+			}
+		}
+		for (RecordDeserializer<?> deserializer : reader2RecordDeserializers) {
+			Buffer buffer = deserializer.getCurrentBuffer();
+			if (buffer != null && !buffer.isRecycled()) {
+				buffer.recycle();
+			}
+		}
+	}
 }


[2/3] flink git commit: [FLINK-1625] [streaming] [api-breaking] Added proper cancellation to StreamInvokables + Sink- and SourceFunction interfaces extended with cancel method

Posted by mb...@apache.org.
[FLINK-1625] [streaming] [api-breaking] Added proper cancellation to StreamInvokables + Sink- and SourceFunction interfaces extended with cancel method


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8436e9ce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8436e9ce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8436e9ce

Branch: refs/heads/master
Commit: 8436e9ce31b52f1bd8c55b8e8c50cafb57cff84f
Parents: 3abd6c8
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Mar 4 10:24:00 2015 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Mar 4 22:38:41 2015 +0100

----------------------------------------------------------------------
 .../streaming/connectors/flume/FlumeSink.java   |   7 ++
 .../streaming/connectors/flume/FlumeSource.java |   6 +-
 .../connectors/kafka/KafkaProducerExample.java  |   8 +-
 .../connectors/kafka/api/KafkaSink.java         |  37 ++++---
 .../connectors/kafka/api/KafkaSource.java       |  32 ++++--
 .../streaming/connectors/rabbitmq/RMQSink.java  |   5 +
 .../connectors/rabbitmq/RMQSource.java          |  42 ++++---
 .../connectors/twitter/TwitterSource.java       |  35 +++---
 .../connectors/twitter/TwitterStreaming.java    |   4 +
 .../api/function/sink/FileSinkFunction.java     |   9 ++
 .../api/function/sink/PrintSinkFunction.java    |   8 +-
 .../api/function/sink/SinkFunction.java         |   4 +-
 .../sink/WriteSinkFunctionByMillis.java         |   5 +
 .../function/source/FileMonitoringFunction.java |  18 ++-
 .../api/function/source/FileSourceFunction.java |  28 +++--
 .../function/source/FromElementsFunction.java   |   6 +-
 .../function/source/GenSequenceFunction.java    |   6 +-
 .../source/SocketTextStreamFunction.java        | 110 ++++++++++++-------
 .../api/function/source/SourceFunction.java     |   6 +-
 .../streaming/api/invokable/SinkInvokable.java  |   8 +-
 .../api/invokable/SourceInvokable.java          |   8 +-
 .../api/invokable/StreamInvokable.java          |  12 +-
 .../invokable/operator/CounterInvokable.java    |   6 +-
 .../api/invokable/operator/FilterInvokable.java |   8 +-
 .../invokable/operator/FlatMapInvokable.java    |   8 +-
 .../api/invokable/operator/MapInvokable.java    |   8 +-
 .../invokable/operator/ProjectInvokable.java    |   2 +-
 .../operator/StreamReduceInvokable.java         |   8 +-
 .../api/invokable/operator/co/CoInvokable.java  |  17 ++-
 .../windowing/GroupedStreamDiscretizer.java     |   7 +-
 .../windowing/GroupedWindowBufferInvokable.java |   8 +-
 .../operator/windowing/StreamDiscretizer.java   |   2 +-
 .../windowing/WindowBufferInvokable.java        |  11 +-
 .../operator/windowing/WindowFlattener.java     |   8 +-
 .../operator/windowing/WindowMerger.java        |   8 +-
 .../operator/windowing/WindowPartitioner.java   |   8 +-
 .../api/streamvertex/StreamVertex.java          |   7 ++
 .../apache/flink/streaming/api/IterateTest.java |   4 +
 .../flink/streaming/api/OutputSplitterTest.java |  16 +++
 .../streaming/api/WindowCrossJoinTest.java      |   8 ++
 .../api/collector/DirectedOutputTest.java       |   4 +
 .../windowing/WindowIntegrationTest.java        |  28 +++++
 .../api/streamvertex/StreamVertexTest.java      |  15 ++-
 .../apache/flink/streaming/util/MockSource.java |   2 +-
 .../streaming/examples/join/WindowJoin.java     |  14 ++-
 .../ml/IncrementalLearningSkeleton.java         |  14 ++-
 .../examples/windowing/StockPrices.java         |  14 ++-
 .../windowing/TopSpeedWindowingExample.java     |   6 +-
 .../flink/streaming/api/scala/DataStream.scala  |   1 +
 .../api/scala/StreamExecutionEnvironment.scala  |   3 +-
 .../test/classloading/jar/StreamingProgram.java |  10 +-
 51 files changed, 481 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 8a2f2b8..86fd1b1 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -134,6 +134,13 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> {
 	}
 
 	@Override
+	public void cancel() {
+		if (client != null) {
+			client.client.close();
+		}
+	}
+
+	@Override
 	public void open(Configuration config) {
 		client = new FlinkRpcClientFacade();
 		client.init(host, port);

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
index 4f6ec2d..2a321a2 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
@@ -130,7 +130,7 @@ public class FlumeSource<OUT> extends ConnectorSource<OUT> {
 	 *            The Collector for sending data to the datastream
 	 */
 	@Override
-	public void invoke(Collector<OUT> collector) throws Exception {
+	public void run(Collector<OUT> collector) throws Exception {
 		configureAvroSource(collector);
 		avroSource.start();
 		while (!finished) {
@@ -138,4 +138,8 @@ public class FlumeSource<OUT> extends ConnectorSource<OUT> {
 		}
 	}
 
+	@Override
+	public void cancel() {
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
index 2c2bf80..1cd1192 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
@@ -41,7 +41,7 @@ public class KafkaProducerExample {
 		@SuppressWarnings({ "unused", "serial" })
 		DataStream<String> stream1 = env.addSource(new SourceFunction<String>() {
 			@Override
-			public void invoke(Collector<String> collector) throws Exception {
+			public void run(Collector<String> collector) throws Exception {
 				for (int i = 0; i < 100; i++) {
 					collector.collect("message #" + i);
 					Thread.sleep(100L);
@@ -49,6 +49,12 @@ public class KafkaProducerExample {
 
 				collector.collect(new String("q"));
 			}
+
+			@Override
+			public void cancel() {				
+			}
+			
+			
 		}).addSink(
 				new KafkaSink<String>(topic, host + ":" + port, new JavaDefaultStringSchema())
 		)

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
index 5324480..d14772b 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -33,9 +33,9 @@ import org.apache.flink.streaming.connectors.util.SerializationSchema;
 
 /**
  * Sink that emits its inputs to a Kafka topic.
- *
+ * 
  * @param <IN>
- * 		Type of the sink input
+ *            Type of the sink input
  */
 public class KafkaSink<IN> extends RichSinkFunction<IN> {
 	private static final long serialVersionUID = 1L;
@@ -49,14 +49,15 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 	private KafkaPartitioner<IN> partitioner;
 
 	/**
-	 * Creates a KafkaSink for a given topic. The partitioner distributes the messages between the partitions of the topics.
-	 *
+	 * Creates a KafkaSink for a given topic. The partitioner distributes the
+	 * messages between the partitions of the topics.
+	 * 
 	 * @param topicId
-	 * 		ID of the Kafka topic.
+	 *            ID of the Kafka topic.
 	 * @param brokerAddr
-	 * 		Address of the Kafka broker (with port number).
+	 *            Address of the Kafka broker (with port number).
 	 * @param serializationSchema
-	 * 		User defined serialization schema.
+	 *            User defined serialization schema.
 	 */
 	public KafkaSink(String topicId, String brokerAddr,
 			SerializationSchema<IN, byte[]> serializationSchema) {
@@ -64,16 +65,17 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 	}
 
 	/**
-	 * Creates a KafkaSink for a given topic. The sink produces its input into the topic.
-	 *
+	 * Creates a KafkaSink for a given topic. The sink produces its input into
+	 * the topic.
+	 * 
 	 * @param topicId
-	 * 		ID of the Kafka topic.
+	 *            ID of the Kafka topic.
 	 * @param brokerAddr
-	 * 		Address of the Kafka broker (with port number).
+	 *            Address of the Kafka broker (with port number).
 	 * @param serializationSchema
-	 * 		User defined serialization schema.
+	 *            User defined serialization schema.
 	 * @param partitioner
-	 * 		User defined partitioner.
+	 *            User defined partitioner.
 	 */
 	public KafkaSink(String topicId, String brokerAddr,
 			SerializationSchema<IN, byte[]> serializationSchema, KafkaPartitioner<IN> partitioner) {
@@ -111,9 +113,9 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 
 	/**
 	 * Called when new data arrives to the sink, and forwards it to Kafka.
-	 *
+	 * 
 	 * @param next
-	 * 		The incoming data
+	 *            The incoming data
 	 */
 	@Override
 	public void invoke(IN next) {
@@ -132,4 +134,9 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 		}
 	}
 
+	@Override
+	public void cancel() {
+		close();
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index 7a185bb..f4097e0 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -53,6 +53,8 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
 	private long zookeeperSyncTimeMillis;
 	private static final long ZOOKEEPER_DEFAULT_SYNC_TIME = 200;
 
+	private volatile boolean isRunning = false;
+
 	/**
 	 * Creates a KafkaSource that consumes a topic.
 	 * 
@@ -107,21 +109,31 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
 	 *            The Collector for sending data to the dataStream
 	 */
 	@Override
-	public void invoke(Collector<OUT> collector) throws Exception {
-
-		while (consumerIterator.hasNext()) {
-			OUT out = schema.deserialize(consumerIterator.next().message());
-			if (schema.isEndOfStream(out)) {
-				break;
+	public void run(Collector<OUT> collector) throws Exception {
+		isRunning = true;
+		try {
+			while (isRunning && consumerIterator.hasNext()) {
+				OUT out = schema.deserialize(consumerIterator.next().message());
+				if (schema.isEndOfStream(out)) {
+					break;
+				}
+				collector.collect(out);
 			}
-			collector.collect(out);
+		} finally {
+			consumer.shutdown();
 		}
-		consumer.shutdown();
-
 	}
 
 	@Override
-	public void open(Configuration config) {
+	public void open(Configuration config) throws Exception {
 		initializeConnection();
 	}
+
+	@Override
+	public void cancel() {
+		isRunning = false;
+		if (consumer != null) {
+			consumer.shutdown();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index 38c4f5f..dae9c6d 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -108,4 +108,9 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 		closeChannel();
 	}
 
+	@Override
+	public void cancel() {
+		close();
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 7ce864e..12ad3d6 100755
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -46,6 +46,8 @@ public class RMQSource<OUT> extends ConnectorSource<OUT> {
 	private transient QueueingConsumer consumer;
 	private transient QueueingConsumer.Delivery delivery;
 
+	private volatile boolean isRunning = false;
+
 	OUT out;
 
 	public RMQSource(String HOST_NAME, String QUEUE_NAME,
@@ -80,42 +82,46 @@ public class RMQSource<OUT> extends ConnectorSource<OUT> {
 	 *            The Collector for sending data to the dataStream
 	 */
 	@Override
-	public void invoke(Collector<OUT> collector) throws Exception {
-
-		while (true) {
-
-			try {
-				delivery = consumer.nextDelivery();
-			} catch (Exception e) {
-				if (LOG.isErrorEnabled()) {
-					LOG.error("Cannot recieve RMQ message {} at {}", QUEUE_NAME, HOST_NAME);
+	public void run(Collector<OUT> collector) throws Exception {
+		isRunning = true;
+		try {
+			while (isRunning) {
+
+				try {
+					delivery = consumer.nextDelivery();
+				} catch (Exception e) {
+					if (LOG.isErrorEnabled()) {
+						LOG.error("Cannot recieve RMQ message {} at {}", QUEUE_NAME, HOST_NAME);
+					}
 				}
-			}
 
-			out = schema.deserialize(delivery.getBody());
-			if (schema.isEndOfStream(out)) {
-				break;
-			} else {
-				collector.collect(out);
+				out = schema.deserialize(delivery.getBody());
+				if (schema.isEndOfStream(out)) {
+					break;
+				} else {
+					collector.collect(out);
+				}
 			}
+		} finally {
+			connection.close();
 		}
 
 	}
 
 	@Override
-	public void open(Configuration config) {
+	public void open(Configuration config) throws Exception {
 		initializeConnection();
 	}
 
 	@Override
-	public void close() {
+	public void cancel() {
+		isRunning = false;
 		try {
 			connection.close();
 		} catch (IOException e) {
 			throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME
 					+ " at " + HOST_NAME, e);
 		}
-
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
index ddb2538..740907f 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -57,6 +57,8 @@ public class TwitterSource extends RichParallelSourceFunction<String> {
 	private boolean streaming;
 	private int numberOfTweets;
 
+	private volatile boolean isRunning = false;
+
 	/**
 	 * Create {@link TwitterSource} for streaming
 	 * 
@@ -90,20 +92,20 @@ public class TwitterSource extends RichParallelSourceFunction<String> {
 	}
 
 	@Override
-	public void invoke(Collector<String> collector) throws Exception {
-
-		if (streaming) {
-			collectMessages(collector);
-		} else {
-			collectFiniteMessages(collector);
+	public void run(Collector<String> collector) throws Exception {
+		isRunning = true;
+		try {
+			if (streaming) {
+				collectMessages(collector);
+			} else {
+				collectFiniteMessages(collector);
+			}
+		} finally {
+			closeConnection();
+			isRunning = false;
 		}
 	}
 
-	@Override
-	public void close() throws Exception {
-		closeConnection();
-	}
-
 	/**
 	 * Initialize Hosebird Client to be able to consume Twitter's Streaming API
 	 */
@@ -196,7 +198,7 @@ public class TwitterSource extends RichParallelSourceFunction<String> {
 			LOG.info("Tweet-stream begins");
 		}
 
-		while (true) {
+		while (isRunning) {
 			collectOneMessage(collector);
 		}
 	}
@@ -246,7 +248,8 @@ public class TwitterSource extends RichParallelSourceFunction<String> {
 	/**
 	 * Get the size of the queue in which the tweets are contained temporarily.
 	 * 
-	 * @return the size of the queue in which the tweets are contained temporarily
+	 * @return the size of the queue in which the tweets are contained
+	 *         temporarily
 	 */
 	public int getQueueSize() {
 		return queueSize;
@@ -280,4 +283,10 @@ public class TwitterSource extends RichParallelSourceFunction<String> {
 	public void setWaitSec(int waitSec) {
 		this.waitSec = waitSec;
 	}
+
+	@Override
+	public void cancel() {
+		isRunning = false;
+		closeConnection();
+	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
index a32fe1b..9be27eb 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
@@ -47,6 +47,10 @@ public class TwitterStreaming {
 			System.out.println("");
 		}
 
+		@Override
+		public void cancel() {
+		}
+
 	}
 
 	public static class SelectDataFlatMap extends

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
index 24beba1..5468494 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/FileSinkFunction.java
@@ -115,4 +115,13 @@ public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> {
 	 */
 	protected abstract void resetParameters();
 
+	@Override
+	public void cancel() {
+		try {
+			close();
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
index d460749..0fa37ac 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/PrintSinkFunction.java
@@ -84,10 +84,9 @@ public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
 	}
 	
 	@Override
-	public void close() throws Exception {
+	public void close() {
 		this.stream = null;
 		this.prefix = null;
-		super.close();
 	}
 	
 	@Override
@@ -95,4 +94,9 @@ public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
 		return "Print to " + (target == STD_OUT ? "System.out" : "System.err");
 	}
 
+	@Override
+	public void cancel() {
+		close();
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
index 6097603..05ae34d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.functions.Function;
 
 public interface SinkFunction<IN> extends Function, Serializable {
 
-	public abstract void invoke(IN value) throws Exception;
+	public void invoke(IN value) throws Exception;
+
+	public void cancel();
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
index ee6df94..53030f4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/WriteSinkFunctionByMillis.java
@@ -47,4 +47,9 @@ public class WriteSinkFunctionByMillis<IN> extends WriteSinkFunction<IN> {
 		lastTime = System.currentTimeMillis();
 	}
 
+	@Override
+	public void cancel() {
+		// No cleanup needed
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
index 05a2489..2a84c0e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileMonitoringFunction.java
@@ -39,8 +39,10 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon
 
 	public enum WatchType {
 		ONLY_NEW_FILES, // Only new files will be processed.
-		REPROCESS_WITH_APPENDED, // When some files are appended, all contents of the files will be processed.
-		PROCESS_ONLY_APPENDED // When some files are appended, only appended contents will be processed.
+		REPROCESS_WITH_APPENDED, // When some files are appended, all contents
+									// of the files will be processed.
+		PROCESS_ONLY_APPENDED // When some files are appended, only appended
+								// contents will be processed.
 	}
 
 	private String path;
@@ -51,6 +53,8 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon
 	private Map<String, Long> offsetOfFiles;
 	private Map<String, Long> modificationTimes;
 
+	private volatile boolean isRunning = false;
+
 	public FileMonitoringFunction(String path, long interval, WatchType watchType) {
 		this.path = path;
 		this.interval = interval;
@@ -60,10 +64,11 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon
 	}
 
 	@Override
-	public void invoke(Collector<Tuple3<String, Long, Long>> collector) throws Exception {
+	public void run(Collector<Tuple3<String, Long, Long>> collector) throws Exception {
+		isRunning = true;
 		fileSystem = FileSystem.get(new URI(path));
 
-		while (true) {
+		while (isRunning) {
 			List<String> files = listNewFiles();
 			for (String filePath : files) {
 				if (watchType == WatchType.ONLY_NEW_FILES
@@ -120,4 +125,9 @@ public class FileMonitoringFunction implements SourceFunction<Tuple3<String, Lon
 			}
 		}
 	}
+
+	@Override
+	public void cancel() {
+		isRunning = false;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
index dcf67a9..d7df266 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
@@ -38,6 +38,8 @@ public class FileSourceFunction extends RichParallelSourceFunction<String> {
 
 	private TypeInformation<String> typeInfo;
 
+	private volatile boolean isRunning;
+
 	public FileSourceFunction(InputFormat<String, ?> format, TypeInformation<String> typeInfo) {
 		this.inputFormat = format;
 		this.typeInfo = typeInfo;
@@ -51,33 +53,32 @@ public class FileSourceFunction extends RichParallelSourceFunction<String> {
 	}
 
 	@Override
-	public void invoke(Collector<String> collector) throws Exception {
+	public void run(Collector<String> collector) throws Exception {
+		isRunning = true;
 		final TypeSerializer<String> serializer = typeInfo.createSerializer(getRuntimeContext()
 				.getExecutionConfig());
 		final Iterator<InputSplit> splitIterator = getInputSplits();
 		@SuppressWarnings("unchecked")
 		final InputFormat<String, InputSplit> format = (InputFormat<String, InputSplit>) this.inputFormat;
 		try {
-			while (splitIterator.hasNext()) {
+			while (isRunning && splitIterator.hasNext()) {
 
 				final InputSplit split = splitIterator.next();
 				String record = serializer.createInstance();
 
 				format.open(split);
-				try {
-					while (!format.reachedEnd()) {
-						if ((record = format.nextRecord(record)) != null) {
-							collector.collect(record);
-						}
+				while (!format.reachedEnd()) {
+					if ((record = format.nextRecord(record)) != null) {
+						collector.collect(record);
 					}
-				} finally {
-					format.close();
 				}
+
 			}
 			collector.close();
-		} catch (Exception ex) {
-			ex.printStackTrace();
+		} finally {
+			format.close();
 		}
+		isRunning = false;
 	}
 
 	private Iterator<InputSplit> getInputSplits() {
@@ -126,4 +127,9 @@ public class FileSourceFunction extends RichParallelSourceFunction<String> {
 			}
 		};
 	}
+
+	@Override
+	public void cancel() {
+		isRunning = false;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
index 8afac75..97a3a92 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FromElementsFunction.java
@@ -40,10 +40,14 @@ public class FromElementsFunction<T> implements SourceFunction<T> {
 	}
 
 	@Override
-	public void invoke(Collector<T> collector) throws Exception {
+	public void run(Collector<T> collector) throws Exception {
 		for (T element : iterable) {
 			collector.collect(element);
 		}
 	}
 
+	@Override
+	public void cancel() {
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
index 3afd06e..eccc146 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/GenSequenceFunction.java
@@ -37,7 +37,7 @@ public class GenSequenceFunction extends RichParallelSourceFunction<Long> {
 	}
 
 	@Override
-	public void invoke(Collector<Long> collector) throws Exception {
+	public void run(Collector<Long> collector) throws Exception {
 		while (splitIterator.hasNext()) {
 			collector.collect(splitIterator.next());
 		}
@@ -50,4 +50,8 @@ public class GenSequenceFunction extends RichParallelSourceFunction<Long> {
 		splitIterator = fullIterator.split(numOfSubTasks)[splitNumber];
 	}
 
+	@Override
+	public void cancel() {
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
index 3253c01..67bc128 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SocketTextStreamFunction.java
@@ -18,10 +18,12 @@
 package org.apache.flink.streaming.api.function.source;
 
 import java.io.BufferedReader;
+import java.io.IOException;
 import java.io.InputStreamReader;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.net.SocketException;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
@@ -33,7 +35,7 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
 	protected static final Logger LOG = LoggerFactory.getLogger(SocketTextStreamFunction.class);
 
 	private static final long serialVersionUID = 1L;
-	
+
 	private String hostname;
 	private int port;
 	private char delimiter;
@@ -43,6 +45,8 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
 	private static final int CONNECTION_TIMEOUT_TIME = 0;
 	private static final int CONNECTION_RETRY_SLEEP = 1000;
 
+	private volatile boolean isRunning = false;
+
 	public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxRetry) {
 		this.hostname = hostname;
 		this.port = port;
@@ -55,65 +59,91 @@ public class SocketTextStreamFunction extends RichSourceFunction<String> {
 	public void open(Configuration parameters) throws Exception {
 		super.open(parameters);
 		socket = new Socket();
-		
+
 		socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
 	}
-	
+
 	@Override
-	public void invoke(Collector<String> collector) throws Exception {
+	public void run(Collector<String> collector) throws Exception {
 		streamFromSocket(collector, socket);
 	}
 
 	public void streamFromSocket(Collector<String> collector, Socket socket) throws Exception {
-		StringBuffer buffer = new StringBuffer();
-		BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+		isRunning = true;
+		try {
+			StringBuffer buffer = new StringBuffer();
+			BufferedReader reader = new BufferedReader(new InputStreamReader(
+					socket.getInputStream()));
 
-		while (true) {
-			int data = reader.read();
-			if (data == -1) {
-				socket.close();
-				long retry = 0;
-				boolean success = false;
-				while (retry < maxRetry && !success) {
-					if (!retryForever) {
-						retry++;
+			while (isRunning) {
+				int data;
+				try {
+					data = reader.read();
+				} catch (SocketException e) {
+					if (!isRunning) {
+						break;
+					} else {
+						throw e;
 					}
-					LOG.warn("Lost connection to server socket. Retrying in " + (CONNECTION_RETRY_SLEEP / 1000) + " seconds...");
-					try {
-						socket = new Socket();
-						socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
-						success = true;
-					} catch (ConnectException ce) {
-						Thread.sleep(CONNECTION_RETRY_SLEEP);
+				}
+
+				if (data == -1) {
+					socket.close();
+					long retry = 0;
+					boolean success = false;
+					while (retry < maxRetry && !success) {
+						if (!retryForever) {
+							retry++;
+						}
+						LOG.warn("Lost connection to server socket. Retrying in "
+								+ (CONNECTION_RETRY_SLEEP / 1000) + " seconds...");
+						try {
+							socket = new Socket();
+							socket.connect(new InetSocketAddress(hostname, port),
+									CONNECTION_TIMEOUT_TIME);
+							success = true;
+						} catch (ConnectException ce) {
+							Thread.sleep(CONNECTION_RETRY_SLEEP);
+						}
 					}
+
+					if (success) {
+						LOG.info("Server socket is reconnected.");
+					} else {
+						LOG.error("Could not reconnect to server socket.");
+						break;
+					}
+					reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+					continue;
 				}
 
-				if (success) {
-					LOG.info("Server socket is reconnected.");
-				} else {
-					LOG.error("Could not reconnect to server socket.");
-					break;
+				if (data == delimiter) {
+					collector.collect(buffer.toString());
+					buffer = new StringBuffer();
+				} else if (data != '\r') { // ignore carriage return
+					buffer.append((char) data);
 				}
-				reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
-				continue;
 			}
 
-			if (data == delimiter) {
+			if (buffer.length() > 0) {
 				collector.collect(buffer.toString());
-				buffer = new StringBuffer();
-			} else if (data != '\r') { // ignore carriage return
-				buffer.append((char) data);
 			}
-		}
-
-		if (buffer.length() > 0) {
-			collector.collect(buffer.toString());
+		} finally {
+			socket.close();
 		}
 	}
 
 	@Override
-	public void close() throws Exception {
-		socket.close();
-		super.close();
+	public void cancel() {
+		isRunning = false;
+		if (socket != null && !socket.isClosed()) {
+			try {
+				socket.close();
+			} catch (IOException e) {
+				if (LOG.isErrorEnabled()) {
+					LOG.error("Could not close open socket");
+				}
+			}
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
index 917562a..4f579fe 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
@@ -24,6 +24,8 @@ import org.apache.flink.util.Collector;
 
 public interface SourceFunction<OUT> extends Function, Serializable {
 
-	public void invoke(Collector<OUT> collector) throws Exception;
+	public void run(Collector<OUT> collector) throws Exception;
+	
+	public void cancel();
 		
-}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
index 13a6ba1..35060fd 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SinkInvokable.java
@@ -31,7 +31,7 @@ public class SinkInvokable<IN> extends ChainableInvokable<IN, IN> {
 
 	@Override
 	public void invoke() throws Exception {
-		while (readNext() != null) {
+		while (isRunning && readNext() != null) {
 			callUserFunctionAndLogException();
 		}
 	}
@@ -47,4 +47,10 @@ public class SinkInvokable<IN> extends ChainableInvokable<IN, IN> {
 		callUserFunctionAndLogException();
 	}
 
+	@Override
+	public void cancel() {
+		super.cancel();
+		sinkFunction.cancel();
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
index f1cf2c5..c3f25a0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/SourceInvokable.java
@@ -39,6 +39,12 @@ public class SourceInvokable<OUT> extends StreamInvokable<OUT, OUT> implements S
 
 	@Override
 	protected void callUserFunction() throws Exception {
-		sourceFunction.invoke(collector);
+		sourceFunction.run(collector);
+	}
+
+	@Override
+	public void cancel() {
+		super.cancel();
+		sourceFunction.cancel();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index db7b642..85fb9a4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -107,7 +107,13 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
 			}
 			return nextRecord;
 		} catch (IOException e) {
-			throw new RuntimeException("Could not read next record.");
+			if (isRunning) {
+				throw new RuntimeException("Could not read next record due to: "
+						+ StringUtils.stringifyException(e));
+			} else {
+				// Task already cancelled do nothing
+				return null;
+			}
 		}
 	}
 
@@ -159,6 +165,10 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
 		}
 	}
 
+	public void cancel() {
+		isRunning = false;
+	}
+
 	public void setRuntimeContext(RuntimeContext t) {
 		FunctionUtils.setFunctionRuntimeContext(userFunction, t);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
index 3fc314c..8bb546c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java
@@ -30,14 +30,16 @@ public class CounterInvokable<IN> extends ChainableInvokable<IN, Long> {
 
 	@Override
 	public void invoke() throws Exception {
-		while (readNext() != null) {
+		while (isRunning && readNext() != null) {
 			collector.collect(++count);
 		}
 	}
 
 	@Override
 	public void collect(IN record) {
-		collector.collect(++count);
+		if (isRunning) {
+			collector.collect(++count);
+		}
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
index 0c8298e..ab3f147 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FilterInvokable.java
@@ -34,7 +34,7 @@ public class FilterInvokable<IN> extends ChainableInvokable<IN, IN> {
 
 	@Override
 	public void invoke() throws Exception {
-		while (readNext() != null) {
+		while (isRunning && readNext() != null) {
 			callUserFunctionAndLogException();
 		}
 	}
@@ -49,7 +49,9 @@ public class FilterInvokable<IN> extends ChainableInvokable<IN, IN> {
 
 	@Override
 	public void collect(IN record) {
-		nextObject = copy(record);
-		callUserFunctionAndLogException();
+		if (isRunning) {
+			nextObject = copy(record);
+			callUserFunctionAndLogException();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
index 2a4081b..025bd32 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/FlatMapInvokable.java
@@ -32,7 +32,7 @@ public class FlatMapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
 
 	@Override
 	public void invoke() throws Exception {
-		while (readNext() != null) {
+		while (isRunning && readNext() != null) {
 			callUserFunctionAndLogException();
 		}
 	}
@@ -44,8 +44,10 @@ public class FlatMapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
 
 	@Override
 	public void collect(IN record) {
-		nextObject = copy(record);
-		callUserFunctionAndLogException();
+		if (isRunning) {
+			nextObject = copy(record);
+			callUserFunctionAndLogException();
+		}
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
index 7c8e577..8fc1f13 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/MapInvokable.java
@@ -32,7 +32,7 @@ public class MapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
 
 	@Override
 	public void invoke() throws Exception {
-		while (readNext() != null) {
+		while (isRunning && readNext() != null) {
 			callUserFunctionAndLogException();
 		}
 	}
@@ -44,7 +44,9 @@ public class MapInvokable<IN, OUT> extends ChainableInvokable<IN, OUT> {
 
 	@Override
 	public void collect(IN record) {
-		nextObject = copy(record);
-		callUserFunctionAndLogException();
+		if (isRunning) {
+			nextObject = copy(record);
+			callUserFunctionAndLogException();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
index 31689c7..3e47107 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
@@ -41,7 +41,7 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN,
 
 	@Override
 	public void invoke() throws Exception {
-		while (readNext() != null) {
+		while (isRunning && readNext() != null) {
 			callUserFunctionAndLogException();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
index fe6c41a..e7fa2b1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceInvokable.java
@@ -35,7 +35,7 @@ public class StreamReduceInvokable<IN> extends ChainableInvokable<IN, IN> {
 
 	@Override
 	public void invoke() throws Exception {
-		while (readNext() != null) {
+		while (isRunning && readNext() != null) {
 			reduce();
 		}
 	}
@@ -62,8 +62,10 @@ public class StreamReduceInvokable<IN> extends ChainableInvokable<IN, IN> {
 
 	@Override
 	public void collect(IN record) {
-		nextObject = copy(record);
-		callUserFunctionAndLogException();
+		if (isRunning) {
+			nextObject = copy(record);
+			callUserFunctionAndLogException();
+		}
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
index 604873e..b41dbbb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoInvokable.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.streaming.api.invokable.operator.co;
 
+import java.io.IOException;
+
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
@@ -76,8 +78,19 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OU
 
 	@Override
 	public void invoke() throws Exception {
-		while (true) {
-			int next = recordIterator.next(reuse1, reuse2);
+		while (isRunning) {
+			int next;
+			try {
+				next = recordIterator.next(reuse1, reuse2);
+			} catch (IOException e) {
+				if (isRunning) {
+					throw e;
+				} else {
+					// Task already cancelled do nothing
+					next = 0;
+				}
+			}
+
 			if (next == 0) {
 				break;
 			} else if (next == 1) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
index 5e21a31..f14a6ae 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
@@ -60,11 +60,8 @@ public class GroupedStreamDiscretizer<IN> extends StreamDiscretizer<IN> {
 
 	@Override
 	public void invoke() throws Exception {
-		if (readNext() == null) {
-			throw new RuntimeException("DataStream must not be empty");
-		}
 
-		while (nextRecord != null) {
+		while (isRunning && readNext() != null) {
 
 			Object key = keySelector.getKey(nextObject);
 
@@ -76,8 +73,6 @@ public class GroupedStreamDiscretizer<IN> extends StreamDiscretizer<IN> {
 			}
 
 			groupDiscretizer.processRealElement(nextObject);
-
-			readNext();
 		}
 
 		for (StreamDiscretizer<IN> group : groupedDiscretizers.values()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java
index 53c87c3..2c3bd75 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java
@@ -42,7 +42,7 @@ public class GroupedWindowBufferInvokable<T> extends WindowBufferInvokable<T> {
 
 	@Override
 	public void invoke() throws Exception {
-		while (readNext() != null) {
+		while (isRunning && readNext() != null) {
 			callUserFunctionAndLogException();
 		}
 	}
@@ -64,8 +64,10 @@ public class GroupedWindowBufferInvokable<T> extends WindowBufferInvokable<T> {
 
 	@Override
 	public void collect(WindowEvent<T> record) {
-		nextObject = record;
-		callUserFunctionAndLogException();
+		if (isRunning) {
+			nextObject = record;
+			callUserFunctionAndLogException();
+		}
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
index 104196e..e668b66 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
@@ -71,7 +71,7 @@ public class StreamDiscretizer<IN> extends StreamInvokable<IN, WindowEvent<IN>>
 	public void invoke() throws Exception {
 
 		// Continuously run
-		while (readNext() != null) {
+		while (isRunning && readNext() != null) {
 			processRealElement(nextObject);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
index ea4b830..75f7d9d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowBufferInvokable.java
@@ -26,8 +26,7 @@ import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
  * This invokable flattens the results of the window transformations by
  * outputing the elements of the {@link StreamWindow} one-by-one
  */
-public class WindowBufferInvokable<T> extends
-		ChainableInvokable<WindowEvent<T>, StreamWindow<T>> {
+public class WindowBufferInvokable<T> extends ChainableInvokable<WindowEvent<T>, StreamWindow<T>> {
 
 	protected WindowBuffer<T> buffer;
 
@@ -40,7 +39,7 @@ public class WindowBufferInvokable<T> extends
 
 	@Override
 	public void invoke() throws Exception {
-		while (readNext() != null) {
+		while (isRunning && readNext() != null) {
 			callUserFunctionAndLogException();
 		}
 	}
@@ -67,8 +66,10 @@ public class WindowBufferInvokable<T> extends
 
 	@Override
 	public void collect(WindowEvent<T> record) {
-		nextObject = record;
-		callUserFunctionAndLogException();
+		if (isRunning) {
+			nextObject = record;
+			callUserFunctionAndLogException();
+		}
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
index edefeef..0ff4724 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
@@ -34,7 +34,7 @@ public class WindowFlattener<T> extends ChainableInvokable<StreamWindow<T>, T> {
 
 	@Override
 	public void invoke() throws Exception {
-		while (readNext() != null) {
+		while (isRunning && readNext() != null) {
 			callUserFunctionAndLogException();
 		}
 	}
@@ -48,8 +48,10 @@ public class WindowFlattener<T> extends ChainableInvokable<StreamWindow<T>, T> {
 
 	@Override
 	public void collect(StreamWindow<T> record) {
-		nextObject = record;
-		callUserFunctionAndLogException();
+		if (isRunning) {
+			nextObject = record;
+			callUserFunctionAndLogException();
+		}
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
index a58bb9f..f425255 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
@@ -41,7 +41,7 @@ public class WindowMerger<T> extends ChainableInvokable<StreamWindow<T>, StreamW
 
 	@Override
 	public void invoke() throws Exception {
-		while (readNext() != null) {
+		while (isRunning && readNext() != null) {
 			callUserFunctionAndLogException();
 		}
 	}
@@ -69,7 +69,9 @@ public class WindowMerger<T> extends ChainableInvokable<StreamWindow<T>, StreamW
 
 	@Override
 	public void collect(StreamWindow<T> record) {
-		nextObject = record;
-		callUserFunctionAndLogException();
+		if (isRunning) {
+			nextObject = record;
+			callUserFunctionAndLogException();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
index e010af4..846650d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
@@ -44,7 +44,7 @@ public class WindowPartitioner<T> extends ChainableInvokable<StreamWindow<T>, St
 
 	@Override
 	public void invoke() throws Exception {
-		while (readNext() != null) {
+		while (isRunning && readNext() != null) {
 			callUserFunctionAndLogException();
 		}
 	}
@@ -71,8 +71,10 @@ public class WindowPartitioner<T> extends ChainableInvokable<StreamWindow<T>, St
 
 	@Override
 	public void collect(StreamWindow<T> record) {
-		nextObject = record;
-		callUserFunctionAndLogException();
+		if (isRunning) {
+			nextObject = record;
+			callUserFunctionAndLogException();
+		}
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index bd25e72..99ca098 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -154,6 +154,13 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
 	}
 
 	@Override
+	public void cancel() {
+		if (userInvokable != null) {
+			userInvokable.cancel();
+		}
+	}
+
+	@Override
 	public StreamConfig getConfig() {
 		return configuration;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 6ad827a..92d23aa 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -69,6 +69,10 @@ public class IterateTest {
 		public void invoke(Boolean tuple) {
 		}
 
+		@Override
+		public void cancel() {
+		}
+
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
index 2486715..a214fbf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
@@ -76,6 +76,10 @@ public class OutputSplitterTest {
 			public void invoke(Integer value) {
 				splitterResult1.add(value);
 			}
+			
+			@Override
+			public void cancel() {
+			}
 		});
 
 		d1.split(new OutputSelector<Integer>() {
@@ -98,6 +102,10 @@ public class OutputSplitterTest {
 			public void invoke(Integer value) {
 				splitterResult2.add(value);
 			}
+			
+			@Override
+			public void cancel() {
+			}
 		});
 		env.execute();
 
@@ -144,6 +152,10 @@ public class OutputSplitterTest {
 			public void invoke(Integer value) {
 				splitterResult1.add(value);
 			}
+			
+			@Override
+			public void cancel() {
+			}
 		});
 
 		ds.split(new OutputSelector<Integer>() {
@@ -168,6 +180,10 @@ public class OutputSplitterTest {
 			public void invoke(Integer value) {
 				splitterResult2.add(value);
 			}
+			
+			@Override
+			public void cancel() {
+			}
 		});
 		env.execute();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
index dc4932e..e14e281 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -143,6 +143,10 @@ public class WindowCrossJoinTest implements Serializable {
 		public void invoke(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> value) {
 			joinResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(value.f0, value.f1.f0));
 		}
+		
+		@Override
+		public void cancel() {
+		}
 	}
 
 	private static class CrossResultSink implements
@@ -153,5 +157,9 @@ public class WindowCrossJoinTest implements Serializable {
 		public void invoke(Tuple2<Tuple2<Integer, String>, Tuple1<Integer>> value) {
 			crossResults.add(new Tuple2<Tuple2<Integer, String>, Integer>(value.f0, value.f1.f0));
 		}
+		
+		@Override
+		public void cancel() {
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index 38bba5e..9d166e5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -88,6 +88,10 @@ public class DirectedOutputTest {
 			outputs.put(name, new ArrayList<Long>());
 			this.list = outputs.get(name);
 		}
+
+		@Override
+		public void cancel() {
+		}
 	}
 
 	private static Map<String, List<Long>> outputs = new HashMap<String, List<Long>>();

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
index 3163c46..2ed0002 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
@@ -208,6 +208,10 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
+		@Override
+		public void cancel() {
+		}
+
 	}
 
 	@SuppressWarnings("serial")
@@ -221,6 +225,10 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
+		@Override
+		public void cancel() {
+		}
+
 	}
 
 	@SuppressWarnings("serial")
@@ -234,6 +242,10 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
+		@Override
+		public void cancel() {
+		}
+
 	}
 
 	@SuppressWarnings("serial")
@@ -247,6 +259,10 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
+		@Override
+		public void cancel() {
+		}
+
 	}
 
 	@SuppressWarnings("serial")
@@ -260,6 +276,10 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
+		@Override
+		public void cancel() {
+		}
+
 	}
 
 	@SuppressWarnings("serial")
@@ -273,6 +293,10 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
+		@Override
+		public void cancel() {
+		}
+
 	}
 
 	@SuppressWarnings("serial")
@@ -286,5 +310,9 @@ public class WindowIntegrationTest implements Serializable {
 			windows.add(value);
 		}
 
+		@Override
+		public void cancel() {
+		}
+
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
index 4f01a8b..18a36ac 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java
@@ -50,12 +50,17 @@ public class StreamVertexTest {
 		private Tuple1<Integer> tuple = new Tuple1<Integer>(0);
 
 		@Override
-		public void invoke(Collector<Tuple1<Integer>> collector) throws Exception {
+		public void run(Collector<Tuple1<Integer>> collector) throws Exception {
 			for (int i = 0; i < 10; i++) {
 				tuple.f0 = i;
 				collector.collect(tuple);
 			}
 		}
+
+		@Override
+		public void cancel() {
+			// No cleanup needed
+		}
 	}
 
 	public static class MyTask extends RichMapFunction<Tuple1<Integer>, Tuple2<Integer, Integer>> {
@@ -77,6 +82,10 @@ public class StreamVertexTest {
 			Integer v = tuple.getField(1);
 			data.put(k, v);
 		}
+
+		@Override
+		public void cancel() {
+		}
 	}
 
 	@SuppressWarnings("unused")
@@ -142,6 +151,10 @@ public class StreamVertexTest {
 		public void invoke(String value) {
 			result.add(value);
 		}
+
+		@Override
+		public void cancel() {
+		}
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
index bb92e8e..4cf02ae 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
@@ -27,7 +27,7 @@ public class MockSource<T> {
 	public static <T> List<T> createAndExecute(SourceFunction<T> source) {
 		List<T> outputs = new ArrayList<T>();
 		try {
-			source.invoke(new MockCollector<T>(outputs));
+			source.run(new MockCollector<T>(outputs));
 		} catch (Exception e) {
 			throw new RuntimeException("Cannot invoke source.", e);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index dcfed50..a5a9577 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -108,7 +108,7 @@ public class WindowJoin {
 		}
 
 		@Override
-		public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
+		public void run(Collector<Tuple2<String, Integer>> out) throws Exception {
 			while (true) {
 				outTuple.f0 = names[rand.nextInt(names.length)];
 				outTuple.f1 = rand.nextInt(GRADE_COUNT) + 1;
@@ -116,6 +116,11 @@ public class WindowJoin {
 				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
 			}
 		}
+		
+		@Override
+		public void cancel() {
+			// No cleanup needed
+		}
 	}
 
 	/**
@@ -134,7 +139,7 @@ public class WindowJoin {
 		}
 
 		@Override
-		public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception {
+		public void run(Collector<Tuple2<String, Integer>> out) throws Exception {
 			while (true) {
 				outTuple.f0 = names[rand.nextInt(names.length)];
 				outTuple.f1 = rand.nextInt(SALARY_MAX) + 1;
@@ -142,6 +147,11 @@ public class WindowJoin {
 				Thread.sleep(rand.nextInt(SLEEP_TIME) + 1);
 			}
 		}
+		
+		@Override
+		public void cancel() {
+			// No cleanup needed
+		}
 	}
 
 	public static class MyJoinFunction

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index 4cdb7c6..26895f2 100755
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -93,7 +93,7 @@ public class IncrementalLearningSkeleton {
 		private static final int NEW_DATA_SLEEP_TIME = 1000;
 
 		@Override
-		public void invoke(Collector<Integer> collector) throws Exception {
+		public void run(Collector<Integer> collector) throws Exception {
 			while (true) {
 				collector.collect(getNewData());
 			}
@@ -103,6 +103,11 @@ public class IncrementalLearningSkeleton {
 			Thread.sleep(NEW_DATA_SLEEP_TIME);
 			return 1;
 		}
+		
+		@Override
+		public void cancel() {
+			// No cleanup needed
+		}
 	}
 
 	/**
@@ -114,7 +119,7 @@ public class IncrementalLearningSkeleton {
 		private static final int TRAINING_DATA_SLEEP_TIME = 10;
 
 		@Override
-		public void invoke(Collector<Integer> collector) throws Exception {
+		public void run(Collector<Integer> collector) throws Exception {
 			while (true) {
 				collector.collect(getTrainingData());
 			}
@@ -126,6 +131,11 @@ public class IncrementalLearningSkeleton {
 			return 1;
 
 		}
+		
+		@Override
+		public void cancel() {
+			// No cleanup needed
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
index 9bf851e..ec99026 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java
@@ -267,7 +267,7 @@ public class StockPrices {
 		}
 
 		@Override
-		public void invoke(Collector<StockPrice> collector) throws Exception {
+		public void run(Collector<StockPrice> collector) throws Exception {
 			price = DEFAULT_PRICE;
 			Random random = new Random();
 
@@ -277,6 +277,11 @@ public class StockPrices {
 				Thread.sleep(random.nextInt(200));
 			}
 		}
+		
+		@Override
+		public void cancel() {
+			// No cleanup needed
+		}
 	}
 
 	public final static class WindowMean implements WindowMapFunction<StockPrice, StockPrice> {
@@ -307,7 +312,7 @@ public class StockPrices {
 		StringBuilder stringBuilder;
 
 		@Override
-		public void invoke(Collector<String> collector) throws Exception {
+		public void run(Collector<String> collector) throws Exception {
 			random = new Random();
 			stringBuilder = new StringBuilder();
 
@@ -322,6 +327,11 @@ public class StockPrices {
 			}
 
 		}
+		
+		@Override
+		public void cancel() {
+			// No cleanup needed
+		}
 	}
 
 	public static final class SendWarning implements WindowMapFunction<StockPrice, String> {

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
index 5e73fd6..311c6b2 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowingExample.java
@@ -87,7 +87,7 @@ public class TopSpeedWindowingExample {
 		}
 
 		@Override
-		public void invoke(Collector<Tuple4<Integer, Integer, Double, Long>> collector)
+		public void run(Collector<Tuple4<Integer, Integer, Double, Long>> collector)
 				throws Exception {
 
 			while (true) {
@@ -104,6 +104,10 @@ public class TopSpeedWindowingExample {
 				}
 			}
 		}
+
+		@Override
+		public void cancel() {
+		}
 	}
 
 	private static int numOfCars = 2;

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 3ab4ff1..d4df1d6 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -568,6 +568,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
     val sinkFunction = new SinkFunction[T] {
       val cleanFun = clean(fun)
       def invoke(in: T) = cleanFun(in)
+      def cancel() = {}
     }
     this.addSink(sinkFunction)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 00d3704..1212b2b 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -224,9 +224,10 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     Validate.notNull(function, "Function must not be null.")
     val sourceFunction = new SourceFunction[T] {
       val cleanFun = StreamExecutionEnvironment.clean(function)
-      override def invoke(out: Collector[T]) {
+      override def run(out: Collector[T]) {
         cleanFun(out)
       }
+      override def cancel() = {}
     }
     addSource(sourceFunction)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/8436e9ce/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
index f7f9eae..18b52c5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
@@ -18,17 +18,15 @@
 
 package org.apache.flink.test.classloading.jar;
 
+import java.util.StringTokenizer;
+
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.test.recordJobs.wordcount.WordCount;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.util.Collector;
 
-import java.util.StringTokenizer;
-
 @SuppressWarnings("serial")
 public class StreamingProgram {
 	
@@ -100,5 +98,9 @@ public class StreamingProgram {
 		@Override
 		public void invoke(Word value) throws Exception {
 		}
+
+		@Override
+		public void cancel() {
+		}
 	}
 }


[3/3] flink git commit: [FLINK-1625] [streaming] Streaming cancellation minor fix and documentation

Posted by mb...@apache.org.
[FLINK-1625] [streaming] Streaming cancellation minor fix and documentation

This closes #449


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/08ef02eb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/08ef02eb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/08ef02eb

Branch: refs/heads/master
Commit: 08ef02ebade3016b31fe4b401e93fa0a7080147c
Parents: 8436e9c
Author: mbalassi <mb...@apache.org>
Authored: Wed Mar 4 16:27:07 2015 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Wed Mar 4 22:38:59 2015 +0100

----------------------------------------------------------------------
 docs/streaming_guide.md                         | 12 ++++---
 .../api/function/sink/SinkFunction.java         | 15 ++++++++
 .../function/source/ParallelSourceFunction.java |  6 +++-
 .../api/function/source/SourceFunction.java     | 38 +++++++++++++++-----
 .../api/invokable/StreamInvokable.java          |  8 +++++
 5 files changed, 65 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/08ef02eb/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index 3d6b75e..0fb7dac 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -168,7 +168,11 @@ Usage: `dataStream.broadcast()`
  * *Global*: All data points end up at the same operator instance. To achieve this use the parallelism setting of the corresponding operator.
 Usage: `operator.setParallelism(1)`
 
-### Sources
+### Connecting to the outside world
+
+The user is expected to connect to the outside world through the source and the sink interfaces. We provide a `cancel()` method where allocated resources can be freed up in case some other parts of the topology failed. The `cancel()` method is called upon termination.
+
+#### Sources
 
 The user can connect to data streams by the different implementations of `SourceFunction` using `StreamExecutionEnvironment.addSource(SourceFunction)`. In contrast with other operators, DataStreamSources have a default operator parallelism of 1.
 
@@ -186,7 +190,7 @@ There are several predefined ones similar to the ones of the batch API and some
 These can be used to easily test and debug streaming programs.
 There are pre-implemented connectors for a number of the most popular message queue services, please refer to the section on [connectors](#stream-connectors) for more detail.
 
-### Sinks
+#### Sinks
 
 `DataStreamSink` represents the different outputs of a Flink Streaming program. There are several pre-defined implementations available right away:
 
@@ -495,13 +499,13 @@ Most data stream operators support directed outputs (output splitting), meaning
 
 ~~~java
 SplitDataStream<Integer> split = someDataStream.split(outputSelector);
-DataStream<Integer> even = split.select("even”);
+DataStream<Integer> even = split.select("even");
 DataStream<Integer> odd = split.select("odd");
 ~~~
 
 In the above example the data stream named ‘even’ will only contain elements that are directed to the output named “even”. The user can of course further transform these new stream by for example squaring only the even elements.
 
-Data streams only receive the elements directed to selected output names. The user can also select multiple output names by `splitStream.select(“output1”, “output2”…)`. It is common that a stream listens to all the outputs, so `split.selectAll()` provides this functionality without having to select all names.
+Data streams only receive the elements directed to selected output names. The user can also select multiple output names by `splitStream.select(“output1”, “output2”, …)`. It is common that a stream listens to all the outputs, so `split.selectAll()` provides this functionality without having to select all names.
 
 The outputs of an operator are directed by implementing a selector function (implementing the `OutputSelector` interface):
 

http://git-wip-us.apache.org/repos/asf/flink/blob/08ef02eb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
index 05ae34d..ffa5a67 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SinkFunction.java
@@ -21,10 +21,25 @@ import java.io.Serializable;
 
 import org.apache.flink.api.common.functions.Function;
 
+/**
+ * Interface for implementing user defined sink functionality.
+ *
+ * @param <IN> INput type parameter.
+ */
 public interface SinkFunction<IN> extends Function, Serializable {
 
+	/**
+	 * Function for standard sink behaviour. This function is called for every record.
+	 *
+	 * @param value The input record.
+	 * @throws Exception
+	 */
 	public void invoke(IN value) throws Exception;
 
+	/**
+	 * In case another vertex in topology fails this method is called before terminating
+	 * the sink. Make sure to free up any allocated resources here.
+	 */
 	public void cancel();
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/08ef02eb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
index 041915f..e37e851 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/ParallelSourceFunction.java
@@ -17,6 +17,10 @@
 
 package org.apache.flink.streaming.api.function.source;
 
+/**
+ * {@link SourceFunction} that may be executed in parallel.
+ *
+ * @param <OUT>
+ */
 public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> {
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/08ef02eb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
index 4f579fe..af63d80 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/SourceFunction.java
@@ -1,4 +1,4 @@
-/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -12,20 +12,40 @@
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
- * limitations under the License.
+ * limitations under the License.
  */
 
 package org.apache.flink.streaming.api.function.source;
 
 import java.io.Serializable;
-
+
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.util.Collector;
 
-public interface SourceFunction<OUT> extends Function, Serializable {
-
-	public void run(Collector<OUT> collector) throws Exception;
-	
-	public void cancel();
+/**
+ * Interface for implementing user defined source functionality.
+ *
+ * <p>Sources implementing this specific interface are executed with
+ * degree of parallelism 1. To execute your sources in parallel
+ * see {@link ParallelSourceFunction}.</p>
+ *
+ * @param <OUT> Output type parameter.
+ */
+public interface SourceFunction<OUT> extends Function, Serializable {
+
+	/**
+	 * Function for standard source behaviour. This function is called only once
+	 * thus to produce multiple outputs make sure to produce multiple records.
+	 *
+	 * @param collector Collector for passing output records
+	 * @throws Exception
+	 */
+	public void run(Collector<OUT> collector) throws Exception;
+
+	/**
+	 * In case another vertex in topology fails this method is called before terminating
+	 * the source. Make sure to free up any allocated resources here.
+	 */
+	public void cancel();
 		
-}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08ef02eb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index 85fb9a4..abe31d4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -114,6 +114,14 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
 				// Task already cancelled do nothing
 				return null;
 			}
+		}  catch (IllegalStateException e) {
+			if (isRunning) {
+				throw new RuntimeException("Could not read next record due to: "
+						+ StringUtils.stringifyException(e));
+			} else {
+				// Task already cancelled do nothing
+				return null;
+			}
 		}
 	}