You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/17 14:14:49 UTC

[1/6] flink git commit: [FLINK-2527] [gelly] Ensure that VertexUpdateFunction.setNewVertexValue is called at most once per updateVertex

Repository: flink
Updated Branches:
  refs/heads/master e4e44bedb -> 3fcc04ab3


[FLINK-2527] [gelly] Ensure that VertexUpdateFunction.setNewVertexValue is called at most once per updateVertex

This closes #1027


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ea0bc12
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ea0bc12
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ea0bc12

Branch: refs/heads/master
Commit: 0ea0bc12b3f8a8a82b6fca563340af547c0a02ab
Parents: e4e44be
Author: Gabor Gevay <gg...@gmail.com>
Authored: Sun Aug 16 21:40:49 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 17 11:48:17 2015 +0200

----------------------------------------------------------------------
 .../flink/graph/spargel/VertexUpdateFunction.java       | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0ea0bc12/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
index 9930b50..248925b 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
@@ -79,7 +79,7 @@ public abstract class VertexUpdateFunction<K, VV, Message> implements Serializab
 	
 	/**
 	 * This method is invoked once per vertex per superstep. It receives the current state of the vertex, as well as
-	 * the incoming messages. It may set a new vertex state via {@link #setNewVV(Object)}. If the vertex
+	 * the incoming messages. It may set a new vertex state via {@link #setNewVertexValue(Object)}. If the vertex
 	 * state is changed, it will trigger the sending of messages via the {@link MessagingFunction}.
 	 * 
 	 * @param vertex The vertex.
@@ -105,10 +105,16 @@ public abstract class VertexUpdateFunction<K, VV, Message> implements Serializab
 	
 	/**
 	 * Sets the new value of this vertex. Setting a new value triggers the sending of outgoing messages from this vertex.
+	 *
+	 * This should be called at most once per updateVertex.
 	 * 
 	 * @param newValue The new vertex value.
 	 */
 	public void setNewVertexValue(VV newValue) {
+		if(setNewVertexValueCalled) {
+			throw new IllegalStateException("setNewVertexValue should only be called at most once per updateVertex");
+		}
+		setNewVertexValueCalled = true;
 		if(isOptDegrees()) {
 			outValWithDegrees.f1.f0 = newValue;
 			outWithDegrees.collect(outValWithDegrees);
@@ -178,6 +184,8 @@ public abstract class VertexUpdateFunction<K, VV, Message> implements Serializab
 
 	private long outDegree = -1;
 
+	private boolean setNewVertexValueCalled;
+
 	void init(IterationRuntimeContext context) {
 		this.runtimeContext = context;
 	}
@@ -185,6 +193,7 @@ public abstract class VertexUpdateFunction<K, VV, Message> implements Serializab
 	void setOutput(Vertex<K, VV> outVal, Collector<Vertex<K, VV>> out) {
 		this.outVal = outVal;
 		this.out = out;
+		setNewVertexValueCalled = false;
 	}
 
 	@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -192,6 +201,7 @@ public abstract class VertexUpdateFunction<K, VV, Message> implements Serializab
 			Collector out) {
 		this.outValWithDegrees = (Vertex<K, Tuple3<VV, Long, Long>>) outVal;
 		this.outWithDegrees = out;
+		setNewVertexValueCalled = false;
 	}
 
 	/**


[5/6] flink git commit: [FLINK-2462] [streaming] Major cleanup of operator structure for exception handling and code simplication

Posted by se...@apache.org.
[FLINK-2462] [streaming] Major cleanup of operator structure for exception handling and code simplication

  - The exceptions are no longer logged by the operators themselves.
    Operators perform only cleanup in reaction to exceptions.
    Exceptions are reported only the the root Task object, which knows whether this is the first
    failure-causing exception (root cause), or is a subsequent exception, or whether the task was
    actually canceled already. In the later case, exceptions are ignored, because many
    cancellations lead to meaningless exceptions.

  - more exception in signatures, less wrapping where not needed

  - Core resource acquisition/release logic is in one streaming task, reducing code duplication

  - Guaranteed cleanup of output buffer and input buffer resources (formerly missed when other exceptions where encountered)

  - Fix mixup in instantiation of source contexts in the stream source task

  - Auto watermark generators correctly shut down their interval scheduler

  - Improve use of generics, got rid of many raw types

This closes #1017


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/92b1e471
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/92b1e471
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/92b1e471

Branch: refs/heads/master
Commit: 92b1e471d4762545637817c74f9396765984b39a
Parents: 63ee34c
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Aug 14 23:32:35 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 17 12:35:19 2015 +0200

----------------------------------------------------------------------
 .../io/network/api/writer/RecordWriter.java     |  16 +-
 .../jobgraph/tasks/AbstractInvokable.java       |   2 +-
 .../tasks/CheckpointNotificationOperator.java   |  14 +-
 .../runtime/operators/RegularPactTask.java      |  27 +-
 .../operators/testutils/MockEnvironment.java    |   9 +-
 .../operators/testutils/TaskTestBase.java       |  39 ++-
 .../flink/streaming/api/graph/StreamEdge.java   |   8 +-
 .../operators/AbstractUdfStreamOperator.java    |   4 +-
 .../flink/streaming/api/operators/Output.java   |   7 +-
 .../streaming/api/operators/StreamSource.java   |  41 ++-
 .../runtime/io/BlockingQueueBroker.java         |  22 +-
 .../runtime/io/RecordWriterFactory.java         |  52 ----
 .../runtime/io/RecordWriterOutput.java          |  64 ++--
 .../runtime/io/StreamInputProcessor.java        |  21 +-
 .../runtime/io/StreamRecordWriter.java          |  28 +-
 .../runtime/io/StreamTwoInputProcessor.java     |   9 +-
 .../streaming/runtime/io/StreamingReader.java   |   7 +-
 .../runtime/tasks/OneInputStreamTask.java       |  99 ++----
 .../streaming/runtime/tasks/OutputHandler.java  |  74 +++--
 .../runtime/tasks/SourceStreamTask.java         |  74 ++---
 .../runtime/tasks/StreamIterationHead.java      | 133 +++++----
 .../runtime/tasks/StreamIterationTail.java      |  79 ++---
 .../streaming/runtime/tasks/StreamTask.java     | 298 +++++++++++++------
 .../runtime/tasks/StreamingRuntimeContext.java  |  44 +--
 .../runtime/tasks/TwoInputStreamTask.java       | 132 +++-----
 .../api/state/StatefulOperatorTest.java         |  21 +-
 .../runtime/io/StreamRecordWriterTest.java      |   7 +-
 .../runtime/tasks/OneInputStreamTaskTest.java   |   3 -
 .../tasks/OneInputStreamTaskTestHarness.java    |   1 -
 .../runtime/tasks/StreamTaskTestHarness.java    |  56 ++--
 .../flink/streaming/util/MockContext.java       |   8 +-
 .../util/OneInputStreamOperatorTestHarness.java |   4 +-
 .../streaming/util/SourceFunctionUtil.java      |   9 +-
 .../util/StreamingMultipleProgramsTestBase.java |   2 +-
 .../util/TwoInputStreamOperatorTestHarness.java |   6 +-
 35 files changed, 692 insertions(+), 728 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 2ae61ed..17a6a18 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -187,15 +187,13 @@ public class RecordWriter<T extends IOReadableWritable> {
 	}
 
 	public void clearBuffers() {
-		if (serializers != null) {
-			for (RecordSerializer<?> s : serializers) {
-				synchronized (s) {
-					Buffer b = s.getCurrentBuffer();
-					s.clear();
-
-					if (b != null) {
-						b.recycle();
-					}
+		for (RecordSerializer<?> s : serializers) {
+			synchronized (s) {
+				Buffer b = s.getCurrentBuffer();
+				s.clear();
+
+				if (b != null) {
+					b.recycle();
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index df41672..0e0bd26 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -49,7 +49,7 @@ public abstract class AbstractInvokable {
 	/**
 	 * Must be overwritten by the concrete task to instantiate the required record reader and record writer.
 	 */
-	public abstract void registerInputOutput();
+	public abstract void registerInputOutput() throws Exception;
 
 	/**
 	 * Must be overwritten by the concrete task. This method is called by the task manager

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointNotificationOperator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointNotificationOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointNotificationOperator.java
index 90c82b7..0eb9e07 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointNotificationOperator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointNotificationOperator.java
@@ -18,8 +18,18 @@
 
 package org.apache.flink.runtime.jobgraph.tasks;
 
-
+/**
+ * This interface needs to be implemented by runtime tasks that want to be able to receive
+ * notifications about completed checkpoints.
+ */
 public interface CheckpointNotificationOperator {
-	
+
+	/**
+	 * Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received
+	 * the notification from all participating tasks.
+	 * 
+	 * @param checkpointId The ID of the checkpoint that is complete..
+	 * @throws Exception The notification method may forward its exceptions.
+	 */
 	void notifyCheckpointComplete(long checkpointId) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 873d948..3cefbba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -226,7 +226,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	 * and as a setup method on the TaskManager.
 	 */
 	@Override
-	public void registerInputOutput() {
+	public void registerInputOutput() throws Exception {
 		if (LOG.isDebugEnabled()) {
 			LOG.debug(formatLogString("Start registering input and output."));
 		}
@@ -239,26 +239,13 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 		final Class<? extends PactDriver<S, OT>> driverClass = this.config.getDriver();
 		this.driver = InstantiationUtil.instantiate(driverClass, PactDriver.class);
 
-		// initialize the readers. this is necessary for nephele to create the input gates
-		// however, this does not trigger any local processing.
-		try {
-			initInputReaders();
-			initBroadcastInputReaders();
-		} catch (Exception e) {
-			throw new RuntimeException("Initializing the input streams failed in Task " + getEnvironment().getTaskName() +
-					(e.getMessage() == null ? "." : ": " + e.getMessage()), e);
-		}
+		// initialize the readers.
+		// this does not yet trigger any stream consuming or processing.
+		initInputReaders();
+		initBroadcastInputReaders();
 
-		// initialize the writers. this is necessary for nephele to create the output gates.
-		// because in the presence of chained tasks, the tasks writers depend on the last task in the chain,
-		// we need to initialize the chained tasks as well. the chained tasks are only set up, but no work
-		// (such as setting up a sorter, etc.) starts
-		try {
-			initOutputs();
-		} catch (Exception e) {
-			throw new RuntimeException("Initializing the output handlers failed" +
-					(e.getMessage() == null ? "." : ": " + e.getMessage()), e);
-		}
+		// initialize the writers.
+		initOutputs();
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug(formatLogString("Finished registering input and output."));

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 51c7f93..5fc9bb9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -63,6 +63,8 @@ import static org.mockito.Mockito.when;
 
 public class MockEnvironment implements Environment {
 	
+	private final String taskName;
+	
 	private final MemoryManager memManager;
 
 	private final IOManager ioManager;
@@ -85,7 +87,8 @@ public class MockEnvironment implements Environment {
 
 	private final int bufferSize;
 
-	public MockEnvironment(long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
+	public MockEnvironment(String taskName, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
+		this.taskName = taskName;
 		this.jobConfiguration = new Configuration();
 		this.taskConfiguration = new Configuration();
 		this.inputs = new LinkedList<InputGate>();
@@ -214,12 +217,12 @@ public class MockEnvironment implements Environment {
 
 	@Override
 	public String getTaskName() {
-		return null;
+		return taskName;
 	}
 
 	@Override
 	public String getTaskNameWithSubtasks() {
-		return null;
+		return taskName + "(0/1)";
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
index 6ffc97b..c7be7a5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
@@ -52,13 +52,11 @@ public abstract class TaskTestBase {
 	public void initEnvironment(long memorySize, int bufferSize) {
 		this.memorySize = memorySize;
 		this.inputSplitProvider = new MockInputSplitProvider();
-		this.mockEnv = new MockEnvironment(this.memorySize, this.inputSplitProvider, bufferSize);
+		this.mockEnv = new MockEnvironment("mock task", this.memorySize, this.inputSplitProvider, bufferSize);
 	}
 
 	public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> input, int groupId) {
-		final IteratorWrappingTestSingleInputGate<Record> reader = addInput(input, groupId, true);
-
-		return reader;
+		return addInput(input, groupId, true);
 	}
 
 	public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> input, int groupId, boolean read) {
@@ -89,19 +87,32 @@ public abstract class TaskTestBase {
 		return this.mockEnv.getTaskConfiguration();
 	}
 
-	public void registerTask(AbstractInvokable task, @SuppressWarnings("rawtypes") Class<? extends PactDriver> driver, Class<? extends RichFunction> stubClass) {
+	public void registerTask(AbstractInvokable task, 
+								@SuppressWarnings("rawtypes") Class<? extends PactDriver> driver,
+								Class<? extends RichFunction> stubClass) {
+		
 		final TaskConfig config = new TaskConfig(this.mockEnv.getTaskConfiguration());
 		config.setDriver(driver);
 		config.setStubWrapper(new UserCodeClassWrapper<RichFunction>(stubClass));
 		
 		task.setEnvironment(this.mockEnv);
 
-		task.registerInputOutput();
+		try {
+			task.registerInputOutput();
+		}
+		catch (Exception e) {
+			throw new RuntimeException(e.getMessage(), e);
+		}
 	}
 
 	public void registerTask(AbstractInvokable task) {
 		task.setEnvironment(this.mockEnv);
-		task.registerInputOutput();
+		try {
+			task.registerInputOutput();
+		}
+		catch (Exception e) {
+			throw new RuntimeException(e.getMessage(), e);
+		}
 	}
 
 	public void registerFileOutputTask(AbstractInvokable outTask, Class<? extends FileOutputFormat> stubClass, String outPath) {
@@ -118,7 +129,12 @@ public abstract class TaskTestBase {
 
 		outTask.setEnvironment(this.mockEnv);
 
-		outTask.registerInputOutput();
+		try {
+			outTask.registerInputOutput();
+		}
+		catch (Exception e) {
+			throw new RuntimeException(e.getMessage(), e);
+		}
 	}
 
 	public void registerFileInputTask(AbstractInvokable inTask,
@@ -142,7 +158,12 @@ public abstract class TaskTestBase {
 
 		inTask.setEnvironment(this.mockEnv);
 
-		inTask.registerInputOutput();
+		try {
+			inTask.registerInputOutput();
+		}
+		catch (Exception e) {
+			throw new RuntimeException(e.getMessage(), e);
+		}
 	}
 
 	public MemoryManager getMemoryManager() {

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
index 47d97df..c252870 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
@@ -45,7 +45,7 @@ public class StreamEdge implements Serializable {
 	 * A list of output names that the target vertex listens to (if there is
 	 * output selection).
 	 */
-	final private List<String> selectedNames;
+	private final List<String> selectedNames;
 	private StreamPartitioner<?> outputPartitioner;
 
 	public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber,
@@ -108,11 +108,7 @@ public class StreamEdge implements Serializable {
 
 		StreamEdge that = (StreamEdge) o;
 
-		if (!edgeId.equals(that.edgeId)) {
-			return false;
-		}
-
-		return true;
+		return edgeId.equals(that.edgeId);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 585b4ce..438d529 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -98,7 +98,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serial
 	public Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> getStateSnapshotFromFunction(long checkpointId, long timestamp)
 			throws Exception {
 		// Get all the states for the operator
-		Map<String, StreamOperatorState> operatorStates = runtimeContext.getOperatorStates();
+		Map<String, StreamOperatorState<?, ?>> operatorStates = runtimeContext.getOperatorStates();
 		
 		Map<String, OperatorStateHandle> operatorStateSnapshots;
 		if (operatorStates.isEmpty()) {
@@ -108,7 +108,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serial
 			// Checkpoint the states and store the handles in a map
 			Map<String, OperatorStateHandle> snapshots = new HashMap<String, OperatorStateHandle>();
 
-			for (Entry<String, StreamOperatorState> state : operatorStates.entrySet()) {
+			for (Entry<String, StreamOperatorState<?, ?>> state : operatorStates.entrySet()) {
 				boolean isPartitioned = state.getValue() instanceof PartitionedStreamOperatorState;
 				snapshots.put(state.getKey(),
 						new OperatorStateHandle(state.getValue().snapshotState(checkpointId, timestamp),

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java
index 89d5560..b68432604 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java
@@ -25,7 +25,7 @@ import org.apache.flink.util.Collector;
  * of this interface that can be used to emit elements and other messages, such as barriers
  * and watermarks, from an operator.
  *
- * @param <T> The type of the elments that can be emitted.
+ * @param <T> The type of the elements that can be emitted.
  */
 public interface Output<T> extends Collector<T> {
 
@@ -33,9 +33,8 @@ public interface Output<T> extends Collector<T> {
 	 * Emits a {@link Watermark} from an operator. This watermark is broadcast to all downstream
 	 * operators.
 	 *
-	 * <p>
-	 * A watermark specifies that no element with a timestamp older or equal to the watermark
-	 * timestamp will be emitted in the future.
+	 * <p>A watermark specifies that no element with a timestamp older or equal to the watermark
+	 * timestamp will be emitted in the future.</p>
 	 */
 	void emitWatermark(Watermark mark);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 0cc46f5..7fad295 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -41,24 +41,23 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction
 		this.chainingStrategy = ChainingStrategy.HEAD;
 	}
 
-	public void run(final Object lockingObject, final Output<StreamRecord<T>> collector) throws Exception {
+	public void run(Object lockingObject, Output<StreamRecord<T>> collector) throws Exception {
 
-		SourceFunction.SourceContext<T> ctx = null;
+		SourceFunction.SourceContext<T> ctx;
 		if (userFunction instanceof EventTimeSourceFunction) {
 			ctx = new ManualWatermarkContext<T>(lockingObject, collector);
 		} else if (executionConfig.getAutoWatermarkInterval() > 0) {
 			ctx = new AutomaticWatermarkContext<T>(lockingObject, collector, executionConfig);
 		} else if (executionConfig.areTimestampsEnabled()) {
-			ctx = new NonTimestampContext<T>(lockingObject, collector);
-		} else {
 			ctx = new NonWatermarkContext<T>(lockingObject, collector);
+		} else {
+			ctx = new NonTimestampContext<T>(lockingObject, collector);
 		}
 
 		userFunction.run(ctx);
 	}
 
 	public void cancel() {
-
 		userFunction.cancel();
 	}
 
@@ -69,10 +68,9 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction
 	 */
 	public static class NonTimestampContext<T> implements SourceFunction.SourceContext<T> {
 
-
 		private final Object lockingObject;
 		private final Output<StreamRecord<T>> output;
-		StreamRecord<T> reuse;
+		private final StreamRecord<T> reuse;
 
 		public NonTimestampContext(Object lockingObjectParam, Output<StreamRecord<T>> outputParam) {
 			this.lockingObject = lockingObjectParam;
@@ -105,8 +103,7 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction
 		}
 
 		@Override
-		public void close() {
-		}
+		public void close() {}
 	}
 
 	/**
@@ -114,10 +111,9 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction
 	 */
 	public static class NonWatermarkContext<T> implements SourceFunction.SourceContext<T> {
 
-
 		private final Object lockingObject;
 		private final Output<StreamRecord<T>> output;
-		StreamRecord<T> reuse;
+		private final StreamRecord<T> reuse;
 
 		public NonWatermarkContext(Object lockingObjectParam, Output<StreamRecord<T>> outputParam) {
 			this.lockingObject = lockingObjectParam;
@@ -151,8 +147,7 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction
 		}
 
 		@Override
-		public void close() {
-		}
+		public void close() {}
 	}
 
 	/**
@@ -161,12 +156,13 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction
 	 */
 	public static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> {
 
-		private transient ScheduledFuture<?> watermarkTimer = null;
+		private final ScheduledExecutorService scheduleExecutor;
+		private final ScheduledFuture<?> watermarkTimer;
 		private final long watermarkInterval;
 
 		private final Object lockingObject;
 		private final Output<StreamRecord<T>> output;
-		StreamRecord<T> reuse;
+		private final StreamRecord<T> reuse;
 
 		private volatile long lastWatermarkTime;
 
@@ -179,9 +175,9 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction
 
 			watermarkInterval = executionConfig.getAutoWatermarkInterval();
 
-			ScheduledExecutorService service = Executors.newScheduledThreadPool(2);
+			scheduleExecutor = Executors.newScheduledThreadPool(1);
 
-			watermarkTimer = service.scheduleAtFixedRate(new Runnable() {
+			watermarkTimer = scheduleExecutor.scheduleAtFixedRate(new Runnable() {
 				@Override
 				public void run() {
 					long currentTime = System.currentTimeMillis();
@@ -237,9 +233,8 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction
 
 		@Override
 		public void close() {
-			if (watermarkTimer != null && !watermarkTimer.isDone()) {
-				watermarkTimer.cancel(true);
-			}
+			watermarkTimer.cancel(true);
+			scheduleExecutor.shutdownNow();
 		}
 	}
 
@@ -251,7 +246,7 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction
 
 		private final Object lockingObject;
 		private final Output<StreamRecord<T>> output;
-		StreamRecord<T> reuse;
+		private final StreamRecord<T> reuse;
 
 		public ManualWatermarkContext(Object lockingObject, Output<StreamRecord<T>> output) {
 			this.lockingObject = lockingObject;
@@ -283,8 +278,6 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction
 		}
 
 		@Override
-		public void close() {
-
-		}
+		public void close() {}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
index 9bf4eb4..be3c9af 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
@@ -20,22 +20,12 @@ package org.apache.flink.streaming.runtime.io;
 import java.util.concurrent.BlockingQueue;
 
 import org.apache.flink.runtime.iterative.concurrent.Broker;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
-@SuppressWarnings("rawtypes")
-public class BlockingQueueBroker extends Broker<BlockingQueue<StreamRecord>> {
-	/**
-	 * Singleton instance
-	 */
-	private static final BlockingQueueBroker INSTANCE = new BlockingQueueBroker();
+public class BlockingQueueBroker extends Broker<BlockingQueue<?>> {
+	
+	/** Singleton instance */
+	public static final BlockingQueueBroker INSTANCE = new BlockingQueueBroker();
 
-	private BlockingQueueBroker() {
-	}
-
-	/**
-	 * retrieve singleton instance
-	 */
-	public static Broker<BlockingQueue<StreamRecord>> instance() {
-		return INSTANCE;
-	}
+	/** Cannot instantiate */
+	private BlockingQueueBroker() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterFactory.java
deleted file mode 100644
index 3a7ba3e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterFactory.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RecordWriterFactory {
-	private static final Logger LOG = LoggerFactory.getLogger(RecordWriterFactory.class);
-
-	public static <OUT extends IOReadableWritable> RecordWriter<OUT> createRecordWriter(ResultPartitionWriter bufferWriter, ChannelSelector<OUT> channelSelector, long bufferTimeout) {
-
-		RecordWriter<OUT> output;
-
-		if (bufferTimeout >= 0) {
-			output = new StreamRecordWriter<OUT>(bufferWriter, channelSelector, bufferTimeout);
-
-			if (LOG.isTraceEnabled()) {
-				LOG.trace("StreamRecordWriter initiated with {} bufferTimeout.", bufferTimeout);
-			}
-		} else {
-			output = new RecordWriter<OUT>(bufferWriter, channelSelector);
-
-			if (LOG.isTraceEnabled()) {
-				LOG.trace("RecordWriter initiated.");
-			}
-		}
-
-		return output;
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index f0f18b1..7048464 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.runtime.io;
 
 import java.io.IOException;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
@@ -29,35 +28,38 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
  * Implementation of {@link Output} that sends data using a {@link RecordWriter}.
  */
 public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
 
-	private static final Logger LOG = LoggerFactory.getLogger(RecordWriterOutput.class);
-
-	private RecordWriter<SerializationDelegate<Object>> recordWriter;
+	private StreamRecordWriter<SerializationDelegate<Object>> recordWriter;
 	
 	private SerializationDelegate<Object> serializationDelegate;
 
+	
 	@SuppressWarnings("unchecked")
 	public RecordWriterOutput(
-			RecordWriter<?> recordWriter,
+			StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
 			TypeSerializer<OUT> outSerializer,
 			boolean enableWatermarkMultiplexing) {
-		
-		Preconditions.checkNotNull(recordWriter);
 
-		this.recordWriter = (RecordWriter<SerializationDelegate<Object>>) recordWriter;
+		checkNotNull(recordWriter);
+		
+		// generic hack: cast the writer to generic Object type so we can use it 
+		// with multiplexed records and watermarks
+		this.recordWriter = (StreamRecordWriter<SerializationDelegate<Object>>) 
+				(StreamRecordWriter<?>) recordWriter;
 
 		TypeSerializer<Object> outRecordSerializer;
 		if (enableWatermarkMultiplexing) {
 			outRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outSerializer);
 		} else {
-			outRecordSerializer = (TypeSerializer<Object>) (TypeSerializer<?>) new StreamRecordSerializer<OUT>(outSerializer);
+			outRecordSerializer = (TypeSerializer<Object>)
+					(TypeSerializer<?>) new StreamRecordSerializer<OUT>(outSerializer);
 		}
 
 		if (outSerializer != null) {
@@ -71,11 +73,9 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
 
 		try {
 			recordWriter.emit(serializationDelegate);
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Emit failed: {}", e);
-			}
-			throw new RuntimeException("Element emission failed.", e);
+		}
+		catch (Exception e) {
+			throw new RuntimeException(e.getMessage(), e);
 		}
 	}
 
@@ -85,33 +85,27 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
 		
 		try {
 			recordWriter.broadcastEmit(serializationDelegate);
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Watermark emit failed: {}", e);
-			}
-			throw new RuntimeException(e);
+		}
+		catch (Exception e) {
+			throw new RuntimeException(e.getMessage(), e);
 		}
 	}
 
+	public void broadcastEvent(AbstractEvent barrier) throws IOException, InterruptedException {
+		recordWriter.broadcastEvent(barrier);
+	}
+	
+	
+	public void flush() throws IOException {
+		recordWriter.flush();
+	}
+	
 	@Override
 	public void close() {
-		try {
-			if (recordWriter instanceof StreamRecordWriter) {
-				((StreamRecordWriter<?>) recordWriter).close();
-			} else {
-				recordWriter.flush();
-			}
-		}
-		catch (IOException e) {
-			throw new RuntimeException("Failed to flush final output", e);
-		}
+		recordWriter.close();
 	}
 
 	public void clearBuffers() {
 		recordWriter.clearBuffers();
 	}
-
-	public void broadcastEvent(AbstractEvent barrier) throws IOException, InterruptedException {
-		recordWriter.broadcastEvent(barrier);
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 4ad5b45..de021ff 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
-import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
 import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
@@ -44,9 +43,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
  *
@@ -55,11 +51,8 @@ import org.slf4j.LoggerFactory;
  * 
  * @param <IN> The type of the record that can be read with this record reader.
  */
-public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBase, StreamingReader {
-
-	@SuppressWarnings("unused")
-	private static final Logger LOG = LoggerFactory.getLogger(StreamInputProcessor.class);
-
+public class StreamInputProcessor<IN> extends AbstractReader implements StreamingReader {
+	
 	private final RecordDeserializer<DeserializationDelegate<Object>>[] recordDeserializers;
 
 	private RecordDeserializer<DeserializationDelegate<Object>> currentRecordDeserializer;
@@ -203,17 +196,17 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
 		}
 	}
 
-	public void clearBuffers() {
+	@Override
+	public void cleanup() throws IOException {
+		// clear the buffers first. this part should not ever fail
 		for (RecordDeserializer<?> deserializer : recordDeserializers) {
 			Buffer buffer = deserializer.getCurrentBuffer();
 			if (buffer != null && !buffer.isRecycled()) {
 				buffer.recycle();
 			}
 		}
-	}
-
-	@Override
-	public void cleanup() throws IOException {
+		
+		// cleanup the barrier handler resources
 		barrierHandler.cleanup();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
index 321f3b4..8dcaad8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
@@ -58,15 +58,18 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit
 		
 		super(writer, channelSelector);
 		
-		checkArgument(timeout >= 0);
+		checkArgument(timeout >= -1);
 		
-		if (timeout == 0) {
+		if (timeout == -1) {
+			flushAlways = false;
+			outputFlusher = null;
+		}
+		else if (timeout == 0) {
 			flushAlways = true;
 			outputFlusher = null;
 		}
 		else {
 			flushAlways = false;
-
 			String threadName = taskName == null ?
 								DEFAULT_OUTPUT_FLUSH_THREAD_NAME : "Output Timeout Flusher - " + taskName;
 			
@@ -94,34 +97,27 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit
 	}
 
 	/**
-	 * Closes the writer. This stops the flushing thread (if there is one) and flushes all pending outputs.
-	 * 
-	 * @throws IOException I/O errors may happen during the final flush of the buffers.
+	 * Closes the writer. This stops the flushing thread (if there is one).
 	 */
-	public void close() throws IOException {
-		// propagate exceptions
-		flush();
-		
+	public void close() {
+		// make sure we terminate the thread in any case
 		if (outputFlusher != null) {
+			outputFlusher.terminate();
 			try {
-				outputFlusher.terminate();
 				outputFlusher.join();
 			}
 			catch (InterruptedException e) {
 				// ignore on close
 			}
 		}
-
-		// final check for asynchronous errors, before we exit with a green light
-		checkErroneous();
 	}
 
 	/**
-	 * Notifies the writer that teh output flusher thread encountered an exception.
+	 * Notifies the writer that the output flusher thread encountered an exception.
 	 * 
 	 * @param t The exception to report.
 	 */
-	void notifyFlusherException(Throwable t) {
+	private void notifyFlusherException(Throwable t) {
 		if (this.flusherException == null) {
 			this.flusherException = t;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index e3d2911..e0af729 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -261,7 +261,6 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
 				}
 			}
 		}
-
 	}
 
 	@Override
@@ -271,17 +270,17 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
 		}
 	}
 
-	public void clearBuffers() {
+	@Override
+	public void cleanup() throws IOException {
+		// clear the buffers first. this part should not ever fail
 		for (RecordDeserializer<?> deserializer : recordDeserializers) {
 			Buffer buffer = deserializer.getCurrentBuffer();
 			if (buffer != null && !buffer.isRecycled()) {
 				buffer.recycle();
 			}
 		}
-	}
 
-	@Override
-	public void cleanup() throws IOException {
+		// cleanup the barrier handler resources
 		barrierHandler.cleanup();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingReader.java
index 9eb9337..793e87e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingReader.java
@@ -18,10 +18,11 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import java.io.IOException;
+import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
 
-public interface StreamingReader {
+import java.io.IOException;
 
-	public void cleanup() throws IOException;
+public interface StreamingReader extends ReaderBase {
 
+	void cleanup() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 6136f24..8ef02b2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -23,87 +23,46 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamOperator<IN, OUT>> {
 
-	private static final Logger LOG = LoggerFactory.getLogger(OneInputStreamTask.class);
-
 	private StreamInputProcessor<IN> inputProcessor;
-
-	@Override
-	public void registerInputOutput() {
-		try {
-			super.registerInputOutput();
-			
-			TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
-			int numberOfInputs = configuration.getNumberOfInputs();
 	
-			if (numberOfInputs > 0) {
-				InputGate[] inputGates = getEnvironment().getAllInputGates();
-				inputProcessor = new StreamInputProcessor<IN>(inputGates, inSerializer,
-						getCheckpointBarrierListener(), 
-						configuration.getCheckpointMode(),
-						getEnvironment().getIOManager(),
-						getExecutionConfig().areTimestampsEnabled());
-	
-				// make sure that stream tasks report their I/O statistics
-				AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
-				AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
-				inputProcessor.setReporter(reporter);
-			}
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Failed to initialize stream operator: " + e.getMessage(), e);
-		}
-	}
+	private volatile boolean running = true;
 
+	
 	@Override
-	public void invoke() throws Exception {
-		boolean operatorOpen = false;
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Task {} invoked", getName());
-		}
-
-		try {
-			openOperator();
-			operatorOpen = true;
+	public void init() throws Exception {
+		TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
+		int numberOfInputs = configuration.getNumberOfInputs();
 
-			while (inputProcessor.processInput(streamOperator)) {
-				// nothing to do, just keep processing
-			}
+		if (numberOfInputs > 0) {
+			InputGate[] inputGates = getEnvironment().getAllInputGates();
+			inputProcessor = new StreamInputProcessor<IN>(inputGates, inSerializer,
+					getCheckpointBarrierListener(), 
+					configuration.getCheckpointMode(),
+					getEnvironment().getIOManager(),
+					getExecutionConfig().areTimestampsEnabled());
 
-			closeOperator();
-			operatorOpen = false;
+			// make sure that stream tasks report their I/O statistics
+			AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
+			AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
+			inputProcessor.setReporter(reporter);
+		}
+	}
 
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Task {} invocation finished", getName());
-			}
+	@Override
+	protected void run() throws Exception {
+		while (running && inputProcessor.processInput(streamOperator));
+	}
 
-		}
-		catch (Exception e) {
-			LOG.error(getEnvironment().getTaskNameWithSubtasks() + " failed", e);
-			
-			if (operatorOpen) {
-				try {
-					closeOperator();
-				} catch (Throwable t) {
-					LOG.warn("Exception while closing operator.", t);
-				}
-			}
-			
-			throw e;
-		}
-		finally {
-			this.isRunning = false;
-			// Cleanup
-			inputProcessor.clearBuffers();
-			inputProcessor.cleanup();
-			outputHandler.flushOutputs();
-			clearBuffers();
-		}
+	@Override
+	protected void cleanup() throws Exception {
+		inputProcessor.cleanup();
+	}
 
+	@Override
+	protected void cancelTask() {
+		running = false;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
index c8fa9e3..ce659fc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
@@ -28,7 +28,6 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -40,7 +39,7 @@ import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.runtime.io.RecordWriterFactory;
+import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -218,37 +217,59 @@ public class OutputHandler<OUT> {
 
 		TypeSerializer<T> outSerializer = upStreamConfig.getTypeSerializerOut1(vertex.userClassLoader);
 
-
 		@SuppressWarnings("unchecked")
 		StreamPartitioner<T> outputPartitioner = (StreamPartitioner<T>) edge.getPartitioner();
 
 		ResultPartitionWriter bufferWriter = vertex.getEnvironment().getWriter(outputIndex);
 
-		RecordWriter<SerializationDelegate<StreamRecord<T>>> output =
-				RecordWriterFactory.createRecordWriter(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout());
+		StreamRecordWriter<SerializationDelegate<StreamRecord<T>>> output = 
+				new StreamRecordWriter<>(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout());
 
 		output.setReporter(reporter);
+		
+		RecordWriterOutput<T> streamOutput = 
+				new RecordWriterOutput<T>(output, outSerializer, vertex.getExecutionConfig().areTimestampsEnabled());
 
-		@SuppressWarnings("unchecked")
-		RecordWriterOutput<T> streamOutput = new RecordWriterOutput<T>(output, outSerializer, vertex.getExecutionConfig().areTimestampsEnabled());
-
-		if (LOG.isTraceEnabled()) {
-			LOG.trace("Partitioner set: {} with {} outputs for {}", outputPartitioner.getClass()
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Partitioner set: {} with {} outputs for {}", outputPartitioner.getClass()
 					.getSimpleName(), outputIndex, vertex.getClass().getSimpleName());
 		}
 
 		return streamOutput;
 	}
 
-	public void flushOutputs() throws IOException, InterruptedException {
+	/**
+	 * 
+	 * This method should be called before finishing the record emission, to make sure any data
+	 * that is still buffered will be sent. It also ensures that all data sending related
+	 * exceptions are recognized.
+	 * 
+	 * @throws IOException Thrown, if the buffered data cannot be pushed into the output streams.
+	 */
+	public void flushOutputs() throws IOException {
 		for (RecordWriterOutput<?> streamOutput : getOutputs()) {
-			streamOutput.close();
+			streamOutput.flush();
 		}
 	}
 
-	public void clearWriters() {
-		for (RecordWriterOutput<?> output : outputMap.values()) {
-			output.clearBuffers();
+	/**
+	 * This method releases all resources of the record writer output. It stops the output
+	 * flushing thread (if there is one) and releases all buffers currently held by the output
+	 * serializers.
+	 *
+	 * This method should never fail.
+	 */
+	public void releaseOutputs() {
+		try {
+			for (RecordWriterOutput<?> streamOutput : getOutputs()) {
+				streamOutput.close();
+			}
+		}
+		finally {
+			// make sure that we release the buffers in any case
+			for (RecordWriterOutput<?> output : getOutputs()) {
+				output.clearBuffers();
+			}
 		}
 	}
 
@@ -265,11 +286,9 @@ public class OutputHandler<OUT> {
 			try {
 				operator.getRuntimeContext().setNextInput(record);
 				operator.processElement(record);
-			} catch (Exception e) {
-				if (LOG.isErrorEnabled()) {
-					LOG.error("Could not forward element to operator.", e);
-				}
-				throw new RuntimeException(e);
+			}
+			catch (Exception e) {
+				throw new RuntimeException("Could not forward element to next operator", e);
 			}
 		}
 
@@ -279,10 +298,7 @@ public class OutputHandler<OUT> {
 				operator.processWatermark(mark);
 			}
 			catch (Exception e) {
-				if (LOG.isErrorEnabled()) {
-					LOG.error("Could not forward element to operator: {}", e);
-				}
-				throw new RuntimeException(e);
+				throw new RuntimeException("Could not forward watermark to next operator", e);
 			}
 		}
 
@@ -292,10 +308,7 @@ public class OutputHandler<OUT> {
 				operator.close();
 			}
 			catch (Exception e) {
-				if (LOG.isErrorEnabled()) {
-					LOG.error("Could not forward close call to operator.", e);
-				}
-				throw new RuntimeException(e);
+				throw new RuntimeException("Could not close() call to next operator", e);
 			}
 		}
 	}
@@ -316,10 +329,7 @@ public class OutputHandler<OUT> {
 				operator.processElement(serializer.copy(record));
 			}
 			catch (Exception e) {
-				if (LOG.isErrorEnabled()) {
-					LOG.error("Could not forward element to operator.", e);
-				}
-				throw new RuntimeException(e);
+				throw new RuntimeException("Could not forward element to next operator", e);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 4b25577..e704e32 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -22,8 +22,6 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Task for executing streaming sources.
@@ -39,61 +37,40 @@ import org.slf4j.LoggerFactory;
  */
 public class SourceStreamTask<OUT> extends StreamTask<OUT, StreamSource<OUT>> {
 
-	private static final Logger LOG = LoggerFactory.getLogger(SourceStreamTask.class);
-
 	@Override
-	public void invoke() throws Exception {
-		final SourceOutput<StreamRecord<OUT>> output = new SourceOutput<StreamRecord<OUT>>(outputHandler.getOutput(), checkpointLock);
-
-		boolean operatorOpen = false;
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Task {} invoked", getName());
-		}
-
-		try {
-			openOperator();
-			operatorOpen = true;
-
-			streamOperator.run(checkpointLock, output);
-
-			closeOperator();
-			operatorOpen = false;
-
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Task {} invocation finished", getName());
-			}
-
-		}
-		catch (Exception e) {
-			LOG.error(getEnvironment().getTaskNameWithSubtasks() + " failed", e);
-
-			if (operatorOpen) {
-				try {
-					closeOperator();
-				}
-				catch (Throwable t) {
-					LOG.warn("Exception while closing operator.", t);
-				}
-			}
-			throw e;
-		}
-		finally {
-			this.isRunning = false;
-			// Cleanup
-			outputHandler.flushOutputs();
-			clearBuffers();
-		}
+	protected void init() {
+		// does not hold any resources, so no initialization needed
+	}
 
+	@Override
+	protected void cleanup() {
+		// does not hold any resources, so no cleanup needed
 	}
+	
 
 	@Override
-	public void cancel() {
-		super.cancel();
+	protected void run() throws Exception {
+		final Object checkpointLock = getCheckpointLock();
+		
+		final SourceOutput<StreamRecord<OUT>> output = 
+				new SourceOutput<StreamRecord<OUT>>(outputHandler.getOutput(), checkpointLock);
+		
+		streamOperator.run(checkpointLock, output);
+	}
+	
+	@Override
+	protected void cancelTask() throws Exception {
 		streamOperator.cancel();
 	}
 
+	// ------------------------------------------------------------------------
+	
+	// TODO:
+	// does this help with anything? The losk should be already held by the source function that
+	// emits. If that one does not hold the lock, then this does not help either.
+	
 	private static class SourceOutput<T> implements Output<T> {
+		
 		private final Output<T> output;
 		private final Object lockObject;
 
@@ -114,7 +91,6 @@ public class SourceStreamTask<OUT> extends StreamTask<OUT, StreamSource<OUT>> {
 			synchronized (lockObject) {
 				output.collect(record);
 			}
-
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index 2911f44..2ad2d2d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -18,13 +18,11 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import java.util.Collection;
-import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -36,76 +34,91 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class);
 
+	private volatile boolean running = true;
 
-	@SuppressWarnings("rawtypes")
-	private BlockingQueue<StreamRecord> dataChannel;
-	private long iterationWaitTime;
-	private boolean shouldWait;
-
-	@SuppressWarnings("rawtypes")
-	public StreamIterationHead() {
-		dataChannel = new ArrayBlockingQueue<StreamRecord>(1);
-	}
-
+	// ------------------------------------------------------------------------
+	
 	@Override
-	public void registerInputOutput() {
-		super.registerInputOutput();
-
-		final AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
-		Map<String, Accumulator<?, ?>> accumulatorMap = registry.getUserMap();
-
-		outputHandler = new OutputHandler<OUT>(this, accumulatorMap, outputHandler.reporter);
-
-		String iterationId = configuration.getIterationId();
-		iterationWaitTime = configuration.getIterationWaitTime();
-		shouldWait = iterationWaitTime > 0;
-
-		try {
-			BlockingQueueBroker.instance().handIn(iterationId+"-" 
-					+getEnvironment().getIndexInSubtaskGroup(), dataChannel);
-		} catch (Exception e) {
-			throw new RuntimeException(e);
+	protected void run() throws Exception {
+		
+		final String iterationId = configuration.getIterationId();
+		if (iterationId == null || iterationId.length() == 0) {
+			throw new Exception("Missing iteration ID in the task configuration");
 		}
+		
+		final String brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId ,
+				getEnvironment().getIndexInSubtaskGroup());
+		
+		final long iterationWaitTime = configuration.getIterationWaitTime();
+		final boolean shouldWait = iterationWaitTime > 0;
 
-	}
+		final BlockingQueue<StreamRecord<OUT>> dataChannel = new ArrayBlockingQueue<StreamRecord<OUT>>(1);
 
-	@SuppressWarnings("unchecked")
-	@Override
-	public void invoke() throws Exception {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Iteration source {} invoked", getName());
-		}
-
-		Collection<RecordWriterOutput<?>> outputs = outputHandler.getOutputs();
+		// offer the queue for the tail
+		BlockingQueueBroker.INSTANCE.handIn(brokerID, dataChannel);
+		LOG.info("Iteration head {} added feedback queue under {}", getName(), brokerID);
 
+		// do the work 
 		try {
-			StreamRecord<OUT> nextRecord;
-
-			while (true) {
-				if (shouldWait) {
-					nextRecord = dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS);
-				} else {
-					nextRecord = dataChannel.take();
+			@SuppressWarnings("unchecked")
+			Collection<RecordWriterOutput<OUT>> outputs = 
+					(Collection<RecordWriterOutput<OUT>>) (Collection<?>) outputHandler.getOutputs();
+
+			while (running) {
+				StreamRecord<OUT> nextRecord = shouldWait ?
+					dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS) :
+					dataChannel.take();
+
+				if (nextRecord != null) {
+					for (RecordWriterOutput<OUT> output : outputs) {
+						output.collect(nextRecord);
+					}
 				}
-				if (nextRecord == null) {
+				else {
+					// done
 					break;
 				}
-				for (RecordWriterOutput<?> output : outputs) {
-					((RecordWriterOutput<OUT>) output).collect(nextRecord);
-				}
 			}
-
-		}
-		catch (Exception e) {
-			LOG.error("Iteration Head " + getEnvironment().getTaskNameWithSubtasks() + " failed", e);
-			
-			throw e;
 		}
 		finally {
-			// Cleanup
-			isRunning = false;
-			outputHandler.flushOutputs();
-			clearBuffers();
+			// make sure that we remove the queue from the broker, to prevent a resource leak
+			BlockingQueueBroker.INSTANCE.remove(brokerID);
+			LOG.info("Iteration head {} removed feedback queue under {}", getName(), brokerID);
 		}
 	}
+
+	@Override
+	protected void cancelTask() {
+		running = false;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void init() {
+		// does not hold any resources, no initialization necessary
+	}
+
+	@Override
+	protected void cleanup() throws Exception {
+		// does not hold any resources, no cleanup necessary
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates the identification string with which head and tail task find the shared blocking
+	 * queue for the back channel. The identification string is unique per parallel head/tail pair
+	 * per iteration per job.
+	 * 
+	 * @param jid The job ID.
+	 * @param iterationID The id of the iteration in the job.
+	 * @param subtaskIndex The parallel subtask number
+	 * @return The identification string.
+	 */
+	public static String createBrokerIdString(JobID jid, String iterationID, int subtaskIndex) {
+		return jid + "-" + iterationID + "-" + subtaskIndex;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
index 9fbc3a7..fdce52d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
@@ -25,7 +25,6 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,57 +32,61 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamIterationTail.class);
 
-	private String iterationId;
-
-	@SuppressWarnings("rawtypes")
-	private BlockingQueue<StreamRecord> dataChannel;
-	private long iterationWaitTime;
-	private boolean shouldWait;
+	@Override
+	public void init() throws Exception {
+		super.init();
+		
+		final String iterationId = configuration.getIterationId();
+		if (iterationId == null || iterationId.length() == 0) {
+			throw new Exception("Missing iteration ID in the task configuration");
+		}
 
-	public StreamIterationTail() {
-	}
+		final String brokerID = StreamIterationHead.createBrokerIdString(getEnvironment().getJobID(), iterationId,
+				getEnvironment().getIndexInSubtaskGroup());
 
-	@Override
-	public void registerInputOutput() {
-		super.registerInputOutput();
+		final long iterationWaitTime = configuration.getIterationWaitTime();
 
-		try {
-			iterationId = configuration.getIterationId();
-			iterationWaitTime = configuration.getIterationWaitTime();
-			shouldWait = iterationWaitTime > 0;
-			dataChannel = BlockingQueueBroker.instance().get(iterationId+"-"
-					+getEnvironment().getIndexInSubtaskGroup());
-		} catch (Exception e) {
-			throw new StreamTaskException(String.format(
-					"Cannot register inputs of StreamIterationSink %s", iterationId), e);
-		}
-		this.streamOperator = new RecordPusher();
+		LOG.info("Iteration tail {} trying to acquire feedback queue under {}", getName(), brokerID);
+		
+		@SuppressWarnings("unchecked")
+		BlockingQueue<StreamRecord<IN>> dataChannel =
+				(BlockingQueue<StreamRecord<IN>>) BlockingQueueBroker.INSTANCE.get(brokerID);
+		
+		LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID);
+		
+		this.streamOperator = new RecordPusher<>(dataChannel, iterationWaitTime);
 	}
 
-	class RecordPusher extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
+	private static class RecordPusher<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
+		
 		private static final long serialVersionUID = 1L;
 
+		@SuppressWarnings("NonSerializableFieldInSerializableClass")
+		private final BlockingQueue<StreamRecord<IN>> dataChannel;
+		
+		private final long iterationWaitTime;
+		
+		private final boolean shouldWait;
+
+		RecordPusher(BlockingQueue<StreamRecord<IN>> dataChannel, long iterationWaitTime) {
+			this.dataChannel = dataChannel;
+			this.iterationWaitTime = iterationWaitTime;
+			this.shouldWait =  iterationWaitTime > 0;
+		}
+
 		@Override
 		public void processElement(StreamRecord<IN> record) throws Exception {
-			try {
-				if (shouldWait) {
-					dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS);
-				} else {
-					dataChannel.put(record);
-				}
-			} catch (InterruptedException e) {
-				if (LOG.isErrorEnabled()) {
-					LOG.error("Pushing back record at iteration %s failed due to: {}", iterationId,
-							StringUtils.stringifyException(e));
-				}
-				throw e;
+			if (shouldWait) {
+				dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS);
+			}
+			else {
+				dataChannel.put(record);
 			}
 		}
 
 		@Override
-		public void processWatermark(Watermark mark) throws Exception {
+		public void processWatermark(Watermark mark) {
 			// ignore
 		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 88813d0..8a5f741 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
@@ -25,13 +24,13 @@ import java.util.Map;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.functors.NotNullPredicate;
+
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointNotificationOperator;
@@ -47,52 +46,96 @@ import org.apache.flink.streaming.api.operators.StatefulStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.state.OperatorStateHandle;
 import org.apache.flink.streaming.api.state.WrapperStateHandle;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
+/**
+ * 
+ * <pre>
+ *     
+ *  -- registerInputOutput()
+ *         |
+ *         +----> Create basic utils (config, etc) and load operators
+ *         +----> operator specific init()
+ *  
+ *  -- restoreState()
+ *  
+ *  -- invoke()
+ *        |
+ *        +----> open operators()
+ *        +----> run()
+ *        +----> close operators()
+ *        +----> common cleanup
+ *        +----> operator specific cleanup()
+ * </pre>
+ * 
+ * @param <OUT>
+ * @param <O>
+ */
 public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends AbstractInvokable implements
 		OperatorStateCarrier<StateHandle<Serializable>>, CheckpointedOperator, CheckpointNotificationOperator {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
 
-	protected final Object checkpointLock = new Object();
+	
+	private final Object checkpointLock = new Object();
+
+	private final EventListener<CheckpointBarrier> checkpointBarrierListener;
+	
+	protected final List<StreamingRuntimeContext> contexts;
 
+	protected StreamingRuntimeContext headContext;
+	
 	protected StreamConfig configuration;
 
+	protected ClassLoader userClassLoader;
+	
 	protected OutputHandler<OUT> outputHandler;
 
 	protected O streamOperator;
 
 	protected boolean hasChainedOperators;
 
-	// needs to be initialized to true, so that early cancel() before invoke() behaves correctly
-	protected volatile boolean isRunning = true;
-
-	protected List<StreamingRuntimeContext> contexts;
-
-	protected StreamingRuntimeContext headContext;
-
-	protected ClassLoader userClassLoader;
+	/** Flag to mark the task "in operation", in which case check
+	 * needs to be initialized to true, so that early cancel() before invoke() behaves correctly */
+	private volatile boolean isRunning;
+	
+	// ------------------------------------------------------------------------
 	
-	private EventListener<CheckpointBarrier> checkpointBarrierListener;
-
 	public StreamTask() {
-		streamOperator = null;
 		checkpointBarrierListener = new CheckpointBarrierListener();
 		contexts = new ArrayList<StreamingRuntimeContext>();
 	}
 
+	// ------------------------------------------------------------------------
+	//  Life cycle methods for specific implementations
+	// ------------------------------------------------------------------------
+
+	protected abstract void init() throws Exception;
+	
+	protected abstract void run() throws Exception;
+	
+	protected abstract void cleanup() throws Exception;
+	
+	protected abstract void cancelTask() throws Exception;
+
+	// ------------------------------------------------------------------------
+	//  Core work methods of the Stream Task
+	// ------------------------------------------------------------------------
+	
 	@Override
-	public void registerInputOutput() {
-		this.userClassLoader = getUserCodeClassLoader();
-		this.configuration = new StreamConfig(getTaskConfiguration());
+	public final void registerInputOutput() throws Exception {
+		LOG.debug("Begin initialization for {}", getName());
+		
+		userClassLoader = getUserCodeClassLoader();
+		configuration = new StreamConfig(getTaskConfiguration());
 
 		streamOperator = configuration.getStreamOperator(userClassLoader);
 
 		// Create and register Accumulators
-		Environment env = getEnvironment();
-		AccumulatorRegistry accumulatorRegistry = env.getAccumulatorRegistry();
+		AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry();
 		Map<String, Accumulator<?, ?>> accumulatorMap = accumulatorRegistry.getUserMap();
 		AccumulatorRegistry.Reporter reporter = accumulatorRegistry.getReadWriteReporter();
 
@@ -108,69 +151,76 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 		}
 
 		hasChainedOperators = outputHandler.getChainedOperators().size() != 1;
+		
+		// operator specific initialization
+		init();
+		
+		LOG.debug("Finish initialization for {}", getName());
 	}
+	
+	@Override
+	public final void invoke() throws Exception {
+		LOG.debug("Invoking {}", getName());
+		
+		boolean operatorOpen = false;
+		try {
+			openAllOperators();
+			operatorOpen = true;
+
+			// let the task do its work
+			isRunning = true;
+			run();
+			
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Finished task {}", getName());
+			}
 
-	public String getName() {
-		return getEnvironment().getTaskName();
-	}
-
-	public StreamingRuntimeContext createRuntimeContext(StreamConfig conf, Map<String, Accumulator<?,?>> accumulatorMap) {
-		Environment env = getEnvironment();
-		String operatorName = conf.getStreamOperator(userClassLoader).getClass().getSimpleName();
-
-		KeySelector<?,Serializable> statePartitioner = conf.getStatePartitioner(userClassLoader);
-
-		return new StreamingRuntimeContext(operatorName, env, getUserCodeClassLoader(),
-				getExecutionConfig(), statePartitioner, getStateHandleProvider(), accumulatorMap);
-	}
-
-	private StateHandleProvider<Serializable> getStateHandleProvider() {
-
-		StateHandleProvider<Serializable> provider = configuration
-				.getStateHandleProvider(userClassLoader);
-
-		// If the user did not specify a provider in the program we try to get it from the config
-		if (provider == null) {
-			String backendName = GlobalConfiguration.getString(ConfigConstants.STATE_BACKEND,
-					ConfigConstants.DEFAULT_STATE_BACKEND).toUpperCase();
-
-			StateBackend backend;
-
+			// this is part of the main logic, so if this fails, the task is considered failed
+			closeAllOperators();
+			operatorOpen = false;
+			
+			// make sure all data if flushed
+			outputHandler.flushOutputs();
+		}
+		finally {
+			this.isRunning = false;
+			
 			try {
-				backend = StateBackend.valueOf(backendName);
-			} catch (Exception e) {
-				throw new RuntimeException(backendName + " is not a valid state backend.\nSupported backends: jobmanager, filesystem.");
+				if (operatorOpen) {
+					// we came here in a failure
+					closeAllOperators();
+				}
 			}
-
-			switch (backend) {
-				case JOBMANAGER:
-					LOG.info("State backend for state checkpoints is set to jobmanager.");
-					return new LocalStateHandle.LocalStateHandleProvider<Serializable>();
-				case FILESYSTEM:
-					String checkpointDir = GlobalConfiguration.getString(ConfigConstants.STATE_BACKEND_FS_DIR, null);
-					if (checkpointDir != null) {
-						LOG.info("State backend for state checkpoints is set to filesystem with directory: "
-								+ checkpointDir);
-						return FileStateHandle.createProvider(checkpointDir);
-					} else {
-						throw new RuntimeException(
-								"For filesystem checkpointing, a checkpoint directory needs to be specified.\nFor example: \"state.backend.dir: hdfs://checkpoints\"");
-					}
-				default:
-					throw new RuntimeException("Backend " + backend + " is not supported yet.");
+			catch (Throwable t) {
+				LOG.error("Error closing stream operators after an exception.", t);
+				
 			}
+			finally {
+				// we must! perform this cleanup
+				
+				// release the output resources
+				if (outputHandler != null) {
+					outputHandler.releaseOutputs();
+				}
 
-		} else {
-			LOG.info("Using user defined state backend for streaming checkpoitns.");
-			return provider;
+				// release this operator's resources
+				try {
+					cleanup();
+				}
+				catch (Throwable t) {
+					LOG.error("Error during cleanup of stream task.");
+				}
+			}
 		}
 	}
-
-	private enum StateBackend {
-		JOBMANAGER, FILESYSTEM
+	
+	@Override
+	public final void cancel() throws Exception {
+		isRunning = false;
+		cancelTask();
 	}
-
-	protected void openOperator() throws Exception {
+	
+	private void openAllOperators() throws Exception {
 		for (StreamOperator<?> operator : outputHandler.getChainedOperators()) {
 			if (operator != null) {
 				operator.open(getTaskConfiguration());
@@ -178,7 +228,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 		}
 	}
 
-	protected void closeOperator() throws Exception {
+	private void closeAllOperators() throws Exception {
 		// We need to close them first to last, since upstream operators in the chain might emit
 		// elements in their close methods.
 		for (int i = outputHandler.getChainedOperators().size()-1; i >= 0; i--) {
@@ -189,19 +239,20 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 		}
 	}
 
-	protected void clearBuffers() throws IOException {
-		if (outputHandler != null) {
-			outputHandler.clearWriters();
-		}
-	}
+	// ------------------------------------------------------------------------
+	//  Access to properties and utilities
+	// ------------------------------------------------------------------------
 
-	@Override
-	public void cancel() {
-		this.isRunning = false;
+	/**
+	 * Gets the name of the task, in the form "taskname (2/5)".
+	 * @return The name of the task.
+	 */
+	public String getName() {
+		return getEnvironment().getTaskNameWithSubtasks();
 	}
 
-	public EventListener<CheckpointBarrier> getCheckpointBarrierListener() {
-		return this.checkpointBarrierListener;
+	public Object getCheckpointLock() {
+		return checkpointLock;
 	}
 
 	// ------------------------------------------------------------------------
@@ -212,8 +263,9 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 	@Override
 	public void setInitialState(StateHandle<Serializable> stateHandle) throws Exception {
 
-		// We retrieve end restore the states for the chained oeprators.
-		List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>> chainedStates = (List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>>) stateHandle.getState();
+		// We retrieve end restore the states for the chained operators.
+		List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>> chainedStates = 
+				(List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>>) stateHandle.getState();
 
 		// We restore all stateful operators
 		for (int i = 0; i < chainedStates.size(); i++) {
@@ -224,7 +276,6 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 				((StatefulStreamOperator<?>) chainedOperator).restoreInitialState(state);
 			}
 		}
-
 	}
 
 	@Override
@@ -235,10 +286,9 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 		synchronized (checkpointLock) {
 			if (isRunning) {
 				try {
-					
-
-					// We wrap the states of the chained operators in a list, marking non-stateful oeprators with null
-					List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>> chainedStates = new ArrayList<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>>();
+					// We wrap the states of the chained operators in a list, marking non-stateful operators with null
+					List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>> chainedStates
+							= new ArrayList<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>>();
 
 					// A wrapper handle is created for the List of statehandles
 					WrapperStateHandle stateHandle;
@@ -278,35 +328,89 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 				}
 			}
 		}
-
 	}
-
-	@SuppressWarnings("rawtypes")
+	
 	@Override
 	public void notifyCheckpointComplete(long checkpointId) throws Exception {
 		synchronized (checkpointLock) {
-
 			for (StreamOperator<?> chainedOperator : outputHandler.getChainedOperators()) {
 				if (chainedOperator instanceof StatefulStreamOperator) {
-					((StatefulStreamOperator) chainedOperator).notifyCheckpointComplete(checkpointId);
+					((StatefulStreamOperator<?>) chainedOperator).notifyCheckpointComplete(checkpointId);
 				}
 			}
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  State backend
+	// ------------------------------------------------------------------------
+	
+	private StateHandleProvider<Serializable> getStateHandleProvider() {
+		StateHandleProvider<Serializable> provider = configuration.getStateHandleProvider(userClassLoader);
+
+		// If the user did not specify a provider in the program we try to get it from the config
+		if (provider == null) {
+			String backendName = GlobalConfiguration.getString(ConfigConstants.STATE_BACKEND,
+					ConfigConstants.DEFAULT_STATE_BACKEND).toUpperCase();
+
+			StateBackend backend;
+
+			try {
+				backend = StateBackend.valueOf(backendName);
+			} catch (Exception e) {
+				throw new RuntimeException(backendName + " is not a valid state backend.\nSupported backends: jobmanager, filesystem.");
+			}
+
+			switch (backend) {
+				case JOBMANAGER:
+					LOG.info("State backend for state checkpoints is set to jobmanager.");
+					return new LocalStateHandle.LocalStateHandleProvider<Serializable>();
+				case FILESYSTEM:
+					String checkpointDir = GlobalConfiguration.getString(ConfigConstants.STATE_BACKEND_FS_DIR, null);
+					if (checkpointDir != null) {
+						LOG.info("State backend for state checkpoints is set to filesystem with directory: "
+								+ checkpointDir);
+						return FileStateHandle.createProvider(checkpointDir);
+					} else {
+						throw new RuntimeException(
+								"For filesystem checkpointing, a checkpoint directory needs to be specified.\nFor example: \"state.backend.dir: hdfs://checkpoints\"");
+					}
+				default:
+					throw new RuntimeException("Backend " + backend + " is not supported yet.");
+			}
 
+		} else {
+			LOG.info("Using user defined state backend for streaming checkpoitns.");
+			return provider;
 		}
 	}
 
+	private enum StateBackend {
+		JOBMANAGER, FILESYSTEM
+	}
 
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
 
+	public StreamingRuntimeContext createRuntimeContext(StreamConfig conf, Map<String, Accumulator<?,?>> accumulatorMap) {
+		KeySelector<?,Serializable> statePartitioner = conf.getStatePartitioner(userClassLoader);
+
+		return new StreamingRuntimeContext(getEnvironment(), getExecutionConfig(),
+				statePartitioner, getStateHandleProvider(), accumulatorMap);
+	}
+	
 	@Override
 	public String toString() {
-		return getEnvironment().getTaskNameWithSubtasks();
+		return getName();
 	}
 
 	// ------------------------------------------------------------------------
 
+	public EventListener<CheckpointBarrier> getCheckpointBarrierListener() {
+		return this.checkpointBarrierListener;
+	}
+	
 	private class CheckpointBarrierListener implements EventListener<CheckpointBarrier> {
 
 		@Override


[3/6] flink git commit: [FLINK-2534] [runtime] Some improvements in CompactingHashTable

Posted by se...@apache.org.
[FLINK-2534] [runtime] Some improvements in CompactingHashTable

This closes #1029


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/63ee34c5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/63ee34c5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/63ee34c5

Branch: refs/heads/master
Commit: 63ee34c5b894e2795e74a3c2aa3d5dc9ac2d5b88
Parents: 0ecc563
Author: HuangWHWHW <40...@qq.com>
Authored: Mon Aug 17 11:31:02 2015 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 17 11:59:39 2015 +0200

----------------------------------------------------------------------
 .../runtime/operators/hash/CompactingHashTable.java     | 12 ++----------
 1 file changed, 2 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/63ee34c5/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
index d07c7e3..ff6548e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/CompactingHashTable.java
@@ -380,8 +380,6 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T> {
 		int countInSegment = bucket.getInt(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
 		int numInSegment = 0;
 		int posInSegment = bucketInSegmentOffset + BUCKET_HEADER_LENGTH;
-		
-		long currentForwardPointer = BUCKET_FORWARD_POINTER_NOT_SET;
 
 		// loop over all segments that are involved in the bucket (original bucket plus overflow buckets)
 		while (true) {
@@ -396,7 +394,6 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T> {
 					// get the pointer to the pair
 					final int pointerOffset = bucketInSegmentOffset + BUCKET_POINTER_START_OFFSET + (numInSegment * POINTER_LEN);
 					final long pointer = bucket.getLong(pointerOffset);
-					numInSegment++;
 					
 					// deserialize the key to check whether it is really equal, or whether we had only a hash collision
 					T valueAtPosition = partition.readRecordAt(pointer);
@@ -406,9 +403,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T> {
 						return;
 					}
 				}
-				else {
-					numInSegment++;
-				}
+				numInSegment++;
 			}
 			
 			// this segment is done. check if there is another chained bucket
@@ -436,7 +431,6 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T> {
 			countInSegment = bucket.getInt(bucketInSegmentOffset + HEADER_COUNT_OFFSET);
 			posInSegment = bucketInSegmentOffset + BUCKET_HEADER_LENGTH;
 			numInSegment = 0;
-			currentForwardPointer = newForwardPointer;
 		}
 	}
 	
@@ -869,9 +863,7 @@ public class CompactingHashTable<T> extends AbstractMutableHashTable<T> {
 		final long numRecordsStorable = totalSize / (recordLenBytes + RECORD_OVERHEAD_BYTES);
 		final long bucketBytes = numRecordsStorable * RECORD_OVERHEAD_BYTES;
 		long numBuckets = bucketBytes / (2 * HASH_BUCKET_SIZE) + 1;
-		while(numBuckets % numPartitions != 0) {
-			numBuckets++;
-		}
+		numBuckets += numPartitions - numBuckets % numPartitions;
 		return numBuckets > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) numBuckets;
 	}
 	


[2/6] flink git commit: [docs] Minor addition to the cluster execution docs.

Posted by se...@apache.org.
[docs] Minor addition to the cluster execution docs.

This closes #1018


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ecc5634
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ecc5634
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ecc5634

Branch: refs/heads/master
Commit: 0ecc56348d7abaf70f7e47d38159f9042f0917ea
Parents: 0ea0bc1
Author: CHEN LIANG <ch...@sina.cn>
Authored: Fri Aug 14 22:14:46 2015 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 17 11:51:50 2015 +0200

----------------------------------------------------------------------
 docs/apis/cluster_execution.md | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0ecc5634/docs/apis/cluster_execution.md
----------------------------------------------------------------------
diff --git a/docs/apis/cluster_execution.md b/docs/apis/cluster_execution.md
index f9844d7..80c15a0 100644
--- a/docs/apis/cluster_execution.md
+++ b/docs/apis/cluster_execution.md
@@ -98,7 +98,8 @@ The latter version is recommended as it respects the classloader management in F
 
 To provide these dependencies not included by Flink we suggest two options with Maven.
 
-1. The maven assembly plugin builds a so called fat jar cointaining all your dependencies.
+1. The maven assembly plugin builds a so-called uber-jar(executable jar) 
+containing all your dependencies.
 Assembly configuration is straight-forward, but the resulting jar might become bulky. See 
 [usage](http://maven.apache.org/plugins/maven-assembly-plugin/usage.html).
 2. The maven unpack plugin, for unpacking the relevant parts of the dependencies and


[4/6] flink git commit: [FLINK-2462] [streaming] Major cleanup of operator structure for exception handling and code simplication

Posted by se...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
index 8c354be..2ca2862 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
@@ -45,28 +45,32 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
  * Implementation of the {@link RuntimeContext}, created by runtime stream UDF
  * operators.
  */
-@SuppressWarnings("rawtypes")
 public class StreamingRuntimeContext extends RuntimeUDFContext {
 
 	private final Environment env;
-	private final Map<String, StreamOperatorState> states;
-	private final List<PartitionedStreamOperatorState> partitionedStates;
+	private final Map<String, StreamOperatorState<?, ?>> states;
+	private final List<PartitionedStreamOperatorState<?, ?, ?>> partitionedStates;
 	private final KeySelector<?, ?> statePartitioner;
 	private final StateHandleProvider<Serializable> provider;
-	private final ClassLoader cl;
-
+	
+	
 	@SuppressWarnings("unchecked")
-	public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader,
-			ExecutionConfig executionConfig, KeySelector<?, ?> statePartitioner,
-			StateHandleProvider<?> provider, Map<String, Accumulator<?, ?>> accumulatorMap) {
-		super(name, env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(), userCodeClassLoader,
-				executionConfig, env.getDistributedCacheEntries(), accumulatorMap);
+	public StreamingRuntimeContext(
+			Environment env,
+			ExecutionConfig executionConfig,
+			KeySelector<?, ?> statePartitioner,
+			StateHandleProvider<?> provider,
+			Map<String, Accumulator<?, ?>> accumulatorMap) {
+		
+		super(env.getTaskName(), env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(),
+				env.getUserClassLoader(), executionConfig,
+				env.getDistributedCacheEntries(), accumulatorMap);
+		
 		this.env = env;
 		this.statePartitioner = statePartitioner;
-		this.states = new HashMap<String, StreamOperatorState>();
-		this.partitionedStates = new LinkedList<PartitionedStreamOperatorState>();
+		this.states = new HashMap<>();
+		this.partitionedStates = new LinkedList<>();
 		this.provider = (StateHandleProvider<Serializable>) provider;
-		this.cl = userCodeClassLoader;
 	}
 
 	/**
@@ -121,14 +125,14 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 
 	public StreamOperatorState<?, ?> getState(String name, boolean partitioned) {
 		// Try fetching state from the map
-		StreamOperatorState state = states.get(name);
+		StreamOperatorState<?, ?> state = states.get(name);
 		if (state == null) {
 			// If not found, create empty state and add to the map
 			state = createRawState(partitioned);
 			states.put(name, state);
 			// We keep a reference to all partitioned states for registering input
 			if (state instanceof PartitionedStreamOperatorState) {
-				partitionedStates.add((PartitionedStreamOperatorState) state);
+				partitionedStates.add((PartitionedStreamOperatorState<?, ?, ?>) state);
 			}
 		}
 		return state;
@@ -139,11 +143,11 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 	 * 
 	 * @return An empty operator state.
 	 */
-	@SuppressWarnings("unchecked")
-	public StreamOperatorState createRawState(boolean partitioned) {
+	@SuppressWarnings({"rawtypes", "unchecked"})
+	public StreamOperatorState<?, ?> createRawState(boolean partitioned) {
 		if (partitioned) {
 			if (statePartitioner != null) {
-				return new PartitionedStreamOperatorState(provider, statePartitioner, cl);
+				return new PartitionedStreamOperatorState(provider, statePartitioner, getUserCodeClassLoader());
 			} else {
 				throw new RuntimeException(
 						"Partitioned state can only be used with KeyedDataStreams.");
@@ -158,7 +162,7 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 	 * 
 	 * @return All the states for the underlying operator.
 	 */
-	public Map<String, StreamOperatorState> getOperatorStates() {
+	public Map<String, StreamOperatorState<?, ?>> getOperatorStates() {
 		return states;
 	}
 
@@ -169,7 +173,7 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 	 * @param nextRecord
 	 *            Next input of the operator.
 	 */
-	@SuppressWarnings("unchecked")
+	@SuppressWarnings({"unchecked", "rawtypes"})
 	public void setNextInput(StreamRecord<?> nextRecord) {
 		if (statePartitioner != null) {
 			for (PartitionedStreamOperatorState state : partitionedStates) {

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 8cf5a40..5d0497d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -27,114 +26,65 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputStreamOperator<IN1, IN2, OUT>> {
 
-	private static final Logger LOG = LoggerFactory.getLogger(TwoInputStreamTask.class);
-
 	private StreamTwoInputProcessor<IN1, IN2> inputProcessor;
+	
+	private volatile boolean running = true;
 
 	@Override
-	public void registerInputOutput() {
-		try {
-			super.registerInputOutput();
+	public void init() throws Exception {
+		TypeSerializer<IN1> inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
+		TypeSerializer<IN2> inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
 	
-			TypeSerializer<IN1> inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
-			TypeSerializer<IN2> inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
+		int numberOfInputs = configuration.getNumberOfInputs();
 	
-			int numberOfInputs = configuration.getNumberOfInputs();
+		ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
+		ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
 	
-			ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
-			ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
+		List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);
 	
-			List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);
-	
-			for (int i = 0; i < numberOfInputs; i++) {
-				int inputType = inEdges.get(i).getTypeNumber();
-				InputGate reader = getEnvironment().getInputGate(i);
-				switch (inputType) {
-					case 1:
-						inputList1.add(reader);
-						break;
-					case 2:
-						inputList2.add(reader);
-						break;
-					default:
-						throw new RuntimeException("Invalid input type number: " + inputType);
-				}
+		for (int i = 0; i < numberOfInputs; i++) {
+			int inputType = inEdges.get(i).getTypeNumber();
+			InputGate reader = getEnvironment().getInputGate(i);
+			switch (inputType) {
+				case 1:
+					inputList1.add(reader);
+					break;
+				case 2:
+					inputList2.add(reader);
+					break;
+				default:
+					throw new RuntimeException("Invalid input type number: " + inputType);
 			}
-	
-			this.inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2,
-					inputDeserializer1, inputDeserializer2,
-					getCheckpointBarrierListener(),
-					configuration.getCheckpointMode(),
-					getEnvironment().getIOManager(),
-					getExecutionConfig().areTimestampsEnabled());
-
-			// make sure that stream tasks report their I/O statistics
-			AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
-			AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
-			this.inputProcessor.setReporter(reporter);
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Failed to initialize stream operator: " + e.getMessage(), e);
 		}
+	
+		this.inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2,
+				inputDeserializer1, inputDeserializer2,
+				getCheckpointBarrierListener(),
+				configuration.getCheckpointMode(),
+				getEnvironment().getIOManager(),
+				getExecutionConfig().areTimestampsEnabled());
+
+		// make sure that stream tasks report their I/O statistics
+		AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
+		AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
+		this.inputProcessor.setReporter(reporter);
 	}
 
 	@Override
-	public void invoke() throws Exception {
-		boolean operatorOpen = false;
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Task {} invoked", getName());
-		}
-
-		try {
-
-			openOperator();
-			operatorOpen = true;
-
-			while (inputProcessor.processInput(streamOperator)) {
-				// do nothing, just keep processing
-			}
-
-			closeOperator();
-			operatorOpen = false;
-
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Task {} invocation finished", getName());
-			}
-
-		}
-		catch (Exception e) {
-			LOG.error(getEnvironment().getTaskNameWithSubtasks() + " failed", e);
-
-			if (operatorOpen) {
-				try {
-					closeOperator();
-				}
-				catch (Throwable t) {
-					LOG.warn("Exception while closing operator.", t);
-				}
-			}
-
-			throw e;
-		}
-		finally {
-			this.isRunning = false;
-			// Cleanup
-			outputHandler.flushOutputs();
-			clearBuffers();
-		}
-
+	protected void run() throws Exception {
+		while (running && inputProcessor.processInput(streamOperator));
 	}
 
 	@Override
-	public void clearBuffers() throws IOException {
-		super.clearBuffers();
-		inputProcessor.clearBuffers();
+	protected void cleanup() throws Exception {
 		inputProcessor.cleanup();
 	}
+
+	@Override
+	protected void cancelTask() {
+		running = false;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index 6ca38b7..60db798 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -141,9 +141,12 @@ public class StatefulOperatorTest {
 			KeySelector<Integer, Serializable> partitioner, byte[] serializedState) throws Exception {
 		final List<String> outputList = output;
 
-		StreamingRuntimeContext context = new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024,
-				new MockInputSplitProvider(), 1024), null, new ExecutionConfig(), partitioner,
-				new LocalStateHandleProvider<Serializable>(), new HashMap<String, Accumulator<?, ?>>());
+		StreamingRuntimeContext context = new StreamingRuntimeContext(
+				new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024), 
+				new ExecutionConfig(),
+				partitioner,
+				new LocalStateHandleProvider<Serializable>(),
+				new HashMap<String, Accumulator<?, ?>>());
 
 		StreamMap<Integer, String> op = new StreamMap<Integer, String>(new StatefulMapper());
 
@@ -217,14 +220,15 @@ public class StatefulOperatorTest {
 			}
 		}
 		
-		@SuppressWarnings({ "rawtypes", "unchecked" })
+		@SuppressWarnings("unchecked")
 		@Override
 		public void close() throws Exception {
-			Map<String, StreamOperatorState> states = ((StreamingRuntimeContext) getRuntimeContext()).getOperatorStates();
+			Map<String, StreamOperatorState<?, ?>> states = ((StreamingRuntimeContext) getRuntimeContext()).getOperatorStates();
 			PartitionedStreamOperatorState<Integer, Integer, Integer> groupCounter = (PartitionedStreamOperatorState<Integer, Integer, Integer>) states.get("groupCounter");
 			for (Entry<Serializable, Integer> count : groupCounter.getPartitionedState().entrySet()) {
 				Integer key = (Integer) count.getKey();
 				Integer expected = key < 3 ? 2 : 1;
+				
 				assertEquals(new MutableInt(expected), count.getValue());
 			}
 		}
@@ -257,11 +261,12 @@ public class StatefulOperatorTest {
 			groupCounter = getRuntimeContext().getOperatorState("groupCounter", 0, true);
 		}
 		
-		@SuppressWarnings({ "rawtypes", "unchecked" })
+		@SuppressWarnings("unchecked")
 		@Override
 		public void close() throws Exception {
-			Map<String, StreamOperatorState> states = ((StreamingRuntimeContext) getRuntimeContext()).getOperatorStates();
-			PartitionedStreamOperatorState<Integer, Integer, Integer> groupCounter = (PartitionedStreamOperatorState<Integer, Integer, Integer>) states.get("groupCounter");
+			Map<String, StreamOperatorState<?, ?>> states = ((StreamingRuntimeContext) getRuntimeContext()).getOperatorStates();
+			PartitionedStreamOperatorState<Integer, Integer, Integer> groupCounter = 
+					(PartitionedStreamOperatorState<Integer, Integer, Integer>) states.get("groupCounter");
 			for (Entry<Serializable, Integer> count : groupCounter.getPartitionedState().entrySet()) {
 				Integer key = (Integer) count.getKey();
 				Integer expected = key < 3 ? 2 : 1;

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
index f07e3a5..7cc1958 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
@@ -85,12 +85,7 @@ public class StreamRecordWriterTest {
 		}
 		finally {
 			if (testWriter != null) {
-				try {
-					testWriter.close();
-				}
-				catch (IOException e) {
-					// ignore in tests
-				}
+				testWriter.close();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 296324a..4c6957b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -168,7 +168,6 @@ public class OneInputStreamTaskTest {
 	 * This test verifies that checkpoint barriers are correctly forwarded.
 	 */
 	@Test
-	@SuppressWarnings("unchecked")
 	public void testCheckpointBarriers() throws Exception {
 		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
 		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
@@ -226,7 +225,6 @@ public class OneInputStreamTaskTest {
 	 * then all inputs receive barriers from a later checkpoint.
 	 */
 	@Test
-	@SuppressWarnings("unchecked")
 	public void testOvertakingCheckpointBarriers() throws Exception {
 		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
 		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
@@ -282,7 +280,6 @@ public class OneInputStreamTaskTest {
 
 		testHarness.waitForInputProcessing();
 
-
 		testHarness.endInput();
 
 		testHarness.waitForTaskCompletion();

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
index 4f07fdb..7fb8ba3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
@@ -62,7 +62,6 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarnes
 	 */
 	public OneInputStreamTaskTestHarness(OneInputStreamTask<IN, OUT> task,
 			int numInputGates,
-
 			int numInputChannelsPerGate,
 			TypeInformation<IN> inputType,
 			TypeInformation<OUT> outputType) {

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 0f372cb..987d0bc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -83,7 +83,7 @@ public class StreamTaskTestHarness<OUT> {
 
 	private ConcurrentLinkedQueue<Object> outputList;
 
-	protected Thread taskThread;
+	protected TaskThread taskThread;
 
 	// These don't get initialized, the one-input/two-input specific test harnesses
 	// must initialize these if they want to simulate input. We have them here so that all the
@@ -161,32 +161,19 @@ public class StreamTaskTestHarness<OUT> {
 
 		task.registerInputOutput();
 
-		taskThread = new Thread(new Runnable() {
-			@Override
-			public void run() {
-
-
-
-				try {
-					task.invoke();
-					shutdownIOManager();
-					shutdownMemoryManager();
-				} catch (Exception e) {
-					throw new RuntimeException(e);
-				}
-
-			}
-		});
-
+		taskThread = new TaskThread(task);
 		taskThread.start();
 	}
 
-	public void waitForTaskCompletion() throws InterruptedException {
+	public void waitForTaskCompletion() throws Exception {
 		if (taskThread == null) {
 			throw new IllegalStateException("Task thread was not started.");
 		}
 
 		taskThread.join();
+		if (taskThread.getError() != null) {
+			throw new Exception("error in task", taskThread.getError());
+		}
 	}
 
 	/**
@@ -300,5 +287,36 @@ public class StreamTaskTestHarness<OUT> {
 			inputGates[i].endInput();
 		}
 	}
+	
+	// ------------------------------------------------------------------------
+	
+	private class TaskThread extends Thread {
+		
+		private final AbstractInvokable task;
+		
+		private volatile Throwable error;
+
+
+		TaskThread(AbstractInvokable task) {
+			super("Task Thread");
+			this.task = task;
+		}
+
+		@Override
+		public void run() {
+			try {
+				task.invoke();
+				shutdownIOManager();
+				shutdownMemoryManager();
+			}
+			catch (Throwable t) {
+				this.error = t;
+			}
+		}
+
+		public Throwable getError() {
+			return error;
+		}
+	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
index 45ae88f..f9b0b09 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -57,9 +57,11 @@ public class MockContext<IN, OUT> {
 
 	public static <IN, OUT> List<OUT> createAndExecute(OneInputStreamOperator<IN, OUT> operator, List<IN> inputs) {
 		MockContext<IN, OUT> mockContext = new MockContext<IN, OUT>(inputs);
-		StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext("MockTask",
-				new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null,
-				new ExecutionConfig(), null, null, new HashMap<String, Accumulator<?, ?>>());
+		StreamingRuntimeContext runtimeContext = new StreamingRuntimeContext(
+				new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
+				new ExecutionConfig(), 
+				null, null, 
+				new HashMap<String, Accumulator<?, ?>>());
 
 		operator.setup(mockContext.output, runtimeContext);
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 6652fde..f404d01 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -60,9 +60,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 		executionConfig = new ExecutionConfig();
 
 		StreamingRuntimeContext runtimeContext =  new StreamingRuntimeContext(
-				"MockTwoInputTask",
-				new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
-				getClass().getClassLoader(),
+				new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
 				executionConfig,
 				null,
 				new LocalStateHandle.LocalStateHandleProvider<Serializable>(),

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
index 2d7f6b5..711dd41 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
@@ -42,8 +42,13 @@ public class SourceFunctionUtil<T> {
 	public static <T> List<T> runSourceFunction(SourceFunction<T> sourceFunction) throws Exception {
 		List<T> outputs = new ArrayList<T>();
 		if (sourceFunction instanceof RichFunction) {
-			RuntimeContext runtimeContext =  new StreamingRuntimeContext("MockTask", new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024), null,
-					new ExecutionConfig(), null, new LocalStateHandle.LocalStateHandleProvider<Serializable>(), new HashMap<String, Accumulator<?, ?>>());
+			RuntimeContext runtimeContext =  new StreamingRuntimeContext(
+					new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
+					new ExecutionConfig(), 
+					null, 
+					new LocalStateHandle.LocalStateHandleProvider<Serializable>(),
+					new HashMap<String, Accumulator<?, ?>>());
+			
 			((RichFunction) sourceFunction).setRuntimeContext(runtimeContext);
 
 			((RichFunction) sourceFunction).open(new Configuration());

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
index a9ebd0b..09db1f4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
@@ -66,7 +66,7 @@ public class StreamingMultipleProgramsTestBase extends TestBaseUtils {
 	// ------------------------------------------------------------------------
 	
 	public StreamingMultipleProgramsTestBase() {
-		TestStreamEnvironment clusterEnv = new TestStreamEnvironment(cluster, 4);
+		TestStreamEnvironment clusterEnv = new TestStreamEnvironment(cluster, DEFAULT_PARALLELISM);
 		clusterEnv.setAsContext();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/92b1e471/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
index 1e8b5c6..21e2e1e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -33,7 +34,6 @@ import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
 import java.io.Serializable;
 import java.util.HashMap;
-import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
@@ -60,9 +60,7 @@ public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {
 		executionConfig = new ExecutionConfig();
 
 		StreamingRuntimeContext runtimeContext =  new StreamingRuntimeContext(
-				"MockTwoInputTask",
-				new MockEnvironment(3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
-				getClass().getClassLoader(),
+				new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
 				new ExecutionConfig(),
 				null,
 				new LocalStateHandle.LocalStateHandleProvider<Serializable>(),


[6/6] flink git commit: [tests] Reinforce StateCheckpoinedITCase to make sure actual checkpointing has happened before a failure.

Posted by se...@apache.org.
[tests] Reinforce StateCheckpoinedITCase to make sure actual checkpointing has happened before a failure.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3fcc04ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3fcc04ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3fcc04ab

Branch: refs/heads/master
Commit: 3fcc04ab3583e14d9d0acd1e29adee900738ffde
Parents: 92b1e47
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Aug 16 16:52:16 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 17 12:35:19 2015 +0200

----------------------------------------------------------------------
 .../checkpointing/StateCheckpoinedITCase.java   | 62 ++++++++++++--------
 .../StreamFaultToleranceTestBase.java           | 20 +------
 2 files changed, 40 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3fcc04ab/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
index 072086b..2c2f2b4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
@@ -22,30 +22,24 @@ import org.apache.flink.api.common.functions.RichFilterFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.Collector;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * A simple test that runs a streaming topology with checkpointing enabled.
@@ -53,6 +47,10 @@ import static org.junit.Assert.fail;
  * The test triggers a failure after a while and verifies that, after completion, the
  * state defined with either the {@link OperatorState} or the {@link Checkpointed}
  * interface reflects the "exactly once" semantics.
+ * 
+ * The test throttles the input until at least two checkpoints are completed, to make sure that
+ * the recovery does not fall back to "square one" (which would naturally lead to correct
+ * results without testing the checkpointing).
  */
 @SuppressWarnings("serial")
 public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
@@ -63,17 +61,24 @@ public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
 	 * Runs the following program:
 	 *
 	 * <pre>
-	 *     [ (source)->(filter)->(map) ] -> [ (map) ] -> [ (groupBy/reduce)->(sink) ]
+	 *     [ (source)->(filter)] -> [ (map) -> (map) ] -> [ (groupBy/reduce)->(sink) ]
 	 * </pre>
 	 */
 	@Override
 	public void testProgram(StreamExecutionEnvironment env) {
 		assertTrue("Broken test setup", NUM_STRINGS % 40 == 0);
 
+		final long failurePosMin = (long) (0.4 * NUM_STRINGS / PARALLELISM);
+		final long failurePosMax = (long) (0.7 * NUM_STRINGS / PARALLELISM);
+
+		final long failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
+		
 		DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
 
 		stream
-				// -------------- first vertex, chained to the source ----------------
+				// first vertex, chained to the source
+				// this filter throttles the flow until at least one checkpoint
+				// is complete, to make sure this program does not run without 
 				.filter(new StringRichFilterFunction())
 
 						// -------------- seconds vertex - one-to-one connected ----------------
@@ -83,12 +88,16 @@ public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
 
 						// -------------- third vertex - reducer and the sink ----------------
 				.partitionByHash("prefix")
-				.flatMap(new OnceFailingAggregator(NUM_STRINGS))
+				.flatMap(new OnceFailingAggregator(failurePos))
 				.addSink(new ValidatingSink());
 	}
 
 	@Override
 	public void postSubmit() {
+		
+		assertTrue("Test inconclusive: failure occurred before first checkpoint",
+				OnceFailingAggregator.wasCheckpointedBeforeFailure);
+		
 		long filterSum = 0;
 		for (long l : StringRichFilterFunction.counts) {
 			filterSum += l;
@@ -189,14 +198,15 @@ public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
 		}
 	}
 
-	private static class StringRichFilterFunction extends RichFilterFunction<String> implements Checkpointed<Long> {
+	private static class StringRichFilterFunction extends RichFilterFunction<String> 
+			implements Checkpointed<Long> {
 
 		static final long[] counts = new long[PARALLELISM];
-
+		
 		private long count;
-
+		
 		@Override
-		public boolean filter(String value) {
+		public boolean filter(String value) throws Exception {
 			count++;
 			return value.length() < 100; // should be always true
 		}
@@ -271,35 +281,34 @@ public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
 	}
 	
 	private static class OnceFailingAggregator extends RichFlatMapFunction<PrefixCount, PrefixCount> 
-		implements Checkpointed<HashMap<String, PrefixCount>> {
+		implements Checkpointed<HashMap<String, PrefixCount>>, CheckpointNotifier {
 
+		static boolean wasCheckpointedBeforeFailure = false;
+		
 		private static volatile boolean hasFailed = false;
 
 		private final HashMap<String, PrefixCount> aggregationMap = new HashMap<String, PrefixCount>();
 		
-		private final long numElements;
-		
 		private long failurePos;
 		private long count;
 		
+		private boolean wasCheckpointed;
+		
 
-		OnceFailingAggregator(long numElements) {
-			this.numElements = numElements;
+		OnceFailingAggregator(long failurePos) {
+			this.failurePos = failurePos;
 		}
 		
 		@Override
 		public void open(Configuration parameters) {
-			long failurePosMin = (long) (0.4 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
-			long failurePosMax = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
-
-			failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
 			count = 0;
 		}
 
 		@Override
 		public void flatMap(PrefixCount value, Collector<PrefixCount> out) throws Exception {
 			count++;
-			if (!hasFailed && count >= failurePos) {
+			if (!hasFailed && count >= failurePos && getRuntimeContext().getIndexOfThisSubtask() == 1) {
+				wasCheckpointedBeforeFailure = wasCheckpointed;
 				hasFailed = true;
 				throw new Exception("Test Failure");
 			}
@@ -324,6 +333,11 @@ public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
 		public void restoreState(HashMap<String, PrefixCount> state) {
 			aggregationMap.putAll(state);
 		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) {
+			this.wasCheckpointed = true;
+		}
 	}
 
 	private static class ValidatingSink extends RichSinkFunction<PrefixCount> 

http://git-wip-us.apache.org/repos/asf/flink/blob/3fcc04ab/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 2993315..8920cf2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -18,38 +18,22 @@
 
 package org.apache.flink.test.checkpointing;
 
-
-import org.apache.flink.api.common.functions.RichFilterFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.io.Serializable;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-
 /**
  * Test base for fault tolerant streaming programs
  */
-@SuppressWarnings("serial")
 public abstract class StreamFaultToleranceTestBase {
 
 	protected static final int NUM_TASK_MANAGERS = 2;
@@ -127,6 +111,7 @@ public abstract class StreamFaultToleranceTestBase {
 	//  Frequently used utilities
 	// --------------------------------------------------------------------------------------------
 
+	@SuppressWarnings("serial")
 	public static class PrefixCount implements Serializable {
 
 		public String prefix;
@@ -146,5 +131,4 @@ public abstract class StreamFaultToleranceTestBase {
 			return prefix + " / " + value;
 		}
 	}
-
 }