You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/07/21 12:45:18 UTC

[8/8] flink git commit: [FLINK-1967] Introduce (Event)time in Streaming

[FLINK-1967] Introduce (Event)time in Streaming

This introduces an additional timestamp field in StreamRecord. When using a
SourceFunction and an auto-timestamp interval is set using the
ExecutionConfig, the timestamp is automatically set to System.currentTimeMillis()
upon element emission. The timestamp can be manually set using an
EventTimeSourceFunction.

This also changes the signature of the StreamOperators. They now get
a StreamRecord instead of the unwrapped value. This is necessary for
them to access the timestamp. Before, the StreamTask would unwrap the
value from the StreamRecord, now this is moved one layer higher.

This also introduces watermarks to keep track of processing. At
a configurable interval the sources will emit watermarks that signify
that no records with a lower timestamp will be emitted in the future by
this source. The timestamps are broadcast, stream inputs wait for watermark
events on all input channels and forward the watermark to the
StreamOperator once the watermark advances on all inputs. Operators are
responsible for forwarding the watermark once they know that no elements
with a previous timestamp will be emitted (this is mostly relevant for
buffering operations such as windows). Right now, all operators simply
forward the watermark they receive.

When using an EventTimeSourceFunction the system does not
automatically emit timestamps, the user is required to manually emit
watermarks using the SourceContext.

No watermarks will be emitted unless
ExecutionConfig.setAutoWatermarkInterval is used to set a watermark
interval.

Per default timestamps and watermarks are completely disabled, there is
a switch in ExecutionConfig (enableTimstamps()) to enable them. This
means that, out-of-box, the performance is not changed by adding this
new feature. If it is not used.

This commit contains fixes for other stuff that was discovered while
implementing the feature. See Jira issue numbers and descriptions below.

[FLINK-2290/2295] Fix CoReader Event Handling and Checkpointing

This changes CoReader (now CoStreamingRecordReader) to reuse
UnionGate for the input multiplexing. This way it will not lock in on
one specific input side and read events from both input sides.

This also enables an event listener for checkpoint barriers so that the
TwoInputTask now reacts to those and correctly forwards them.

Then, this adds CoStreamCheckpointintITCase to verify that checkpointing
works in topologies with TwoInputStreamTasks.

This also adds tests for OneInputStreamTask and TwoInputStreamTask
that check whether:
 - whether open()/close() of RichFunctions are correctly
   called
 - Watermarks are correctly forwarded
 - Supersteps/checkpoint barriers are correctly forwarded and the
   blocking of inputs works correctly

Add proper tests for Stream Operators

These test whether:
 - open()/close() on RichFunctions are called
 - Timestamps of emitted elements match the timestamp of the input
   element
 - Watermarks are correctly forwarded

[FLINK-2301] Fix Behaviour of BarrierBuffer and add Tests

Before, a CheckpointBarrier from a more recent Checkpoint would also
trigger unblocking while waiting on an older CheckpointBarrier. Now,
a CheckpointBarrier from a newer checkpoint will unblock all channels
and start a new wait on the new Checkpoint.

The tests for OneInputStreamTask and TwoInputStreamTask check whether
the buffer behaves correctly when receiving CheckpointBarriers from more
recent checkpoints while still waiting on an older CheckpointBarrier.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a2eb6cc8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a2eb6cc8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a2eb6cc8

Branch: refs/heads/master
Commit: a2eb6cc8774ab43475829b0b691e62739fbbe88b
Parents: 2d191ab
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Jun 22 12:26:44 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Jul 20 18:52:45 2015 +0200

