You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:33 UTC

[06/24] flink git commit: [FLINK-2808] [streaming] Refactor and extend state backend abstraction

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 80563b8..e131cda 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -42,7 +42,6 @@ import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordS
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
 /**
  * Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
@@ -125,7 +124,8 @@ public class StreamInputProcessor<IN> {
 		lastEmittedWatermark = Long.MIN_VALUE;
 	}
 
-	public boolean processInput(OneInputStreamOperator<IN, ?> streamOperator, Object lock) throws Exception {
+	@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+	public boolean processInput(OneInputStreamOperator<IN, ?> streamOperator, final Object lock) throws Exception {
 		if (isFinished) {
 			return false;
 		}
@@ -161,11 +161,8 @@ public class StreamInputProcessor<IN> {
 					} else {
 						// now we can do the actual processing
 						StreamRecord<IN> record = recordOrWatermark.asRecord();
-						StreamingRuntimeContext ctx = streamOperator.getRuntimeContext();
 						synchronized (lock) {
-							if (ctx != null) {
-								ctx.setNextInput(record);
-							}
+							streamOperator.setKeyContextElement(record);
 							streamOperator.processElement(record);
 						}
 						return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
index 017c8ea..7020758 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
@@ -17,17 +17,16 @@
  */
 package org.apache.flink.streaming.runtime.operators;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -57,10 +56,9 @@ public class BucketStreamSortOperator<T> extends AbstractStreamOperator<T> imple
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-		buckets = Maps.newHashMap();
+	public void open() throws Exception {
+		super.open();
+		buckets = new HashMap<>();
 
 	}
 
@@ -70,7 +68,7 @@ public class BucketStreamSortOperator<T> extends AbstractStreamOperator<T> imple
 		long bucketId = record.getTimestamp() - (record.getTimestamp() % granularity);
 		List<StreamRecord<T>> bucket = buckets.get(bucketId);
 		if (bucket == null) {
-			bucket = Lists.newArrayList();
+			bucket = new ArrayList<>();
 			buckets.put(bucketId, bucket);
 		}
 		bucket.add(record);
