You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2016/10/14 12:34:12 UTC

[2/2] flink git commit: [FLINK-3660] Measure latency and exposes them via a metric

[FLINK-3660] Measure latency and exposes them via a metric

This commit adds the initial runtime support for measuring latency of records going through the system.

I therefore introduced a new StreamElement, called a LatencyMarker.
Similar to Watermarks, LatencyMarkers are emitted from the sources at an configured interval. The default value for the interval is 2000 ms. The emission of markers can be disabled by setting the interval to 0. LatencyMarkers can not "overtake" regular elements. This ensures that the measured latency approximates the end-to-end latency of regular stream elements.

Regular operators (excluding those participating in iterations) forward latency markers if they are not a sink.
Operators with many outputs randomly select one to forward the maker to. This ensures that every marker exists only once in the system, and that repartition steps are not causing an explosion in the number of transferred markers.
If an operator is a sink, it will maintain the last 512 latencies from each known source instance.
The min/max/mean/p50/p95/p99 of each known source is reported using a special LatencyGauge from the sink (every operator can be a sink, if it doesn't have any outputs).

This commit does not visualize the latency in the web interface.
Also, there is currently no mechanism to ensure that the system clocks are in-sync, so the latency measurements will be inaccurate if the hardware clocks are not correct.

This closes #2386


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a612b996
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a612b996
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a612b996

Branch: refs/heads/master
Commit: a612b9966f3ee020a5721ac2f039a3633c40146c
Parents: a648f88
Author: Robert Metzger <rm...@apache.org>
Authored: Mon Apr 4 12:09:56 2016 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Fri Oct 14 14:33:32 2016 +0200

----------------------------------------------------------------------
 docs/monitoring/metrics.md                      | 113 +++++++-----
 docs/setup/config.md                            |   3 +
 .../flink/storm/wrappers/BoltWrapperTest.java   |   2 +
 .../flink/api/common/ExecutionConfig.java       |  39 ++++
 .../flink/configuration/ConfigConstants.java    |   5 +
 .../flink/runtime/execution/Environment.java    |   2 +-
 .../io/network/api/writer/RecordWriter.java     |  53 +++---
 .../operators/testutils/DummyEnvironment.java   |   2 +-
 .../api/collector/selector/DirectedOutput.java  |  11 ++
 .../api/operators/AbstractStreamOperator.java   | 179 ++++++++++++++++++-
 .../api/operators/OneInputStreamOperator.java   |   3 +
 .../flink/streaming/api/operators/Output.java   |   3 +
 .../streaming/api/operators/StreamCounter.java  |  44 -----
 .../api/operators/StreamGroupedReduce.java      |   1 -
 .../streaming/api/operators/StreamSink.java     |  10 +-
 .../streaming/api/operators/StreamSource.java   |  42 +++++
 .../api/operators/TwoInputStreamOperator.java   |  25 ++-
 .../runtime/io/RecordWriterOutput.java          |  17 +-
 .../runtime/io/StreamInputProcessor.java        |  26 +--
 .../runtime/io/StreamRecordWriter.java          |   9 +
 .../runtime/io/StreamTwoInputProcessor.java     |  37 ++--
 .../runtime/streamrecord/LatencyMarker.java     | 106 +++++++++++
 .../MultiplexingStreamRecordSerializer.java     |  23 ++-
 .../runtime/streamrecord/StreamElement.java     |  17 ++
 .../runtime/tasks/OneInputStreamTask.java       |   2 +-
 .../streaming/runtime/tasks/OperatorChain.java  |  51 ++++--
 .../runtime/tasks/StreamIterationTail.java      |  10 ++
 .../streaming/runtime/tasks/StreamTask.java     |   8 +
 .../runtime/tasks/TwoInputStreamTask.java       |   2 +-
 .../api/graph/StreamGraphGeneratorTest.java     |  16 ++
 .../api/operators/StreamCounterTest.java        |  61 -------
 .../operators/StreamSourceOperatorTest.java     |  67 ++++++-
 ...AlignedProcessingTimeWindowOperatorTest.java |   1 +
 .../operators/windowing/CollectingOutput.java   |   8 +-
 .../apache/flink/streaming/util/MockOutput.java |   6 +
 .../util/OneInputStreamOperatorTestHarness.java |   6 +
 .../util/TwoInputStreamOperatorTestHarness.java |   6 +
 .../test/streaming/runtime/TimestampITCase.java |   1 +
 38 files changed, 781 insertions(+), 236 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 0e51407..6de5b5e 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -376,7 +376,6 @@ Flink exposes the following system metrics:
       <th class="text-left">Description</th>
     </tr>
   </thead>
-
   <tbody>
     <tr>
       <th rowspan="1"><strong>JobManager</strong></th>
@@ -475,52 +474,76 @@ Flink exposes the following system metrics:
       <td></td>
     </tr>
     <tr>
-      <tr>
-        <th rowspan="7"><strong>Task</strong></t>
-        <td>currentLowWatermark</td>
-        <td>The lowest watermark a task has received.</td>
-      </tr>
-      <tr>
-        <td>lastCheckpointDuration</td>
-        <td>The time it took to complete the last checkpoint.</td>
-      </tr>
-      <tr>
-        <td>lastCheckpointSize</td>
-        <td>The total size of the last checkpoint.</td>
-      </tr>
-      <tr>
-        <td>restartingTime</td>
-        <td>The time it took to restart the job.</td>
-      </tr>
-      <tr>
-        <td>numBytesInLocal</td>
-        <td>The total number of bytes this task has read from a local source.</td>
-      </tr>
-      <tr>
-        <td>numBytesInRemote</td>
-        <td>The total number of bytes this task has read from a remote source.</td>
-      </tr>
-      <tr>
-        <td>numBytesOut</td>
-        <td>The total number of bytes this task has emitted.</td>
-      </tr>
-    </tr>
-    <tr>
-      <tr>
-        <th rowspan="3"><strong>Operator</strong></th>
-        <td>numRecordsIn</td>
-        <td>The total number of records this operator has received.</td>
-      </tr>
-      <tr>
-        <td>numRecordsOut</td>
-        <td>The total number of records this operator has emitted.</td>
-      </tr>
-      <tr>
-        <td>numSplitsProcessed</td>
-        <td>The total number of InputSplits this data source has processed.</td>
-      </tr>
+      <th rowspan="7"><strong>Task</strong></th>
+      <td>currentLowWatermark</td>
+      <td>The lowest watermark a task has received.</td>
+    </tr>
+    <tr>
+      <td>lastCheckpointDuration</td>
+      <td>The time it took to complete the last checkpoint.</td>
+    </tr>
+    <tr>
+      <td>lastCheckpointSize</td>
+      <td>The total size of the last checkpoint.</td>
+    </tr>
+    <tr>
+      <td>restartingTime</td>
+      <td>The time it took to restart the job.</td>
+    </tr>
+    <tr>
+      <td>numBytesInLocal</td>
+      <td>The total number of bytes this task has read from a local source.</td>
+    </tr>
+    <tr>
+      <td>numBytesInRemote</td>
+      <td>The total number of bytes this task has read from a remote source.</td>
+    </tr>
+    <tr>
+      <td>numBytesOut</td>
+      <td>The total number of bytes this task has emitted.</td>
+    </tr>
+    <tr>
+      <th rowspan="4"><strong>Operator</strong></th>
+      <td>numRecordsIn</td>
+      <td>The total number of records this operator has received.</td>
+    </tr>
+    <tr>
+      <td>numRecordsOut</td>
+      <td>The total number of records this operator has emitted.</td>
+    </tr>
+    <tr>
+      <td>numSplitsProcessed</td>
+      <td>The total number of InputSplits this data source has processed (if the operator is a data source).</td>
+    </tr>
+    <tr>
+      <td>latency</td>
+      <td>A latency gauge reporting the latency distribution from the different sources.</td>
     </tr>
   </tbody>
 </table>
 
+
+### Latency tracking
+
+Flink allows to track the latency of records traveling through the system. To enable the latency tracking
+a `latencyTrackingInterval` (in milliseconds) has to be set to a positive value in the `ExecutionConfig`.
+
+At the `latencyTrackingInterval`, the sources will periodically emit a special record, called a `LatencyMarker`.
+The marker contains a timestamp from the time when the record has been emitted at the sources.
+Latency markers can not overtake regular user records, thus if records are queuing up in front of an operator, 
+it will add to the latency tracked by the marker.
+
+Note that the latency markers are not accounting for the time user records spend in operators as they are
+bypassing them. In particular the markers are not accounting for the time records spend for example in window buffers.
+Only if operators are not able to accept new records, thus they are queuing up, the latency measured using
+the markers will reflect that.
+
+All intermediate operators keep a list of the last `n` latencies from each source to compute 
+a latency distribution.
+The sink operators keep a list from each source, and each parallel source instance to allow detecting 
+latency issues caused by individual machines.
+
+Currently, Flink assumes that the clocks of all machines in the cluster are in sync. We recommend setting
+up an automated clock synchronisation service (like NTP) to avoid false latency results.
+
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 54ef394..3f6b705 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -378,6 +378,9 @@ Previously this key was named `recovery.mode` and the default value was `standal
 
 - `metrics.scope.tm.operator`: (Default: &lt;host&gt;.taskmanager.&lt;tm_id&gt;.&lt;job_name&gt;.&lt;operator_name&gt;.&lt;subtask_index&gt;) Defines the scope format string that is applied to all metrics scoped to an operator.
 
+- `metrics.latency.history-size`: (Default: 128) Defines the number of measured latencies to maintain at each operator
+
+
 ## Background
 
 ### Configuring the Network Buffers

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
index c15b5f6..e0659da 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.storm.util.AbstractTest;
 import org.apache.flink.storm.util.SplitStreamType;
 import org.apache.flink.storm.util.StormConfig;
@@ -369,6 +370,7 @@ public class BoltWrapperTest extends AbstractTest {
 		when(env.getTaskInfo()).thenReturn(new TaskInfo("Mock Task", 1, 0, 1, 0));
 		when(env.getUserClassLoader()).thenReturn(BoltWrapperTest.class.getClassLoader());
 		when(env.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup());
+		when(env.getTaskManagerInfo()).thenReturn(new TaskManagerRuntimeInfo("foo", new Configuration(), "foo"));
 
 		StreamTask<?, ?> mockTask = mock(StreamTask.class);
 		when(mockTask.getCheckpointLock()).thenReturn(new Object());

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index a0a63b1..3daf9cf 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -121,6 +121,11 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 	private long autoWatermarkInterval = 0;
 
 	/**
+	 * Interval in milliseconds for sending latency tracking marks from the sources to the sinks.
+	 */
+	private long latencyTrackingInterval = 2000L;
+
+	/**
 	 * @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration
 	 */
 	@Deprecated
@@ -205,6 +210,40 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
 	}
 
 	/**
+	 * Interval for sending latency tracking marks from the sources to the sinks.
+	 * Flink will send latency tracking marks from the sources at the specified interval.
+	 *
+	 * Recommended value: 2000 (2 seconds).
+	 *
+	 * Setting a tracking interval <= 0 disables the latency tracking.
+	 *
+	 * @param interval Interval in milliseconds.
+	 */
+	@PublicEvolving
+	public ExecutionConfig setLatencyTrackingInterval(long interval) {
+		this.latencyTrackingInterval = interval;
+		return this;
+	}
+
+	/**
+	 * Returns the latency tracking interval.
+	 * @return The latency tracking interval in milliseconds
+	 */
+	@PublicEvolving
+	public long getLatencyTrackingInterval() {
+		return latencyTrackingInterval;
+	}
+
+	/**
+	 * Returns if latency tracking is enabled
+	 * @return True, if the tracking is enabled, false otherwise.
+	 */
+	@PublicEvolving
+	public boolean isLatencyTrackingEnabled() {
+		return latencyTrackingInterval > 0;
+	}
+
+	/**
 	 * Gets the parallelism with which operation are executed by default. Operations can
 	 * individually override this value to use a specific parallelism.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index f508d4a..3fe0306 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -868,6 +868,10 @@ public final class ConfigConstants {
 	/** The scope format string that is applied to all metrics scoped to an operator. */
 	public static final String METRICS_SCOPE_NAMING_OPERATOR = "metrics.scope.operator";
 
+	/** The number of measured latencies to maintain at each operator */
+	public static final String METRICS_LATENCY_HISTORY_SIZE = "metrics.latency.history-size";
+
+
 	// ---------------------------- Checkpoints -------------------------------
 
 	/** The default directory for savepoints. */
@@ -886,6 +890,7 @@ public final class ConfigConstants {
 	@Deprecated
 	public static final String SAVEPOINT_FS_DIRECTORY_KEY = "savepoints.state.backend.fs.dir";
 
+
 	// ------------------------------------------------------------------------
 	//                            Default Values
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index cbbeec7..f0ff918 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -79,7 +79,7 @@ public interface Environment {
 	ExecutionAttemptID getExecutionId();
 
 	/**
-	 * Returns the task-wide configuration object, originally attache to the job vertex.
+	 * Returns the task-wide configuration object, originally attached to the job vertex.
 	 *
 	 * @return The task-wide configuration
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 4963698..422aa65 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -25,8 +25,10 @@ import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.XORShiftRandom;
 
 import java.io.IOException;
+import java.util.Random;
 
 import static org.apache.flink.runtime.io.network.api.serialization.RecordSerializer.SerializationResult;
 
@@ -54,6 +56,8 @@ public class RecordWriter<T extends IOReadableWritable> {
 	/** {@link RecordSerializer} per outgoing channel */
 	private final RecordSerializer<T>[] serializers;
 
+	private final Random RNG = new XORShiftRandom();
+
 	public RecordWriter(ResultPartitionWriter writer) {
 		this(writer, new RoundRobinChannelSelector<T>());
 	}
@@ -78,22 +82,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 
 	public void emit(T record) throws IOException, InterruptedException {
 		for (int targetChannel : channelSelector.selectChannels(record, numChannels)) {
-			// serialize with corresponding serializer and send full buffer
-			RecordSerializer<T> serializer = serializers[targetChannel];
-
-			synchronized (serializer) {
-				SerializationResult result = serializer.addRecord(record);
-				while (result.isFullBuffer()) {
-					Buffer buffer = serializer.getCurrentBuffer();
-
-					if (buffer != null) {
-						writeBuffer(buffer, targetChannel, serializer);
-					}
-
-					buffer = writer.getBufferProvider().requestBufferBlocking();
-					result = serializer.setNextBuffer(buffer);
-				}
-			}
+			sendToTarget(record, targetChannel);
 		}
 	}
 
@@ -103,21 +92,31 @@ public class RecordWriter<T extends IOReadableWritable> {
 	 */
 	public void broadcastEmit(T record) throws IOException, InterruptedException {
 		for (int targetChannel = 0; targetChannel < numChannels; targetChannel++) {
-			// serialize with corresponding serializer and send full buffer
-			RecordSerializer<T> serializer = serializers[targetChannel];
+			sendToTarget(record, targetChannel);
+		}
+	}
 
-			synchronized (serializer) {
-				SerializationResult result = serializer.addRecord(record);
-				while (result.isFullBuffer()) {
-					Buffer buffer = serializer.getCurrentBuffer();
+	/**
+	 * This is used to send LatencyMarks to a random target channel
+	 */
+	public void randomEmit(T record) throws IOException, InterruptedException {
+		sendToTarget(record, RNG.nextInt(numChannels));
+	}
 
-					if (buffer != null) {
-						writeBuffer(buffer, targetChannel, serializer);
-					}
+	private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
+		RecordSerializer<T> serializer = serializers[targetChannel];
 
-					buffer = writer.getBufferProvider().requestBufferBlocking();
-					result = serializer.setNextBuffer(buffer);
+		synchronized (serializer) {
+			SerializationResult result = serializer.addRecord(record);
+			while (result.isFullBuffer()) {
+				Buffer buffer = serializer.getCurrentBuffer();
+
+				if (buffer != null) {
+					writeBuffer(buffer, targetChannel, serializer);
 				}
+
+				buffer = writer.getBufferProvider().requestBufferBlocking();
+				result = serializer.setNextBuffer(buffer);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index bb07122..04ba4e5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -92,7 +92,7 @@ public class DummyEnvironment implements Environment {
 
 	@Override
 	public TaskManagerRuntimeInfo getTaskManagerInfo() {
-		return null;
+		return new TaskManagerRuntimeInfo("foo", new Configuration(), "foo");
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
index 8346013..24f1c63 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
@@ -23,13 +23,16 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.XORShiftRandom;
 
 
 public class DirectedOutput<OUT> implements Output<StreamRecord<OUT>> {
@@ -42,6 +45,8 @@ public class DirectedOutput<OUT> implements Output<StreamRecord<OUT>> {
 	
 	protected final Output<StreamRecord<OUT>>[] allOutputs;
 
+	private final Random RNG = new XORShiftRandom();
+
 	
 	@SuppressWarnings({"unchecked", "rawtypes"})
 	public DirectedOutput(
@@ -100,6 +105,12 @@ public class DirectedOutput<OUT> implements Output<StreamRecord<OUT>> {
 		}
 	}
 
+	@Override
+	public void emitLatencyMarker(LatencyMarker latencyMarker) {
+		// randomly select an output
+		allOutputs[RNG.nextInt(allOutputs.length)].emitLatencyMarker(latencyMarker);
+	}
+
 	protected Set<Output<StreamRecord<OUT>>> selectOutputs(StreamRecord<OUT> record)  {
 		Set<Output<StreamRecord<OUT>>> selectedOutputs = new HashSet<>(selectAllOutputs.length);
 		Collections.addAll(selectedOutputs, selectAllOutputs);

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 0ca89ef..77e4d9a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -19,13 +19,17 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -38,12 +42,16 @@ import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ConcurrentModificationException;
 import java.util.Collection;
 import java.util.concurrent.RunnableFuture;
 
@@ -103,7 +111,13 @@ public abstract class AbstractStreamOperator<OUT>
 
 	private transient Collection<OperatorStateHandle> lazyRestoreStateHandles;
 
-	protected transient MetricGroup metrics;
+
+	// --------------- Metrics ---------------------------
+
+	/** Metric group for the operator */
+	protected MetricGroup metrics;
+
+	protected LatencyGauge latencyGauge;
 
 	// ------------------------------------------------------------------------
 	//  Life Cycle
@@ -117,12 +131,21 @@ public abstract class AbstractStreamOperator<OUT>
 		
 		this.metrics = container.getEnvironment().getMetricGroup().addOperator(operatorName);
 		this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut"));
+		Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration();
+		int historySize = taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE);
+		if (historySize <= 0) {
+			LOG.warn("{} has been set to a value below 0: {}. Using default.", ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, historySize);
+			historySize = ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE;
+		}
+
+		latencyGauge = this.metrics.gauge("latency", new LatencyGauge(historySize));
 		this.runtimeContext = new StreamingRuntimeContext(this, container.getEnvironment(), container.getAccumulatorMap());
 
 		stateKeySelector1 = config.getStatePartitioner(0, getUserCodeClassloader());
 		stateKeySelector2 = config.getStatePartitioner(1, getUserCodeClassloader());
 	}
 	
+	@Override
 	public MetricGroup getMetricGroup() {
 		return metrics;
 	}
@@ -365,6 +388,155 @@ public abstract class AbstractStreamOperator<OUT>
 		return chainingStrategy;
 	}
 
+
+	// ------------------------------------------------------------------------
+	//  Metrics
+	// ------------------------------------------------------------------------
+
+	// ------- One input stream
+	public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
+		reportOrForwardLatencyMarker(latencyMarker);
+	}
+
+	// ------- Two input stream
+	public void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception {
+		reportOrForwardLatencyMarker(latencyMarker);
+	}
+
+	public void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception {
+		reportOrForwardLatencyMarker(latencyMarker);
+	}
+
+
+	protected void reportOrForwardLatencyMarker(LatencyMarker maker) {
+		// all operators are tracking latencies
+		this.latencyGauge.reportLatency(maker, false);
+
+		// everything except sinks forwards latency markers
+		this.output.emitLatencyMarker(maker);
+	}
+
+	// ----------------------- Helper classes -----------------------
+
+
+	/**
+	 * The gauge uses a HashMap internally to avoid classloading issues when accessing
+	 * the values using JMX.
+	 */
+	protected static class LatencyGauge implements Gauge<Map<String, HashMap<String, Double>>> {
+		private final Map<LatencySourceDescriptor, DescriptiveStatistics> latencyStats = new HashMap<>();
+		private final int historySize;
+
+		LatencyGauge(int historySize) {
+			this.historySize = historySize;
+		}
+
+		public void reportLatency(LatencyMarker marker, boolean isSink) {
+			LatencySourceDescriptor sourceDescriptor = LatencySourceDescriptor.of(marker, !isSink);
+			DescriptiveStatistics sourceStats = latencyStats.get(sourceDescriptor);
+			if (sourceStats == null) {
+				// 512 element window (4 kb)
+				sourceStats = new DescriptiveStatistics(this.historySize);
+				latencyStats.put(sourceDescriptor, sourceStats);
+			}
+			long now = System.currentTimeMillis();
+			sourceStats.addValue(now - marker.getMarkedTime());
+		}
+
+		@Override
+		public Map<String, HashMap<String, Double>> getValue() {
+			while (true) {
+				try {
+					Map<String, HashMap<String, Double>> ret = new HashMap<>();
+					for (Map.Entry<LatencySourceDescriptor, DescriptiveStatistics> source : latencyStats.entrySet()) {
+						HashMap<String, Double> sourceStatistics = new HashMap<>(6);
+						sourceStatistics.put("max", source.getValue().getMax());
+						sourceStatistics.put("mean", source.getValue().getMean());
+						sourceStatistics.put("min", source.getValue().getMin());
+						sourceStatistics.put("p50", source.getValue().getPercentile(50));
+						sourceStatistics.put("p95", source.getValue().getPercentile(95));
+						sourceStatistics.put("p99", source.getValue().getPercentile(99));
+						ret.put(source.getKey().toString(), sourceStatistics);
+					}
+					return ret;
+					// Concurrent access onto the "latencyStats" map could cause
+					// ConcurrentModificationExceptions. To avoid unnecessary blocking
+					// of the reportLatency() method, we retry this operation until
+					// it succeeds.
+				} catch(ConcurrentModificationException ignore) {
+					LOG.debug("Unable to report latency statistics", ignore);
+				}
+			}
+		}
+	}
+
+	/**
+	 * Identifier for a latency source
+	 */
+	private static class LatencySourceDescriptor {
+		/**
+		 * A unique ID identifying a logical source in Flink
+		 */
+		private final int vertexID;
+
+		/**
+		 * Identifier for parallel subtasks of a logical source
+		 */
+		private final int subtaskIndex;
+
+		/**
+		 *
+		 * @param marker The latency marker to extract the LatencySourceDescriptor from.
+		 * @param ignoreSubtaskIndex Set to true to ignore the subtask index, to treat the latencies from all the parallel instances of a source as the same.
+		 * @return A LatencySourceDescriptor for the given marker.
+		 */
+		public static LatencySourceDescriptor of(LatencyMarker marker, boolean ignoreSubtaskIndex) {
+			if (ignoreSubtaskIndex) {
+				return new LatencySourceDescriptor(marker.getVertexID(), -1);
+			} else {
+				return new LatencySourceDescriptor(marker.getVertexID(), marker.getSubtaskIndex());
+			}
+
+		}
+
+		private LatencySourceDescriptor(int vertexID, int subtaskIndex) {
+			this.vertexID = vertexID;
+			this.subtaskIndex = subtaskIndex;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+
+			LatencySourceDescriptor that = (LatencySourceDescriptor) o;
+
+			if (vertexID != that.vertexID) {
+				return false;
+			}
+			return subtaskIndex == that.subtaskIndex;
+		}
+
+		@Override
+		public int hashCode() {
+			int result = vertexID;
+			result = 31 * result + subtaskIndex;
+			return result;
+		}
+
+		@Override
+		public String toString() {
+			return "LatencySourceDescriptor{" +
+					"vertexID=" + vertexID +
+					", subtaskIndex=" + subtaskIndex +
+					'}';
+		}
+	}
+
 	public class CountingOutput implements Output<StreamRecord<OUT>> {
 		private final Output<StreamRecord<OUT>> output;
 		private final Counter numRecordsOut;
@@ -380,6 +552,11 @@ public abstract class AbstractStreamOperator<OUT>
 		}
 
 		@Override
+		public void emitLatencyMarker(LatencyMarker latencyMarker) {
+			output.emitLatencyMarker(latencyMarker);
+		}
+
+		@Override
 		public void collect(StreamRecord<OUT> record) {
 			numRecordsOut.inc();
 			output.collect(record);

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java
index 323feb5..d9de230 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/OneInputStreamOperator.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 /**
@@ -46,4 +47,6 @@ public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT> {
 	 * @see org.apache.flink.streaming.api.watermark.Watermark
 	 */
 	void processWatermark(Watermark mark) throws Exception;
+
+	void processLatencyMarker(LatencyMarker latencyMarker) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
index 4a7002f..ec2409e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Output.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.util.Collector;
 
 /**
@@ -39,4 +40,6 @@ public interface Output<T> extends Collector<T> {
 	 * timestamp will be emitted in the future.
 	 */
 	void emitWatermark(Watermark mark);
+
+	void emitLatencyMarker(LatencyMarker latencyMarker);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
deleted file mode 100644
index 8835032..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamCounter.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-@Internal
-public class StreamCounter<IN> extends AbstractStreamOperator<Long> implements OneInputStreamOperator<IN, Long> {
-
-	private static final long serialVersionUID = 1L;
-
-	private Long count = 0L;
-
-	public StreamCounter() {
-		chainingStrategy = ChainingStrategy.ALWAYS;
-	}
-
-	@Override
-	public void processElement(StreamRecord<IN> element) {
-		output.collect(element.replace(++count));
-	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		output.emitWatermark(mark);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
index b11e22c..229c254 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamGroupedReduce.java
@@ -70,5 +70,4 @@ public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, Reduc
 		output.emitWatermark(mark);
 	}
 
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
index 9fa2039..bd0f574 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.operators;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 @Internal
@@ -30,7 +31,6 @@ public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFuncti
 
 	public StreamSink(SinkFunction<IN> sinkFunction) {
 		super(sinkFunction);
-
 		chainingStrategy = ChainingStrategy.ALWAYS;
 	}
 
@@ -43,4 +43,12 @@ public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFuncti
 	public void processWatermark(Watermark mark) throws Exception {
 		// ignore it for now, we are a sink, after all
 	}
+
+	@Override
+	protected void reportOrForwardLatencyMarker(LatencyMarker maker) {
+		// all operators are tracking latencies
+		this.latencyGauge.reportLatency(maker, true);
+
+		// sinks don't forward latency markers
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 1409ae4..a07e6b7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -21,8 +21,14 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
 /**
  * {@link StreamOperator} for streaming sources.
  *
@@ -53,6 +59,13 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 	
 	public void run(final Object lockingObject, final Output<StreamRecord<OUT>> collector) throws Exception {
 		final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
+
+		LatencyMarksEmitter latencyEmitter = null;
+		if(getExecutionConfig().isLatencyTrackingEnabled()) {
+			latencyEmitter = new LatencyMarksEmitter<>(lockingObject, collector, getExecutionConfig().getLatencyTrackingInterval(),
+					getOperatorConfig().getVertexID(), getRuntimeContext().getIndexOfThisSubtask());
+		}
+		
 		final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
 
 		this.ctx = StreamSourceContexts.getSourceContext(
@@ -70,6 +83,9 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 		} finally {
 			// make sure that the context is closed in any case
 			ctx.close();
+			if(latencyEmitter != null) {
+				latencyEmitter.close();
+			}
 		}
 	}
 
@@ -103,4 +119,30 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 	protected boolean isCanceledOrStopped() {
 		return canceledOrStopped;
 	}
+
+	private static class LatencyMarksEmitter<OUT> {
+		private final ScheduledExecutorService scheduleExecutor;
+		private final ScheduledFuture<?> latencyMarkTimer;
+
+		public LatencyMarksEmitter(final Object lockingObject, final Output<StreamRecord<OUT>> output, long latencyTrackingInterval, final int vertexID, final int subtaskIndex) {
+			this.scheduleExecutor = Executors.newScheduledThreadPool(1);
+			this.latencyMarkTimer = scheduleExecutor.scheduleAtFixedRate(new Runnable() {
+				@Override
+				public void run() {
+					try {
+						synchronized (lockingObject) {
+							output.emitLatencyMarker(new LatencyMarker(System.currentTimeMillis(), vertexID, subtaskIndex));
+						}
+					} catch (Throwable t) {
+						LOG.warn("Error while emitting latency marker", t);
+					}
+				}
+			}, 0, latencyTrackingInterval, TimeUnit.MILLISECONDS);
+		}
+
+		public void close() {
+			latencyMarkTimer.cancel(true);
+			scheduleExecutor.shutdownNow();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java
index d22583d..e45fedf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 /**
@@ -38,13 +39,13 @@ public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OU
 	 * Processes one element that arrived on the first input of this two-input operator.
 	 * This method is guaranteed to not be called concurrently with other methods of the operator.
 	 */
-	public void processElement1(StreamRecord<IN1> element) throws Exception;
+	void processElement1(StreamRecord<IN1> element) throws Exception;
 
 	/**
 	 * Processes one element that arrived on the second input of this two-input operator.
 	 * This method is guaranteed to not be called concurrently with other methods of the operator.
 	 */
-	public void processElement2(StreamRecord<IN2> element) throws Exception;
+	void processElement2(StreamRecord<IN2> element) throws Exception;
 
 	/**
 	 * Processes a {@link Watermark} that arrived on the first input of this two-input operator.
@@ -52,7 +53,7 @@ public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OU
 	 *
 	 * @see org.apache.flink.streaming.api.watermark.Watermark
 	 */
-	public void processWatermark1(Watermark mark) throws Exception;
+	void processWatermark1(Watermark mark) throws Exception;
 
 	/**
 	 * Processes a {@link Watermark} that arrived on the second input of this two-input operator.
@@ -60,6 +61,22 @@ public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OU
 	 *
 	 * @see org.apache.flink.streaming.api.watermark.Watermark
 	 */
-	public void processWatermark2(Watermark mark) throws Exception;
+	void processWatermark2(Watermark mark) throws Exception;
+
+	/**
+	 * Processes a {@link LatencyMarker} that arrived on the first input of this two-input operator.
+	 * This method is guaranteed to not be called concurrently with other methods of the operator.
+	 *
+	 * @see org.apache.flink.streaming.runtime.streamrecord.LatencyMarker
+	 */
+	void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception;
+
+	/**
+	 * Processes a {@link LatencyMarker} that arrived on the second input of this two-input operator.
+	 * This method is guaranteed to not be called concurrently with other methods of the operator.
+	 *
+	 * @see org.apache.flink.streaming.runtime.streamrecord.LatencyMarker
+	 */
+	void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index c9d579f..9f046f6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
@@ -48,7 +49,7 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
 	public RecordWriterOutput(
 			StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
 			TypeSerializer<OUT> outSerializer,
-			boolean enableWatermarkMultiplexing) {
+			boolean enableMultiplexing) {
 
 		checkNotNull(recordWriter);
 		
@@ -58,7 +59,7 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
 				(StreamRecordWriter<?>) recordWriter;
 
 		TypeSerializer<StreamElement> outRecordSerializer;
-		if (enableWatermarkMultiplexing) {
+		if (enableMultiplexing) {
 			outRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outSerializer);
 		} else {
 			outRecordSerializer = (TypeSerializer<StreamElement>)
@@ -94,6 +95,18 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>> {
 		}
 	}
 
+	@Override
+	public void emitLatencyMarker(LatencyMarker latencyMarker) {
+		serializationDelegate.setInstance(latencyMarker);
+
+		try {
+			recordWriter.randomEmit(serializationDelegate);
+		}
+		catch (Exception e) {
+			throw new RuntimeException(e.getMessage(), e);
+		}
+	}
+
 	public void broadcastEvent(AbstractEvent barrier) throws IOException, InterruptedException {
 		recordWriter.broadcastEvent(barrier);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 2dbc6d4..47e55dc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -89,7 +89,7 @@ public class StreamInputProcessor<IN> {
 			StatefulTask checkpointedTask,
 			CheckpointingMode checkpointMode,
 			IOManager ioManager,
-			boolean enableWatermarkMultiplexing) throws IOException {
+			boolean enableMultiplexing) throws IOException {
 
 		InputGate inputGate = InputGateUtil.createInputGate(inputGates);
 
@@ -107,13 +107,13 @@ public class StreamInputProcessor<IN> {
 			this.barrierHandler.registerCheckpointEventHandler(checkpointedTask);
 		}
 		
-		if (enableWatermarkMultiplexing) {
-			MultiplexingStreamRecordSerializer<IN> ser = new MultiplexingStreamRecordSerializer<IN>(inputSerializer);
-			this.deserializationDelegate = new NonReusingDeserializationDelegate<StreamElement>(ser);
+		if (enableMultiplexing) {
+			MultiplexingStreamRecordSerializer<IN> ser = new MultiplexingStreamRecordSerializer<>(inputSerializer);
+			this.deserializationDelegate = new NonReusingDeserializationDelegate<>(ser);
 		} else {
 			StreamRecordSerializer<IN> ser = new StreamRecordSerializer<IN>(inputSerializer);
 			this.deserializationDelegate = (NonReusingDeserializationDelegate<StreamElement>)
-					(NonReusingDeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN>>(ser);
+					(NonReusingDeserializationDelegate<?>) new NonReusingDeserializationDelegate<>(ser);
 		}
 		
 		// Initialize one deserializer per input channel
@@ -150,14 +150,14 @@ public class StreamInputProcessor<IN> {
 				}
 
 				if (result.isFullRecord()) {
-					StreamElement recordOrWatermark = deserializationDelegate.getInstance();
+					StreamElement recordOrMark = deserializationDelegate.getInstance();
 
-					if (recordOrWatermark.isWatermark()) {
-						long watermarkMillis = recordOrWatermark.asWatermark().getTimestamp();
+					if (recordOrMark.isWatermark()) {
+						long watermarkMillis = recordOrMark.asWatermark().getTimestamp();
 						if (watermarkMillis > watermarks[currentChannel]) {
 							watermarks[currentChannel] = watermarkMillis;
 							long newMinWatermark = Long.MAX_VALUE;
-							for (long watermark : watermarks) {
+							for (long watermark: watermarks) {
 								newMinWatermark = Math.min(watermark, newMinWatermark);
 							}
 							if (newMinWatermark > lastEmittedWatermark) {
@@ -168,9 +168,15 @@ public class StreamInputProcessor<IN> {
 							}
 						}
 						continue;
+					} else if(recordOrMark.isLatencyMarker()) {
+						// handle latency marker
+						synchronized (lock) {
+							streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
+						}
+						continue;
 					} else {
 						// now we can do the actual processing
-						StreamRecord<IN> record = recordOrWatermark.asRecord();
+						StreamRecord<IN> record = recordOrMark.asRecord();
 						synchronized (lock) {
 							numRecordsIn.inc();
 							streamOperator.setKeyContextElement1(record);

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
index f46b366..6d5e89b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
@@ -98,6 +98,15 @@ public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWrit
 		}
 	}
 
+	@Override
+	public void randomEmit(T record) throws IOException, InterruptedException {
+		checkErroneous();
+		super.randomEmit(record);
+		if (flushAlways) {
+			flush();
+		}
+	}
+
 	/**
 	 * Closes the writer. This stops the flushing thread (if there is one).
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 70ce783..a25c1a1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -40,7 +40,6 @@ import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 
 import java.io.IOException;
@@ -97,7 +96,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 			StatefulTask checkpointedTask,
 			CheckpointingMode checkpointMode,
 			IOManager ioManager,
-			boolean enableWatermarkMultiplexing) throws IOException {
+			boolean enableMultiplexing) throws IOException {
 		
 		final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2);
 
@@ -115,24 +114,24 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 			this.barrierHandler.registerCheckpointEventHandler(checkpointedTask);
 		}
 		
-		if (enableWatermarkMultiplexing) {
-			MultiplexingStreamRecordSerializer<IN1> ser = new MultiplexingStreamRecordSerializer<IN1>(inputSerializer1);
-			this.deserializationDelegate1 = new NonReusingDeserializationDelegate<StreamElement>(ser);
+		if (enableMultiplexing) {
+			MultiplexingStreamRecordSerializer<IN1> ser = new MultiplexingStreamRecordSerializer<>(inputSerializer1);
+			this.deserializationDelegate1 = new NonReusingDeserializationDelegate<>(ser);
 		}
 		else {
-			StreamRecordSerializer<IN1> ser = new StreamRecordSerializer<IN1>(inputSerializer1);
+			StreamRecordSerializer<IN1> ser = new StreamRecordSerializer<>(inputSerializer1);
 			this.deserializationDelegate1 = (DeserializationDelegate<StreamElement>)
-					(DeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN1>>(ser);
+					(DeserializationDelegate<?>) new NonReusingDeserializationDelegate<>(ser);
 		}
 		
-		if (enableWatermarkMultiplexing) {
-			MultiplexingStreamRecordSerializer<IN2> ser = new MultiplexingStreamRecordSerializer<IN2>(inputSerializer2);
-			this.deserializationDelegate2 = new NonReusingDeserializationDelegate<StreamElement>(ser);
+		if (enableMultiplexing) {
+			MultiplexingStreamRecordSerializer<IN2> ser = new MultiplexingStreamRecordSerializer<>(inputSerializer2);
+			this.deserializationDelegate2 = new NonReusingDeserializationDelegate<>(ser);
 		}
 		else {
-			StreamRecordSerializer<IN2> ser = new StreamRecordSerializer<IN2>(inputSerializer2);
+			StreamRecordSerializer<IN2> ser = new StreamRecordSerializer<>(inputSerializer2);
 			this.deserializationDelegate2 = (DeserializationDelegate<StreamElement>)
-					(DeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN2>>(ser);
+					(DeserializationDelegate<?>) new NonReusingDeserializationDelegate<>(ser);
 		}
 
 		// Initialize one deserializer per input channel
@@ -185,7 +184,13 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 					if (currentChannel < numInputChannels1) {
 						StreamElement recordOrWatermark = deserializationDelegate1.getInstance();
 						if (recordOrWatermark.isWatermark()) {
-							handleWatermark(streamOperator, (Watermark) recordOrWatermark, currentChannel, lock);
+							handleWatermark(streamOperator, recordOrWatermark.asWatermark(), currentChannel, lock);
+							continue;
+						}
+						else if (recordOrWatermark.isLatencyMarker()) {
+							synchronized (lock) {
+								streamOperator.processLatencyMarker1(recordOrWatermark.asLatencyMarker());
+							}
 							continue;
 						}
 						else {
@@ -203,6 +208,12 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 							handleWatermark(streamOperator, recordOrWatermark.asWatermark(), currentChannel, lock);
 							continue;
 						}
+						else if (recordOrWatermark.isLatencyMarker()) {
+							synchronized (lock) {
+								streamOperator.processLatencyMarker2(recordOrWatermark.asLatencyMarker());
+							}
+							continue;
+						}
 						else {
 							synchronized (lock) {
 								streamOperator.setKeyContextElement2(recordOrWatermark.<IN2>asRecord());

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
new file mode 100644
index 0000000..714bdae
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamrecord;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Special record type carrying a timestamp of its creation time at a source operator
+ * and the vertexId and subtask index of the operator.
+ *
+ * At sinks, the marker can be used to approximate the time a record needs to travel
+ * through the dataflow.
+ */
+@PublicEvolving
+public final class LatencyMarker extends StreamElement {
+
+	// ------------------------------------------------------------------------
+
+	/** The time the latency mark is denoting */
+	private final long markedTime;
+
+	private final int vertexID;
+
+	private final int subtaskIndex;
+
+	/**
+	 * Creates a latency mark with the given timestamp
+	 */
+	public LatencyMarker(long markedTime, int vertexID, int subtaskIndex) {
+		this.markedTime = markedTime;
+		this.vertexID = vertexID;
+		this.subtaskIndex = subtaskIndex;
+	}
+
+	/**
+	 * Returns the timestamp marked by the LatencyMarker
+	 */
+	public long getMarkedTime() {
+		return markedTime;
+	}
+
+	public int getVertexID() {
+		return vertexID;
+	}
+
+	public int getSubtaskIndex() {
+		return subtaskIndex;
+	}
+
+	// ------------------------------------------------------------------------
+
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()){
+			return false;
+		}
+
+		LatencyMarker that = (LatencyMarker) o;
+
+		if (markedTime != that.markedTime) {
+			return false;
+		}
+		if (vertexID != that.vertexID) {
+			return false;
+		}
+		return subtaskIndex == that.subtaskIndex;
+
+	}
+
+	@Override
+	public int hashCode() {
+		int result = (int) (markedTime ^ (markedTime >>> 32));
+		result = 31 * result + vertexID;
+		result = 31 * result + subtaskIndex;
+		return result;
+	}
+
+	@Override
+	public String toString() {
+		return "LatencyMarker{" +
+				"markedTime=" + markedTime +
+				", vertexID=" + vertexID +
+				", subtaskIndex=" + subtaskIndex +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
index 832c4b6..95e3ebd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
@@ -43,6 +43,7 @@ public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<
 	private static final int TAG_REC_WITH_TIMESTAMP = 0;
 	private static final int TAG_REC_WITHOUT_TIMESTAMP = 1;
 	private static final int TAG_WATERMARK = 2;
+	private static final int TAG_LATENCY_MARKER = 3;
 	
 	
 	private final TypeSerializer<T> typeSerializer;
@@ -95,7 +96,7 @@ public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<
 			StreamRecord<T> fromRecord = from.asRecord();
 			return fromRecord.copy(typeSerializer.copy(fromRecord.getValue()));
 		}
-		else if (from.isWatermark()) {
+		else if (from.isWatermark() || from.isLatencyMarker()) {
 			// is immutable
 			return from;
 		}
@@ -114,7 +115,7 @@ public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<
 			fromRecord.copyTo(valueCopy, reuseRecord);
 			return reuse;
 		}
-		else if (from.isWatermark()) {
+		else if (from.isWatermark() || from.isLatencyMarker()) {
 			// is immutable
 			return from;
 		}
@@ -139,7 +140,11 @@ public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<
 		else if (tag == TAG_WATERMARK) {
 			target.writeLong(source.readLong());
 		}
-		else {
+		else if (tag == TAG_LATENCY_MARKER) {
+			target.writeLong(source.readLong());
+			target.writeInt(source.readInt());
+			target.writeInt(source.readInt());
+		} else {
 			throw new IOException("Corrupt stream, found tag: " + tag);
 		}
 	}
@@ -161,6 +166,12 @@ public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<
 			target.write(TAG_WATERMARK);
 			target.writeLong(value.asWatermark().getTimestamp());
 		}
+		else if (value.isLatencyMarker()) {
+			target.write(TAG_LATENCY_MARKER);
+			target.writeLong(value.asLatencyMarker().getMarkedTime());
+			target.writeInt(value.asLatencyMarker().getVertexID());
+			target.writeInt(value.asLatencyMarker().getSubtaskIndex());
+		}
 		else {
 			throw new RuntimeException();
 		}
@@ -179,6 +190,9 @@ public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<
 		else if (tag == TAG_WATERMARK) {
 			return new Watermark(source.readLong());
 		}
+		else if (tag == TAG_LATENCY_MARKER) {
+			return new LatencyMarker(source.readLong(), source.readInt(), source.readInt());
+		}
 		else {
 			throw new IOException("Corrupt stream, found tag: " + tag);
 		}
@@ -203,6 +217,9 @@ public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<
 		else if (tag == TAG_WATERMARK) {
 			return new Watermark(source.readLong());
 		}
+		else if (tag == TAG_LATENCY_MARKER) {
+			return new LatencyMarker(source.readLong(), source.readInt(), source.readInt());
+		}
 		else {
 			throw new IOException("Corrupt stream, found tag: " + tag);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
index f6cccf7..62418bc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
@@ -44,6 +44,14 @@ public abstract class StreamElement {
 	}
 
 	/**
+	 * Checks whether this element is a record.
+	 * @return True, if this element is a record, false otherwise.
+	 */
+	public final boolean isLatencyMarker() {
+		return getClass() == LatencyMarker.class;
+	}
+
+	/**
 	 * Casts this element into a StreamRecord.
 	 * @return This element as a stream record.
 	 * @throws java.lang.ClassCastException Thrown, if this element is actually not a stream record.
@@ -61,4 +69,13 @@ public abstract class StreamElement {
 	public final Watermark asWatermark() {
 		return (Watermark) this;
 	}
+
+	/**
+	 * Casts this element into a LatencyMarker.
+	 * @return This element as a LatencyMarker.
+	 * @throws java.lang.ClassCastException Thrown, if this element is actually not a LatencyMarker.
+	 */
+	public final LatencyMarker asLatencyMarker() {
+		return (LatencyMarker) this;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 0a6534b..97546b8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -46,7 +46,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 					this, 
 					configuration.getCheckpointMode(),
 					getEnvironment().getIOManager(),
-					isSerializingTimestamps());
+					isSerializingMixedStream());
 
 			// make sure that stream tasks report their I/O statistics
 			AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 9e96f5d..7342b6d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -17,6 +17,13 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -29,24 +36,22 @@ import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput;
 import org.apache.flink.streaming.api.collector.selector.DirectedOutput;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.apache.flink.util.XORShiftRandom;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 
 /**
  * The {@code OperatorChain} contains all operators that are executed as one chain within a single
@@ -72,7 +77,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 		
 		final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
 		final StreamConfig configuration = containingTask.getConfiguration();
-		final boolean enableTimestamps = containingTask.isSerializingTimestamps();
+		final boolean enableMultiplexing = containingTask.isSerializingMixedStream();
 
 		headOperator = configuration.getStreamOperator(userCodeClassloader);
 
@@ -94,7 +99,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 				
 				RecordWriterOutput<?> streamOutput = createStreamOutput(
 						outEdge, chainedConfigs.get(outEdge.getSourceId()), i,
-						containingTask.getEnvironment(), enableTimestamps, reporter, containingTask.getName());
+						containingTask.getEnvironment(), enableMultiplexing, reporter, containingTask.getName());
 	
 				this.streamOutputs[i] = streamOutput;
 				streamOutputMap.put(outEdge, streamOutput);
@@ -300,7 +305,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 	
 	private static <T> RecordWriterOutput<T> createStreamOutput(
 			StreamEdge edge, StreamConfig upStreamConfig, int outputIndex,
-			Environment taskEnvironment, boolean withTimestamps,
+			Environment taskEnvironment, boolean enableMultiplexing,
 			AccumulatorRegistry.Reporter reporter, String taskName)
 	{
 		TypeSerializer<T> outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
@@ -317,7 +322,7 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 		output.setReporter(reporter);
 		output.setMetricGroup(taskEnvironment.getMetricGroup().getIOMetricGroup());
 		
-		return new RecordWriterOutput<T>(output, outSerializer, withTimestamps);
+		return new RecordWriterOutput<>(output, outSerializer, enableMultiplexing);
 	}
 	
 	// ------------------------------------------------------------------------
@@ -357,6 +362,16 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 		}
 
 		@Override
+		public void emitLatencyMarker(LatencyMarker latencyMarker) {
+			try {
+				operator.processLatencyMarker(latencyMarker);
+			}
+			catch (Exception e) {
+				throw new ExceptionInChainedOperatorException(e);
+			}
+		}
+
+		@Override
 		public void close() {
 			try {
 				operator.close();
@@ -393,6 +408,8 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 	private static class BroadcastingOutputCollector<T> implements Output<StreamRecord<T>> {
 		
 		protected final Output<StreamRecord<T>>[] outputs;
+
+		private final Random RNG = new XORShiftRandom();
 		
 		public BroadcastingOutputCollector(Output<StreamRecord<T>>[] outputs) {
 			this.outputs = outputs;
@@ -406,6 +423,18 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> {
 		}
 
 		@Override
+		public void emitLatencyMarker(LatencyMarker latencyMarker) {
+			if(outputs.length <= 0) {
+				// ignore
+			} else if(outputs.length == 1) {
+				outputs[0].emitLatencyMarker(latencyMarker);
+			} else {
+				// randomly select an output
+				outputs[RNG.nextInt(outputs.length)].emitLatencyMarker(latencyMarker);
+			}
+		}
+
+		@Override
 		public void collect(StreamRecord<T> record) {
 			for (Output<StreamRecord<T>> output : outputs) {
 				output.collect(record);

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
index 58e3cb8..a5f94ad 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,6 +75,11 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
 		public void processWatermark(Watermark mark) {
 			// ignore
 		}
+
+		@Override
+		public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
+			// ignore
+		}
 	}
 
 	private static class IterationTailOutput<IN> implements Output<StreamRecord<IN>> {
@@ -96,6 +102,10 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
 		}
 
 		@Override
+		public void emitLatencyMarker(LatencyMarker latencyMarker) {
+		}
+
+		@Override
 		public void collect(StreamRecord<IN> record) {
 			try {
 				if (shouldWait) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 4893fed..2e6ebf3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -461,6 +461,14 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		return tc == TimeCharacteristic.EventTime | tc == TimeCharacteristic.IngestionTime;
 	}
 
+	/**
+	 * Check if the tasks is sending a mixed stream (of watermarks, latency marks and records)
+	 * @return true if stream contains more than just records
+	 */
+	protected boolean isSerializingMixedStream() {
+		return isSerializingTimestamps() || getExecutionConfig().isLatencyTrackingEnabled();
+	}
+	
 	// ------------------------------------------------------------------------
 	//  Access to properties and utilities
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index fb08959..bc80607 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -71,7 +71,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 				this,
 				configuration.getCheckpointMode(),
 				getEnvironment().getIOManager(),
-				isSerializingTimestamps());
+				isSerializingMixedStream());
 
 		// make sure that stream tasks report their I/O statistics
 		AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index c93a439..aa86304 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -41,6 +41,7 @@ import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.util.EvenOddOutputSelector;
@@ -453,6 +454,16 @@ public class StreamGraphGeneratorTest {
 		public void processWatermark2(Watermark mark) throws Exception {}
 
 		@Override
+		public void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception {
+			// ignore
+		}
+
+		@Override
+		public void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception {
+			// ignore
+		}
+
+		@Override
 		public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Integer>> output) {}
 	}
 
@@ -476,6 +487,11 @@ public class StreamGraphGeneratorTest {
 		public void processWatermark(Watermark mark) {}
 
 		@Override
+		public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
+
+		}
+
+		@Override
 		public void setOutputType(TypeInformation<Integer> outTypeInfo, ExecutionConfig executionConfig) {
 			tpeInformation = outTypeInfo;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java
deleted file mode 100644
index dc8024c..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.junit.Test;
-
-/**
- * Tests for {@link StreamCounter}. These test that:
- *
- * <ul>
- *     <li>Timestamps of processed elements match the input timestamp</li>
- *     <li>Watermarks are correctly forwarded</li>
- * </ul>
- */
-public class StreamCounterTest {
-
-	@Test
-	public void testCount() throws Exception {
-		StreamCounter<String> operator = new StreamCounter<String>();
-
-		OneInputStreamOperatorTestHarness<String, Long> testHarness = new OneInputStreamOperatorTestHarness<String, Long>(operator);
-
-		long initialTime = 0L;
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
-
-		testHarness.open();
-
-		testHarness.processElement(new StreamRecord<String>("eins", initialTime + 1));
-		testHarness.processElement(new StreamRecord<String>("zwei", initialTime + 2));
-		testHarness.processWatermark(new Watermark(initialTime + 2));
-		testHarness.processElement(new StreamRecord<String>("drei", initialTime + 3));
-
-		expectedOutput.add(new StreamRecord<Long>(1L, initialTime + 1));
-		expectedOutput.add(new StreamRecord<Long>(2L, initialTime + 2));
-		expectedOutput.add(new Watermark(initialTime + 2));
-		expectedOutput.add(new StreamRecord<Long>(3L, initialTime + 3));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-	}
-}