----------------------------------------------------------------------
 .../wrappers/StormBoltCollector.java            |   7 +-
 .../wrappers/StormBoltWrapper.java              |  21 +-
 .../wrappers/StormBoltWrapperTest.java          |   7 +-
 .../wrappers/TestContext.java                   |  16 +
 flink-core/pom.xml                              |   3 +-
 .../flink/api/common/ExecutionConfig.java       |  62 +++
 .../io/network/api/writer/RecordWriter.java     |  26 +
 .../flink-streaming-core/pom.xml                |   9 +-
 .../api/collector/CollectorWrapper.java         |  48 --
 .../streaming/api/collector/StreamOutput.java   |  90 ----
 .../BroadcastOutputSelectorWrapper.java         |  12 +-
 .../selector/DirectedOutputSelectorWrapper.java |  26 +-
 .../selector/OutputSelectorWrapper.java         |   5 +-
 .../source/EventTimeSourceFunction.java         |  39 ++
 .../api/functions/source/SourceFunction.java    |  57 +-
 .../flink/streaming/api/graph/StreamConfig.java |  28 +-
 .../flink/streaming/api/graph/StreamGraph.java  |  32 +-
 .../flink/streaming/api/graph/StreamNode.java   |  20 +-
 .../api/operators/AbstractStreamOperator.java   |   5 +-
 .../operators/AbstractUdfStreamOperator.java    |   3 +-
 .../api/operators/OneInputStreamOperator.java   |  18 +-
 .../flink/streaming/api/operators/Output.java   |  15 +-
 .../streaming/api/operators/StreamCounter.java  |  12 +-
 .../streaming/api/operators/StreamFilter.java   |  11 +-
 .../streaming/api/operators/StreamFlatMap.java  |  21 +-
 .../streaming/api/operators/StreamFold.java     |  13 +-
 .../api/operators/StreamGroupedFold.java        |  14 +-
 .../api/operators/StreamGroupedReduce.java      |  13 +-
 .../streaming/api/operators/StreamMap.java      |  11 +-
 .../streaming/api/operators/StreamOperator.java |   3 +-
 .../streaming/api/operators/StreamProject.java  |  13 +-
 .../streaming/api/operators/StreamReduce.java   |  16 +-
 .../streaming/api/operators/StreamSink.java     |  11 +-
 .../streaming/api/operators/StreamSource.java   | 263 +++++++++-
 .../api/operators/TimestampedCollector.java     |  63 +++
 .../api/operators/TwoInputStreamOperator.java   |  32 +-
 .../api/operators/co/CoStreamFlatMap.java       |  48 +-
 .../api/operators/co/CoStreamGroupedReduce.java |  16 +-
 .../streaming/api/operators/co/CoStreamMap.java |  34 +-
 .../api/operators/co/CoStreamReduce.java        |  43 +-
 .../api/operators/co/CoStreamWindow.java        |  40 +-
 .../windowing/GroupedActiveDiscretizer.java     |   9 +-
 .../windowing/GroupedStreamDiscretizer.java     |   6 +-
 .../windowing/GroupedWindowBuffer.java          |   9 +-
 .../operators/windowing/StreamDiscretizer.java  |  29 +-
 .../operators/windowing/StreamWindowBuffer.java |  10 +-
 .../operators/windowing/WindowFlattener.java    |  13 +-
 .../api/operators/windowing/WindowMerger.java   |  13 +-
 .../operators/windowing/WindowPartitioner.java  |  19 +-
 .../state/PartitionedStreamOperatorState.java   |   2 +-
 .../streaming/api/watermark/Watermark.java      |  79 +++
 .../windowbuffer/BasicWindowBuffer.java         |   5 +-
 .../JumpingCountGroupedPreReducer.java          |   3 +-
 .../windowbuffer/JumpingCountPreReducer.java    |   3 +-
 .../JumpingTimeGroupedPreReducer.java           |   3 +-
 .../windowbuffer/JumpingTimePreReducer.java     |   3 +-
 .../windowbuffer/SlidingPreReducer.java         |   5 +-
 .../windowbuffer/TumblingGroupedPreReducer.java |   7 +-
 .../windowbuffer/TumblingPreReducer.java        |   7 +-
 .../windowing/windowbuffer/WindowBuffer.java    |   3 +-
 .../streaming/runtime/io/BarrierBuffer.java     | 124 ++---
 .../runtime/io/BlockingQueueBroker.java         |  70 +--
 .../streaming/runtime/io/CoReaderIterator.java  |  57 --
 .../streaming/runtime/io/CoRecordReader.java    | 300 -----------
 .../streaming/runtime/io/CollectorWrapper.java  |  64 +++
 .../runtime/io/IndexedMutableReader.java        |  37 --
 .../runtime/io/IndexedReaderIterator.java       |  33 --
 .../streaming/runtime/io/InputGateFactory.java  |  42 --
 .../streaming/runtime/io/InputGateUtil.java     |  52 ++
 .../runtime/io/RecordWriterOutput.java          | 115 ++++
 .../runtime/io/StreamInputProcessor.java        | 200 +++++++
 .../runtime/io/StreamRecordWriter.java          |   8 +
 .../runtime/io/StreamTwoInputProcessor.java     | 268 ++++++++++
 .../io/StreamingAbstractRecordReader.java       | 144 -----
 .../io/StreamingMutableRecordReader.java        |  44 --
 .../partitioner/CustomPartitionerWrapper.java   |  12 +-
 .../runtime/partitioner/FieldsPartitioner.java  |   9 +-
 .../MultiplexingStreamRecordSerializer.java     | 135 +++++
 .../runtime/streamrecord/StreamRecord.java      | 131 +++--
 .../streamrecord/StreamRecordSerializer.java    |  84 +--
 .../runtime/tasks/CheckpointBarrier.java        |  97 ++++
 .../runtime/tasks/OneInputStreamTask.java       |  67 +--
 .../streaming/runtime/tasks/OutputHandler.java  |  92 ++--
 .../runtime/tasks/SourceStreamTask.java         |  37 +-
 .../runtime/tasks/StreamIterationHead.java      |   9 +-
 .../runtime/tasks/StreamIterationTail.java      |  70 +--
 .../streaming/runtime/tasks/StreamTask.java     |  23 +-
 .../runtime/tasks/StreamingRuntimeContext.java  |   5 +-
 .../runtime/tasks/StreamingSuperstep.java       |  83 ---
 .../runtime/tasks/TwoInputStreamTask.java       | 122 ++---
 .../consumer/StreamTestSingleInputGate.java     | 232 +++++++++
 .../api/collector/StreamCollectorTest.java      |  52 --
 .../api/functions/ListSourceContext.java        |  16 +
 .../streaming/api/operators/CounterTest.java    |  40 --
 .../streaming/api/operators/FilterTest.java     |  52 --
 .../streaming/api/operators/FlatMapTest.java    |  55 --
 .../api/operators/GroupedFoldTest.java          |  67 ---
 .../api/operators/GroupedReduceTest.java        |  62 ---
 .../flink/streaming/api/operators/MapTest.java  |  50 --
 .../streaming/api/operators/ProjectTest.java    | 118 -----
 .../api/operators/StreamCounterTest.java        |  62 +++
 .../api/operators/StreamFilterTest.java         | 135 +++++
 .../api/operators/StreamFlatMapTest.java        | 144 +++++
 .../api/operators/StreamGroupedFoldTest.java    | 159 ++++++
 .../api/operators/StreamGroupedReduceTest.java  | 153 ++++++
 .../streaming/api/operators/StreamMapTest.java  | 129 +++++
 .../api/operators/StreamProjectTest.java        | 135 +++++
 .../api/operators/co/CoFlatMapTest.java         |  83 ---
 .../api/operators/co/CoGroupedReduceTest.java   | 250 ++++-----
 .../streaming/api/operators/co/CoMapTest.java   |  57 --
 .../api/operators/co/CoStreamFlatMapTest.java   | 190 +++++++
 .../api/operators/co/CoStreamMapTest.java       | 172 ++++++
 .../api/operators/co/CoWindowTest.java          | 364 ++++++-------
 .../operators/windowing/ParallelMergeTest.java  |  79 +--
 .../windowing/WindowIntegrationTest.java        | 519 -------------------
 .../operators/windowing/WindowingITCase.java    | 519 +++++++++++++++++++
 .../api/state/StatefulOperatorTest.java         |  22 +-
 .../api/streamtask/MockRecordWriter.java        |   2 +-
 .../windowbuffer/BasicWindowBufferTest.java     |  18 +-
 .../JumpingCountGroupedPreReducerTest.java      |   7 +-
 .../JumpingCountPreReducerTest.java             |   4 +-
 .../windowbuffer/JumpingTimePreReducerTest.java |   4 +-
 .../SlidingCountGroupedPreReducerTest.java      |  61 ++-
 .../SlidingCountPreReducerTest.java             |  10 +-
 .../SlidingTimeGroupedPreReducerTest.java       |  14 +-
 .../windowbuffer/SlidingTimePreReducerTest.java |  10 +-
 .../TumblingGroupedPreReducerTest.java          |   6 +-
 .../windowbuffer/TumblingPreReducerTest.java    |   6 +-
 .../runtime/io/BarrierBufferIOTest.java         |   9 +-
 .../streaming/runtime/io/BarrierBufferTest.java |  42 +-
 .../runtime/io/CoRecordReaderTest.java          |  92 ----
 .../partitioner/BroadcastPartitionerTest.java   |   3 +-
 .../partitioner/DistributePartitionerTest.java  |   2 +-
 .../partitioner/FieldsPartitionerTest.java      |  20 +-
 .../partitioner/ForwardPartitionerTest.java     |   2 +-
 .../partitioner/GlobalPartitionerTest.java      |   3 +-
 .../partitioner/ShufflePartitionerTest.java     |   3 +-
 .../runtime/tasks/OneInputStreamTaskTest.java   | 339 ++++++++++++
 .../tasks/OneInputStreamTaskTestHarness.java    | 105 ++++
 .../runtime/tasks/SourceStreamTaskTest.java     | 191 ++++---
 .../runtime/tasks/StreamMockEnvironment.java    |  49 +-
 .../runtime/tasks/StreamTaskTestBase.java       | 109 ----
 .../runtime/tasks/StreamTaskTestHarness.java    | 308 +++++++++++
 .../runtime/tasks/TwoInputStreamTaskTest.java   | 374 +++++++++++++
 .../tasks/TwoInputStreamTaskTestHarness.java    | 170 ++++++
 .../streaming/timestamp/TimestampITCase.java    | 416 +++++++++++++++
 .../flink/streaming/util/MockCoContext.java     | 216 --------
 .../flink/streaming/util/MockContext.java       |  58 +--
 .../apache/flink/streaming/util/MockOutput.java |  14 +-
 .../streaming/util/MockRecordWriterFactory.java |  40 --
 .../util/OneInputStreamOperatorTestHarness.java | 146 ++++++
 .../streaming/util/SourceFunctionUtil.java      |  30 +-
 .../flink/streaming/util/TestHarnessUtil.java   |  68 +++
 .../util/TwoInputStreamOperatorTestHarness.java | 148 ++++++
 .../CoStreamCheckpointingITCase.java            | 463 +++++++++++++++++
 155 files changed, 7691 insertions(+), 3775 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java