@@ -79,7 +77,7 @@ public class BucketStreamSortOperator<T> extends AbstractStreamOperator<T> imple
 	@Override
 	public void processWatermark(Watermark mark) throws Exception {
 		long maxBucketId = mark.getTimestamp() - (mark.getTimestamp() % granularity);
-		Set<Long> toRemove = Sets.newHashSet();
+		Set<Long> toRemove = new HashSet<>();
 		for (Map.Entry<Long, List<StreamRecord<T>>> bucket: buckets.entrySet()) {
 			if (bucket.getKey() < maxBucketId) {
 				Collections.sort(bucket.getValue(), new Comparator<StreamRecord<T>>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
index ddfc6a1..6e51a49 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
@@ -17,9 +17,9 @@
 
 package org.apache.flink.streaming.runtime.operators;
 
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.TimestampExtractor;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -46,11 +46,11 @@ public class ExtractTimestampsOperator<T>
 	}
 
 	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-		watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
+	public void open() throws Exception {
+		super.open();
+		watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
 		if (watermarkInterval > 0) {
-			getRuntimeContext().registerTimer(System.currentTimeMillis() + watermarkInterval, this);
+			registerTimer(System.currentTimeMillis() + watermarkInterval, this);
 		}
 
 		currentWatermark = Long.MIN_VALUE;
@@ -78,7 +78,7 @@ public class ExtractTimestampsOperator<T>
 	@Override
 	public void trigger(long timestamp) throws Exception {
 		// register next timer
-		getRuntimeContext().registerTimer(System.currentTimeMillis() + watermarkInterval, this);
+		registerTimer(System.currentTimeMillis() + watermarkInterval, this);
 		long lastWatermark = currentWatermark;
 		currentWatermark = userFunction.getCurrentWatermark();
 
@@ -90,6 +90,6 @@ public class ExtractTimestampsOperator<T>
 
 	@Override
 	public void processWatermark(Watermark mark) throws Exception {
-		// ingore them, since we are basically a watermark source
+		// ignore them, since we are basically a watermark source
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index 4fcfb2c..227de49 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -22,7 +22,6 @@ import org.apache.commons.math3.util.ArithmeticUtils;
 
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.util.MathUtils;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -105,8 +104,8 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
+	public void open() throws Exception {
+		super.open();
 		
 		out = new TimestampedCollector<>(output);
 		
@@ -119,7 +118,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 		nextEvaluationTime = now + windowSlide - (now % windowSlide);
 		nextSlideTime = now + paneSize - (now % paneSize);
 		
-		getRuntimeContext().registerTimer(Math.min(nextEvaluationTime, nextSlideTime), this);
+		registerTimer(Math.min(nextEvaluationTime, nextSlideTime), this);
 	}
 
 	@Override
@@ -188,7 +187,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 		}
 
 		long nextTriggerTime = Math.min(nextEvaluationTime, nextSlideTime);
-		getRuntimeContext().registerTimer(nextTriggerTime, this);
+		registerTimer(nextTriggerTime, this);
 	}
 	
 	private void computeWindow(long timestamp) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index f35ffca..aecfd5d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -18,16 +18,15 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
@@ -39,10 +38,13 @@ import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -104,11 +106,11 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 	}
 
 	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-		windows = Maps.newHashMap();
-		watermarkTimers = Maps.newHashMap();
-		processingTimeTimers = Maps.newHashMap();
+	public void open() throws Exception {
+		super.open();
+		windows = new HashMap<>();
+		watermarkTimers = new HashMap<>();
+		processingTimeTimers = new HashMap<>();
 		timestampedCollector = new TimestampedCollector<>(output);
 
 		if (inputSerializer == null) {
@@ -116,7 +118,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 		}
 
 		windowBufferFactory.setRuntimeContext(getRuntimeContext());
-		windowBufferFactory.open(parameters);
+		windowBufferFactory.open(getUserFunctionParameters());
 	}
 
 	@Override
@@ -192,7 +194,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 
 	@Override
 	public void processWatermark(Watermark mark) throws Exception {
-		Set<Long> toRemove = Sets.newHashSet();
+		Set<Long> toRemove = new HashSet<>();
 
 		for (Map.Entry<Long, Set<TriggerContext>> triggers: watermarkTimers.entrySet()) {
 			if (triggers.getKey() <= mark.getTimestamp()) {
@@ -212,7 +214,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 
 	@Override
 	public void trigger(long time) throws Exception {
-		Set<Long> toRemove = Sets.newHashSet();
+		Set<Long> toRemove = new HashSet<>();
 
 		for (Map.Entry<Long, Set<TriggerContext>> triggers: processingTimeTimers.entrySet()) {
 			if (triggers.getKey() < time) {
@@ -243,7 +245,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 			Set<TriggerContext> triggers = processingTimeTimers.get(time);
 			if (triggers == null) {
 				getRuntimeContext().registerTimer(time, NonKeyedWindowOperator.this);
-				triggers = Sets.newHashSet();
+				triggers = new HashSet<>();
 				processingTimeTimers.put(time, triggers);
 			}
 			triggers.add(this);
@@ -253,7 +255,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 		public void registerWatermarkTimer(long time) {
 			Set<TriggerContext> triggers = watermarkTimers.get(time);
 			if (triggers == null) {
-				triggers = Sets.newHashSet();
+				triggers = new HashSet<>();
 				watermarkTimers.put(time, triggers);
 			}
 			triggers.add(this);

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index da36db1..82a3f9a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -18,17 +18,16 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
@@ -44,6 +43,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -149,11 +150,11 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 	}
 
 	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-		windows = Maps.newHashMap();
-		watermarkTimers = Maps.newHashMap();
-		processingTimeTimers = Maps.newHashMap();
+	public void open() throws Exception {
+		super.open();
+		windows = new HashMap<>();
+		watermarkTimers = new HashMap<>();
+		processingTimeTimers = new HashMap<>();
 		timestampedCollector = new TimestampedCollector<>(output);
 
 		if (inputSerializer == null) {
@@ -161,7 +162,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 		}
 
 		windowBufferFactory.setRuntimeContext(getRuntimeContext());
-		windowBufferFactory.open(parameters);
+		windowBufferFactory.open(getUserFunctionParameters());
 	}
 
 	@Override
@@ -191,7 +192,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
 		Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>> keyWindows = windows.get(key);
 		if (keyWindows == null) {
-			keyWindows = Maps.newHashMap();
+			keyWindows = new HashMap<>();
 			windows.put(key, keyWindows);
 		}
 
@@ -260,7 +261,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
 	@Override
 	public void processWatermark(Watermark mark) throws Exception {
-		Set<Long> toRemove = Sets.newHashSet();
+		Set<Long> toRemove = new HashSet<>();
 
 		for (Map.Entry<Long, Set<TriggerContext>> triggers: watermarkTimers.entrySet()) {
 			if (triggers.getKey() <= mark.getTimestamp()) {
@@ -280,7 +281,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
 	@Override
 	public void trigger(long time) throws Exception {
-		Set<Long> toRemove = Sets.newHashSet();
+		Set<Long> toRemove = new HashSet<>();
 
 		for (Map.Entry<Long, Set<TriggerContext>> triggers: processingTimeTimers.entrySet()) {
 			if (triggers.getKey() < time) {
@@ -317,7 +318,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 			Set<TriggerContext> triggers = processingTimeTimers.get(time);
 			if (triggers == null) {
 				getRuntimeContext().registerTimer(time, WindowOperator.this);
-				triggers = Sets.newHashSet();
+				triggers = new HashSet<>();
 				processingTimeTimers.put(time, triggers);
 			}
 			triggers.add(this);
@@ -327,7 +328,7 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 		public void registerWatermarkTimer(long time) {
 			Set<TriggerContext> triggers = watermarkTimers.get(time);
 			if (triggers == null) {
-				triggers = Sets.newHashSet();
+				triggers = new HashSet<>();
 				watermarkTimers.put(time, triggers);
 			}
 			triggers.add(this);

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
new file mode 100644
index 0000000..ec90bff
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A special exception that signifies that the cause exception came from a chained operator.
+ */
+public class ExceptionInChainedOperatorException extends RuntimeException {
+
+	private static final long serialVersionUID = 1L;
+
+	public ExceptionInChainedOperatorException(Throwable cause) {
+		this("Could not forward element to next operator", cause);
+	}
+
+	public ExceptionInChainedOperatorException(String message, Throwable cause) {
+		super(message, requireNonNull(cause));
+	}
+	
+	public Throwable getOriginalCause() {
+		Throwable ex = this;
+		do {
+			ex = ex.getCause();
+		} while (ex instanceof ExceptionInChainedOperatorException);
+		return ex; 
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 89eac92..5316ae4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
 
@@ -32,6 +33,8 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 
 	@Override
 	public void init() throws Exception {
+		StreamConfig configuration = getConfiguration();
+		
 		TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
 		int numberOfInputs = configuration.getNumberOfInputs();
 
@@ -52,10 +55,13 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 
 	@Override
 	protected void run() throws Exception {
-		while (running && inputProcessor.processInput(streamOperator, lock)) {
-			if (timerException != null) {
-				throw timerException;
-			}
+		// cache some references on the stack, to make the code more JIT friendly
+		final OneInputStreamOperator<IN, OUT> operator = this.headOperator;
+		final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
+		final Object lock = getCheckpointLock();
+		
+		while (running && inputProcessor.processInput(operator, lock)) {
+			checkTimerException();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
new file mode 100644
index 0000000..9df3a5d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.CollectorWrapper;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
+import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OperatorChain<OUT> {
+	
+	private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class);
+	
+	private final StreamOperator<?>[] allOperators;
+	
+	private final RecordWriterOutput<?>[] streamOutputs;
+	
+	private final Output<StreamRecord<OUT>> chainEntryPoint;
+	
+
+	public OperatorChain(StreamTask<OUT, ?> containingTask,
+							StreamOperator<OUT> headOperator,
+							AccumulatorRegistry.Reporter reporter) {
+		
+		final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
+		final StreamConfig configuration = containingTask.getConfiguration();
+		final boolean enableTimestamps = containingTask.getExecutionConfig().areTimestampsEnabled();
+
+		// we read the chained configs, and the order of record writer registrations by output name
+		Map<Integer, StreamConfig> chainedConfigs = configuration.getTransitiveChainedTaskConfigs(userCodeClassloader);
+		chainedConfigs.put(configuration.getVertexID(), configuration);
+
+		// create the final output stream writers
+		// we iterate through all the out edges from this job vertex and create a stream output
+		List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader);
+		Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap = new HashMap<>(outEdgesInOrder.size());
+		this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];
+		
+		for (int i = 0; i < outEdgesInOrder.size(); i++) {
+			StreamEdge outEdge = outEdgesInOrder.get(i);
+			
+			RecordWriterOutput<?> streamOutput = createStreamOutput(
+					outEdge, chainedConfigs.get(outEdge.getSourceId()), i,
+					containingTask.getEnvironment(), enableTimestamps, reporter, containingTask.getName());
+
+			streamOutputMap.put(outEdge, streamOutput);
+			this.streamOutputs[i] = streamOutput;
+		}
+
+		// we create the chain of operators and grab the collector that leads into the chain
+		List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
+		this.chainEntryPoint = createOutputCollector(containingTask, configuration,
+				chainedConfigs, userCodeClassloader, streamOutputMap, allOps);
+		
+		this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size() + 1]);
+		
+		// add the head operator to the end of the list
+		this.allOperators[this.allOperators.length - 1] = headOperator;
+	}
+
+	// 
+	
+	public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException, InterruptedException {
+		CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp);
+		for (RecordWriterOutput<?> streamOutput : streamOutputs) {
+			streamOutput.broadcastEvent(barrier);
+		}
+	}
+	
+	public RecordWriterOutput<?>[] getStreamOutputs() {
+		return streamOutputs;
+	}
+	
+	public StreamOperator<?>[] getAllOperators() {
+		return allOperators;
+	}
+
+	public Output<StreamRecord<OUT>> getChainEntryPoint() {
+		return chainEntryPoint;
+	}
+
+	/**
+	 *
+	 * This method should be called before finishing the record emission, to make sure any data
+	 * that is still buffered will be sent. It also ensures that all data sending related
+	 * exceptions are recognized.
+	 *
+	 * @throws IOException Thrown, if the buffered data cannot be pushed into the output streams.
+	 */
+	public void flushOutputs() throws IOException {
+		for (RecordWriterOutput<?> streamOutput : getStreamOutputs()) {
+			streamOutput.flush();
+		}
+	}
+
+	/**
+	 * This method releases all resources of the record writer output. It stops the output
+	 * flushing thread (if there is one) and releases all buffers currently held by the output
+	 * serializers.
+	 *
+	 * <p>This method should never fail.
+	 */
+	public void releaseOutputs() {
+		try {
+			for (RecordWriterOutput<?> streamOutput : streamOutputs) {
+				streamOutput.close();
+			}
+		}
+		finally {
+			// make sure that we release the buffers in any case
+			for (RecordWriterOutput<?> output : streamOutputs) {
+				output.clearBuffers();
+			}
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  initialization utilities
+	// ------------------------------------------------------------------------
+	
+	private static <T> Output<StreamRecord<T>> createOutputCollector(
+			StreamTask<?, ?> containingTask,
+			StreamConfig operatorConfig,
+			Map<Integer, StreamConfig> chainedConfigs,
+			ClassLoader userCodeClassloader,
+			Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
+			List<StreamOperator<?>> allOperators)
+	{
+		// We create a wrapper that will encapsulate the chained operators and network outputs
+		OutputSelectorWrapper<T> outputSelectorWrapper = operatorConfig.getOutputSelectorWrapper(userCodeClassloader);
+		CollectorWrapper<T> wrapper = new CollectorWrapper<T>(outputSelectorWrapper);
+
+		// create collectors for the network outputs
+		for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
+			@SuppressWarnings("unchecked")
+			RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);
+			wrapper.addCollector(output, outputEdge);
+		}
+
+		// Create collectors for the chained outputs
+		for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
+			int outputId = outputEdge.getTargetId();
+			StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
+
+			Output<StreamRecord<T>> output = createChainedOperator(
+					containingTask, chainedOpConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators);
+			wrapper.addCollector(output, outputEdge);
+		}
+		return wrapper;
+	}
+	
+	private static <IN, OUT> Output<StreamRecord<IN>> createChainedOperator(
+			StreamTask<?, ?> containingTask,
+			StreamConfig operatorConfig,
+			Map<Integer, StreamConfig> chainedConfigs,
+			ClassLoader userCodeClassloader,
+			Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
+			List<StreamOperator<?>> allOperators)
+	{
+		// create the output that the operator writes to first. this may recursively create more operators
+		Output<StreamRecord<OUT>> output = createOutputCollector(
+				containingTask, operatorConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators);
+
+		// now create the operator and give it the output collector to write its output to
+		OneInputStreamOperator<IN, OUT> chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader);
+		chainedOperator.setup(containingTask, operatorConfig, output);
+
+		allOperators.add(chainedOperator);
+
+		if (containingTask.getExecutionConfig().isObjectReuseEnabled() || chainedOperator.isInputCopyingDisabled()) {
+			return new ChainingOutput<IN>(chainedOperator);
+		}
+		else {
+			TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
+			return new CopyingChainingOutput<IN>(chainedOperator, inSerializer);
+		}
+	}
+	
+	private static <T> RecordWriterOutput<T> createStreamOutput(
+			StreamEdge edge, StreamConfig upStreamConfig, int outputIndex,
+			Environment taskEnvironment, boolean withTimestamps,
+			AccumulatorRegistry.Reporter reporter, String taskName)
+	{
+		TypeSerializer<T> outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
+
+		@SuppressWarnings("unchecked")
+		StreamPartitioner<T> outputPartitioner = (StreamPartitioner<T>) edge.getPartitioner();
+
+		LOG.debug("Using partitioner {} for output {} of task ", outputPartitioner, outputIndex, taskName);
+		
+		ResultPartitionWriter bufferWriter = taskEnvironment.getWriter(outputIndex);
+
+		StreamRecordWriter<SerializationDelegate<StreamRecord<T>>> output = 
+				new StreamRecordWriter<>(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout());
+		output.setReporter(reporter);
+		
+		return new RecordWriterOutput<T>(output, outSerializer, withTimestamps);
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Collectors for output chaining
+	// ------------------------------------------------------------------------ 
+
+	private static class ChainingOutput<T> implements Output<StreamRecord<T>> {
+		
+		protected final OneInputStreamOperator<T, ?> operator;
+
+		public ChainingOutput(OneInputStreamOperator<T, ?> operator) {
+			this.operator = operator;
+		}
+
+		@Override
+		public void collect(StreamRecord<T> record) {
+			try {
+				operator.setKeyContextElement(record);
+				operator.processElement(record);
+			}
+			catch (Exception e) {
+				throw new ExceptionInChainedOperatorException(e);
+			}
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+			try {
+				operator.processWatermark(mark);
+			}
+			catch (Exception e) {
+				throw new ExceptionInChainedOperatorException(e);
+			}
+		}
+
+		@Override
+		public void close() {
+			try {
+				operator.close();
+			}
+			catch (Exception e) {
+				throw new ExceptionInChainedOperatorException(e);
+			}
+		}
+	}
+
+	private static class CopyingChainingOutput<T> extends ChainingOutput<T> {
+		
+		private final TypeSerializer<T> serializer;
+		
+		private final StreamRecord<T> copyRecord;
+
+		public CopyingChainingOutput(OneInputStreamOperator<T, ?> operator, TypeSerializer<T> serializer) {
+			super(operator);
+			this.serializer = serializer;
+			this.copyRecord = new StreamRecord<T>(null, 0L);
+		}
+
+		@Override
+		public void collect(StreamRecord<T> record) {
+			try {
+				T copy = serializer.copy(record.getValue());
+				copyRecord.replace(copy, record.getTimestamp());
+				
+				operator.setKeyContextElement(copyRecord);
+				operator.processElement(copyRecord);
+			}
+			catch (Exception e) {
+				throw new RuntimeException("Could not forward element to next operator", e);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
deleted file mode 100644
index ce659fc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.io.CollectorWrapper;
-import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class OutputHandler<OUT> {
-	
-	private static final Logger LOG = LoggerFactory.getLogger(OutputHandler.class);
-
-	private final StreamTask<OUT, ?> vertex;
-	
-	/** The classloader used to access all user code */
-	private final ClassLoader userCodeClassloader;
-	
-	
-	private final Output<StreamRecord<OUT>> outerOutput;
-
-	public final List<StreamOperator<?>> chainedOperators;
-
-	private final Map<StreamEdge, RecordWriterOutput<?>> outputMap;
-
-	private final Map<Integer, StreamConfig> chainedConfigs;
-
-	/** Counters for the number of records emitted and bytes written. */
-	protected final AccumulatorRegistry.Reporter reporter;
-
-
-	public OutputHandler(StreamTask<OUT, ?> vertex, Map<String, Accumulator<?,?>> accumulatorMap,
-						AccumulatorRegistry.Reporter reporter) {
-
-		// Initialize some fields
-		this.vertex = vertex;
-		StreamConfig configuration = new StreamConfig(vertex.getTaskConfiguration());
-		this.chainedOperators = new ArrayList<StreamOperator<?>>();
-		this.outputMap = new HashMap<StreamEdge, RecordWriterOutput<?>>();
-		this.userCodeClassloader = vertex.getUserCodeClassLoader();
-
-		// We read the chained configs, and the order of record writer
-		// registrations by output name
-		this.chainedConfigs = configuration.getTransitiveChainedTaskConfigs(userCodeClassloader);
-		this.chainedConfigs.put(configuration.getVertexID(), configuration);
-
-		List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader);
-
-		this.reporter = reporter;
-
-		// We iterate through all the out edges from this job vertex and create
-		// a stream output
-		for (StreamEdge outEdge : outEdgesInOrder) {
-			RecordWriterOutput<?> streamOutput = createStreamOutput(
-					outEdge,
-					outEdge.getTargetId(),
-					chainedConfigs.get(outEdge.getSourceId()),
-					outEdgesInOrder.indexOf(outEdge),
-					reporter);
-			outputMap.put(outEdge, streamOutput);
-		}
-
-		// We create the outer output that will be passed to the first task
-		// in the chain
-		this.outerOutput = createChainedCollector(configuration, accumulatorMap);
-		
-		// Add the head operator to the end of the list
-		this.chainedOperators.add(vertex.streamOperator);
-	}
-
-	public void broadcastBarrier(long id, long timestamp) throws IOException, InterruptedException {
-		CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp);
-		for (RecordWriterOutput<?> streamOutput : outputMap.values()) {
-			streamOutput.broadcastEvent(barrier);
-		}
-	}
-
-	public Collection<RecordWriterOutput<?>> getOutputs() {
-		return outputMap.values();
-	}
-	
-	public List<StreamOperator<?>> getChainedOperators(){
-		return chainedOperators;
-	}
-
-	/**
-	 * This method builds up a nested output which encapsulates all the
-	 * chained operators and their network output. The result of this recursive
-	 * call will be passed as output to the first operator in the chain.
-	 *
-	 * @param chainedTaskConfig
-	 * 		The configuration of the starting operator of the chain, we
-	 * 		use this paramater to recursively build the whole chain
-	 * @return Returns the output for the chain starting from the given
-	 * config
-	 */
-	@SuppressWarnings("unchecked")
-	private <X> Output<StreamRecord<X>> createChainedCollector(StreamConfig chainedTaskConfig, Map<String, Accumulator<?,?>> accumulatorMap) {
-		
-		// We create a wrapper that will encapsulate the chained operators and
-		// network outputs
-
-		OutputSelectorWrapper<?> outputSelectorWrapper = chainedTaskConfig.getOutputSelectorWrapper(userCodeClassloader);
-		CollectorWrapper wrapper = new CollectorWrapper(outputSelectorWrapper);
-
-		// Create collectors for the network outputs
-		for (StreamEdge outputEdge : chainedTaskConfig.getNonChainedOutputs(userCodeClassloader)) {
-			Output<?> output = outputMap.get(outputEdge);
-
-			wrapper.addCollector(output, outputEdge);
-		}
-
-		// Create collectors for the chained outputs
-		for (StreamEdge outputEdge : chainedTaskConfig.getChainedOutputs(userCodeClassloader)) {
-			Integer outputId = outputEdge.getTargetId();
-
-			Output<?> output = createChainedCollector(chainedConfigs.get(outputId), accumulatorMap);
-
-			wrapper.addCollector(output, outputEdge);
-		}
-
-		if (chainedTaskConfig.isChainStart()) {
-			// The current task is the first chained task at this vertex so we
-			// return the wrapper
-			return (Output<StreamRecord<X>>) wrapper;
-		}
-		else {
-			// The current task is a part of the chain so we get the chainable
-			// operator which will be returned and set it up using the wrapper
-			OneInputStreamOperator chainableOperator =
-					chainedTaskConfig.getStreamOperator(userCodeClassloader);
-
-			StreamingRuntimeContext chainedContext = vertex.createRuntimeContext(chainedTaskConfig, accumulatorMap);
-			vertex.contexts.add(chainedContext);
-			
-			chainableOperator.setup(wrapper, chainedContext);
-
-			chainedOperators.add(chainableOperator);
-			if (vertex.getExecutionConfig().isObjectReuseEnabled() || chainableOperator.isInputCopyingDisabled()) {
-				return new ChainingOutput<X>(chainableOperator);
-			}
-			else {
-				TypeSerializer<X> typeSer = chainedTaskConfig.getTypeSerializerIn1(userCodeClassloader);
-				TypeSerializer<StreamRecord<X>> inSerializer;
-				
-				if (vertex.getExecutionConfig().areTimestampsEnabled()) {
-					inSerializer = (TypeSerializer<StreamRecord<X>>) 
-							(TypeSerializer<?>) new MultiplexingStreamRecordSerializer<X>(typeSer);
-				}
-				else {
-					inSerializer = new StreamRecordSerializer<X>(typeSer);
-				}
-				
-				return new CopyingChainingOutput<X>(chainableOperator, inSerializer);
-			}
-		}
-
-	}
-
-	public Output<StreamRecord<OUT>> getOutput() {
-		return outerOutput;
-	}
-
-	/**
-	 * We create the StreamOutput for the specific output given by the id, and
-	 * the configuration of its source task
-	 *
-	 * @param outputVertex
-	 * 		Name of the output to which the stream output will be set up
-	 * @param upStreamConfig
-	 * 		The config of upStream task
-	 * @return The created StreamOutput
-	 */
-	private <T> RecordWriterOutput<T> createStreamOutput(StreamEdge edge, Integer outputVertex,
-			StreamConfig upStreamConfig, int outputIndex, AccumulatorRegistry.Reporter reporter) {
-
-		TypeSerializer<T> outSerializer = upStreamConfig.getTypeSerializerOut1(vertex.userClassLoader);
-
-		@SuppressWarnings("unchecked")
-		StreamPartitioner<T> outputPartitioner = (StreamPartitioner<T>) edge.getPartitioner();
-
-		ResultPartitionWriter bufferWriter = vertex.getEnvironment().getWriter(outputIndex);
-
-		StreamRecordWriter<SerializationDelegate<StreamRecord<T>>> output = 
-				new StreamRecordWriter<>(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout());
-
-		output.setReporter(reporter);
-		
-		RecordWriterOutput<T> streamOutput = 
-				new RecordWriterOutput<T>(output, outSerializer, vertex.getExecutionConfig().areTimestampsEnabled());
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Partitioner set: {} with {} outputs for {}", outputPartitioner.getClass()
-					.getSimpleName(), outputIndex, vertex.getClass().getSimpleName());
-		}
-
-		return streamOutput;
-	}
-
-	/**
-	 * 
-	 * This method should be called before finishing the record emission, to make sure any data
-	 * that is still buffered will be sent. It also ensures that all data sending related
-	 * exceptions are recognized.
-	 * 
-	 * @throws IOException Thrown, if the buffered data cannot be pushed into the output streams.
-	 */
-	public void flushOutputs() throws IOException {
-		for (RecordWriterOutput<?> streamOutput : getOutputs()) {
-			streamOutput.flush();
-		}
-	}
-
-	/**
-	 * This method releases all resources of the record writer output. It stops the output
-	 * flushing thread (if there is one) and releases all buffers currently held by the output
-	 * serializers.
-	 *
-	 * This method should never fail.
-	 */
-	public void releaseOutputs() {
-		try {
-			for (RecordWriterOutput<?> streamOutput : getOutputs()) {
-				streamOutput.close();
-			}
-		}
-		finally {
-			// make sure that we release the buffers in any case
-			for (RecordWriterOutput<?> output : getOutputs()) {
-				output.clearBuffers();
-			}
-		}
-	}
-
-	private static class ChainingOutput<T> implements Output<StreamRecord<T>> {
-		
-		protected final OneInputStreamOperator<T, ?> operator;
-
-		public ChainingOutput(OneInputStreamOperator<T, ?> operator) {
-			this.operator = operator;
-		}
-
-		@Override
-		public void collect(StreamRecord<T> record) {
-			try {
-				operator.getRuntimeContext().setNextInput(record);
-				operator.processElement(record);
-			}
-			catch (Exception e) {
-				throw new RuntimeException("Could not forward element to next operator", e);
-			}
-		}
-
-		@Override
-		public void emitWatermark(Watermark mark) {
-			try {
-				operator.processWatermark(mark);
-			}
-			catch (Exception e) {
-				throw new RuntimeException("Could not forward watermark to next operator", e);
-			}
-		}
-
-		@Override
-		public void close() {
-			try {
-				operator.close();
-			}
-			catch (Exception e) {
-				throw new RuntimeException("Could not close() call to next operator", e);
-			}
-		}
-	}
-
-	private static class CopyingChainingOutput<T> extends ChainingOutput<T> {
-		private final TypeSerializer<StreamRecord<T>> serializer;
-
-		public CopyingChainingOutput(OneInputStreamOperator<T, ?> operator,
-				TypeSerializer<StreamRecord<T>> serializer) {
-			super(operator);
-			this.serializer = serializer;
-		}
-
-		@Override
-		public void collect(StreamRecord<T> record) {
-			try {
-				operator.getRuntimeContext().setNextInput(record);
-				operator.processElement(serializer.copy(record));
-			}
-			catch (Exception e) {
-				throw new RuntimeException("Could not forward element to next operator", e);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index fc221f8..3d82275 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -51,15 +51,13 @@ public class SourceStreamTask<OUT> extends StreamTask<OUT, StreamSource<OUT>> {
 	@Override
 	protected void run() throws Exception {
 		final Object checkpointLock = getCheckpointLock();
-		
-		final SourceOutput<StreamRecord<OUT>> output = new SourceOutput<>(outputHandler.getOutput(), checkpointLock);
-		
-		streamOperator.run(checkpointLock, output);
+		final SourceOutput<StreamRecord<OUT>> output = new SourceOutput<>(getHeadOutput(), checkpointLock);
+		headOperator.run(checkpointLock, output);
 	}
 	
 	@Override
 	protected void cancelTask() throws Exception {
-		streamOperator.cancel();
+		headOperator.cancel();
 	}
 
 	// ------------------------------------------------------------------------
@@ -95,9 +93,7 @@ public class SourceStreamTask<OUT> extends StreamTask<OUT, StreamSource<OUT>> {
 		@Override
 		public void collect(T record) {
 			synchronized (lockObject) {
-				if (timerException != null) {
-					throw timerException;
-				}
+				checkTimerException();
 				output.collect(record);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index c937e51..2125df1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import java.util.Collection;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -41,7 +40,7 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
 	@Override
 	protected void run() throws Exception {
 		
-		final String iterationId = configuration.getIterationId();
+		final String iterationId = getConfiguration().getIterationId();
 		if (iterationId == null || iterationId.length() == 0) {
 			throw new Exception("Missing iteration ID in the task configuration");
 		}
@@ -49,7 +48,7 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
 		final String brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId ,
 				getEnvironment().getIndexInSubtaskGroup());
 		
-		final long iterationWaitTime = configuration.getIterationWaitTime();
+		final long iterationWaitTime = getConfiguration().getIterationWaitTime();
 		final boolean shouldWait = iterationWaitTime > 0;
 
 		final BlockingQueue<StreamRecord<OUT>> dataChannel = new ArrayBlockingQueue<StreamRecord<OUT>>(1);
@@ -61,8 +60,7 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
 		// do the work 
 		try {
 			@SuppressWarnings("unchecked")
-			Collection<RecordWriterOutput<OUT>> outputs = 
-					(Collection<RecordWriterOutput<OUT>>) (Collection<?>) outputHandler.getOutputs();
+			RecordWriterOutput<OUT>[] outputs = (RecordWriterOutput<OUT>[]) getStreamOutputs();
 
 			// If timestamps are enabled we make sure to remove cyclic watermark dependencies
 			if (getExecutionConfig().areTimestampsEnabled()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
index fdce52d..9bb5311 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
@@ -36,7 +36,7 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
 	public void init() throws Exception {
 		super.init();
 		
-		final String iterationId = configuration.getIterationId();
+		final String iterationId = getConfiguration().getIterationId();
 		if (iterationId == null || iterationId.length() == 0) {
 			throw new Exception("Missing iteration ID in the task configuration");
 		}
@@ -44,7 +44,7 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
 		final String brokerID = StreamIterationHead.createBrokerIdString(getEnvironment().getJobID(), iterationId,
 				getEnvironment().getIndexInSubtaskGroup());
 
-		final long iterationWaitTime = configuration.getIterationWaitTime();
+		final long iterationWaitTime = getConfiguration().getIterationWaitTime();
 
 		LOG.info("Iteration tail {} trying to acquire feedback queue under {}", getName(), brokerID);
 		
@@ -54,7 +54,7 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
 		
 		LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID);
 		
-		this.streamOperator = new RecordPusher<>(dataChannel, iterationWaitTime);
+		this.headOperator = new RecordPusher<>(dataChannel, iterationWaitTime);
 	}
 
 	private static class RecordPusher<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 16b8f55..bbfd233 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -17,121 +17,125 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.functors.NotNullPredicate;
-
 import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
-import org.apache.flink.runtime.state.FileStateHandle;
-import org.apache.flink.runtime.state.LocalStateHandle;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.StatefulStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.state.OperatorStateHandle;
-import org.apache.flink.streaming.api.state.WrapperStateHandle;
-
+import org.apache.flink.streaming.api.state.StateBackend;
+import org.apache.flink.streaming.api.state.StateBackendFactory;
+import org.apache.flink.streaming.api.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.streaming.api.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
- * Base Invokable for all {@code StreamTasks}. A {@code StreamTask} processes input and forwards
- * elements and watermarks to a {@link StreamOperator}.
+ * Base class for all streaming tasks. A task is the unit of local processing that is deployed
+ * and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form
+ * the Task's operator chain. Operators that are chained together execute synchronously in the
+ * same thread and hence on the same stream partition. A common case for these chaines
+ * are successive map/flatmap/filter tasks.
+ * 
+ * <p>The task chain contains one "head" operator and multiple chained operators. 
+ * The StreamTask is specialized for the type of the head operator: one-input and two-input tasks,
+ * as well as for sources, iteration heads and iteration tails.
+ * 
+ * <p>The Task class deals with the setup of the streams read by the head operator, and the streams 
+ * produced by the operators at the ends of the operator chain. Note that the chain may fork and
+ * thus have multiple ends.
  *
+ * The life cycle of the task is set up as follows: 
  * <pre>
- *     
  *  -- registerInputOutput()
  *         |
- *         +----> Create basic utils (config, etc) and load operators
- *         +----> operator specific init()
+ *         +----> Create basic utils (config, etc) and load the chain of operators
+ *         +----> operators.setup()
+ *         +----> task specific init()
  *  
- *  -- restoreState()
+ *  -- restoreState() -> restores state of all operators in the chain
  *  
  *  -- invoke()
  *        |
- *        +----> open operators()
+ *        +----> open-operators()
  *        +----> run()
- *        +----> close operators()
+ *        +----> close-operators()
+ *        +----> dispose-operators()
  *        +----> common cleanup
- *        +----> operator specific cleanup()
+ *        +----> task specific cleanup()
  * </pre>
  *
- * <p>
- * {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a
+ * <p> The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a
  * {@code StreamOperator} must be synchronized on this lock object to ensure that no methods
  * are called concurrently.
  * 
  * @param <OUT>
- * @param <O>
+ * @param <Operator>
  */
-public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends AbstractInvokable implements StatefulTask<StateHandle<Serializable>> {
+public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
+		extends AbstractInvokable
+		implements StatefulTask<StreamTaskStateList> {
 
 	/** The thread group that holds all trigger timer threads */
 	public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
 	
-	private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
-
+	/** The logger used by the StreamTask and its subclasses */
+	protected static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
+	
+	// ------------------------------------------------------------------------
+	
 	/**
 	 * All interaction with the {@code StreamOperator} must be synchronized on this lock object to ensure that
-	 * we don't have concurrent method calls.
+	 * we don't have concurrent method calls that void consistent checkpoints.
 	 */
-	protected final Object lock = new Object();
-
-	private final EventListener<CheckpointBarrier> checkpointBarrierListener;
+	private final Object lock = new Object();
 	
-	protected final List<StreamingRuntimeContext> contexts;
+	/** the head operator that consumes the input streams of this task */
+	protected Operator headOperator;
 
-	protected StreamingRuntimeContext headContext;
+	/** The chain of operators executed by this task */
+	private OperatorChain<OUT> operatorChain;
 	
-	protected StreamConfig configuration;
-
-	protected ClassLoader userClassLoader;
+	/** The configuration of this streaming task */
+	private StreamConfig configuration;
 
-	/** The executor service that */
+	/** The class loader used to load dynamic classes of a job */
+	private ClassLoader userClassLoader;
+	
+	/** The state backend that stores the state and checkpoints for this task */
+	private StateBackend<?> stateBackend;
+	
+	/** The executor service that schedules and calls the triggers of this task*/
 	private ScheduledExecutorService timerService;
+	
+	/** The map of user-defined accumulators of this task */
+	private Map<String, Accumulator<?, ?>> accumulatorMap;
 
-	/**
-	 * This field is used to forward an exception that is caught in the timer thread. Subclasses
-	 * must ensure that exceptions stored here get thrown on the actual execution Thread.
-	 */
-	protected volatile TimerException timerException = null;
-
-	protected OutputHandler<OUT> outputHandler;
-
-	protected O streamOperator;
-
-	protected boolean hasChainedOperators;
-
+	/** This field is used to forward an exception that is caught in the timer thread. Subclasses
+	 * must ensure that exceptions stored here get thrown on the actual execution Thread. */
+	private volatile TimerException timerException;
+	
 	/** Flag to mark the task "in operation", in which case check
 	 * needs to be initialized to true, so that early cancel() before invoke() behaves correctly */
 	private volatile boolean isRunning;
 	
-	// ------------------------------------------------------------------------
-	
-	public StreamTask() {
-		checkpointBarrierListener = new CheckpointBarrierListener();
-		contexts = new ArrayList<>();
-	}
 
 	// ------------------------------------------------------------------------
 	//  Life cycle methods for specific implementations
@@ -152,34 +156,27 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 	@Override
 	public final void registerInputOutput() throws Exception {
 		LOG.debug("Begin initialization for {}", getName());
+
+		AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry();
 		
 		userClassLoader = getUserCodeClassLoader();
 		configuration = new StreamConfig(getTaskConfiguration());
-
-		streamOperator = configuration.getStreamOperator(userClassLoader);
-
-		// Create and register Accumulators
-		AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry();
-		Map<String, Accumulator<?, ?>> accumulatorMap = accumulatorRegistry.getUserMap();
-		AccumulatorRegistry.Reporter reporter = accumulatorRegistry.getReadWriteReporter();
-
-		outputHandler = new OutputHandler<>(this, accumulatorMap, reporter);
-
-		if (streamOperator != null) {
-			// IterationHead and IterationTail don't have an Operator...
-
-			//Create context of the head operator
-			headContext = createRuntimeContext(configuration, accumulatorMap);
-			this.contexts.add(headContext);
-			streamOperator.setup(outputHandler.getOutput(), headContext);
+		accumulatorMap = accumulatorRegistry.getUserMap();
+		
+		stateBackend = createStateBackend();
+		stateBackend.initializeForJob(getEnvironment().getJobID());
+		
+		headOperator = configuration.getStreamOperator(userClassLoader);
+		operatorChain = new OperatorChain<>(this, headOperator, accumulatorRegistry.getReadWriteReporter());
+		
+		if (headOperator != null) {
+			headOperator.setup(this, configuration, operatorChain.getChainEntryPoint());
 		}
 
-		hasChainedOperators = outputHandler.getChainedOperators().size() != 1;
-
-		this.timerService = Executors.newSingleThreadScheduledExecutor(
+		timerService = Executors.newSingleThreadScheduledExecutor(
 				new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));
 
-		// operator specific initialization
+		// task specific initialization
 		init();
 		
 		LOG.debug("Finish initialization for {}", getName());
@@ -211,7 +208,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 			closeAllOperators();
 			
 			// make sure all data is flushed
-			outputHandler.flushOutputs();
+			operatorChain.flushOutputs();
 
 			// make an attempt to dispose the operators such that failures in the dispose call
 			// still let the computation fail
@@ -224,8 +221,8 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 			timerService.shutdown();
 			
 			// release the output resources. this method should never fail.
-			if (outputHandler != null) {
-				outputHandler.releaseOutputs();
+			if (operatorChain != null) {
+				operatorChain.releaseOutputs();
 			}
 
 			// we must! perform this cleanup
@@ -252,41 +249,33 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 	}
 	
 	private void openAllOperators() throws Exception {
-		for (StreamOperator<?> operator : outputHandler.getChainedOperators()) {
-			if (operator != null) {
-				operator.open(getTaskConfiguration());
-			}
+		for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
+			operator.open();
 		}
 	}
 
 	private void closeAllOperators() throws Exception {
 		// We need to close them first to last, since upstream operators in the chain might emit
 		// elements in their close methods.
-		for (int i = outputHandler.getChainedOperators().size() - 1; i >= 0; i--) {
-			StreamOperator<?> operator = outputHandler.getChainedOperators().get(i);
-			if (operator != null) {
-				operator.close();
-			}
+		StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
+		for (int i = allOperators.length - 1; i >= 0; i--) {
+			allOperators[i].close();
 		}
 	}
 
 	private void tryDisposeAllOperators() throws Exception {
-		for (StreamOperator<?> operator : outputHandler.getChainedOperators()) {
-			if (operator != null) {
-				operator.dispose();
-			}
+		for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
+			operator.dispose();
 		}
 	}
 	
 	private void disposeAllOperators() {
-		for (StreamOperator<?> operator : outputHandler.getChainedOperators()) {
-			if (operator != null) {
-				try {
-					operator.dispose();
-				}
-				catch (Throwable t) {
-					LOG.error("Error during disposal of stream operator.", t);
-				}
+		for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
+			try {
+				operator.dispose();
+			}
+			catch (Throwable t) {
+				LOG.error("Error during disposal of stream operator.", t);
 			}
 		}
 	}
@@ -300,8 +289,8 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 	 * shutdown is attempted, and cause threads to linger for longer than needed.
 	 */
 	@Override
-	@SuppressWarnings("FinalizeDoesntCallSuperFinalize")
-	protected void finalize() {
+	protected void finalize() throws Throwable {
+		super.finalize();
 		if (timerService != null) {
 			if (!timerService.isTerminated()) {
 				LOG.warn("Timer service was not shut down. Shutting down in finalize().");
@@ -322,73 +311,84 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 		return getEnvironment().getTaskNameWithSubtasks();
 	}
 
+	/**
+	 * Gets the lock object on which all operations that involve data and state mutation have to lock. 
+	 
+	 * @return The checkpoint lock object.
+	 */
 	public Object getCheckpointLock() {
 		return lock;
 	}
+	
+	public StreamConfig getConfiguration() {
+		return configuration;
+	}
+
+	public Map<String, Accumulator<?, ?>> getAccumulatorMap() {
+		return accumulatorMap;
+	}
+	
+	public Output<StreamRecord<OUT>> getHeadOutput() {
+		return operatorChain.getChainEntryPoint();
+	}
+	
+	public RecordWriterOutput<?>[] getStreamOutputs() {
+		return operatorChain.getStreamOutputs();
+	}
 
 	// ------------------------------------------------------------------------
 	//  Checkpoint and Restore
 	// ------------------------------------------------------------------------
-
-	@SuppressWarnings("unchecked")
+	
 	@Override
-	public void setInitialState(StateHandle<Serializable> stateHandle) throws Exception {
-
-		// We retrieve end restore the states for the chained operators.
-		List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>> chainedStates = 
-				(List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>>) stateHandle.getState(this.userClassLoader);
-
-		// We restore all stateful operators
-		for (int i = 0; i < chainedStates.size(); i++) {
-			Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> state = chainedStates.get(i);
-			// If state is not null we need to restore it
-			if (state != null) {
-				StreamOperator<?> chainedOperator = outputHandler.getChainedOperators().get(i);
-				((StatefulStreamOperator<?>) chainedOperator).restoreInitialState(state);
+	public void setInitialState(StreamTaskStateList initialState) throws Exception {
+		LOG.info("Restoring checkpointed state to task {}", getName());
+		
+		final StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
+		final StreamTaskState[] states = initialState.getState(userClassLoader);
+		
+		for (int i = 0; i < states.length; i++) {
+			StreamTaskState state = states[i];
+			StreamOperator<?> operator = allOperators[i];
+			
+			if (state != null && operator != null) {
+				LOG.debug("Task {} in chain ({}) has checkpointed state", i, getName());
+				operator.restoreState(state);
+			}
+			else if (operator != null) {
+				LOG.debug("Task {} in chain ({}) does not have checkpointed state", i, getName());
 			}
 		}
 	}
 
 	@Override
 	public void triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
-
 		LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
 		
 		synchronized (lock) {
 			if (isRunning) {
-				try {
-					// We wrap the states of the chained operators in a list, marking non-stateful operators with null
-					List<Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>>> chainedStates = new ArrayList<>();
 
-					// A wrapper handle is created for the List of statehandles
-					WrapperStateHandle stateHandle;
-					try {
-
-						// We construct a list of states for chained tasks
-						for (StreamOperator<?> chainedOperator : outputHandler.getChainedOperators()) {
-							if (chainedOperator instanceof StatefulStreamOperator) {
-								chainedStates.add(((StatefulStreamOperator<?>) chainedOperator)
-										.getStateSnapshotFromFunction(checkpointId, timestamp));
-							}else{
-								chainedStates.add(null);
-							}
-						}
-
-						stateHandle = CollectionUtils.exists(chainedStates,
-								NotNullPredicate.INSTANCE) ? new WrapperStateHandle(chainedStates) : null;
-					}
-					catch (Exception e) {
-						throw new Exception("Error while drawing snapshot of the user state.", e);
+				// since both state checkpointing and downstream barrier emission occurs in this
+				// lock scope, they are an atomic operation regardless of the order in which they occur
+				// we immediately emit the checkpoint barriers, so the downstream operators can start
+				// their checkpoint work as soon as possible
+				operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);
+				
+				// now draw the state snapshot
+				try {
+					final StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
+					final StreamTaskState[] states = new StreamTaskState[allOperators.length];
+					
+					for (int i = 0; i < states.length; i++) {
+						StreamTaskState state = allOperators[i].snapshotOperatorState(checkpointId, timestamp);
+						states[i] = state.isEmpty() ? null : state;
 					}
 
-					// now emit the checkpoint barriers
-					outputHandler.broadcastBarrier(checkpointId, timestamp);
-
-					// now confirm the checkpoint
-					if (stateHandle == null) {
+					StreamTaskStateList allStates = new StreamTaskStateList(states);
+					if (allStates.isEmpty()) {
 						getEnvironment().acknowledgeCheckpoint(checkpointId);
 					} else {
-						getEnvironment().acknowledgeCheckpoint(checkpointId, stateHandle);
+						getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);
 					}
 				}
 				catch (Exception e) {
@@ -404,64 +404,85 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 	public void notifyCheckpointComplete(long checkpointId) throws Exception {
 		synchronized (lock) {
 			if (isRunning) {
-				for (StreamOperator<?> chainedOperator : outputHandler.getChainedOperators()) {
-					if (chainedOperator instanceof StatefulStreamOperator) {
-						((StatefulStreamOperator<?>) chainedOperator).notifyCheckpointComplete(checkpointId);
-					}
+				LOG.debug("Notification of complete checkpoint for task {}", getName());
+				
+				for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
+					operator.notifyOfCompletedCheckpoint(checkpointId);
 				}
 			}
+			else {
+				LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());
+			}
 		}
 	}
 	
 	// ------------------------------------------------------------------------
 	//  State backend
 	// ------------------------------------------------------------------------
-	
-	private StateHandleProvider<Serializable> getStateHandleProvider() {
-		StateHandleProvider<Serializable> provider = configuration.getStateHandleProvider(userClassLoader);
 
-		// If the user did not specify a provider in the program we try to get it from the config
-		if (provider == null) {
+	/**
+	 * Gets the state backend used by this task. The state backend defines how to maintain the
+	 * key/value state and how and where to store state snapshots.
+	 * 
+	 * @return The state backend used by this task.
+	 */
+	public StateBackend<?> getStateBackend() {
+		return stateBackend;
+	}
+	
+	private StateBackend<?> createStateBackend() throws Exception {
+		StateBackend<?> configuredBackend = configuration.getStateBackend(userClassLoader);
+		
+		if (configuredBackend != null) {
+			// backend has been configured on the environment
+			LOG.info("Using user-defined state backend: " + configuredBackend);
+			return configuredBackend;
+		}
+		else {
+			// see if we have a backend specified in the configuration
 			Configuration flinkConfig = getEnvironment().getTaskManagerInfo().getConfiguration();
-			String backendName = flinkConfig.getString(ConfigConstants.STATE_BACKEND,
-					ConfigConstants.DEFAULT_STATE_BACKEND).toUpperCase();
-
-			StateBackend backend;
-
-			try {
-				backend = StateBackend.valueOf(backendName);
-			} catch (Exception e) {
-				throw new RuntimeException(backendName + " is not a valid state backend.\nSupported backends: jobmanager, filesystem.");
+			String backendName = flinkConfig.getString(ConfigConstants.STATE_BACKEND, null);
+			
+			if (backendName == null) {
+				LOG.warn("No state backend has been specified, using default state backend (Memory / JobManager)");
+				backendName = "jobmanager";
 			}
+			
+			backendName = backendName.toLowerCase();
+			switch (backendName) {
+				case "jobmanager":
+					LOG.info("State backend is set to heap memory (checkpoint to jobmanager)");
+					return MemoryStateBackend.defaultInstance();
+				
+				case "filesystem":
+					FsStateBackend backend = new FsStateBackendFactory().createFromConfig(flinkConfig);
+					LOG.info("State backend is set to heap memory (checkpoints to filesystem \""
+							+ backend.getBasePath() + "\")");
+					return backend;
+				
+				default:
+					try {
+						@SuppressWarnings("rawtypes")
+						Class<? extends StateBackendFactory> clazz =
+								Class.forName(backendName, false, userClassLoader).asSubclass(StateBackendFactory.class);
 
-			switch (backend) {
-				case JOBMANAGER:
-					LOG.info("State backend for state checkpoints is set to jobmanager.");
-					return new LocalStateHandle.LocalStateHandleProvider<>();
-				case FILESYSTEM:
-					String checkpointDir = flinkConfig.getString(ConfigConstants.STATE_BACKEND_FS_DIR, null);
-					if (checkpointDir != null) {
-						LOG.info("State backend for state checkpoints is set to filesystem with directory: "
-								+ checkpointDir);
-						return FileStateHandle.createProvider(checkpointDir);
-					} else {
-						throw new RuntimeException(
-								"For filesystem checkpointing, a checkpoint directory needs to be specified.\nFor example: \"state.backend.dir: hdfs://checkpoints\"");
+						return (StateBackend<?>) clazz.newInstance();
+					}
+					catch (ClassNotFoundException e) {
+						throw new IllegalConfigurationException("Cannot find configured state backend: " + backendName);
+					}
+					catch (ClassCastException e) {
+						throw new IllegalConfigurationException("The class configured under '" +
+								ConfigConstants.STATE_BACKEND + "' is not a valid state backend factory (" +
+								backendName + ')');
+					}
+					catch (Throwable t) {
+						throw new IllegalConfigurationException("Cannot create configured state backend", t);
 					}
-				default:
-					throw new RuntimeException("Backend " + backend + " is not supported yet.");
 			}
-
-		} else {
-			LOG.info("Using user defined state backend for streaming checkpoitns.");
-			return provider;
 		}
 	}
 
-	private enum StateBackend {
-		JOBMANAGER, FILESYSTEM
-	}
-
 	/**
 	 * Registers a timer.
 	 */
@@ -473,41 +494,37 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 				delay,
 				TimeUnit.MILLISECONDS);
 	}
+	
+	public void checkTimerException() throws TimerException {
+		if (timerException != null) {
+			throw timerException;
+		}
+	}
 
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
-
-	public StreamingRuntimeContext createRuntimeContext(StreamConfig conf, Map<String, Accumulator<?,?>> accumulatorMap) {
-		KeySelector<?,Serializable> statePartitioner = conf.getStatePartitioner(userClassLoader);
-
-		return new StreamingRuntimeContext(getEnvironment(), getExecutionConfig(),
-				statePartitioner, getStateHandleProvider(), accumulatorMap, this);
-	}
 	
 	@Override
 	public String toString() {
 		return getName();
 	}
 
-	// ------------------------------------------------------------------------
-
-	public EventListener<CheckpointBarrier> getCheckpointBarrierListener() {
-		return this.checkpointBarrierListener;
-	}
-	
-	private class CheckpointBarrierListener implements EventListener<CheckpointBarrier> {
-
-		@Override
-		public void onEvent(CheckpointBarrier barrier) {
-			try {
-				triggerCheckpoint(barrier.getId(), barrier.getTimestamp());
-			}
-			catch (Exception e) {
-				throw new RuntimeException("Error triggering a checkpoint as the result of receiving checkpoint barrier", e);
+	protected final EventListener<CheckpointBarrier> getCheckpointBarrierListener() {
+		return new EventListener<CheckpointBarrier>() {
+			@Override
+			public void onEvent(CheckpointBarrier barrier) {
+				try {
+					triggerCheckpoint(barrier.getId(), barrier.getTimestamp());
+				}
+				catch (Exception e) {
+					throw new RuntimeException("Error triggering a checkpoint as the result of receiving checkpoint barrier", e);
+				}
 			}
-		}
+		};
 	}
+	
+	// ------------------------------------------------------------------------
 
 	/**
 	 * Internal task that is invoked by the timer service and triggers the target.

http://git-wip-us.apache.org/repos/asf/flink/blob/479bec0b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
new file mode 100644
index 0000000..2fce7af
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.streaming.api.state.KvStateSnapshot;
+
+import java.io.Serializable;
+
+/**
+ * The state checkpointed by a {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator}.
+ * This state consists of any combination of those three:
+ * <ul>
+ *     <li>The state of the stream operator, if it implements the Checkpointed interface.</li>
+ *     <li>The state of the user function, if it implements the Checkpointed interface.</li>
+ *     <li>The key/value state of the operator, if it executes on a KeyedDataStream.</li>
+ * </ul>
+ */
+public class StreamTaskState implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+	
+	private StateHandle<?> operatorState;
+
+	private StateHandle<Serializable> functionState;
+
+	private KvStateSnapshot<?, ?, ?> kvState;
+
+	// ------------------------------------------------------------------------
+
+	public StateHandle<?> getOperatorState() {
+		return operatorState;
+	}
+
+	public void setOperatorState(StateHandle<?> operatorState) {
+		this.operatorState = operatorState;
+	}
+
+	public StateHandle<Serializable> getFunctionState() {
+		return functionState;
+	}
+
+	public void setFunctionState(StateHandle<Serializable> functionState) {
+		this.functionState = functionState;
+	}
+
+	public KvStateSnapshot<?, ?, ?> getKvState() {
+		return kvState;
+	}
+
+	public void setKvState(KvStateSnapshot<?, ?, ?> kvState) {
+		this.kvState = kvState;
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Checks if this state object actually contains any state, or if all of the state
+	 * fields are null.
+	 * 
+	 * @return True, if all state is null, false if at least one state is not null.
+	 */
+	public boolean isEmpty() {
+		return operatorState == null & functionState == null & kvState == null;
+	}
+
+	/**
+	 * Discards all the contained states and sets them to null.
+	 * 
+	 * @throws Exception Forwards exceptions that occur when releasing the
+	 *                   state handles and snapshots.
+	 */
+	public void discardState() throws Exception {
+		StateHandle<?> operatorState = this.operatorState;
+		StateHandle<?> functionState = this.functionState;
+		KvStateSnapshot<?, ?, ?> kvState = this.kvState;
+		
+		if (operatorState != null) {
+			operatorState.discardState();
+		}
+		if (functionState != null) {
+			functionState.discardState();
+		}
+		if (kvState != null) {
+			kvState.discardState();
+		}
+
+		this.operatorState = null;
+		this.functionState = null;
+		this.kvState = null;
+	}
+}