You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/07/21 12:45:16 UTC

[6/8] flink git commit: [FLINK-1967] Introduce (Event)time in Streaming

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
index 247fe25..9bf4eb4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
@@ -1,41 +1,41 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
  * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
 import java.util.concurrent.BlockingQueue;
 
 import org.apache.flink.runtime.iterative.concurrent.Broker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-@SuppressWarnings("rawtypes")
-public class BlockingQueueBroker extends Broker<BlockingQueue<StreamRecord>> {
-	/**
-	 * Singleton instance
-	 */
-	private static final BlockingQueueBroker INSTANCE = new BlockingQueueBroker();
-
-	private BlockingQueueBroker() {
-	}
-
-	/**
-	 * retrieve singleton instance
-	 */
-	public static Broker<BlockingQueue<StreamRecord>> instance() {
-		return INSTANCE;
-	}
-}
+
+@SuppressWarnings("rawtypes")
+public class BlockingQueueBroker extends Broker<BlockingQueue<StreamRecord>> {
+	/**
+	 * Singleton instance
+	 */
+	private static final BlockingQueueBroker INSTANCE = new BlockingQueueBroker();
+
+	private BlockingQueueBroker() {
+	}
+
+	/**
+	 * retrieve singleton instance
+	 */
+	public static Broker<BlockingQueue<StreamRecord>> instance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoReaderIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoReaderIterator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoReaderIterator.java
deleted file mode 100644
index 4358810..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoReaderIterator.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.runtime.plugable.ReusingDeserializationDelegate;
-
-/**
- * A CoReaderIterator wraps a {@link CoRecordReader} producing records of two
- * input types.
- */
-public class CoReaderIterator<T1, T2> {
-
-	private final CoRecordReader<DeserializationDelegate<T1>, DeserializationDelegate<T2>> reader; // the
-																									// source
-
-	protected final ReusingDeserializationDelegate<T1> delegate1;
-	protected final ReusingDeserializationDelegate<T2> delegate2;
-
-	public CoReaderIterator(
-			CoRecordReader<DeserializationDelegate<T1>, DeserializationDelegate<T2>> reader,
-			TypeSerializer<T1> serializer1, TypeSerializer<T2> serializer2) {
-		this.reader = reader;
-		this.delegate1 = new ReusingDeserializationDelegate<T1>(serializer1);
-		this.delegate2 = new ReusingDeserializationDelegate<T2>(serializer2);
-	}
-
-	public int next(T1 target1, T2 target2) throws IOException {
-		this.delegate1.setInstance(target1);
-		this.delegate2.setInstance(target2);
-
-		try {
-			return this.reader.getNextRecord(this.delegate1, this.delegate2);
-
-		} catch (InterruptedException e) {
-			throw new IOException("Reader interrupted.", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
deleted file mode 100644
index a7139b6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CoRecordReader.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
-import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
-import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.runtime.tasks.StreamingSuperstep;
-
-/**
- * A CoRecordReader wraps {@link MutableRecordReader}s of two different input
- * types to read records effectively.
- */
-@SuppressWarnings("rawtypes")
-public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadableWritable> extends
-		AbstractReader implements EventListener<InputGate>, StreamingReader {
-
-	private final InputGate bufferReader1;
-
-	private final InputGate bufferReader2;
-
-	private final LinkedBlockingDeque<Integer> availableRecordReaders = new LinkedBlockingDeque<Integer>();
-
-	private LinkedList<Integer> processed = new LinkedList<Integer>();
-
-	private AdaptiveSpanningRecordDeserializer[] reader1RecordDeserializers;
-
-	private RecordDeserializer<T1> reader1currentRecordDeserializer;
-
-	private AdaptiveSpanningRecordDeserializer[] reader2RecordDeserializers;
-
-	private RecordDeserializer<T2> reader2currentRecordDeserializer;
-
-	// 0 => none, 1 => reader (T1), 2 => reader (T2)
-	private int currentReaderIndex;
-
-	private boolean hasRequestedPartitions;
-
-	protected CoBarrierBuffer barrierBuffer1;
-	protected CoBarrierBuffer barrierBuffer2;
-
-	public CoRecordReader(InputGate inputgate1, InputGate inputgate2) {
-		super(new UnionInputGate(inputgate1, inputgate2));
-
-		this.bufferReader1 = inputgate1;
-		this.bufferReader2 = inputgate2;
-
-		this.reader1RecordDeserializers = new AdaptiveSpanningRecordDeserializer[inputgate1
-				.getNumberOfInputChannels()];
-		this.reader2RecordDeserializers = new AdaptiveSpanningRecordDeserializer[inputgate2
-				.getNumberOfInputChannels()];
-
-		for (int i = 0; i < reader1RecordDeserializers.length; i++) {
-			reader1RecordDeserializers[i] = new AdaptiveSpanningRecordDeserializer<T1>();
-		}
-
-		for (int i = 0; i < reader2RecordDeserializers.length; i++) {
-			reader2RecordDeserializers[i] = new AdaptiveSpanningRecordDeserializer<T2>();
-		}
-
-		inputgate1.registerListener(this);
-		inputgate2.registerListener(this);
-
-		barrierBuffer1 = new CoBarrierBuffer(inputgate1, this);
-		barrierBuffer2 = new CoBarrierBuffer(inputgate2, this);
-
-		barrierBuffer1.setOtherBarrierBuffer(barrierBuffer2);
-		barrierBuffer2.setOtherBarrierBuffer(barrierBuffer1);
-	}
-
-	public void requestPartitionsOnce() throws IOException, InterruptedException {
-		if (!hasRequestedPartitions) {
-			bufferReader1.requestPartitions();
-			bufferReader2.requestPartitions();
-
-			hasRequestedPartitions = true;
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	protected int getNextRecord(T1 target1, T2 target2) throws IOException, InterruptedException {
-
-		requestPartitionsOnce();
-
-		while (true) {
-			if (currentReaderIndex == 0) {
-				if ((bufferReader1.isFinished() && bufferReader2.isFinished())) {
-					return 0;
-				}
-
-				currentReaderIndex = getNextReaderIndexBlocking();
-
-			}
-
-			if (currentReaderIndex == 1) {
-				while (true) {
-					if (reader1currentRecordDeserializer != null) {
-						RecordDeserializer.DeserializationResult result = reader1currentRecordDeserializer
-								.getNextRecord(target1);
-
-						if (result.isBufferConsumed()) {
-							reader1currentRecordDeserializer.getCurrentBuffer().recycle();
-							reader1currentRecordDeserializer = null;
-
-							currentReaderIndex = 0;
-						}
-
-						if (result.isFullRecord()) {
-							return 1;
-						}
-					} else {
-
-						final BufferOrEvent boe = barrierBuffer1.getNextNonBlocked();
-
-						if (boe.isBuffer()) {
-							reader1currentRecordDeserializer = reader1RecordDeserializers[boe
-									.getChannelIndex()];
-							reader1currentRecordDeserializer.setNextBuffer(boe.getBuffer());
-						} else if (boe.getEvent() instanceof StreamingSuperstep) {
-							barrierBuffer1.processSuperstep(boe);
-							currentReaderIndex = 0;
-
-							break;
-						} else if (handleEvent(boe.getEvent())) {
-							currentReaderIndex = 0;
-
-							break;
-						}
-					}
-				}
-			} else if (currentReaderIndex == 2) {
-				while (true) {
-					if (reader2currentRecordDeserializer != null) {
-						RecordDeserializer.DeserializationResult result = reader2currentRecordDeserializer
-								.getNextRecord(target2);
-
-						if (result.isBufferConsumed()) {
-							reader2currentRecordDeserializer.getCurrentBuffer().recycle();
-							reader2currentRecordDeserializer = null;
-
-							currentReaderIndex = 0;
-						}
-
-						if (result.isFullRecord()) {
-							return 2;
-						}
-					} else {
-						final BufferOrEvent boe = barrierBuffer2.getNextNonBlocked();
-
-						if (boe.isBuffer()) {
-							reader2currentRecordDeserializer = reader2RecordDeserializers[boe
-									.getChannelIndex()];
-							reader2currentRecordDeserializer.setNextBuffer(boe.getBuffer());
-						} else if (boe.getEvent() instanceof StreamingSuperstep) {
-							barrierBuffer2.processSuperstep(boe);
-							currentReaderIndex = 0;
-
-							break;
-						} else if (handleEvent(boe.getEvent())) {
-							currentReaderIndex = 0;
-
-							break;
-						}
-					}
-				}
-			} else {
-				throw new IllegalStateException("Bug: unexpected current reader index.");
-			}
-		}
-	}
-
-	protected int getNextReaderIndexBlocking() throws InterruptedException {
-
-		Integer nextIndex = 0;
-
-		while (processed.contains(nextIndex = availableRecordReaders.take())) {
-			processed.remove(nextIndex);
-		}
-
-		if (nextIndex == 1) {
-			if (barrierBuffer1.isAllBlocked()) {
-				availableRecordReaders.addFirst(1);
-				processed.add(2);
-				return 2;
-			} else {
-				return 1;
-			}
-		} else {
-			if (barrierBuffer2.isAllBlocked()) {
-				availableRecordReaders.addFirst(2);
-				processed.add(1);
-				return 1;
-			} else {
-				return 2;
-			}
-
-		}
-
-	}
-
-	// ------------------------------------------------------------------------
-	// Data availability notifications
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void onEvent(InputGate bufferReader) {
-		addToAvailable(bufferReader);
-	}
-
-	protected void addToAvailable(InputGate bufferReader) {
-		if (bufferReader == bufferReader1) {
-			availableRecordReaders.add(1);
-		} else if (bufferReader == bufferReader2) {
-			availableRecordReaders.add(2);
-		}
-	}
-
-	public void clearBuffers() {
-		for (RecordDeserializer<?> deserializer : reader1RecordDeserializers) {
-			Buffer buffer = deserializer.getCurrentBuffer();
-			if (buffer != null && !buffer.isRecycled()) {
-				buffer.recycle();
-			}
-		}
-		for (RecordDeserializer<?> deserializer : reader2RecordDeserializers) {
-			Buffer buffer = deserializer.getCurrentBuffer();
-			if (buffer != null && !buffer.isRecycled()) {
-				buffer.recycle();
-			}
-		}
-	}
-
-	@Override
-	public void setReporter(AccumulatorRegistry.Reporter reporter) {
-		for (AdaptiveSpanningRecordDeserializer serializer : reader1RecordDeserializers) {
-			serializer.setReporter(reporter);
-		}
-		for (AdaptiveSpanningRecordDeserializer serializer : reader2RecordDeserializers) {
-			serializer.setReporter(reporter);
-		}
-	}
-
-	private class CoBarrierBuffer extends BarrierBuffer {
-
-		private CoBarrierBuffer otherBuffer;
-
-		public CoBarrierBuffer(InputGate inputGate, AbstractReader reader) {
-			super(inputGate, reader);
-		}
-
-		public void setOtherBarrierBuffer(CoBarrierBuffer other) {
-			this.otherBuffer = other;
-		}
-
-		@Override
-		protected void actOnAllBlocked() {
-			if (otherBuffer.isAllBlocked()) {
-				super.actOnAllBlocked();
-				otherBuffer.releaseBlocks();
-			}
-		}
-
-	}
-
-	public void cleanup() throws IOException {
-		try {
-			barrierBuffer1.cleanup();
-		} finally {
-			barrierBuffer2.cleanup();
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
new file mode 100644
index 0000000..2f9d1d6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class CollectorWrapper<OUT> implements Output<StreamRecord<OUT>> {
+
+	private OutputSelectorWrapper<OUT> outputSelectorWrapper;
+
+	private List<Output<OUT>> allOutputs;
+
+	public CollectorWrapper(OutputSelectorWrapper<OUT> outputSelectorWrapper) {
+		this.outputSelectorWrapper = outputSelectorWrapper;
+		allOutputs = new ArrayList<Output<OUT>>();
+	}
+
+	public void addCollector(Collector<StreamRecord<?>> output, StreamEdge edge) {
+		outputSelectorWrapper.addCollector(output, edge);
+		allOutputs.add((Output) output);
+	}
+
+	@Override
+	public void collect(StreamRecord<OUT> record) {
+		for (Collector<StreamRecord<OUT>> output : outputSelectorWrapper.getSelectedOutputs(record.getValue())) {
+			output.collect(record);
+		}
+	}
+
+	@Override
+	public void emitWatermark(Watermark mark) {
+		for (Output<OUT> output : allOutputs) {
+			output.emitWatermark(mark);
+		}
+	}
+
+	@Override
+	public void close() {
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedMutableReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedMutableReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedMutableReader.java
deleted file mode 100644
index 7f2a9c5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedMutableReader.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-
-public class IndexedMutableReader<T extends IOReadableWritable> extends
-		StreamingMutableRecordReader<T> {
-
-	InputGate reader;
-
-	public IndexedMutableReader(InputGate reader) {
-		super(reader);
-		this.reader = reader;
-	}
-
-	public int getNumberOfInputChannels() {
-		return reader.getNumberOfInputChannels();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedReaderIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedReaderIterator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedReaderIterator.java
deleted file mode 100644
index 2050e27..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/IndexedReaderIterator.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.operators.util.ReaderIterator;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-
-public class IndexedReaderIterator<T> extends ReaderIterator<T> {
-
-	public IndexedReaderIterator(
-			IndexedMutableReader<DeserializationDelegate<T>> reader,
-			TypeSerializer<T> serializer) {
-
-		super(reader, serializer);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateFactory.java
deleted file mode 100644
index 7883251..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateFactory.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.util.Collection;
-
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
-
-public class InputGateFactory {
-
-	public static InputGate createInputGate(Collection<InputGate> inputGates) {
-		return createInputGate(inputGates.toArray(new InputGate[inputGates.size()]));
-	}
-
-	public static InputGate createInputGate(InputGate[] inputGates) {
-		if (inputGates.length <= 0) {
-			throw new RuntimeException("No such input gate.");
-		}
-
-		if (inputGates.length < 2) {
-			return inputGates[0];
-		} else {
-			return new UnionInputGate(inputGates);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java
new file mode 100644
index 0000000..01e16fb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/InputGateUtil.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
+
+/**
+ * Utility for dealing with input gates. This will either just return
+ * the single {@link InputGate} that was passed in or create a {@link UnionInputGate} if several
+ * {@link InputGate input gates} are given.
+ */
+public class InputGateUtil {
+
+	public static InputGate createInputGate(Collection<InputGate> inputGates1, Collection<InputGate> inputGates2) {
+		List<InputGate> gates = new ArrayList<InputGate>(inputGates1.size() + inputGates2.size());
+		gates.addAll(inputGates1);
+		gates.addAll(inputGates2);
+		return createInputGate(gates.toArray(new InputGate[gates.size()]));
+	}
+
+	public static InputGate createInputGate(InputGate[] inputGates) {
+		if (inputGates.length <= 0) {
+			throw new RuntimeException("No such input gate.");
+		}
+
+		if (inputGates.length < 2) {
+			return inputGates[0];
+		} else {
+			return new UnionInputGate(inputGates);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
new file mode 100644
index 0000000..e9cbb7d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.io.IOException;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link Output} that sends data using a {@link RecordWriter}.
+ */
+public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(RecordWriterOutput.class);
+
+	private RecordWriter<SerializationDelegate> recordWriter;
+	private SerializationDelegate serializationDelegate;
+
+	@SuppressWarnings("unchecked")
+	public RecordWriterOutput(
+			RecordWriter<SerializationDelegate> recordWriter,
+			TypeSerializer<OUT> outSerializer,
+			boolean enableWatermarkMultiplexing) {
+		Preconditions.checkNotNull(recordWriter);
+
+		this.recordWriter = recordWriter;
+
+		StreamRecordSerializer<OUT> outRecordSerializer;
+		if (enableWatermarkMultiplexing) {
+			outRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outSerializer);
+		} else {
+			outRecordSerializer = new StreamRecordSerializer<OUT>(outSerializer);
+		}
+
+		if (outSerializer != null) {
+			serializationDelegate = new SerializationDelegate(outRecordSerializer);
+		}
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void collect(StreamRecord<OUT> record) {
+		serializationDelegate.setInstance(record);
+
+		try {
+			recordWriter.emit(serializationDelegate);
+		} catch (Exception e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Emit failed: {}", e);
+			}
+			throw new RuntimeException("Element emission failed.", e);
+		}
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void emitWatermark(Watermark mark) {
+		serializationDelegate.setInstance(mark);
+		try {
+			recordWriter.broadcastEmit(serializationDelegate);
+		} catch (Exception e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Watermark emit failed: {}", e);
+			}
+			throw new RuntimeException(e);
+		}
+	}
+
+	@Override
+	public void close() {
+		if (recordWriter instanceof StreamRecordWriter) {
+			((StreamRecordWriter) recordWriter).close();
+		} else {
+			try {
+				recordWriter.flush();
+			} catch (IOException e) {
+				e.printStackTrace();
+			}
+		}
+	}
+
+	public void clearBuffers() {
+		recordWriter.clearBuffers();
+	}
+
+	public void broadcastEvent(TaskEvent barrier) throws IOException, InterruptedException {
+		recordWriter.broadcastEvent(barrier);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
new file mode 100644
index 0000000..e665710
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
+import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
+import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
+ *
+ * <p>
+ * This also keeps track of {@link Watermark} events and forwards them to event subscribers
+ * once the {@link Watermark} from all inputs advances.
+ * 
+ * @param <IN> The type of the record that can be read with this record reader.
+ */
+public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBase, StreamingReader {
+
+	@SuppressWarnings("unused")
+	private static final Logger LOG = LoggerFactory.getLogger(StreamInputProcessor.class);
+
+	private final RecordDeserializer<DeserializationDelegate>[] recordDeserializers;
+
+	private RecordDeserializer<DeserializationDelegate> currentRecordDeserializer;
+
+	// We need to keep track of the channel from which a buffer came, so that we can
+	// appropriately map the watermarks to input channels
+	int currentChannel = -1;
+
+	private boolean isFinished;
+
+	private final BarrierBuffer barrierBuffer;
+
+	private long[] watermarks;
+	private long lastEmittedWatermark;
+
+	private DeserializationDelegate deserializationDelegate;
+
+	@SuppressWarnings("unchecked")
+	public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer, boolean enableWatermarkMultiplexing) {
+		super(InputGateUtil.createInputGate(inputGates));
+
+		barrierBuffer = new BarrierBuffer(inputGate, this);
+
+		StreamRecordSerializer<IN> inputRecordSerializer;
+		if (enableWatermarkMultiplexing) {
+			inputRecordSerializer = new MultiplexingStreamRecordSerializer<IN>(inputSerializer);
+		} else {
+			inputRecordSerializer = new StreamRecordSerializer<IN>(inputSerializer);
+		}
+		this.deserializationDelegate = new NonReusingDeserializationDelegate(inputRecordSerializer);
+
+		// Initialize one deserializer per input channel
+		this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
+		for (int i = 0; i < recordDeserializers.length; i++) {
+			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate>();
+		}
+
+		watermarks = new long[inputGate.getNumberOfInputChannels()];
+		for (int i = 0; i < inputGate.getNumberOfInputChannels(); i++) {
+			watermarks[i] = Long.MIN_VALUE;
+		}
+		lastEmittedWatermark = Long.MIN_VALUE;
+	}
+
+	@SuppressWarnings("unchecked")
+	public boolean processInput(OneInputStreamOperator<IN, ?> streamOperator) throws Exception {
+		if (isFinished) {
+			return false;
+		}
+
+		while (true) {
+			if (currentRecordDeserializer != null) {
+				DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
+
+				if (result.isBufferConsumed()) {
+					currentRecordDeserializer.getCurrentBuffer().recycle();
+					currentRecordDeserializer = null;
+				}
+
+				if (result.isFullRecord()) {
+					Object recordOrWatermark = deserializationDelegate.getInstance();
+
+					if (recordOrWatermark instanceof Watermark) {
+						Watermark mark = (Watermark) recordOrWatermark;
+						long watermarkMillis = mark.getTimestamp();
+						if (watermarkMillis > watermarks[currentChannel]) {
+							watermarks[currentChannel] = watermarkMillis;
+							long newMinWatermark = Long.MAX_VALUE;
+							for (long watermark : watermarks) {
+								if (watermark < newMinWatermark) {
+									newMinWatermark = watermark;
+								}
+							}
+							if (newMinWatermark > lastEmittedWatermark) {
+								lastEmittedWatermark = newMinWatermark;
+								streamOperator.processWatermark(new Watermark(lastEmittedWatermark));
+							}
+						}
+						continue;
+					} else {
+						// now we can do the actual processing
+						StreamRecord<IN> record = (StreamRecord<IN>) deserializationDelegate.getInstance();
+						StreamingRuntimeContext ctx = streamOperator.getRuntimeContext();
+						if (ctx != null) {
+							ctx.setNextInput(record);
+						}
+						streamOperator.processElement(record);
+						return true;
+					}
+				}
+			}
+
+			final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked();
+
+			if (bufferOrEvent.isBuffer()) {
+				currentChannel = bufferOrEvent.getChannelIndex();
+				currentRecordDeserializer = recordDeserializers[currentChannel];
+				currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
+			} else {
+				// Event received
+				final AbstractEvent event = bufferOrEvent.getEvent();
+
+				if (event instanceof CheckpointBarrier) {
+					barrierBuffer.processBarrier(bufferOrEvent);
+				} else {
+					if (handleEvent(event)) {
+						if (inputGate.isFinished()) {
+							if (!barrierBuffer.isEmpty()) {
+								throw new RuntimeException("BarrierBuffer should be empty at this point");
+							}
+							isFinished = true;
+							return false;
+						} else if (hasReachedEndOfSuperstep()) {
+							return false;
+						} // else: More data is coming...
+					}
+				}
+			}
+		}
+	}
+
+	@Override
+	public void setReporter(AccumulatorRegistry.Reporter reporter) {
+		for (RecordDeserializer<?> deserializer : recordDeserializers) {
+			deserializer.setReporter(reporter);
+		}
+	}
+
+	public void clearBuffers() {
+		for (RecordDeserializer<?> deserializer : recordDeserializers) {
+			Buffer buffer = deserializer.getCurrentBuffer();
+			if (buffer != null && !buffer.isRecycled()) {
+				buffer.recycle();
+			}
+		}
+	}
+
+	public void cleanup() throws IOException {
+		barrierBuffer.cleanup();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
index c212346..abae9a4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
@@ -61,6 +61,14 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit
 		}
 	}
 
+	@Override
+	public void broadcastEmit(T record) throws IOException, InterruptedException {
+		super.broadcastEmit(record);
+		if (flushAlways) {
+			flush();
+		}
+	}
+
 	public void close() {
 		try {
 			if (outputFlusher != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
new file mode 100644
index 0000000..1fe98bb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
+import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
+import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Input reader for {@link org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask}.
+ *
+ * <p>
+ * This also keeps track of {@link org.apache.flink.streaming.api.watermark.Watermark} events and forwards them to event subscribers
+ * once the {@link org.apache.flink.streaming.api.watermark.Watermark} from all inputs advances.
+ *
+ * @param <IN1> The type of the records that arrive on the first input
+ * @param <IN2> The type of the records that arrive on the second input
+ */
+public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements ReaderBase, StreamingReader {
+
+	@SuppressWarnings("unused")
+	private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputProcessor.class);
+
+	private final RecordDeserializer[] recordDeserializers;
+
+	private RecordDeserializer currentRecordDeserializer;
+
+	// We need to keep track of the channel from which a buffer came, so that we can
+	// appropriately map the watermarks to input channels
+	int currentChannel = -1;
+
+	private boolean isFinished;
+
+	private final BarrierBuffer barrierBuffer;
+
+	private long[] watermarks1;
+	private long lastEmittedWatermark1;
+
+	private long[] watermarks2;
+	private long lastEmittedWatermark2;
+
+	private int numInputChannels1;
+	private int numInputChannels2;
+
+	private DeserializationDelegate deserializationDelegate1;
+	private DeserializationDelegate deserializationDelegate2;
+
+	@SuppressWarnings("unchecked")
+	public StreamTwoInputProcessor(
+			Collection<InputGate> inputGates1,
+			Collection<InputGate> inputGates2,
+			TypeSerializer<IN1> inputSerializer1,
+			TypeSerializer<IN2> inputSerializer2,
+			boolean enableWatermarkMultiplexing) {
+		super(InputGateUtil.createInputGate(inputGates1, inputGates2));
+
+		barrierBuffer = new BarrierBuffer(inputGate, this);
+
+		StreamRecordSerializer<IN1> inputRecordSerializer1;
+		if (enableWatermarkMultiplexing) {
+			inputRecordSerializer1 = new MultiplexingStreamRecordSerializer<IN1>(inputSerializer1);
+		} else {
+			inputRecordSerializer1 = new StreamRecordSerializer<IN1>(inputSerializer1);
+		}
+		this.deserializationDelegate1 = new NonReusingDeserializationDelegate(inputRecordSerializer1);
+
+		StreamRecordSerializer<IN2> inputRecordSerializer2;
+		if (enableWatermarkMultiplexing) {
+			inputRecordSerializer2 = new MultiplexingStreamRecordSerializer<IN2>(inputSerializer2);
+		} else {
+			inputRecordSerializer2 = new StreamRecordSerializer<IN2>(inputSerializer2);
+		}
+		this.deserializationDelegate2 = new NonReusingDeserializationDelegate(inputRecordSerializer2);
+
+		// Initialize one deserializer per input channel
+		this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate
+				.getNumberOfInputChannels()];
+		for (int i = 0; i < recordDeserializers.length; i++) {
+			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer();
+		}
+
+		// determine which unioned channels belong to input 1 and which belong to input 2
+		numInputChannels1 = 0;
+		for (InputGate gate: inputGates1) {
+			numInputChannels1 += gate.getNumberOfInputChannels();
+		}
+		numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1;
+
+		watermarks1 = new long[numInputChannels1];
+		for (int i = 0; i < numInputChannels1; i++) {
+			watermarks1[i] = Long.MIN_VALUE;
+		}
+		lastEmittedWatermark1 = Long.MIN_VALUE;
+
+		watermarks2 = new long[numInputChannels2];
+		for (int i = 0; i < numInputChannels2; i++) {
+			watermarks2[i] = Long.MIN_VALUE;
+		}
+		lastEmittedWatermark2 = Long.MIN_VALUE;
+	}
+
+	@SuppressWarnings("unchecked")
+	public boolean processInput(TwoInputStreamOperator<IN1, IN2, ?> streamOperator) throws Exception {
+		if (isFinished) {
+			return false;
+		}
+
+		while (true) {
+			if (currentRecordDeserializer != null) {
+				DeserializationResult result;
+				if (currentChannel < numInputChannels1) {
+					result = currentRecordDeserializer.getNextRecord(deserializationDelegate1);
+				} else {
+					result = currentRecordDeserializer.getNextRecord(deserializationDelegate2);
+				}
+
+				if (result.isBufferConsumed()) {
+					currentRecordDeserializer.getCurrentBuffer().recycle();
+					currentRecordDeserializer = null;
+				}
+
+				if (result.isFullRecord()) {
+					if (currentChannel < numInputChannels1) {
+						Object recordOrWatermark = deserializationDelegate1.getInstance();
+						if (recordOrWatermark instanceof Watermark) {
+							handleWatermark(streamOperator, (Watermark) recordOrWatermark, currentChannel);
+							continue;
+						} else {
+							streamOperator.processElement1((StreamRecord<IN1>) deserializationDelegate1.getInstance());
+							return true;
+
+						}
+					} else {
+						Object recordOrWatermark = deserializationDelegate2.getInstance();
+						if (recordOrWatermark instanceof Watermark) {
+							handleWatermark(streamOperator, (Watermark) recordOrWatermark, currentChannel);
+							continue;
+						} else {
+							streamOperator.processElement2((StreamRecord<IN2>) deserializationDelegate2.getInstance());
+							return true;
+						}
+					}
+				}
+			}
+
+			final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked();
+
+			if (bufferOrEvent.isBuffer()) {
+				currentChannel = bufferOrEvent.getChannelIndex();
+				currentRecordDeserializer = recordDeserializers[currentChannel];
+				currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
+
+			} else {
+				// Event received
+				final AbstractEvent event = bufferOrEvent.getEvent();
+
+				if (event instanceof CheckpointBarrier) {
+					barrierBuffer.processBarrier(bufferOrEvent);
+				} else {
+					if (handleEvent(event)) {
+						if (inputGate.isFinished()) {
+							if (!barrierBuffer.isEmpty()) {
+								throw new RuntimeException("BarrierBuffer should be empty at this point");
+							}
+							isFinished = true;
+							return false;
+						} else if (hasReachedEndOfSuperstep()) {
+							return false;
+						} // else: More data is coming...
+					}
+				}
+			}
+		}
+	}
+
+	private void handleWatermark(TwoInputStreamOperator<IN1, IN2, ?> operator, Watermark mark, int channelIndex) throws Exception {
+		if (channelIndex < numInputChannels1) {
+			long watermarkMillis = mark.getTimestamp();
+			if (watermarkMillis > watermarks1[channelIndex]) {
+				watermarks1[channelIndex] = watermarkMillis;
+				long newMinWatermark = Long.MAX_VALUE;
+				for (long aWatermarks1 : watermarks1) {
+					if (aWatermarks1 < newMinWatermark) {
+						newMinWatermark = aWatermarks1;
+					}
+				}
+				if (newMinWatermark > lastEmittedWatermark1) {
+					lastEmittedWatermark1 = newMinWatermark;
+					operator.processWatermark1(new Watermark(lastEmittedWatermark1));
+				}
+			}
+		} else {
+			channelIndex = channelIndex - numInputChannels1;
+			long watermarkMillis = mark.getTimestamp();
+			if (watermarkMillis > watermarks2[channelIndex]) {
+				watermarks2[channelIndex] = watermarkMillis;
+				long newMinWatermark = Long.MAX_VALUE;
+				for (long aWatermarks2 : watermarks2) {
+					if (aWatermarks2 < newMinWatermark) {
+						newMinWatermark = aWatermarks2;
+					}
+				}
+				if (newMinWatermark > lastEmittedWatermark2) {
+					lastEmittedWatermark2 = newMinWatermark;
+					operator.processWatermark2(new Watermark(lastEmittedWatermark2));
+				}
+			}
+		}
+
+	}
+
+	@Override
+	public void setReporter(AccumulatorRegistry.Reporter reporter) {
+		for (RecordDeserializer<?> deserializer : recordDeserializers) {
+			deserializer.setReporter(reporter);
+		}
+	}
+
+	public void clearBuffers() {
+		for (RecordDeserializer<?> deserializer : recordDeserializers) {
+			Buffer buffer = deserializer.getCurrentBuffer();
+			if (buffer != null && !buffer.isRecycled()) {
+				buffer.recycle();
+			}
+		}
+	}
+
+	public void cleanup() throws IOException {
+		barrierBuffer.cleanup();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
deleted file mode 100644
index 44f9a86..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingAbstractRecordReader.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
-import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
-import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.streaming.runtime.tasks.StreamingSuperstep;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A record-oriented reader.
- * <p>
- * This abstract base class is used by both the mutable and immutable record
- * readers.
- * 
- * @param <T>
- *            The type of the record that can be read with this record reader.
- */
-public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable> extends
-		AbstractReader implements ReaderBase, StreamingReader {
-
-	@SuppressWarnings("unused")
-	private static final Logger LOG = LoggerFactory.getLogger(StreamingAbstractRecordReader.class);
-
-	private final RecordDeserializer<T>[] recordDeserializers;
-
-	private RecordDeserializer<T> currentRecordDeserializer;
-
-	private boolean isFinished;
-
-	private final BarrierBuffer barrierBuffer;
-
-
-	@SuppressWarnings("unchecked")
-	protected StreamingAbstractRecordReader(InputGate inputGate) {
-		super(inputGate);
-		barrierBuffer = new BarrierBuffer(inputGate, this);
-
-		// Initialize one deserializer per input channel
-		this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate
-				.getNumberOfInputChannels()];
-		for (int i = 0; i < recordDeserializers.length; i++) {
-			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<T>();
-		}
-	}
-
-	protected boolean getNextRecord(T target) throws IOException, InterruptedException {
-		if (isFinished) {
-			return false;
-		}
-
-		while (true) {
-			if (currentRecordDeserializer != null) {
-				DeserializationResult result = currentRecordDeserializer.getNextRecord(target);
-
-				if (result.isBufferConsumed()) {
-					Buffer currentBuffer = currentRecordDeserializer.getCurrentBuffer();
-					currentBuffer.recycle();
-					currentRecordDeserializer = null;
-				}
-
-				if (result.isFullRecord()) {
-					return true;
-				}
-			}
-
-			final BufferOrEvent bufferOrEvent = barrierBuffer.getNextNonBlocked();
-
-			if (bufferOrEvent.isBuffer()) {
-				currentRecordDeserializer = recordDeserializers[bufferOrEvent.getChannelIndex()];
-				currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
-			} else {
-				// Event received
-				final AbstractEvent event = bufferOrEvent.getEvent();
-
-				if (event instanceof StreamingSuperstep) {
-					barrierBuffer.processSuperstep(bufferOrEvent);
-				} else {
-					if (handleEvent(event)) {
-						if (inputGate.isFinished()) {
-							if (!barrierBuffer.isEmpty()) {
-								throw new RuntimeException(
-										"BarrierBuffer should be empty at this point");
-							}
-							isFinished = true;
-							return false;
-						} else if (hasReachedEndOfSuperstep()) {
-							return false;
-						} // else: More data is coming...
-					}
-				}
-			}
-		}
-	}
-
-	public void clearBuffers() {
-		for (RecordDeserializer<?> deserializer : recordDeserializers) {
-			Buffer buffer = deserializer.getCurrentBuffer();
-			if (buffer != null && !buffer.isRecycled()) {
-				buffer.recycle();
-			}
-		}
-	}
-
-	public void cleanup() throws IOException {
-		barrierBuffer.cleanup();
-	}
-
-	@Override
-	public void setReporter(AccumulatorRegistry.Reporter reporter) {
-		for (RecordDeserializer<?> deserializer : recordDeserializers) {
-			deserializer.setReporter(reporter);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java
deleted file mode 100644
index 1356af5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamingMutableRecordReader.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.reader.MutableReader;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-
-public class StreamingMutableRecordReader<T extends IOReadableWritable> extends
-		StreamingAbstractRecordReader<T> implements MutableReader<T> {
-
-	public StreamingMutableRecordReader(InputGate inputGate) {
-		super(inputGate);
-	}
-
-	@Override
-	public boolean next(final T target) throws IOException, InterruptedException {
-		return getNextRecord(target);
-	}
-
-	@Override
-	public void clearBuffers() {
-		super.clearBuffers();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
index 75867cd..6c40c03 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
@@ -44,10 +44,14 @@ public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {
 	}
 
 	@Override
-	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
-			int numberOfOutputChannels) {
-
-		K key = record.getInstance().getKey(keySelector);
+	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, int numberOfOutputChannels) {
+
+		K key = null;
+		try {
+			key = keySelector.getKey(record.getInstance().getValue());
+		} catch (Exception e) {
+			throw new RuntimeException("Could not extract key from " + record.getInstance(), e);
+		}
 
 		returnArray[0] = partitioner.partition(key,
 				numberOfOutputChannels);

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
index 08c431b..7026d45 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitioner.java
@@ -42,8 +42,13 @@ public class FieldsPartitioner<T> extends StreamPartitioner<T> {
 	@Override
 	public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
 			int numberOfOutputChannels) {
-		returnArray[0] = Math.abs(record.getInstance().getKey(keySelector).hashCode()
-				% numberOfOutputChannels);
+		Object key;
+		try {
+			key = keySelector.getKey(record.getInstance().getValue());
+		} catch (Exception e) {
+			throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
+		}
+		returnArray[0] = Math.abs(key.hashCode() % numberOfOutputChannels);
 
 		return returnArray;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
new file mode 100644
index 0000000..715f0d2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamrecord;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import java.io.IOException;
+
+/**
+ * Serializer for {@link StreamRecord} and {@link Watermark}. This does not behave like a normal
+ * {@link TypeSerializer}, instead, this is only used at the
+ * {@link org.apache.flink.streaming.runtime.tasks.StreamTask} level for transmitting
+ * {@link StreamRecord StreamRecords} and {@link Watermark Watermarks}. This serializer
+ * can handle both of them, therefore it returns {@link Object} the result has
+ * to be cast to the correct type.
+ *
+ * @param <T> The type of value in the {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord}
+ */
+public final class MultiplexingStreamRecordSerializer<T> extends StreamRecordSerializer<T> {
+
+	private final long IS_WATERMARK = Long.MIN_VALUE;
+
+	private static final long serialVersionUID = 1L;
+
+	public MultiplexingStreamRecordSerializer(TypeSerializer<T> serializer) {
+		super(serializer);
+		if (serializer instanceof MultiplexingStreamRecordSerializer) {
+			throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer);
+		}
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public Object copy(Object from) {
+		// we can reuse the timestamp since Instant is immutable
+		if (from instanceof StreamRecord) {
+			StreamRecord<T> fromRecord = (StreamRecord<T>) from;
+			return new StreamRecord<T>(typeSerializer.copy(fromRecord.getValue()), fromRecord.getTimestamp());
+		} else if (from instanceof Watermark) {
+			// is immutable
+			return from;
+		} else {
+			throw new RuntimeException("Cannot copy " + from);
+		}
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public Object copy(Object from, Object reuse) {
+		if (from instanceof StreamRecord && reuse instanceof StreamRecord) {
+			StreamRecord<T> fromRecord = (StreamRecord<T>) from;
+			StreamRecord<T> reuseRecord = (StreamRecord<T>) reuse;
+
+			reuseRecord.replace(typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue()), fromRecord.getTimestamp());
+			return reuse;
+		} else if (from instanceof Watermark) {
+			// is immutable
+			return from;
+		} else {
+			throw new RuntimeException("Cannot copy " + from);
+		}
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void serialize(Object value, DataOutputView target) throws IOException {
+		if (value instanceof StreamRecord) {
+			StreamRecord<T> record = (StreamRecord<T>) value;
+			target.writeLong(record.getTimestamp());
+			typeSerializer.serialize(record.getValue(), target);
+		} else if (value instanceof Watermark) {
+			target.writeLong(IS_WATERMARK);
+			target.writeLong(((Watermark) value).getTimestamp());
+		}
+	}
+	
+	@Override
+	public Object deserialize(DataInputView source) throws IOException {
+		long millis = source.readLong();
+
+		if (millis == IS_WATERMARK) {
+			return new Watermark(source.readLong());
+		} else {
+			T element = typeSerializer.deserialize(source);
+			return new StreamRecord<T>(element, millis);
+		}
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public Object deserialize(Object reuse, DataInputView source) throws IOException {
+		long millis = source.readLong();
+
+		if (millis == IS_WATERMARK) {
+			return new Watermark(source.readLong());
+
+		} else {
+			StreamRecord<T> reuseRecord = (StreamRecord<T>) reuse;
+			T element = typeSerializer.deserialize(reuseRecord.getValue(), source);
+			reuseRecord.replace(element, millis);
+			return reuse;
+		}
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		long millis = source.readLong();
+		target.writeLong(millis);
+
+		if (millis == IS_WATERMARK) {
+			target.writeLong(source.readLong());
+		} else {
+			typeSerializer.copy(source, target);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
index 66a64b3..aff030e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
@@ -17,87 +17,106 @@
 
 package org.apache.flink.streaming.runtime.streamrecord;
 
-import java.io.Serializable;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple;
-
 /**
- * Object for wrapping a tuple or other object with ID used for sending records
- * between streaming task in Apache Flink stream processing.
+ * One value in a data stream. This stores the value and the associated timestamp.
  */
-public class StreamRecord<T> implements Serializable {
-	private static final long serialVersionUID = 1L;
+public class StreamRecord<T> {
 
-	private T streamObject;
-	public boolean isTuple;
+	// We store it as Object so that we can reuse a StreamElement for emitting
+	// elements of a different type while still reusing the timestamp.
+	private Object value;
+	private long timestamp;
 
 	/**
-	 * Creates an empty StreamRecord
+	 * Creates a new {@link StreamRecord} wrapping the given value. The timestamp is set to the
+	 * result of {@code new Instant(0)}.
 	 */
-	public StreamRecord() {
+	public StreamRecord(T value) {
+		this(value, Long.MIN_VALUE + 1);
+		// be careful to set it to MIN_VALUE + 1, because MIN_VALUE is reserved as the
+		// special tag to signify that a transmitted element is a Watermark in StreamRecordSerializer
 	}
 
 	/**
-	 * Gets the wrapped object from the StreamRecord
-	 * 
-	 * @return The object wrapped
+	 * Creates a new {@link StreamRecord} wrapping the given value. The timestamp is set to the
+	 * given timestamp.
+	 *
+	 * @param value The value to wrap in this {@link StreamRecord}
+	 * @param timestamp The timestamp in milliseconds
 	 */
-	public T getObject() {
-		return streamObject;
+	public StreamRecord(T value, long timestamp) {
+		this.value = value;
+		this.timestamp = timestamp;
 	}
 
 	/**
-	 * Gets the field of the contained object at the given position. If a tuple
-	 * is wrapped then the getField method is invoked. If the StreamRecord
-	 * contains and object of Basic types only position 0 could be returned.
-	 * 
-	 * @param pos
-	 *            Position of the field to get.
-	 * @return Returns the object contained in the position.
+	 * Returns the value wrapped in this stream value.
 	 */
-	public Object getField(int pos) {
-		if (isTuple) {
-			return ((Tuple) streamObject).getField(pos);
-		} else {
-			if (pos == 0) {
-				return streamObject;
-			} else {
-				throw new IndexOutOfBoundsException();
-			}
-		}
+	@SuppressWarnings("unchecked")
+	public T getValue() {
+		return (T) value;
 	}
 
 	/**
-	 * Extracts key for the stored object using the keySelector provided.
-	 * 
-	 * @param keySelector
-	 *            KeySelector for extracting the key
-	 * @return The extracted key
+	 * Returns the timestamp associated with this stream value in milliseconds.
 	 */
-	public <R> R getKey(KeySelector<T, R> keySelector) {
-		try {
-			return keySelector.getKey(streamObject);
-		} catch (Exception e) {
-			throw new RuntimeException("Failed to extract key: " + e.getMessage());
-		}
+	public long getTimestamp() {
+		return timestamp;
+	}
+
+	/**
+	 * Replace the currently stored value by the given new value. This returns a StreamElement
+	 * with the generic type parameter that matches the new value while keeping the old
+	 * timestamp.
+	 *
+	 * @param element Element to set in this stream value
+	 * @return Returns the StreamElement with replaced value
+	 */
+	@SuppressWarnings("unchecked")
+	public <X> StreamRecord<X> replace(X element) {
+		this.value = element;
+		return (StreamRecord<X>) this;
 	}
 
 	/**
-	 * Sets the object stored
-	 * 
-	 * @param object
-	 *            Object to set
-	 * @return Returns the StreamRecord object
+	 * Replace the currently stored value by the given new value and the currently stored
+	 * timestamp with the new timestamp. This returns a StreamElement with the generic type
+	 * parameter that matches the new value.
+	 *
+	 * @param value The new value to wrap in this {@link StreamRecord}
+	 * @param timestamp The new timestamp in milliseconds
+	 * @return Returns the StreamElement with replaced value
 	 */
-	public StreamRecord<T> setObject(T object) {
-		this.streamObject = object;
-		return this;
+	@SuppressWarnings("unchecked")
+	public <X> StreamRecord<X> replace(X value, long timestamp) {
+		this.timestamp = timestamp;
+		this.value = value;
+		return (StreamRecord<X>) this;
 	}
 
 	@Override
-	public String toString() {
-		return streamObject.toString();
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		StreamRecord that = (StreamRecord) o;
+
+		return value.equals(that.value) && timestamp == that.timestamp;
 	}
 
+	@Override
+	public int hashCode() {
+		int result = value != null ? value.hashCode() : 0;
+		result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+		return result;
+	}
+
+	@Override
+	public String toString() {
+		return "Record{" + value + "; " + timestamp + '}';
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
index 4499499..b05eb36 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
@@ -20,26 +20,35 @@ package org.apache.flink.streaming.runtime.streamrecord;
 
 import java.io.IOException;
 
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 
-public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord<T>> {
+/**
+ * Serializer for {@link StreamRecord}. This version ignores timestamps and only deals with
+ * the element.
+ *
+ * <p>
+ * {@link MultiplexingStreamRecordSerializer} is a version that deals with timestamps and also
+ * multiplexes {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks} in the same
+ * stream with {@link StreamRecord StreamRecords}.
+ *
+ * @see MultiplexingStreamRecordSerializer
+ *
+ * @param <T> The type of value in the {@link StreamRecord}
+ */
+public class StreamRecordSerializer<T> extends TypeSerializer<Object> {
 
 	private static final long serialVersionUID = 1L;
 
-	private final TypeSerializer<T> typeSerializer;
-	private final boolean isTuple;
+	protected final TypeSerializer<T> typeSerializer;
 
-	public StreamRecordSerializer(TypeInformation<T> typeInfo, ExecutionConfig executionConfig) {
-		this.typeSerializer = typeInfo.createSerializer(executionConfig);
-		this.isTuple = typeInfo.isTupleType();
-	}
-
-	public TypeSerializer<T> getObjectSerializer() {
-		return typeSerializer;
+	public StreamRecordSerializer(TypeSerializer<T> serializer) {
+		if (serializer instanceof StreamRecordSerializer) {
+			throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer);
+		}
+		this.typeSerializer = Preconditions.checkNotNull(serializer);
 	}
 
 	@Override
@@ -48,34 +57,34 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
 	}
 
 	@Override
-	public StreamRecordSerializer<T> duplicate() {
+	@SuppressWarnings("unchecked")
+	public TypeSerializer duplicate() {
 		return this;
 	}
 
 	@Override
-	public StreamRecord<T> createInstance() {
+	public Object createInstance() {
 		try {
-			StreamRecord<T> t = new StreamRecord<T>();
-			t.isTuple = isTuple;
-			t.setObject(typeSerializer.createInstance());
-			return t;
+			return new StreamRecord<T>(typeSerializer.createInstance());
 		} catch (Exception e) {
 			throw new RuntimeException("Cannot instantiate StreamRecord.", e);
 		}
 	}
 	
 	@Override
-	public StreamRecord<T> copy(StreamRecord<T> from) {
-		StreamRecord<T> rec = new StreamRecord<T>();
-		rec.isTuple = from.isTuple;
-		rec.setObject(typeSerializer.copy(from.getObject()));
-		return rec;
+	@SuppressWarnings("unchecked")
+	public Object copy(Object from) {
+		StreamRecord<T> fromRecord = (StreamRecord<T>) from;
+		return new StreamRecord<T>(typeSerializer.copy(fromRecord.getValue()), fromRecord.getTimestamp());
 	}
 
 	@Override
-	public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> reuse) {
-		reuse.isTuple = from.isTuple;
-		reuse.setObject(typeSerializer.copy(from.getObject(), reuse.getObject()));
+	@SuppressWarnings("unchecked")
+	public Object copy(Object from, Object reuse) {
+		StreamRecord<T> fromRecord = (StreamRecord<T>) from;
+		StreamRecord<T> reuseRecord = (StreamRecord<T>) reuse;
+
+		reuseRecord.replace(typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue()), 0);
 		return reuse;
 	}
 
@@ -85,26 +94,29 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
 	}
 
 	@Override
-	public void serialize(StreamRecord<T> value, DataOutputView target) throws IOException {
-		typeSerializer.serialize(value.getObject(), target);
+	@SuppressWarnings("unchecked")
+	public void serialize(Object value, DataOutputView target) throws IOException {
+		StreamRecord<T> record = (StreamRecord<T>) value;
+		typeSerializer.serialize(record.getValue(), target);
 	}
 	
 	@Override
-	public StreamRecord<T> deserialize(DataInputView source) throws IOException {
-		StreamRecord<T> record = new StreamRecord<T>();
-		record.isTuple = this.isTuple;
-		record.setObject(typeSerializer.deserialize(source));
-		return record;
+	public Object deserialize(DataInputView source) throws IOException {
+		T element = typeSerializer.deserialize(source);
+		return new StreamRecord<T>(element, 0);
 	}
 
 	@Override
-	public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source) throws IOException {
-		reuse.setObject(typeSerializer.deserialize(reuse.getObject(), source));
+	@SuppressWarnings("unchecked")
+	public Object deserialize(Object reuse, DataInputView source) throws IOException {
+		StreamRecord<T> reuseRecord = (StreamRecord<T>) reuse;
+		T element = typeSerializer.deserialize(reuseRecord.getValue(), source);
+		reuseRecord.replace(element, 0);
 		return reuse;
 	}
 
 	@Override
 	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		// Needs to be implemented
+		typeSerializer.copy(source, target);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointBarrier.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointBarrier.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointBarrier.java
new file mode 100644
index 0000000..d94b5b4
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/CheckpointBarrier.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import java.io.IOException;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.task.TaskEvent;
+
+/**
+ * Checkpoint barriers are used to synchronize checkpoints throughout the streaming topology. The
+ * barriers are emitted by the sources when instructed to do so by the JobManager. When
+ * operators receive a {@link CheckpointBarrier} on one of its inputs it must block processing
+ * of further elements on this input until all inputs received the checkpoint barrier
+ * corresponding to to that checkpoint. Once all inputs received the checkpoint barrier for
+ * a checkpoint the operator is to perform the checkpoint and then broadcast the barrier to
+ * downstream operators.
+ *
+ * <p>
+ * The checkpoint barrier IDs are advancing. Once an operator receives a {@link CheckpointBarrier}
+ * for a checkpoint with a higher id it is to discard all barriers that it received from previous
+ * checkpoints and unblock all other inputs.
+ */
+public class CheckpointBarrier extends TaskEvent {
+
+	protected long id;
+	protected long timestamp;
+
+	public CheckpointBarrier() {}
+
+	public CheckpointBarrier(long id, long timestamp) {
+		this.id = id;
+		this.timestamp = timestamp;
+	}
+
+	public long getId() {
+		return id;
+	}
+
+	public long getTimestamp() {
+		return id;
+	}
+
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeLong(id);
+		out.writeLong(timestamp);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		id = in.readLong();
+		timestamp = in.readLong();
+	}
+	
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return (int) (id ^ (id >>> 32) ^ timestamp ^(timestamp >>> 32));
+	}
+
+	@Override
+	public boolean equals(Object other) {
+		if (other == null || !(other instanceof CheckpointBarrier)) {
+			return false;
+		}
+		else {
+			CheckpointBarrier that = (CheckpointBarrier) other;
+			return that.id == this.id && that.timestamp == this.timestamp;
+		}
+	}
+
+	@Override
+	public String toString() {
+		return String.format("CheckpointBarrier %d @ %d", id, timestamp);
+	}
+}