index 4154f49..8b088c3 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java
@@ -23,6 +23,7 @@ import backtype.storm.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple25;
 import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.util.Collector;
 
 import java.util.Collection;
 import java.util.List;
@@ -34,8 +35,8 @@ import java.util.List;
  */
 class StormBoltCollector<OUT> extends AbstractStormCollector<OUT> implements IOutputCollector {
 
-	/** The Flink output object */
-	private final Output<OUT> flinkOutput;
+	/** The Flink output Collector */
+	private final Collector<OUT> flinkOutput;
 
 	/**
 	 * Instantiates a new {@link StormBoltCollector} that emits Flink tuples to the given Flink
@@ -50,7 +51,7 @@ class StormBoltCollector<OUT> extends AbstractStormCollector<OUT> implements IOu
 	 * @throws UnsupportedOperationException
 	 *         if the specified number of attributes is not in the valid range of [0,25]
 	 */
-	public StormBoltCollector(final int numberOfAttributes, final Output<OUT> flinkOutput) throws UnsupportedOperationException {
+	public StormBoltCollector(final int numberOfAttributes, final Collector<OUT> flinkOutput) throws UnsupportedOperationException {
 		super(numberOfAttributes);
 		assert (flinkOutput != null);
 		this.flinkOutput = flinkOutput;

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
index 140e629..c7b87ba 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
@@ -26,6 +26,9 @@ import org.apache.flink.api.java.tuple.Tuple25;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
 
@@ -52,6 +55,12 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
 	private final int numberOfAttributes;
 
 	/**
+	 *  We have to use this because Operators must output
+	 *  {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord}.
+	 */
+	private TimestampedCollector<OUT> flinkCollector;
+
+	/**
 	 * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt}
 	 * such that it can be used within a Flink streaming program. The output type will be one of
 	 * {@link Tuple1} to {@link Tuple25} depending on the bolt's declared number of attributes.
@@ -93,11 +102,12 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
 
 		final TopologyContext topologyContext = StormWrapperSetupHelper.convertToTopologyContext(
 				(StreamingRuntimeContext)super.runtimeContext, false);
+		flinkCollector = new TimestampedCollector<OUT>(output);
 		OutputCollector stormCollector = null;
 
 		if (this.numberOfAttributes != -1) {
 			stormCollector = new OutputCollector(new StormBoltCollector<OUT>(
-					this.numberOfAttributes, super.output));
+					this.numberOfAttributes, flinkCollector));
 		}
 
 		this.bolt.prepare(null, topologyContext, stormCollector);
@@ -110,8 +120,13 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple
 	}
 
 	@Override
-	public void processElement(final IN element) throws Exception {
-		this.bolt.execute(new StormTuple<IN>(element));
+	public void processElement(final StreamRecord<IN> element) throws Exception {
+		flinkCollector.setTimestamp(element.getTimestamp());
+		this.bolt.execute(new StormTuple<IN>(element.getValue()));
 	}
 
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		output.emitWatermark(mark);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
index 780c75e..dd56c4d 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
@@ -114,9 +114,9 @@ public class StormBoltWrapperTest {
 
 		final StreamRecord record = mock(StreamRecord.class);
 		if (numberOfAttributes == 0) {
-			when(record.getObject()).thenReturn(rawTuple);
+			when(record.getValue()).thenReturn(rawTuple);
 		} else {
-			when(record.getObject()).thenReturn(flinkTuple);
+			when(record.getValue()).thenReturn(flinkTuple);
 		}
 
 		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
@@ -129,8 +129,9 @@ public class StormBoltWrapperTest {
 
 		final StormBoltWrapper wrapper = new StormBoltWrapper(bolt);
 		wrapper.setup(mock(Output.class), taskContext);
+		wrapper.open(new Configuration());
 
-		wrapper.processElement(record.getObject());
+		wrapper.processElement(record);
 		if (numberOfAttributes == 0) {
 			verify(bolt).execute(eq(new StormTuple<String>(rawTuple)));
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestContext.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestContext.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestContext.java
index 8885a1b..7c91e6f 100644
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestContext.java
+++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/TestContext.java
@@ -19,6 +19,8 @@ package org.apache.flink.stormcompatibility.wrappers;
 
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
 import java.util.LinkedList;
 
 class TestContext implements SourceContext<Tuple1<Integer>> {
@@ -33,8 +35,22 @@ class TestContext implements SourceContext<Tuple1<Integer>> {
 	}
 
 	@Override
+	public void collectWithTimestamp(Tuple1<Integer> element, long timestamp) {
+		this.result.add(element.copy());
+	}
+
+	@Override
+	public void emitWatermark(Watermark mark) {
+		// ignore it
+	}
+
+	@Override
 	public Object getCheckpointLock() {
 		return null;
 	}
 
+	@Override
+	public void close() {
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index c0c4378..d59e755 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -63,7 +63,8 @@ under the License.
 			<artifactId>guava</artifactId>
 			<version>${guava.version}</version>
 		</dependency>
-	</dependencies>
+
+    </dependencies>
 
 	<build>
 		<plugins>

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 4974295..b8fa2a2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -89,6 +89,10 @@ public class ExecutionConfig implements Serializable {
 
 	private GlobalJobParameters globalJobParameters = null;
 
+	private long autoWatermarkInterval = 0;
+
+	private boolean timestampsEnabled = false;
+
 	// Serializers and types registered with Kryo and the PojoSerializer
 	// we store them in lists to ensure they are registered in order in all kryo instances.
 
@@ -141,6 +145,62 @@ public class ExecutionConfig implements Serializable {
 	}
 
 	/**
+	 * Sets the interval of the automatic watermark emission. Watermaks are used throughout
+	 * the streaming system to keep track of the progress of time. They are used, for example,
+	 * for time based windowing.
+	 *
+	 * @param interval The interval between watermarks in milliseconds.
+	 */
+	public ExecutionConfig setAutoWatermarkInterval(long interval) {
+		this.autoWatermarkInterval = interval;
+		return this;
+	}
+
+	/**
+	 * Enables streaming timestamps. When this is enabled all records that are emitted
+	 * from a source have a timestamp attached. This is required if a topology contains
+	 * operations that rely on watermarks and timestamps to perform operations, such as
+	 * event-time windows.
+	 *
+	 * <p>
+	 * This is automatically enabled if you enable automatic watermarks.
+	 *
+	 * @see #setAutoWatermarkInterval(long)
+	 */
+	public ExecutionConfig enableTimestamps() {
+		this.timestampsEnabled = true;
+		return this;
+	}
+
+	/**
+	 * Disables streaming timestamps.
+	 *
+	 * @see #enableTimestamps()
+	 */
+	public ExecutionConfig disableTimestamps() {
+		this.timestampsEnabled = false;
+		return this;
+	}
+
+	/**
+	 * Returns true when timestamps are enabled.
+	 *
+	 * @see #enableTimestamps()
+	 */
+	public boolean areTimestampsEnabled() {
+		return timestampsEnabled;
+	}
+
+	/**
+	 * Returns the interval of the automatic watermark emission.
+	 *
+	 * @see #setAutoWatermarkInterval(long)
+	 */
+	public long getAutoWatermarkInterval()  {
+		return this.autoWatermarkInterval;
+	}
+
+	/**
 	 * Gets the parallelism with which operation are executed by default. Operations can
 	 * individually override this value to use a specific parallelism.
 	 *
@@ -637,6 +697,8 @@ public class ExecutionConfig implements Serializable {
 	 * getRuntimeContext().getExecutionConfig().getUserConfig()
 	 */
 	public static class GlobalJobParameters implements Serializable {
+		private static final long serialVersionUID = 1L;
+
 		/**
 		 * Convert UserConfig into a Map<String, String> representation.
 		 * This can be used by the runtime, for example for presenting the user config in the web frontend.

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 29efc4c..5bc705d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -114,6 +114,32 @@ public class RecordWriter<T extends IOReadableWritable> {
 		}
 	}
 
+	/**
+	 * This is used to broadcast Streaming Watermarks in-band with records. This ignores
+	 * the {@link ChannelSelector}.
+	 */
+	public void broadcastEmit(T record) throws IOException, InterruptedException {
+		for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
+			// serialize with corresponding serializer and send full buffer
+			RecordSerializer<T> serializer = serializers[targetChannel];
+
+			synchronized (serializer) {
+				SerializationResult result = serializer.addRecord(record);
+				while (result.isFullBuffer()) {
+					Buffer buffer = serializer.getCurrentBuffer();
+
+					if (buffer != null) {
+						writer.writeBuffer(buffer, targetChannel);
+						serializer.clearCurrentBuffer();
+					}
+
+					buffer = writer.getBufferProvider().requestBufferBlocking();
+					result = serializer.setNextBuffer(buffer);
+				}
+			}
+		}
+	}
+
 	public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException {
 		for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
 			RecordSerializer<T> serializer = serializers[targetChannel];

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/pom.xml b/flink-staging/flink-streaming/flink-streaming-core/pom.xml
index 6b49770..a4eb6a9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/pom.xml
+++ b/flink-staging/flink-streaming/flink-streaming-core/pom.xml
@@ -78,7 +78,14 @@ under the License.
 			<artifactId>guava</artifactId>
 			<version>${guava.version}</version>
 		</dependency>
-	</dependencies>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-math3</artifactId>
+            <version>3.5</version>
+        </dependency>
+
+    </dependencies>
 
 	<build>
 		<plugins>

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
deleted file mode 100644
index 2fd4cd0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.util.Collector;
-
-public class CollectorWrapper<OUT> implements Output<OUT> {
-
-	private OutputSelectorWrapper<OUT> outputSelectorWrapper;
-
-	public CollectorWrapper(OutputSelectorWrapper<OUT> outputSelectorWrapper) {
-		this.outputSelectorWrapper = outputSelectorWrapper;
-	}
-
-	public void addCollector(Collector<?> output, StreamEdge edge) {
-		outputSelectorWrapper.addCollector(output, edge);
-	}
-
-	@Override
-	public void collect(OUT record) {
-		for (Collector<OUT> output : outputSelectorWrapper.getSelectedOutputs(record)) {
-			output.collect(record);
-		}
-	}
-
-	@Override
-	public void close() {
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
deleted file mode 100644
index aa367ab..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import java.io.IOException;
-
-import org.apache.flink.runtime.event.task.TaskEvent;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamOutput<OUT> implements Collector<OUT> {
-
-	private static final Logger LOG = LoggerFactory.getLogger(StreamOutput.class);
-
-	private RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output;
-	private SerializationDelegate<StreamRecord<OUT>> serializationDelegate;
-	private StreamRecord<OUT> streamRecord;
-
-	public StreamOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output, SerializationDelegate<StreamRecord<OUT>> serializationDelegate) {
-
-		this.serializationDelegate = serializationDelegate;
-
-		if (serializationDelegate != null) {
-			this.streamRecord = serializationDelegate.getInstance();
-		} else {
-			throw new RuntimeException("Serializer cannot be null");
-		}
-		this.output = output;
-	}
-
-	public RecordWriter<SerializationDelegate<StreamRecord<OUT>>> getRecordWriter() {
-		return output;
-	}
-
-	@Override
-	public void collect(OUT record) {
-		streamRecord.setObject(record);
-		serializationDelegate.setInstance(streamRecord);
-
-		try {
-			output.emit(serializationDelegate);
-		} catch (Exception e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Emit failed due to: {}", StringUtils.stringifyException(e));
-			}
-		}
-	}
-
-	@Override
-	public void close() {
-		if (output instanceof StreamRecordWriter) {
-			((StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>) output).close();
-		} else {
-			try {
-				output.flush();
-			} catch (IOException e) {
-				e.printStackTrace();
-			}
-		}
-	}
-
-	public void clearBuffers() {
-		output.clearBuffers();
-	}
-
-	public void broadcastEvent(TaskEvent barrier) throws IOException, InterruptedException {
-		output.broadcastEvent(barrier);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
index b90cce2..0fe84d8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
@@ -21,25 +21,27 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Collector;
 
 public class BroadcastOutputSelectorWrapper<OUT> implements OutputSelectorWrapper<OUT> {
 
 	private static final long serialVersionUID = 1L;
-	private List<Collector<OUT>> outputs;
+	private List<Collector<StreamRecord<OUT>>> outputs;
 
 	public BroadcastOutputSelectorWrapper() {
-		outputs = new ArrayList<Collector<OUT>>();
+		outputs = new ArrayList<Collector<StreamRecord<OUT>>>();
 	}
 
 	@SuppressWarnings("unchecked")
 	@Override
-	public void addCollector(Collector<?> output, StreamEdge edge) {
-		outputs.add((Collector<OUT>) output);
+	public void addCollector(Collector<StreamRecord<?>> output, StreamEdge edge) {
+		Collector output1 = output;
+		outputs.add((Collector<StreamRecord<OUT>>) output1);
 	}
 
 	@Override
-	public Iterable<Collector<OUT>> getSelectedOutputs(OUT record) {
+	public Iterable<Collector<StreamRecord<OUT>>> getSelectedOutputs(OUT record) {
 		return outputs;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
index 8ca0508..46b315d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,31 +38,32 @@ public class DirectedOutputSelectorWrapper<OUT> implements OutputSelectorWrapper
 
 	private List<OutputSelector<OUT>> outputSelectors;
 
-	private Map<String, List<Collector<OUT>>> outputMap;
-	private Set<Collector<OUT>> selectAllOutputs;
+	private Map<String, List<Collector<StreamRecord<OUT>>>> outputMap;
+	private Set<Collector<StreamRecord<OUT>>> selectAllOutputs;
 
 	public DirectedOutputSelectorWrapper(List<OutputSelector<OUT>> outputSelectors) {
 		this.outputSelectors = outputSelectors;
-		this.selectAllOutputs = new HashSet<Collector<OUT>>(); //new LinkedList<Collector<OUT>>();
-		this.outputMap = new HashMap<String, List<Collector<OUT>>>();
+		this.selectAllOutputs = new HashSet<Collector<StreamRecord<OUT>>>(); //new LinkedList<Collector<OUT>>();
+		this.outputMap = new HashMap<String, List<Collector<StreamRecord<OUT>>>>();
 	}
 
 	@SuppressWarnings("unchecked")
 	@Override
-	public void addCollector(Collector<?> output, StreamEdge edge) {
+	public void addCollector(Collector<StreamRecord<?>> output, StreamEdge edge) {
+		Collector output1 = output;
 		List<String> selectedNames = edge.getSelectedNames();
 
 		if (selectedNames.isEmpty()) {
-			selectAllOutputs.add((Collector<OUT>) output);
+			selectAllOutputs.add((Collector<StreamRecord<OUT>>) output1);
 		} else {
 			for (String selectedName : selectedNames) {
 
 				if (!outputMap.containsKey(selectedName)) {
-					outputMap.put(selectedName, new LinkedList<Collector<OUT>>());
-					outputMap.get(selectedName).add((Collector<OUT>) output);
+					outputMap.put(selectedName, new LinkedList<Collector<StreamRecord<OUT>>>());
+					outputMap.get(selectedName).add((Collector<StreamRecord<OUT>>) output1);
 				} else {
 					if (!outputMap.get(selectedName).contains(output)) {
-						outputMap.get(selectedName).add((Collector<OUT>) output);
+						outputMap.get(selectedName).add((Collector<StreamRecord<OUT>>) output1);
 					}
 				}
 			}
@@ -69,14 +71,14 @@ public class DirectedOutputSelectorWrapper<OUT> implements OutputSelectorWrapper
 	}
 
 	@Override
-	public Iterable<Collector<OUT>> getSelectedOutputs(OUT record) {
-		Set<Collector<OUT>> selectedOutputs = new HashSet<Collector<OUT>>(selectAllOutputs);
+	public Iterable<Collector<StreamRecord<OUT>>> getSelectedOutputs(OUT record) {
+		Set<Collector<StreamRecord<OUT>>> selectedOutputs = new HashSet<Collector<StreamRecord<OUT>>>(selectAllOutputs);
 
 		for (OutputSelector<OUT> outputSelector : outputSelectors) {
 			Iterable<String> outputNames = outputSelector.select(record);
 
 			for (String outputName : outputNames) {
-				List<Collector<OUT>> outputList = outputMap.get(outputName);
+				List<Collector<StreamRecord<OUT>>> outputList = outputMap.get(outputName);
 
 				try {
 					selectedOutputs.addAll(outputList);

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
index 937b69f..9133ac0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
@@ -20,12 +20,13 @@ package org.apache.flink.streaming.api.collector.selector;
 import java.io.Serializable;
 
 import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Collector;
 
 public interface OutputSelectorWrapper<OUT> extends Serializable {
 
-	public void addCollector(Collector<?> output, StreamEdge edge);
+	public void addCollector(Collector<StreamRecord<?>> output, StreamEdge edge);
 
-	public Iterable<Collector<OUT>> getSelectedOutputs(OUT record);
+	public Iterable<Collector<StreamRecord<OUT>>> getSelectedOutputs(OUT record);
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/EventTimeSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/EventTimeSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/EventTimeSourceFunction.java
new file mode 100644
index 0000000..ab380d7
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/EventTimeSourceFunction.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+
+/**
+ * A marker interface that must be implemented by {@link SourceFunction}s that emit elements with
+ * timestamps. The {@link SourceFunction} can extract the timestamp from the data and attach it to
+ * the element upon emission.
+ *
+ * <p>
+ * Event-time sources must manually emit
+ * {@link org.apache.flink.streaming.api.watermark.Watermark watermarks} to keep track of progress.
+ * Automatic emission of watermarks will be suppressed if a source implements this interface.
+ *
+ * <p>
+ * Elements must be emitted using
+ * {@link SourceFunction.SourceContext#collectWithTimestamp(Object, long)}
+ * and watermarks can be emitted using
+ * {@link SourceFunction.SourceContext#emitWatermark(org.apache.flink.streaming.api.watermark.Watermark)}.
+ *
+ * @param <T> Type of the elements emitted by this source.
+ */
+public interface EventTimeSourceFunction<T> extends SourceFunction<T> { }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
index 58ee1da..886d6e7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.streaming.api.functions.source;
 
-import java.io.Serializable;
-
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import java.io.Serializable;
 
 /**
  * Base interface for all stream data sources in Flink. The contract of a stream source
@@ -28,9 +29,10 @@ import org.apache.flink.api.common.functions.Function;
  * is called with a {@link org.apache.flink.util.Collector} that can be used for emitting elements.
  * The run method can run for as long as necessary. The source must, however, react to an
  * invocation of {@link #cancel} by breaking out of its main loop.
- * 
- * <b>Note about checkpointed sources</b>
+ *
  * <p>
+ * <b>Note about checkpointed sources</b> <br>
+ *
  * Sources that also implement the {@link org.apache.flink.streaming.api.checkpoint.Checkpointed}
  * interface must ensure that state checkpointing, updating of internal state and emission of
  * elements are not done concurrently. This is achieved by using the provided checkpointing lock
@@ -41,7 +43,6 @@ import org.apache.flink.api.common.functions.Function;
  * This is the basic pattern one should follow when implementing a (checkpointed) source:
  * </p>
  *
- * <pre>
  * {@code
  *  public class ExampleSource<T> implements SourceFunction<T>, Checkpointed<Long> {
  *      private long count = 0L;
@@ -70,6 +71,14 @@ import org.apache.flink.api.common.functions.Function;
  * }
  * </pre>
  *
+ *
+ * <p>
+ * <b>Note about element timestamps and watermarks:</b> <br>
+ * Sources must only manually emit watermarks when they implement
+ * {@link EventTimeSourceFunction }.
+ * Otherwise, elements automatically get the current timestamp assigned at ingress
+ * and the system automatically emits watermarks.
+ *
  * @param <T> The type of the elements produced by this source.
  */
 public interface SourceFunction<T> extends Function, Serializable {
@@ -106,18 +115,48 @@ public interface SourceFunction<T> extends Function, Serializable {
 	public static interface SourceContext<T> {
 
 		/**
-		 * Emits one element from the source.
-		 * 
-		 * @param element The element to emit.
+		 * Emits one element from the source. The result of {@link System#currentTimeMillis()} is set as
+		 * the timestamp of the emitted element.
+		 *
+		 * @param element The element to emit
 		 */
 		void collect(T element);
 
 		/**
+		 * Emits one element from the source with the given timestamp.
+		 *
+		 * @param element The element to emit
+		 * @param timestamp The timestamp in milliseconds
+		 */
+		public void collectWithTimestamp(T element, long timestamp);
+
+		/**
+		 * Emits the given {@link org.apache.flink.streaming.api.watermark.Watermark}.
+		 *
+		 * <p>
+		 * <b>Important:</b>
+		 * Sources must only manually emit watermarks when they implement
+		 * {@link EventTimeSourceFunction}.
+		 * Otherwise, elements automatically get the current timestamp assigned at ingress
+		 * and the system automatically emits watermarks.
+		 *
+		 * @param mark The {@link Watermark} to emit
+		 */
+		void emitWatermark(Watermark mark);
+
+
+		/**
 		 * Returns the checkpoint lock. Please refer to the explanation about checkpointed sources
 		 * in {@link org.apache.flink.streaming.api.functions.source.SourceFunction}.
 		 * 
-		 * @return The object to use the lock. 
+		 * @return The object to use as the lock. 
 		 */
 		Object getCheckpointLock();
+
+		/**
+		 * This must be called when closing the source operator to allow the {@link SourceContext}
+		 * to clean up internal state.
+		 */
+		void close();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 6a44104..d0e8064 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -24,12 +24,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
 import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
 import org.apache.flink.util.InstantiationUtil;
 
@@ -94,26 +94,26 @@ public class StreamConfig implements Serializable {
 		return config.getString(OPERATOR_NAME, "Missing");
 	}
 
-	public void setTypeSerializerIn1(StreamRecordSerializer<?> serializer) {
+	public void setTypeSerializerIn1(TypeSerializer<?> serializer) {
 		setTypeSerializer(TYPE_SERIALIZER_IN_1, serializer);
 	}
 
-	public void setTypeSerializerIn2(StreamRecordSerializer<?> serializer) {
+	public void setTypeSerializerIn2(TypeSerializer<?> serializer) {
 		setTypeSerializer(TYPE_SERIALIZER_IN_2, serializer);
 	}
 
-	public void setTypeSerializerOut1(StreamRecordSerializer<?> serializer) {
+	public void setTypeSerializerOut1(TypeSerializer<?> serializer) {
 		setTypeSerializer(TYPE_SERIALIZER_OUT_1, serializer);
 	}
 
-	public void setTypeSerializerOut2(StreamRecordSerializer<?> serializer) {
+	public void setTypeSerializerOut2(TypeSerializer<?> serializer) {
 		setTypeSerializer(TYPE_SERIALIZER_OUT_2, serializer);
 	}
 
 	@SuppressWarnings("unchecked")
-	public <T> StreamRecordSerializer<T> getTypeSerializerIn1(ClassLoader cl) {
+	public <T> TypeSerializer<T> getTypeSerializerIn1(ClassLoader cl) {
 		try {
-			return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
+			return (TypeSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
 					TYPE_SERIALIZER_IN_1, cl);
 		} catch (Exception e) {
 			throw new StreamTaskException("Could not instantiate serializer.", e);
@@ -121,9 +121,9 @@ public class StreamConfig implements Serializable {
 	}
 
 	@SuppressWarnings("unchecked")
-	public <T> StreamRecordSerializer<T> getTypeSerializerIn2(ClassLoader cl) {
+	public <T> TypeSerializer<T> getTypeSerializerIn2(ClassLoader cl) {
 		try {
-			return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
+			return (TypeSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
 					TYPE_SERIALIZER_IN_2, cl);
 		} catch (Exception e) {
 			throw new StreamTaskException("Could not instantiate serializer.", e);
@@ -131,9 +131,9 @@ public class StreamConfig implements Serializable {
 	}
 
 	@SuppressWarnings("unchecked")
-	public <T> StreamRecordSerializer<T> getTypeSerializerOut1(ClassLoader cl) {
+	public <T> TypeSerializer<T> getTypeSerializerOut1(ClassLoader cl) {
 		try {
-			return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
+			return (TypeSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
 					TYPE_SERIALIZER_OUT_1, cl);
 		} catch (Exception e) {
 			throw new StreamTaskException("Could not instantiate serializer.", e);
@@ -141,16 +141,16 @@ public class StreamConfig implements Serializable {
 	}
 
 	@SuppressWarnings("unchecked")
-	public <T> StreamRecordSerializer<T> getTypeSerializerOut2(ClassLoader cl) {
+	public <T> TypeSerializer<T> getTypeSerializerOut2(ClassLoader cl) {
 		try {
-			return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
+			return (TypeSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
 					TYPE_SERIALIZER_OUT_2, cl);
 		} catch (Exception e) {
 			throw new StreamTaskException("Could not instantiate serializer.", e);
 		}
 	}
 
-	private void setTypeSerializer(String key, StreamRecordSerializer<?> typeWrapper) {
+	private void setTypeSerializer(String key, TypeSerializer<?> typeWrapper) {
 		try {
 			InstantiationUtil.writeObjectToConfig(typeWrapper, this.config, key);
 		} catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 64c349e..f1428b4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -34,6 +34,7 @@ import java.util.Set;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.MissingTypeInfo;
@@ -49,7 +50,6 @@ import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
@@ -168,12 +168,9 @@ public class StreamGraph extends StreamingPlan {
 			addNode(vertexID, OneInputStreamTask.class, operatorObject, operatorName);
 		}
 
-		StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? new StreamRecordSerializer<IN>(
-				inTypeInfo, executionConfig) : null;
+		TypeSerializer<IN> inSerializer = inTypeInfo != null && !(inTypeInfo instanceof MissingTypeInfo) ? inTypeInfo.createSerializer(executionConfig) : null;
 
-		StreamRecordSerializer<OUT> outSerializer = (outTypeInfo != null)
-				&& !(outTypeInfo instanceof MissingTypeInfo) ? new StreamRecordSerializer<OUT>(
-				outTypeInfo, executionConfig) : null;
+		TypeSerializer<OUT> outSerializer = outTypeInfo != null && !(outTypeInfo instanceof MissingTypeInfo) ? outTypeInfo.createSerializer(executionConfig) : null;
 
 		setSerializers(vertexID, inSerializer, null, outSerializer);
 
@@ -183,18 +180,15 @@ public class StreamGraph extends StreamingPlan {
 	}
 
 	public <IN1, IN2, OUT> void addCoOperator(Integer vertexID,
-			TwoInputStreamOperator<IN1, IN2, OUT> taskoperatorObject,
-			TypeInformation<IN1> in1TypeInfo, TypeInformation<IN2> in2TypeInfo,
-			TypeInformation<OUT> outTypeInfo, String operatorName) {
+			TwoInputStreamOperator<IN1, IN2, OUT> taskoperatorObject, TypeInformation<IN1> in1TypeInfo,
+			TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
 
 		addNode(vertexID, TwoInputStreamTask.class, taskoperatorObject, operatorName);
 
-		StreamRecordSerializer<OUT> outSerializer = (outTypeInfo != null)
-				&& !(outTypeInfo instanceof MissingTypeInfo) ? new StreamRecordSerializer<OUT>(
-				outTypeInfo, executionConfig) : null;
+		TypeSerializer<OUT> outSerializer = (outTypeInfo != null) && !(outTypeInfo instanceof MissingTypeInfo) ?
+				outTypeInfo.createSerializer(executionConfig) : null;
 
-		setSerializers(vertexID, new StreamRecordSerializer<IN1>(in1TypeInfo, executionConfig),
-				new StreamRecordSerializer<IN2>(in2TypeInfo, executionConfig), outSerializer);
+		setSerializers(vertexID, in1TypeInfo.createSerializer(executionConfig), in2TypeInfo.createSerializer(executionConfig), outSerializer);
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("CO-TASK: {}", vertexID);
@@ -316,8 +310,7 @@ public class StreamGraph extends StreamingPlan {
 						// We set the proper serializers for the sink/source
 						setSerializersFrom(tailOps.get(0).getId(), sink.getId());
 						if (loop.isCoIteration()) {
-							source.setSerializerOut(new StreamRecordSerializer(loop
-									.getFeedbackType(), executionConfig));
+							source.setSerializerOut(loop.getFeedbackType().createSerializer(executionConfig));
 						} else {
 							setSerializersFrom(headOpsInGroup.get(0).getId(), source.getId());
 						}
@@ -430,8 +423,7 @@ public class StreamGraph extends StreamingPlan {
 		getStreamNode(vertexID).setBufferTimeout(bufferTimeout);
 	}
 
-	private void setSerializers(Integer vertexID, StreamRecordSerializer<?> in1,
-			StreamRecordSerializer<?> in2, StreamRecordSerializer<?> out) {
+	private void setSerializers(Integer vertexID, TypeSerializer<?> in1, TypeSerializer<?> in2, TypeSerializer<?> out) {
 		StreamNode vertex = getStreamNode(vertexID);
 		vertex.setSerializerIn1(in1);
 		vertex.setSerializerIn2(in2);
@@ -447,9 +439,7 @@ public class StreamGraph extends StreamingPlan {
 	}
 
 	public <OUT> void setOutType(Integer vertexID, TypeInformation<OUT> outType) {
-		StreamRecordSerializer<OUT> serializer = new StreamRecordSerializer<OUT>(outType,
-				executionConfig);
-		getStreamNode(vertexID).setSerializerOut(serializer);
+		getStreamNode(vertexID).setSerializerOut(outType.createSerializer(executionConfig));
 	}
 
 	public <IN, OUT> void setOperator(Integer vertexID, StreamOperator<OUT> operatorObject) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 0b909bd..62e2d83 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
@@ -29,7 +30,6 @@ import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
 import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapperFactory;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 
 /**
  * Class representing the operators in the streaming programs, with all their
@@ -53,9 +53,9 @@ public class StreamNode implements Serializable {
 
 	private transient StreamOperator<?> operator;
 	private List<OutputSelector<?>> outputSelectors;
-	private StreamRecordSerializer<?> typeSerializerIn1;
-	private StreamRecordSerializer<?> typeSerializerIn2;
-	private StreamRecordSerializer<?> typeSerializerOut;
+	private TypeSerializer<?> typeSerializerIn1;
+	private TypeSerializer<?> typeSerializerIn2;
+	private TypeSerializer<?> typeSerializerOut;
 
 	private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
 	private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();
@@ -168,27 +168,27 @@ public class StreamNode implements Serializable {
 		this.outputSelectors.add(outputSelector);
 	}
 
-	public StreamRecordSerializer<?> getTypeSerializerIn1() {
+	public TypeSerializer<?> getTypeSerializerIn1() {
 		return typeSerializerIn1;
 	}
 
-	public void setSerializerIn1(StreamRecordSerializer<?> typeSerializerIn1) {
+	public void setSerializerIn1(TypeSerializer<?> typeSerializerIn1) {
 		this.typeSerializerIn1 = typeSerializerIn1;
 	}
 
-	public StreamRecordSerializer<?> getTypeSerializerIn2() {
+	public TypeSerializer<?> getTypeSerializerIn2() {
 		return typeSerializerIn2;
 	}
 
-	public void setSerializerIn2(StreamRecordSerializer<?> typeSerializerIn2) {
+	public void setSerializerIn2(TypeSerializer<?> typeSerializerIn2) {
 		this.typeSerializerIn2 = typeSerializerIn2;
 	}
 
-	public StreamRecordSerializer<?> getTypeSerializerOut() {
+	public TypeSerializer<?> getTypeSerializerOut() {
 		return typeSerializerOut;
 	}
 
-	public void setSerializerOut(StreamRecordSerializer<?> typeSerializerOut) {
+	public void setSerializerOut(TypeSerializer<?> typeSerializerOut) {
 		this.typeSerializerOut = typeSerializerOut;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index cb4bcb0..3956d75 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
 /**
@@ -35,7 +36,7 @@ public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>
 
 	protected transient ExecutionConfig executionConfig;
 
-	public transient Output<OUT> output;
+	public transient Output<StreamRecord<OUT>> output;
 
 	protected boolean inputCopyDisabled = false;
 
@@ -43,7 +44,7 @@ public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>
 	protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
 
 	@Override
-	public void setup(Output<OUT> output, StreamingRuntimeContext runtimeContext) {
+	public void setup(Output<StreamRecord<OUT>> output, StreamingRuntimeContext runtimeContext) {
 		this.output = output;
 		this.executionConfig = runtimeContext.getExecutionConfig();
 		this.runtimeContext = runtimeContext;

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index b2d9c91..23c4ab8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.state.StreamOperatorState;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
@@ -57,7 +58,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serial
 	}
 
 	@Override
-	public void setup(Output<OUT> output, StreamingRuntimeContext runtimeContext) {
+	public final void setup(Output<StreamRecord<OUT>> output, StreamingRuntimeContext runtimeContext) {
 		super.setup(output, runtimeContext);
 		FunctionUtils.setFunctionRuntimeContext(userFunction, runtimeContext);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java
index d24ffed..7ca540f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
 /**
  * Interface for stream operators with one input. Use
  * {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator} as a base class if
@@ -27,5 +30,18 @@ package org.apache.flink.streaming.api.operators;
  * @param <OUT> The output type of the operator
  */
 public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT> {
-	public void processElement(IN element) throws Exception;
+
+	/**
+	 * Processes one element that arrived at this operator.
+	 * This method is guaranteed to not be called concurrently with other methods of the operator.
+	 */
+	public void processElement(StreamRecord<IN> element) throws Exception;
+
+	/**
+	 * Processes a {@link Watermark}.
+	 * This method is guaranteed to not be called concurrently with other methods of the operator.
+	 *
+	 * @see org.apache.flink.streaming.api.watermark.Watermark
+	 */
+	public void processWatermark(Watermark mark) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java
index d6f810a..89d5560 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/Output.java
@@ -17,16 +17,25 @@
  */
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.util.Collector;
 
 /**
  * A {@link org.apache.flink.streaming.api.operators.StreamOperator} is supplied with an object
  * of this interface that can be used to emit elements and other messages, such as barriers
- * and low watermarks, from an operator.
+ * and watermarks, from an operator.
  *
  * @param <T> The type of the elments that can be emitted.
  */
 public interface Output<T> extends Collector<T> {
-	// NOTE: This does not yet have methods for barriers/low watermarks, this needs to be
-	// extended when this functionality arrives.
+
+	/**
+	 * Emits a {@link Watermark} from an operator. This watermark is broadcast to all downstream
+	 * operators.
+	 *
+	 * <p>
+	 * A watermark specifies that no element with a timestamp older or equal to the watermark
+	 * timestamp will be emitted in the future.
+	 */
+	void emitWatermark(Watermark mark);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
index 240e2b1..efe5d52 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
@@ -17,6 +17,9 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
 public class StreamCounter<IN> extends AbstractStreamOperator<Long> implements OneInputStreamOperator<IN, Long> {
 
 	private static final long serialVersionUID = 1L;
@@ -28,7 +31,12 @@ public class StreamCounter<IN> extends AbstractStreamOperator<Long> implements O
 	}
 
 	@Override
-	public void processElement(IN element) {
-		output.collect(++count);
+	public void processElement(StreamRecord<IN> element) {
+		output.collect(element.replace(++count));
+	}
+
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		output.emitWatermark(mark);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
index a54a4ea..2ff220e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFilter.java
@@ -18,6 +18,8 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 public class StreamFilter<IN> extends AbstractUdfStreamOperator<IN, FilterFunction<IN>> implements OneInputStreamOperator<IN, IN> {
 
@@ -29,9 +31,14 @@ public class StreamFilter<IN> extends AbstractUdfStreamOperator<IN, FilterFuncti
 	}
 
 	@Override
-	public void processElement(IN element) throws Exception {
-		if (userFunction.filter(element)) {
+	public void processElement(StreamRecord<IN> element) throws Exception {
+		if (userFunction.filter(element.getValue())) {
 			output.collect(element);
 		}
 	}
+
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		output.emitWatermark(mark);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
index e8da2c7..5547c6a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFlatMap.java
@@ -18,6 +18,9 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 public class StreamFlatMap<IN, OUT>
 		extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>>
@@ -25,13 +28,27 @@ public class StreamFlatMap<IN, OUT>
 
 	private static final long serialVersionUID = 1L;
 
+	private TimestampedCollector<OUT> collector;
+
 	public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
 		super(flatMapper);
 		chainingStrategy = ChainingStrategy.ALWAYS;
 	}
 
 	@Override
-	public void processElement(IN element) throws Exception {
-		userFunction.flatMap(element, output);
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		collector = new TimestampedCollector(output);
+	}
+
+	@Override
+	public void processElement(StreamRecord<IN> element) throws Exception {
+		collector.setTimestamp(element.getTimestamp());
+		userFunction.flatMap(element.getValue(), collector);
+	}
+
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		output.emitWatermark(mark);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
index 580477a..a5e5264 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamFold.java
@@ -21,6 +21,8 @@ import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 public class StreamFold<IN, OUT>
 		extends AbstractUdfStreamOperator<OUT, FoldFunction<IN, OUT>>
@@ -40,9 +42,9 @@ public class StreamFold<IN, OUT>
 	}
 
 	@Override
-	public void processElement(IN element) throws Exception {
-		accumulator = userFunction.fold(outTypeSerializer.copy(accumulator), element);
-		output.collect(accumulator);
+	public void processElement(StreamRecord<IN> element) throws Exception {
+		accumulator = userFunction.fold(outTypeSerializer.copy(accumulator), element.getValue());
+		output.collect(element.replace(accumulator));
 	}
 
 	@Override
@@ -50,4 +52,9 @@ public class StreamFold<IN, OUT>
 		super.open(config);
 		this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig);
 	}
+
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		output.emitWatermark(mark);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
index 08107a9..5272a48 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedFold.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 public class StreamGroupedFold<IN, OUT> extends StreamFold<IN, OUT> {
 
@@ -41,19 +42,18 @@ public class StreamGroupedFold<IN, OUT> extends StreamFold<IN, OUT> {
 	}
 
 	@Override
-	public void processElement(IN element) throws Exception {
-		Object key = keySelector.getKey(element);
+	public void processElement(StreamRecord<IN> element) throws Exception {
+		Object key = keySelector.getKey(element.getValue());
 		OUT accumulator = values.get(key);
-		FoldFunction<IN, OUT> folder = ((FoldFunction<IN, OUT>) userFunction);
 
 		if (accumulator != null) {
-			OUT folded = folder.fold(outTypeSerializer.copy(accumulator), element);
+			OUT folded = userFunction.fold(outTypeSerializer.copy(accumulator), element.getValue());
 			values.put(key, folded);
-			output.collect(folded);
+			output.collect(element.replace(folded));
 		} else {
-			OUT first = folder.fold(outTypeSerializer.copy(initialValue), element);
+			OUT first = userFunction.fold(outTypeSerializer.copy(initialValue), element.getValue());
 			values.put(key, first);
-			output.collect(first);
+			output.collect(element.replace(first));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index 8269be7..6be011e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -22,6 +22,7 @@ import java.util.Map;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 public class StreamGroupedReduce<IN> extends StreamReduce<IN> {
 
@@ -37,17 +38,17 @@ public class StreamGroupedReduce<IN> extends StreamReduce<IN> {
 	}
 
 	@Override
-	public void processElement(IN element) throws Exception {
-		Object key = keySelector.getKey(element);
+	public void processElement(StreamRecord<IN> element) throws Exception {
+		Object key = keySelector.getKey(element.getValue());
 		IN currentValue = values.get(key);
 		if (currentValue != null) {
 			// TODO: find a way to let operators copy elements (maybe)
-			IN reduced = userFunction.reduce(currentValue, element);
+			IN reduced = userFunction.reduce(currentValue, element.getValue());
 			values.put(key, reduced);
-			output.collect(reduced);
+			output.collect(element.replace(reduced));
 		} else {
-			values.put(key, element);
-			output.collect(element);
+			values.put(key, element.getValue());
+			output.collect(element.replace(element.getValue()));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
index 08dc981..7d5c7cc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamMap.java
@@ -18,6 +18,8 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 public class StreamMap<IN, OUT>
 		extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
@@ -31,7 +33,12 @@ public class StreamMap<IN, OUT>
 	}
 
 	@Override
-	public void processElement(IN element) throws Exception {
-		output.collect(userFunction.map(element));
+	public void processElement(StreamRecord<IN> element) throws Exception {
+		output.collect(element.replace(userFunction.map(element.getValue())));
+	}
+
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		output.emitWatermark(mark);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index 75cea5f..9dd18b2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.operators;
 import java.io.Serializable;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
 /**
@@ -37,7 +38,7 @@ public interface StreamOperator<OUT> extends Serializable {
 	/**
 	 * Initializes the {@link StreamOperator} for input and output handling.
 	 */
-	public void setup(Output<OUT> output, StreamingRuntimeContext runtimeContext);
+	public void setup(Output<StreamRecord<OUT>> output, StreamingRuntimeContext runtimeContext);
 
 	/**
 	 * This method is called before any elements are processed.

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
index 83613d8..c0815b5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamProject.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.api.operators;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 public class StreamProject<IN, OUT extends Tuple>
 		extends AbstractStreamOperator<OUT>
@@ -43,11 +45,11 @@ public class StreamProject<IN, OUT extends Tuple>
 
 
 	@Override
-	public void processElement(IN element) throws Exception {
+	public void processElement(StreamRecord<IN> element) throws Exception {
 		for (int i = 0; i < this.numFields; i++) {
-			outTuple.setField(((Tuple) element).getField(fields[i]), i);
+			outTuple.setField(((Tuple) element.getValue()).getField(fields[i]), i);
 		}
-		output.collect(outTuple);
+		output.collect(element.replace(outTuple));
 	}
 
 	@Override
@@ -55,4 +57,9 @@ public class StreamProject<IN, OUT extends Tuple>
 		super.open(config);
 		outTuple = outSerializer.createInstance();
 	}
+
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		output.emitWatermark(mark);
+	}
 }