You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:46 UTC

[30/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
deleted file mode 100644
index 80df72e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.streamrecord;
-
-import org.apache.flink.streaming.api.watermark.Watermark;
-
-/**
- * An element in a data stream. Can be a record or a Watermark.
- */
-public abstract class StreamElement {
-	
-	/**
-	 * Checks whether this element is a watermark.
-	 * @return True, if this element is a watermark, false otherwise.
-	 */
-	public final boolean isWatermark() {
-		return getClass() == Watermark.class;
-	}
-
-	/**
-	 * Checks whether this element is a record.
-	 * @return True, if this element is a record, false otherwise.
-	 */
-	public final boolean isRecord() {
-		return getClass() == StreamRecord.class;
-	}
-
-	/**
-	 * Casts this element into a StreamRecord.
-	 * @return This element as a stream record.
-	 * @throws java.lang.ClassCastException Thrown, if this element is actually not a stream record.
-	 */
-	@SuppressWarnings("unchecked")
-	public final <E> StreamRecord<E> asRecord() {
-		return (StreamRecord<E>) this;
-	}
-
-	/**
-	 * Casts this element into a Watermark.
-	 * @return This element as a Watermark.
-	 * @throws java.lang.ClassCastException Thrown, if this element is actually not a Watermark.
-	 */
-	public final Watermark asWatermark() {
-		return (Watermark) this;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
deleted file mode 100644
index 348b974..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.streamrecord;
-
-/**
- * One value in a data stream. This stores the value and the associated timestamp.
- * 
- * @param <T> The type encapsulated with the stream record.
- */
-public class StreamRecord<T> extends StreamElement {
-	
-	/** The actual value held by this record */
-	private T value;
-	
-	/** The timestamp of the record */
-	private long timestamp;
-
-	/**
-	 * Creates a new {@link StreamRecord} wrapping the given value. The timestamp is set to the
-	 * result of {@code new Instant(0)}.
-	 */
-	public StreamRecord(T value) {
-		this(value, Long.MIN_VALUE + 1);
-		// be careful to set it to MIN_VALUE + 1, because MIN_VALUE is reserved as the
-		// special tag to signify that a transmitted element is a Watermark in StreamRecordSerializer
-	}
-
-	/**
-	 * Creates a new {@link StreamRecord} wrapping the given value. The timestamp is set to the
-	 * given timestamp.
-	 *
-	 * @param value The value to wrap in this {@link StreamRecord}
-	 * @param timestamp The timestamp in milliseconds
-	 */
-	public StreamRecord(T value, long timestamp) {
-		this.value = value;
-		this.timestamp = timestamp;
-	}
-
-	/**
-	 * Returns the value wrapped in this stream value.
-	 */
-	public T getValue() {
-		return value;
-	}
-
-	/**
-	 * Returns the timestamp associated with this stream value in milliseconds.
-	 */
-	public long getTimestamp() {
-		return timestamp;
-	}
-
-	/**
-	 * Replace the currently stored value by the given new value. This returns a StreamElement
-	 * with the generic type parameter that matches the new value while keeping the old
-	 * timestamp.
-	 *
-	 * @param element Element to set in this stream value
-	 * @return Returns the StreamElement with replaced value
-	 */
-	@SuppressWarnings("unchecked")
-	public <X> StreamRecord<X> replace(X element) {
-		this.value = (T) element;
-		return (StreamRecord<X>) this;
-	}
-
-	/**
-	 * Replace the currently stored value by the given new value and the currently stored
-	 * timestamp with the new timestamp. This returns a StreamElement with the generic type
-	 * parameter that matches the new value.
-	 *
-	 * @param value The new value to wrap in this {@link StreamRecord}
-	 * @param timestamp The new timestamp in milliseconds
-	 * @return Returns the StreamElement with replaced value
-	 */
-	@SuppressWarnings("unchecked")
-	public <X> StreamRecord<X> replace(X value, long timestamp) {
-		this.timestamp = timestamp;
-		this.value = (T) value;
-		return (StreamRecord<X>) this;
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		StreamRecord<?> that = (StreamRecord<?>) o;
-
-		return value.equals(that.value) && timestamp == that.timestamp;
-	}
-
-	@Override
-	public int hashCode() {
-		int result = value != null ? value.hashCode() : 0;
-		result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
-		return result;
-	}
-
-	@Override
-	public String toString() {
-		return "Record{" + value + "; " + timestamp + '}';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
deleted file mode 100644
index d47da50..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.streamrecord;
-
-import java.io.IOException;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-/**
- * Serializer for {@link StreamRecord}. This version ignores timestamps and only deals with
- * the element.
- *
- * <p>
- * {@link MultiplexingStreamRecordSerializer} is a version that deals with timestamps and also
- * multiplexes {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks} in the same
- * stream with {@link StreamRecord StreamRecords}.
- *
- * @see MultiplexingStreamRecordSerializer
- *
- * @param <T> The type of value in the {@link StreamRecord}
- */
-public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord<T>> {
-
-	private static final long serialVersionUID = 1L;
-
-	private final TypeSerializer<T> typeSerializer;
-	
-
-	public StreamRecordSerializer(TypeSerializer<T> serializer) {
-		if (serializer instanceof StreamRecordSerializer) {
-			throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer);
-		}
-		this.typeSerializer = Preconditions.checkNotNull(serializer);
-	}
-
-	public TypeSerializer<T> getContainedTypeSerializer() {
-		return this.typeSerializer;
-	}
-	
-	// ------------------------------------------------------------------------
-	//  General serializer and type utils
-	// ------------------------------------------------------------------------
-
-	@Override
-	public StreamRecordSerializer<T> duplicate() {
-		TypeSerializer<T> serializerCopy = typeSerializer.duplicate();
-		return serializerCopy == typeSerializer ? this : new StreamRecordSerializer<T>(serializerCopy);
-	}
-
-	@Override
-	public boolean isImmutableType() {
-		return false;
-	}
-
-	@Override
-	public int getLength() {
-		return typeSerializer.getLength();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Type serialization, copying, instantiation
-	// ------------------------------------------------------------------------
-
-	@Override
-	public StreamRecord<T> createInstance() {
-		try {
-			return new StreamRecord<T>(typeSerializer.createInstance());
-		} catch (Exception e) {
-			throw new RuntimeException("Cannot instantiate StreamRecord.", e);
-		}
-	}
-	
-	@Override
-	public StreamRecord<T> copy(StreamRecord<T> from) {
-		return new StreamRecord<T>(typeSerializer.copy(from.getValue()), from.getTimestamp());
-	}
-
-	@Override
-	public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> reuse) {
-		reuse.replace(typeSerializer.copy(from.getValue(), reuse.getValue()), 0);
-		return reuse;
-	}
-
-	@Override
-	public void serialize(StreamRecord<T> value, DataOutputView target) throws IOException {
-		typeSerializer.serialize(value.getValue(), target);
-	}
-	
-	@Override
-	public StreamRecord<T> deserialize(DataInputView source) throws IOException {
-		T element = typeSerializer.deserialize(source);
-		return new StreamRecord<T>(element, 0);
-	}
-
-	@Override
-	public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source) throws IOException {
-		T element = typeSerializer.deserialize(reuse.getValue(), source);
-		reuse.replace(element, 0);
-		return reuse;
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		typeSerializer.copy(source, target);
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof StreamRecordSerializer) {
-			StreamRecordSerializer<?> other = (StreamRecordSerializer<?>) obj;
-
-			return other.canEqual(this) && typeSerializer.equals(other.typeSerializer);
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public boolean canEqual(Object obj) {
-		return obj instanceof StreamRecordSerializer;
-	}
-
-	@Override
-	public int hashCode() {
-		return typeSerializer.hashCode();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
deleted file mode 100644
index ec90bff..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/ExceptionInChainedOperatorException.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * A special exception that signifies that the cause exception came from a chained operator.
- */
-public class ExceptionInChainedOperatorException extends RuntimeException {
-
-	private static final long serialVersionUID = 1L;
-
-	public ExceptionInChainedOperatorException(Throwable cause) {
-		this("Could not forward element to next operator", cause);
-	}
-
-	public ExceptionInChainedOperatorException(String message, Throwable cause) {
-		super(message, requireNonNull(cause));
-	}
-	
-	public Throwable getOriginalCause() {
-		Throwable ex = this;
-		do {
-			ex = ex.getCause();
-		} while (ex instanceof ExceptionInChainedOperatorException);
-		return ex; 
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
deleted file mode 100644
index 5316ae4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
-
-public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamOperator<IN, OUT>> {
-
-	private StreamInputProcessor<IN> inputProcessor;
-	
-	private volatile boolean running = true;
-
-	@Override
-	public void init() throws Exception {
-		StreamConfig configuration = getConfiguration();
-		
-		TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
-		int numberOfInputs = configuration.getNumberOfInputs();
-
-		if (numberOfInputs > 0) {
-			InputGate[] inputGates = getEnvironment().getAllInputGates();
-			inputProcessor = new StreamInputProcessor<IN>(inputGates, inSerializer,
-					getCheckpointBarrierListener(), 
-					configuration.getCheckpointMode(),
-					getEnvironment().getIOManager(),
-					getExecutionConfig().areTimestampsEnabled());
-
-			// make sure that stream tasks report their I/O statistics
-			AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
-			AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
-			inputProcessor.setReporter(reporter);
-		}
-	}
-
-	@Override
-	protected void run() throws Exception {
-		// cache some references on the stack, to make the code more JIT friendly
-		final OneInputStreamOperator<IN, OUT> operator = this.headOperator;
-		final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
-		final Object lock = getCheckpointLock();
-		
-		while (running && inputProcessor.processInput(operator, lock)) {
-			checkTimerException();
-		}
-	}
-
-	@Override
-	protected void cleanup() throws Exception {
-		inputProcessor.cleanup();
-	}
-
-	@Override
-	protected void cancelTask() {
-		running = false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
deleted file mode 100644
index b42b888..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.io.CollectorWrapper;
-import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class OperatorChain<OUT> {
-	
-	private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class);
-	
-	private final StreamOperator<?>[] allOperators;
-	
-	private final RecordWriterOutput<?>[] streamOutputs;
-	
-	private final Output<StreamRecord<OUT>> chainEntryPoint;
-	
-
-	public OperatorChain(StreamTask<OUT, ?> containingTask,
-							StreamOperator<OUT> headOperator,
-							AccumulatorRegistry.Reporter reporter) {
-		
-		final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
-		final StreamConfig configuration = containingTask.getConfiguration();
-		final boolean enableTimestamps = containingTask.getExecutionConfig().areTimestampsEnabled();
-
-		// we read the chained configs, and the order of record writer registrations by output name
-		Map<Integer, StreamConfig> chainedConfigs = configuration.getTransitiveChainedTaskConfigs(userCodeClassloader);
-		chainedConfigs.put(configuration.getVertexID(), configuration);
-
-		// create the final output stream writers
-		// we iterate through all the out edges from this job vertex and create a stream output
-		List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader);
-		Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap = new HashMap<>(outEdgesInOrder.size());
-		this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];
-		
-		// from here on, we need to make sure that the output writers are shut down again on failure
-		boolean success = false;
-		try {
-			for (int i = 0; i < outEdgesInOrder.size(); i++) {
-				StreamEdge outEdge = outEdgesInOrder.get(i);
-				
-				RecordWriterOutput<?> streamOutput = createStreamOutput(
-						outEdge, chainedConfigs.get(outEdge.getSourceId()), i,
-						containingTask.getEnvironment(), enableTimestamps, reporter, containingTask.getName());
-	
-				this.streamOutputs[i] = streamOutput;
-				streamOutputMap.put(outEdge, streamOutput);
-			}
-	
-			// we create the chain of operators and grab the collector that leads into the chain
-			List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
-			this.chainEntryPoint = createOutputCollector(containingTask, configuration,
-					chainedConfigs, userCodeClassloader, streamOutputMap, allOps);
-			
-			this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size() + 1]);
-			
-			// add the head operator to the end of the list
-			this.allOperators[this.allOperators.length - 1] = headOperator;
-			
-			success = true;
-		}
-		finally {
-			// make sure we clean up after ourselves in case of a failure after acquiring
-			// the first resources
-			if (!success) {
-				for (RecordWriterOutput<?> output : this.streamOutputs) {
-					if (output != null) {
-						output.close();
-						output.clearBuffers();
-					}
-				}
-			}
-		}
-		
-	}
-	
-	
-	public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException, InterruptedException {
-		CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp);
-		for (RecordWriterOutput<?> streamOutput : streamOutputs) {
-			streamOutput.broadcastEvent(barrier);
-		}
-	}
-	
-	public RecordWriterOutput<?>[] getStreamOutputs() {
-		return streamOutputs;
-	}
-	
-	public StreamOperator<?>[] getAllOperators() {
-		return allOperators;
-	}
-
-	public Output<StreamRecord<OUT>> getChainEntryPoint() {
-		return chainEntryPoint;
-	}
-
-	/**
-	 *
-	 * This method should be called before finishing the record emission, to make sure any data
-	 * that is still buffered will be sent. It also ensures that all data sending related
-	 * exceptions are recognized.
-	 *
-	 * @throws IOException Thrown, if the buffered data cannot be pushed into the output streams.
-	 */
-	public void flushOutputs() throws IOException {
-		for (RecordWriterOutput<?> streamOutput : getStreamOutputs()) {
-			streamOutput.flush();
-		}
-	}
-
-	/**
-	 * This method releases all resources of the record writer output. It stops the output
-	 * flushing thread (if there is one) and releases all buffers currently held by the output
-	 * serializers.
-	 *
-	 * <p>This method should never fail.
-	 */
-	public void releaseOutputs() {
-		try {
-			for (RecordWriterOutput<?> streamOutput : streamOutputs) {
-				streamOutput.close();
-			}
-		}
-		finally {
-			// make sure that we release the buffers in any case
-			for (RecordWriterOutput<?> output : streamOutputs) {
-				output.clearBuffers();
-			}
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	//  initialization utilities
-	// ------------------------------------------------------------------------
-	
-	private static <T> Output<StreamRecord<T>> createOutputCollector(
-			StreamTask<?, ?> containingTask,
-			StreamConfig operatorConfig,
-			Map<Integer, StreamConfig> chainedConfigs,
-			ClassLoader userCodeClassloader,
-			Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
-			List<StreamOperator<?>> allOperators)
-	{
-		// We create a wrapper that will encapsulate the chained operators and network outputs
-		OutputSelectorWrapper<T> outputSelectorWrapper = operatorConfig.getOutputSelectorWrapper(userCodeClassloader);
-		CollectorWrapper<T> wrapper = new CollectorWrapper<T>(outputSelectorWrapper);
-
-		// create collectors for the network outputs
-		for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
-			@SuppressWarnings("unchecked")
-			RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);
-			wrapper.addCollector(output, outputEdge);
-		}
-
-		// Create collectors for the chained outputs
-		for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
-			int outputId = outputEdge.getTargetId();
-			StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
-
-			Output<StreamRecord<T>> output = createChainedOperator(
-					containingTask, chainedOpConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators);
-			wrapper.addCollector(output, outputEdge);
-		}
-		return wrapper;
-	}
-	
-	private static <IN, OUT> Output<StreamRecord<IN>> createChainedOperator(
-			StreamTask<?, ?> containingTask,
-			StreamConfig operatorConfig,
-			Map<Integer, StreamConfig> chainedConfigs,
-			ClassLoader userCodeClassloader,
-			Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
-			List<StreamOperator<?>> allOperators)
-	{
-		// create the output that the operator writes to first. this may recursively create more operators
-		Output<StreamRecord<OUT>> output = createOutputCollector(
-				containingTask, operatorConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators);
-
-		// now create the operator and give it the output collector to write its output to
-		OneInputStreamOperator<IN, OUT> chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader);
-		chainedOperator.setup(containingTask, operatorConfig, output);
-
-		allOperators.add(chainedOperator);
-
-		if (containingTask.getExecutionConfig().isObjectReuseEnabled() || chainedOperator.isInputCopyingDisabled()) {
-			return new ChainingOutput<IN>(chainedOperator);
-		}
-		else {
-			TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
-			return new CopyingChainingOutput<IN>(chainedOperator, inSerializer);
-		}
-	}
-	
-	private static <T> RecordWriterOutput<T> createStreamOutput(
-			StreamEdge edge, StreamConfig upStreamConfig, int outputIndex,
-			Environment taskEnvironment, boolean withTimestamps,
-			AccumulatorRegistry.Reporter reporter, String taskName)
-	{
-		TypeSerializer<T> outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
-
-		@SuppressWarnings("unchecked")
-		StreamPartitioner<T> outputPartitioner = (StreamPartitioner<T>) edge.getPartitioner();
-
-		LOG.debug("Using partitioner {} for output {} of task ", outputPartitioner, outputIndex, taskName);
-		
-		ResultPartitionWriter bufferWriter = taskEnvironment.getWriter(outputIndex);
-
-		StreamRecordWriter<SerializationDelegate<StreamRecord<T>>> output = 
-				new StreamRecordWriter<>(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout());
-		output.setReporter(reporter);
-		
-		return new RecordWriterOutput<T>(output, outSerializer, withTimestamps);
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Collectors for output chaining
-	// ------------------------------------------------------------------------ 
-
-	private static class ChainingOutput<T> implements Output<StreamRecord<T>> {
-		
-		protected final OneInputStreamOperator<T, ?> operator;
-
-		public ChainingOutput(OneInputStreamOperator<T, ?> operator) {
-			this.operator = operator;
-		}
-
-		@Override
-		public void collect(StreamRecord<T> record) {
-			try {
-				operator.setKeyContextElement(record);
-				operator.processElement(record);
-			}
-			catch (Exception e) {
-				throw new ExceptionInChainedOperatorException(e);
-			}
-		}
-
-		@Override
-		public void emitWatermark(Watermark mark) {
-			try {
-				operator.processWatermark(mark);
-			}
-			catch (Exception e) {
-				throw new ExceptionInChainedOperatorException(e);
-			}
-		}
-
-		@Override
-		public void close() {
-			try {
-				operator.close();
-			}
-			catch (Exception e) {
-				throw new ExceptionInChainedOperatorException(e);
-			}
-		}
-	}
-
-	private static class CopyingChainingOutput<T> extends ChainingOutput<T> {
-		
-		private final TypeSerializer<T> serializer;
-		
-		private final StreamRecord<T> copyRecord;
-
-		public CopyingChainingOutput(OneInputStreamOperator<T, ?> operator, TypeSerializer<T> serializer) {
-			super(operator);
-			this.serializer = serializer;
-			this.copyRecord = new StreamRecord<T>(null, 0L);
-		}
-
-		@Override
-		public void collect(StreamRecord<T> record) {
-			try {
-				T copy = serializer.copy(record.getValue());
-				copyRecord.replace(copy, record.getTimestamp());
-				
-				operator.setKeyContextElement(copyRecord);
-				operator.processElement(copyRecord);
-			}
-			catch (Exception e) {
-				throw new RuntimeException("Could not forward element to next operator", e);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
deleted file mode 100644
index 3d82275..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-/**
- * Task for executing streaming sources.
- *
- * One important aspect of this is that the checkpointing and the emission of elements must never
- * occur at the same time. The execution must be serial. This is achieved by having the contract
- * with the StreamFunction that it must only modify its state or emit elements in
- * a synchronized block that locks on the lock Object. Also, the modification of the state
- * and the emission of elements must happen in the same block of code that is protected by the
- * synchronized block.
- *
- * @param <OUT> Type of the output elements of this source.
- */
-public class SourceStreamTask<OUT> extends StreamTask<OUT, StreamSource<OUT>> {
-
-	@Override
-	protected void init() {
-		// does not hold any resources, so no initialization needed
-	}
-
-	@Override
-	protected void cleanup() {
-		// does not hold any resources, so no cleanup needed
-	}
-	
-
-	@Override
-	protected void run() throws Exception {
-		final Object checkpointLock = getCheckpointLock();
-		final SourceOutput<StreamRecord<OUT>> output = new SourceOutput<>(getHeadOutput(), checkpointLock);
-		headOperator.run(checkpointLock, output);
-	}
-	
-	@Override
-	protected void cancelTask() throws Exception {
-		headOperator.cancel();
-	}
-
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * Special output for sources that ensures that sources synchronize on  the lock object before
-	 * emitting elements.
-	 *
-	 * <p>
-	 * This is required to ensure that no concurrent method calls on operators later in the chain
-	 * can occur. When operators register a timer the timer callback is synchronized
-	 * on the same lock object.
-	 *
-	 * @param <T> The type of elements emitted by the source.
-	 */
-	private class SourceOutput<T> implements Output<T> {
-		
-		private final Output<T> output;
-		private final Object lockObject;
-
-		public SourceOutput(Output<T> output, Object lockObject) {
-			this.output = output;
-			this.lockObject = lockObject;
-		}
-
-		@Override
-		public void emitWatermark(Watermark mark) {
-			synchronized (lockObject) {
-				output.emitWatermark(mark);
-			}
-		}
-
-		@Override
-		public void collect(T record) {
-			synchronized (lockObject) {
-				checkTimerException();
-				output.collect(record);
-			}
-		}
-
-		@Override
-		public void close() {
-			output.close();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
deleted file mode 100644
index 2125df1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
-import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
-
-	private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class);
-
-	private volatile boolean running = true;
-
-	// ------------------------------------------------------------------------
-	
-	@Override
-	protected void run() throws Exception {
-		
-		final String iterationId = getConfiguration().getIterationId();
-		if (iterationId == null || iterationId.length() == 0) {
-			throw new Exception("Missing iteration ID in the task configuration");
-		}
-		
-		final String brokerID = createBrokerIdString(getEnvironment().getJobID(), iterationId ,
-				getEnvironment().getIndexInSubtaskGroup());
-		
-		final long iterationWaitTime = getConfiguration().getIterationWaitTime();
-		final boolean shouldWait = iterationWaitTime > 0;
-
-		final BlockingQueue<StreamRecord<OUT>> dataChannel = new ArrayBlockingQueue<StreamRecord<OUT>>(1);
-
-		// offer the queue for the tail
-		BlockingQueueBroker.INSTANCE.handIn(brokerID, dataChannel);
-		LOG.info("Iteration head {} added feedback queue under {}", getName(), brokerID);
-
-		// do the work 
-		try {
-			@SuppressWarnings("unchecked")
-			RecordWriterOutput<OUT>[] outputs = (RecordWriterOutput<OUT>[]) getStreamOutputs();
-
-			// If timestamps are enabled we make sure to remove cyclic watermark dependencies
-			if (getExecutionConfig().areTimestampsEnabled()) {
-				for (RecordWriterOutput<OUT> output : outputs) {
-					output.emitWatermark(new Watermark(Long.MAX_VALUE));
-				}
-			}
-
-			while (running) {
-				StreamRecord<OUT> nextRecord = shouldWait ?
-					dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS) :
-					dataChannel.take();
-
-				if (nextRecord != null) {
-					for (RecordWriterOutput<OUT> output : outputs) {
-						output.collect(nextRecord);
-					}
-				}
-				else {
-					// done
-					break;
-				}
-			}
-		}
-		finally {
-			// make sure that we remove the queue from the broker, to prevent a resource leak
-			BlockingQueueBroker.INSTANCE.remove(brokerID);
-			LOG.info("Iteration head {} removed feedback queue under {}", getName(), brokerID);
-		}
-	}
-
-	@Override
-	protected void cancelTask() {
-		running = false;
-	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void init() {
-		// does not hold any resources, no initialization necessary
-	}
-
-	@Override
-	protected void cleanup() throws Exception {
-		// does not hold any resources, no cleanup necessary
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates the identification string with which head and tail task find the shared blocking
-	 * queue for the back channel. The identification string is unique per parallel head/tail pair
-	 * per iteration per job.
-	 * 
-	 * @param jid The job ID.
-	 * @param iterationID The id of the iteration in the job.
-	 * @param subtaskIndex The parallel subtask number
-	 * @return The identification string.
-	 */
-	public static String createBrokerIdString(JobID jid, String iterationID, int subtaskIndex) {
-		return jid + "-" + iterationID + "-" + subtaskIndex;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
deleted file mode 100644
index 9bb5311..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
-
-	private static final Logger LOG = LoggerFactory.getLogger(StreamIterationTail.class);
-
-	@Override
-	public void init() throws Exception {
-		super.init();
-		
-		final String iterationId = getConfiguration().getIterationId();
-		if (iterationId == null || iterationId.length() == 0) {
-			throw new Exception("Missing iteration ID in the task configuration");
-		}
-
-		final String brokerID = StreamIterationHead.createBrokerIdString(getEnvironment().getJobID(), iterationId,
-				getEnvironment().getIndexInSubtaskGroup());
-
-		final long iterationWaitTime = getConfiguration().getIterationWaitTime();
-
-		LOG.info("Iteration tail {} trying to acquire feedback queue under {}", getName(), brokerID);
-		
-		@SuppressWarnings("unchecked")
-		BlockingQueue<StreamRecord<IN>> dataChannel =
-				(BlockingQueue<StreamRecord<IN>>) BlockingQueueBroker.INSTANCE.get(brokerID);
-		
-		LOG.info("Iteration tail {} acquired feedback queue {}", getName(), brokerID);
-		
-		this.headOperator = new RecordPusher<>(dataChannel, iterationWaitTime);
-	}
-
-	private static class RecordPusher<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
-		
-		private static final long serialVersionUID = 1L;
-
-		@SuppressWarnings("NonSerializableFieldInSerializableClass")
-		private final BlockingQueue<StreamRecord<IN>> dataChannel;
-		
-		private final long iterationWaitTime;
-		
-		private final boolean shouldWait;
-
-		RecordPusher(BlockingQueue<StreamRecord<IN>> dataChannel, long iterationWaitTime) {
-			this.dataChannel = dataChannel;
-			this.iterationWaitTime = iterationWaitTime;
-			this.shouldWait =  iterationWaitTime > 0;
-		}
-
-		@Override
-		public void processElement(StreamRecord<IN> record) throws Exception {
-			if (shouldWait) {
-				dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS);
-			}
-			else {
-				dataChannel.put(record);
-			}
-		}
-
-		@Override
-		public void processWatermark(Watermark mark) {
-			// ignore
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
deleted file mode 100644
index 8c58e29..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ /dev/null
@@ -1,616 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
-import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.StateBackendFactory;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Base class for all streaming tasks. A task is the unit of local processing that is deployed
- * and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form
- * the Task's operator chain. Operators that are chained together execute synchronously in the
- * same thread and hence on the same stream partition. A common case for these chaines
- * are successive map/flatmap/filter tasks.
- * 
- * <p>The task chain contains one "head" operator and multiple chained operators. 
- * The StreamTask is specialized for the type of the head operator: one-input and two-input tasks,
- * as well as for sources, iteration heads and iteration tails.
- * 
- * <p>The Task class deals with the setup of the streams read by the head operator, and the streams 
- * produced by the operators at the ends of the operator chain. Note that the chain may fork and
- * thus have multiple ends.
- *
- * The life cycle of the task is set up as follows: 
- * <pre>
- *  -- registerInputOutput()
- *         |
- *         +----> Create basic utils (config, etc) and load the chain of operators
- *         +----> operators.setup()
- *         +----> task specific init()
- *  
- *  -- restoreState() -> restores state of all operators in the chain
- *  
- *  -- invoke()
- *        |
- *        +----> open-operators()
- *        +----> run()
- *        +----> close-operators()
- *        +----> dispose-operators()
- *        +----> common cleanup
- *        +----> task specific cleanup()
- * </pre>
- *
- * <p> The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a
- * {@code StreamOperator} must be synchronized on this lock object to ensure that no methods
- * are called concurrently.
- * 
- * @param <OUT>
- * @param <Operator>
- */
-public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
-		extends AbstractInvokable
-		implements StatefulTask<StreamTaskStateList> {
-
-	/** The thread group that holds all trigger timer threads */
-	public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
-	
-	/** The logger used by the StreamTask and its subclasses */
-	protected static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
-	
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * All interaction with the {@code StreamOperator} must be synchronized on this lock object to ensure that
-	 * we don't have concurrent method calls that void consistent checkpoints.
-	 */
-	private final Object lock = new Object();
-	
-	/** the head operator that consumes the input streams of this task */
-	protected Operator headOperator;
-
-	/** The chain of operators executed by this task */
-	private OperatorChain<OUT> operatorChain;
-	
-	/** The configuration of this streaming task */
-	private StreamConfig configuration;
-
-	/** The class loader used to load dynamic classes of a job */
-	private ClassLoader userClassLoader;
-	
-	/** The state backend that stores the state and checkpoints for this task */
-	private StateBackend<?> stateBackend;
-	
-	/** The executor service that schedules and calls the triggers of this task*/
-	private ScheduledExecutorService timerService;
-	
-	/** The map of user-defined accumulators of this task */
-	private Map<String, Accumulator<?, ?>> accumulatorMap;
-	
-	/** The state to be restored once the initialization is done */
-	private StreamTaskStateList lazyRestoreState;
-
-	/** This field is used to forward an exception that is caught in the timer thread. Subclasses
-	 * must ensure that exceptions stored here get thrown on the actual execution Thread. */
-	private volatile TimerException timerException;
-	
-	/** Flag to mark the task "in operation", in which case check
-	 * needs to be initialized to true, so that early cancel() before invoke() behaves correctly */
-	private volatile boolean isRunning;
-	
-
-	// ------------------------------------------------------------------------
-	//  Life cycle methods for specific implementations
-	// ------------------------------------------------------------------------
-
-	protected abstract void init() throws Exception;
-	
-	protected abstract void run() throws Exception;
-	
-	protected abstract void cleanup() throws Exception;
-	
-	protected abstract void cancelTask() throws Exception;
-
-	// ------------------------------------------------------------------------
-	//  Core work methods of the Stream Task
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public final void registerInputOutput() throws Exception {
-		LOG.debug("registerInputOutput for {}", getName());
-
-		boolean initializationCompleted = false;
-		try {
-			AccumulatorRegistry accumulatorRegistry = getEnvironment().getAccumulatorRegistry();
-
-			userClassLoader = getUserCodeClassLoader();
-			configuration = new StreamConfig(getTaskConfiguration());
-			accumulatorMap = accumulatorRegistry.getUserMap();
-
-			stateBackend = createStateBackend();
-			stateBackend.initializeForJob(getEnvironment().getJobID());
-
-			headOperator = configuration.getStreamOperator(userClassLoader);
-			operatorChain = new OperatorChain<>(this, headOperator, accumulatorRegistry.getReadWriteReporter());
-
-			if (headOperator != null) {
-				headOperator.setup(this, configuration, operatorChain.getChainEntryPoint());
-			}
-
-			timerService = Executors.newSingleThreadScheduledExecutor(
-					new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));
-
-			// task specific initialization
-			init();
-			
-			initializationCompleted = true;
-		}
-		finally {
-			if (!initializationCompleted) {
-				if (timerService != null) {
-					timerService.shutdownNow();
-				}
-				if (operatorChain != null) {
-					operatorChain.releaseOutputs();
-				}
-			}
-		}
-	}
-	
-	@Override
-	public final void invoke() throws Exception {
-		LOG.debug("Invoking {}", getName());
-		
-		boolean disposed = false;
-		try {
-			// first order of business is to ive operators back their state
-			restoreStateLazy();
-			
-			// we need to make sure that any triggers scheduled in open() cannot be
-			// executed before all operators are opened
-			synchronized (lock) {
-				openAllOperators();
-			}
-
-			// let the task do its work
-			isRunning = true;
-			run();
-			isRunning = false;
-			
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Finished task {}", getName());
-			}
-			
-			// make sure no further checkpoint and notification actions happen.
-			// we make sure that no other thread is currently in the locked scope before
-			// we close the operators by trying to acquire the checkpoint scope lock
-			// we also need to make sure that no triggers fire concurrently with the close logic
-			synchronized (lock) {
-				// this is part of the main logic, so if this fails, the task is considered failed
-				closeAllOperators();
-			}
-			
-			// make sure all buffered data is flushed
-			operatorChain.flushOutputs();
-
-			// make an attempt to dispose the operators such that failures in the dispose call
-			// still let the computation fail
-			tryDisposeAllOperators();
-			disposed = true;
-		}
-		finally {
-			isRunning = false;
-
-			timerService.shutdownNow();
-			
-			// release the output resources. this method should never fail.
-			if (operatorChain != null) {
-				operatorChain.releaseOutputs();
-			}
-
-			// we must! perform this cleanup
-
-			try {
-				cleanup();
-			}
-			catch (Throwable t) {
-				// catch and log the exception to not replace the original exception
-				LOG.error("Error during cleanup of stream task.");
-			}
-			
-			// if the operators were not disposed before, do a hard dispose
-			if (!disposed) {
-				disposeAllOperators();
-			}
-
-			try {
-				if (stateBackend != null) {
-					stateBackend.close();
-				}
-			} catch (Throwable t) {
-				LOG.error("Error while closing the state backend", t);
-			}
-		}
-	}
-	
-	@Override
-	public final void cancel() throws Exception {
-		isRunning = false;
-		cancelTask();
-	}
-	
-	private void openAllOperators() throws Exception {
-		for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
-			if (operator != null) {
-				operator.open();
-			}
-		}
-	}
-
-	private void closeAllOperators() throws Exception {
-		// We need to close them first to last, since upstream operators in the chain might emit
-		// elements in their close methods.
-		StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
-		for (int i = allOperators.length - 1; i >= 0; i--) {
-			StreamOperator<?> operator = allOperators[i];
-			if (operator != null) {
-				operator.close();
-			}
-		}
-	}
-
-	private void tryDisposeAllOperators() throws Exception {
-		for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
-			if (operator != null) {
-				operator.dispose();
-			}
-		}
-	}
-	
-	private void disposeAllOperators() {
-		for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
-			try {
-				if (operator != null) {
-					operator.dispose();
-				}
-			}
-			catch (Throwable t) {
-				LOG.error("Error during disposal of stream operator.", t);
-			}
-		}
-	}
-
-	/**
-	 * The finalize method shuts down the timer. This is a fail-safe shutdown, in case the original
-	 * shutdown method was never called.
-	 *
-	 * <p>
-	 * This should not be relied upon! It will cause shutdown to happen much later than if manual
-	 * shutdown is attempted, and cause threads to linger for longer than needed.
-	 */
-	@Override
-	protected void finalize() throws Throwable {
-		super.finalize();
-		if (timerService != null) {
-			if (!timerService.isTerminated()) {
-				LOG.warn("Timer service was not shut down. Shutting down in finalize().");
-			}
-			timerService.shutdown();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Access to properties and utilities
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Gets the name of the task, in the form "taskname (2/5)".
-	 * @return The name of the task.
-	 */
-	public String getName() {
-		return getEnvironment().getTaskNameWithSubtasks();
-	}
-
-	/**
-	 * Gets the lock object on which all operations that involve data and state mutation have to lock. 
-	 
-	 * @return The checkpoint lock object.
-	 */
-	public Object getCheckpointLock() {
-		return lock;
-	}
-	
-	public StreamConfig getConfiguration() {
-		return configuration;
-	}
-
-	public Map<String, Accumulator<?, ?>> getAccumulatorMap() {
-		return accumulatorMap;
-	}
-	
-	public Output<StreamRecord<OUT>> getHeadOutput() {
-		return operatorChain.getChainEntryPoint();
-	}
-	
-	public RecordWriterOutput<?>[] getStreamOutputs() {
-		return operatorChain.getStreamOutputs();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Checkpoint and Restore
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public void setInitialState(StreamTaskStateList initialState) {
-		lazyRestoreState = initialState;
-	}
-	
-	public void restoreStateLazy() throws Exception {
-		if (lazyRestoreState != null) {
-			LOG.info("Restoring checkpointed state to task {}", getName());
-			
-			try {
-				final StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
-				final StreamTaskState[] states = lazyRestoreState.getState(userClassLoader);
-				
-				// be GC friendly
-				lazyRestoreState = null;
-				
-				for (int i = 0; i < states.length; i++) {
-					StreamTaskState state = states[i];
-					StreamOperator<?> operator = allOperators[i];
-					
-					if (state != null && operator != null) {
-						LOG.debug("Task {} in chain ({}) has checkpointed state", i, getName());
-						operator.restoreState(state);
-					}
-					else if (operator != null) {
-						LOG.debug("Task {} in chain ({}) does not have checkpointed state", i, getName());
-					}
-				}
-			}
-			catch (Exception e) {
-				throw new Exception("Could not restore checkpointed state to operators and functions", e);
-			}
-		}
-	}
-
-	@Override
-	public void triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
-		LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
-		
-		synchronized (lock) {
-			if (isRunning) {
-
-				// since both state checkpointing and downstream barrier emission occurs in this
-				// lock scope, they are an atomic operation regardless of the order in which they occur
-				// we immediately emit the checkpoint barriers, so the downstream operators can start
-				// their checkpoint work as soon as possible
-				operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);
-				
-				// now draw the state snapshot
-				try {
-					final StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
-					final StreamTaskState[] states = new StreamTaskState[allOperators.length];
-					
-					for (int i = 0; i < states.length; i++) {
-						StreamOperator<?> operator = allOperators[i];
-						if (operator != null) {
-							StreamTaskState state = operator.snapshotOperatorState(checkpointId, timestamp);
-							states[i] = state.isEmpty() ? null : state;
-						}
-					}
-
-					StreamTaskStateList allStates = new StreamTaskStateList(states);
-					if (allStates.isEmpty()) {
-						getEnvironment().acknowledgeCheckpoint(checkpointId);
-					} else {
-						getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);
-					}
-				}
-				catch (Exception e) {
-					if (isRunning) {
-						throw e;
-					}
-				}
-			}
-		}
-	}
-	
-	@Override
-	public void notifyCheckpointComplete(long checkpointId) throws Exception {
-		synchronized (lock) {
-			if (isRunning) {
-				LOG.debug("Notification of complete checkpoint for task {}", getName());
-				
-				for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
-					if (operator != null) {
-						operator.notifyOfCompletedCheckpoint(checkpointId);
-					}
-				}
-			}
-			else {
-				LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());
-			}
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	//  State backend
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Gets the state backend used by this task. The state backend defines how to maintain the
-	 * key/value state and how and where to store state snapshots.
-	 * 
-	 * @return The state backend used by this task.
-	 */
-	public StateBackend<?> getStateBackend() {
-		return stateBackend;
-	}
-	
-	private StateBackend<?> createStateBackend() throws Exception {
-		StateBackend<?> configuredBackend = configuration.getStateBackend(userClassLoader);
-
-		if (configuredBackend != null) {
-			// backend has been configured on the environment
-			LOG.info("Using user-defined state backend: " + configuredBackend);
-			return configuredBackend;
-		} else {
-			// see if we have a backend specified in the configuration
-			Configuration flinkConfig = getEnvironment().getTaskManagerInfo().getConfiguration();
-			String backendName = flinkConfig.getString(ConfigConstants.STATE_BACKEND, null);
-
-			if (backendName == null) {
-				LOG.warn("No state backend has been specified, using default state backend (Memory / JobManager)");
-				backendName = "jobmanager";
-			}
-
-			backendName = backendName.toLowerCase();
-			switch (backendName) {
-				case "jobmanager":
-					LOG.info("State backend is set to heap memory (checkpoint to jobmanager)");
-					return MemoryStateBackend.defaultInstance();
-
-				case "filesystem":
-					FsStateBackend backend = new FsStateBackendFactory().createFromConfig(flinkConfig);
-					LOG.info("State backend is set to heap memory (checkpoints to filesystem \""
-						+ backend.getBasePath() + "\")");
-					return backend;
-
-				default:
-					try {
-						@SuppressWarnings("rawtypes")
-						Class<? extends StateBackendFactory> clazz =
-							Class.forName(backendName, false, userClassLoader).asSubclass(StateBackendFactory.class);
-
-						return (StateBackend<?>) clazz.newInstance();
-					} catch (ClassNotFoundException e) {
-						throw new IllegalConfigurationException("Cannot find configured state backend: " + backendName);
-					} catch (ClassCastException e) {
-						throw new IllegalConfigurationException("The class configured under '" +
-							ConfigConstants.STATE_BACKEND + "' is not a valid state backend factory (" +
-							backendName + ')');
-					} catch (Throwable t) {
-						throw new IllegalConfigurationException("Cannot create configured state backend", t);
-					}
-			}
-		}
-	}
-
-	/**
-	 * Registers a timer.
-	 */
-	public void registerTimer(final long timestamp, final Triggerable target) {
-		long delay = Math.max(timestamp - System.currentTimeMillis(), 0);
-
-		timerService.schedule(
-				new TriggerTask(this, lock, target, timestamp),
-				delay,
-				TimeUnit.MILLISECONDS);
-	}
-	
-	public void checkTimerException() throws TimerException {
-		if (timerException != null) {
-			throw timerException;
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public String toString() {
-		return getName();
-	}
-
-	protected final EventListener<CheckpointBarrier> getCheckpointBarrierListener() {
-		return new EventListener<CheckpointBarrier>() {
-			@Override
-			public void onEvent(CheckpointBarrier barrier) {
-				try {
-					triggerCheckpoint(barrier.getId(), barrier.getTimestamp());
-				}
-				catch (Exception e) {
-					throw new RuntimeException("Error triggering a checkpoint as the result of receiving checkpoint barrier", e);
-				}
-			}
-		};
-	}
-	
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Internal task that is invoked by the timer service and triggers the target.
-	 */
-	private static final class TriggerTask implements Runnable {
-
-		private final Object lock;
-		private final Triggerable target;
-		private final long timestamp;
-		private final StreamTask<?, ?> task;
-
-		TriggerTask(StreamTask<?, ?> task, final Object lock, Triggerable target, long timestamp) {
-			this.task = task;
-			this.lock = lock;
-			this.target = target;
-			this.timestamp = timestamp;
-		}
-
-		@Override
-		public void run() {
-			synchronized (lock) {
-				try {
-					target.trigger(timestamp);
-				} catch (Throwable t) {
-					LOG.error("Caught exception while processing timer.", t);
-					if (task.timerException == null) {
-						task.timerException = new TimerException(t);
-					}
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskException.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskException.java
deleted file mode 100644
index 5680810..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskException.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-/**
- * An exception that is thrown by the stream vertices when encountering an
- * illegal condition.
- */
-public class StreamTaskException extends RuntimeException {
-
-	/**
-	 * Serial version UID for serialization interoperability.
-	 */
-	private static final long serialVersionUID = 8392043527067472439L;
-
-	/**
-	 * Creates a compiler exception with no message and no cause.
-	 */
-	public StreamTaskException() {
-	}
-
-	/**
-	 * Creates a compiler exception with the given message and no cause.
-	 * 
-	 * @param message
-	 *            The message for the exception.
-	 */
-	public StreamTaskException(String message) {
-		super(message);
-	}
-
-	/**
-	 * Creates a compiler exception with the given cause and no message.
-	 * 
-	 * @param cause
-	 *            The <tt>Throwable</tt> that caused this exception.
-	 */
-	public StreamTaskException(Throwable cause) {
-		super(cause);
-	}
-
-	/**
-	 * Creates a compiler exception with the given message and cause.
-	 * 
-	 * @param message
-	 *            The message for the exception.
-	 * @param cause
-	 *            The <tt>Throwable</tt> that caused this exception.
-	 */
-	public StreamTaskException(String message, Throwable cause) {
-		super(message, cause);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
deleted file mode 100644
index afeabd9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskState.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-
-import java.io.Serializable;
-import java.util.ConcurrentModificationException;
-import java.util.HashMap;
-import java.util.Iterator;
-
-/**
- * The state checkpointed by a {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator}.
- * This state consists of any combination of those three:
- * <ul>
- *     <li>The state of the stream operator, if it implements the Checkpointed interface.</li>
- *     <li>The state of the user function, if it implements the Checkpointed interface.</li>
- *     <li>The key/value state of the operator, if it executes on a KeyedDataStream.</li>
- * </ul>
- */
-public class StreamTaskState implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-	
-	private StateHandle<?> operatorState;
-
-	private StateHandle<Serializable> functionState;
-
-	private HashMap<String, KvStateSnapshot<?, ?, ?>> kvStates;
-
-	// ------------------------------------------------------------------------
-
-	public StateHandle<?> getOperatorState() {
-		return operatorState;
-	}
-
-	public void setOperatorState(StateHandle<?> operatorState) {
-		this.operatorState = operatorState;
-	}
-
-	public StateHandle<Serializable> getFunctionState() {
-		return functionState;
-	}
-
-	public void setFunctionState(StateHandle<Serializable> functionState) {
-		this.functionState = functionState;
-	}
-
-	public HashMap<String, KvStateSnapshot<?, ?, ?>> getKvStates() {
-		return kvStates;
-	}
-
-	public void setKvStates(HashMap<String, KvStateSnapshot<?, ?, ?>> kvStates) {
-		this.kvStates = kvStates;
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Checks if this state object actually contains any state, or if all of the state
-	 * fields are null.
-	 * 
-	 * @return True, if all state is null, false if at least one state is not null.
-	 */
-	public boolean isEmpty() {
-		return operatorState == null & functionState == null & kvStates == null;
-	}
-
-	/**
-	 * Discards all the contained states and sets them to null.
-	 * 
-	 * @throws Exception Forwards exceptions that occur when releasing the
-	 *                   state handles and snapshots.
-	 */
-	public void discardState() throws Exception {
-		StateHandle<?> operatorState = this.operatorState;
-		StateHandle<?> functionState = this.functionState;
-		HashMap<String, KvStateSnapshot<?, ?, ?>> kvStates = this.kvStates;
-		
-		if (operatorState != null) {
-			operatorState.discardState();
-		}
-		if (functionState != null) {
-			functionState.discardState();
-		}
-		if (kvStates != null) {
-			while (kvStates.size() > 0) {
-				try {
-					Iterator<KvStateSnapshot<?, ?, ?>> values = kvStates.values().iterator();
-					while (values.hasNext()) {
-						KvStateSnapshot<?, ?, ?> s = values.next();
-						s.discardState();
-						values.remove();
-					}
-				}
-				catch (ConcurrentModificationException e) {
-					// fall through the loop
-				}
-			}
-		}
-
-		this.operatorState = null;
-		this.functionState = null;
-		this.kvStates = null;
-	}
-}
- 
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java
deleted file mode 100644
index 7b8dbd5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskStateList.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.runtime.state.StateHandle;
-
-/**
- * List of task states for a chain of streaming tasks.
- */
-public class StreamTaskStateList implements StateHandle<StreamTaskState[]> {
-
-	private static final long serialVersionUID = 1L;
-
-	/** The states for all operator */
-	private final StreamTaskState[] states;
-
-	
-	public StreamTaskStateList(StreamTaskState[] states) {
-		this.states = states;
-	}
-
-	public boolean isEmpty() {
-		for (StreamTaskState state : states) {
-			if (state != null) {
-				return false;
-			}
-		}
-		return true;
-	}
-	
-	@Override
-	public StreamTaskState[] getState(ClassLoader userCodeClassLoader) {
-		return states;
-	}
-
-	@Override
-	public void discardState() throws Exception {
-		for (StreamTaskState state : states) {
-			if (state != null) {
-				state.discardState();
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerException.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerException.java
deleted file mode 100644
index 3e1c1e5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TimerException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.tasks;
-
-/**
- * {@code RuntimeException} for wrapping exceptions that are thrown in the timer callback of
- * the timer service in {@link StreamTask}.
- */
-public class TimerException extends RuntimeException {
-	private static final long serialVersionUID = 1L;
-
-	public TimerException(Throwable cause) {
-		super(cause);
-	}
-
-	@Override
-	public String toString() {
-		return "TimerException{" + getCause() + "}";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
deleted file mode 100644
index d2d8a2e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
-
-public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputStreamOperator<IN1, IN2, OUT>> {
-
-	private StreamTwoInputProcessor<IN1, IN2> inputProcessor;
-	
-	private volatile boolean running = true;
-
-	@Override
-	public void init() throws Exception {
-		StreamConfig configuration = getConfiguration();
-		ClassLoader userClassLoader = getUserCodeClassLoader();
-		
-		TypeSerializer<IN1> inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
-		TypeSerializer<IN2> inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
-	
-		int numberOfInputs = configuration.getNumberOfInputs();
-	
-		ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
-		ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
-	
-		List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);
-	
-		for (int i = 0; i < numberOfInputs; i++) {
-			int inputType = inEdges.get(i).getTypeNumber();
-			InputGate reader = getEnvironment().getInputGate(i);
-			switch (inputType) {
-				case 1:
-					inputList1.add(reader);
-					break;
-				case 2:
-					inputList2.add(reader);
-					break;
-				default:
-					throw new RuntimeException("Invalid input type number: " + inputType);
-			}
-		}
-	
-		this.inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2,
-				inputDeserializer1, inputDeserializer2,
-				getCheckpointBarrierListener(),
-				configuration.getCheckpointMode(),
-				getEnvironment().getIOManager(),
-				getExecutionConfig().areTimestampsEnabled());
-
-		// make sure that stream tasks report their I/O statistics
-		AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
-		AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
-		this.inputProcessor.setReporter(reporter);
-	}
-
-	@Override
-	protected void run() throws Exception {
-		// cache some references on the stack, to make the code more JIT friendly
-		final TwoInputStreamOperator<IN1, IN2, OUT> operator = this.headOperator;
-		final StreamTwoInputProcessor<IN1, IN2> inputProcessor = this.inputProcessor;
-		final Object lock = getCheckpointLock();
-		
-		while (running && inputProcessor.processInput(operator, lock)) {
-			checkTimerException();
-		}
-	}
-
-	@Override
-	protected void cleanup() throws Exception {
-		inputProcessor.cleanup();
-	}
-
-	@Override
-	protected void cancelTask() {
-		running = false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/package-info.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/package-info.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/package-info.java
deleted file mode 100644
index a40ae3a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/package-info.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package contains classes that realize streaming tasks. These tasks are
- * executable stream consumers and producers that are scheduled by the distributed
- * dataflow runtime. Each task occupies one execution slot and is run with by an
- * executing thread.
- * <p>
- * The tasks merely set up the distributed stream coordination and the checkpointing.
- * Internally, the tasks create one or more operators, perform the stream transformations.
- */
-package org.apache.flink.streaming.runtime.tasks;
\ No newline at end of file