You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/09/12 12:19:41 UTC

[GitHub] isunjin closed pull request #6657: [FLINK-10205] [JobManager] Batch Job: InputSplit Fault tolerant for DataSource…

isunjin closed pull request #6657:     [FLINK-10205] [JobManager] Batch Job: InputSplit Fault tolerant for DataSource…
URL: https://github.com/apache/flink/pull/6657
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/metric_configuration.html b/docs/_includes/generated/metric_configuration.html
index 98054e94224..0c0b0dd2ffb 100644
--- a/docs/_includes/generated/metric_configuration.html
+++ b/docs/_includes/generated/metric_configuration.html
@@ -7,11 +7,21 @@
         </tr>
     </thead>
     <tbody>
+        <tr>
+            <td><h5>metrics.latency.granularity</h5></td>
+            <td style="word-wrap: break-word;">"operator"</td>
+            <td>Defines the granularity of latency metrics. Accepted values are:<ul><li>single - Track latency without differentiating between sources and subtasks.</li><li>operator - Track latency while differentiating between sources, but not subtasks.</li><li>subtask - Track latency while differentiating between sources and subtasks.</li></ul></td>
+        </tr>
         <tr>
             <td><h5>metrics.latency.history-size</h5></td>
             <td style="word-wrap: break-word;">128</td>
             <td>Defines the number of measured latencies to maintain at each operator.</td>
         </tr>
+        <tr>
+            <td><h5>metrics.latency.interval</h5></td>
+            <td style="word-wrap: break-word;">0</td>
+            <td>Defines the interval at which latency tracking marks are emitted from the sources. Disables latency tracking if set to 0 or a negative value. Enabling this feature can significantly impact the performance of the cluster.</td>
+        </tr>
         <tr>
             <td><h5>metrics.reporter.&lt;name&gt;.&lt;parameter&gt;</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/docs/dev/batch/index.md b/docs/dev/batch/index.md
index c624fce8954..d0043647227 100644
--- a/docs/dev/batch/index.md
+++ b/docs/dev/batch/index.md
@@ -592,7 +592,7 @@ val output: DataSet[(Int, String, Double)] = input.sum(0).min(2)
       </td>
     </tr>
 
-    </tr>
+    <tr>
       <td><strong>Join</strong></td>
       <td>
         Joins two data sets by creating all pairs of elements that are equal on their keys.
@@ -608,7 +608,7 @@ val result = input1.join(input2).where(0).equalTo(1)
         describe whether the join happens through partitioning or broadcasting, and whether it uses
         a sort-based or a hash-based algorithm. Please refer to the
         <a href="dataset_transformations.html#join-algorithm-hints">Transformations Guide</a> for
-        a list of possible hints and an example.</br>
+        a list of possible hints and an example.<br />
         If no hint is specified, the system will try to make an estimate of the input sizes and
         pick the best strategy according to those estimates.
 {% highlight scala %}
@@ -700,7 +700,6 @@ val result = in.partitionByRange(0).mapPartition { ... }
 {% endhighlight %}
       </td>
     </tr>
-    </tr>
     <tr>
       <td><strong>Custom Partitioning</strong></td>
       <td>
@@ -1615,7 +1614,7 @@ In object-reuse enabled mode, Flink's runtime minimizes the number of object ins
    <tr>
       <td><strong>Emitting Input Objects</strong></td>
       <td>
-        You <strong>must not</strong> emit input objects, except for input objects of MapFunction, FlatMapFunction, MapPartitionFunction, GroupReduceFunction, GroupCombineFunction, CoGroupFunction, and InputFormat.next(reuse).</td>
+        You <strong>must not</strong> emit input objects, except for input objects of MapFunction, FlatMapFunction, MapPartitionFunction, GroupReduceFunction, GroupCombineFunction, CoGroupFunction, and InputFormat.next(reuse).
       </td>
    </tr>
    <tr>
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 7d88a36393c..85c60a67a22 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1638,8 +1638,9 @@ logged by `SystemResourcesMetricsInitializer` during the startup.
 
 ## Latency tracking
 
-Flink allows to track the latency of records traveling through the system. To enable the latency tracking
-a `latencyTrackingInterval` (in milliseconds) has to be set to a positive value in the `ExecutionConfig`.
+Flink allows to track the latency of records traveling through the system. This feature is disabled by default.
+To enable the latency tracking you must set the `latencyTrackingInterval` to a positive number in either the
+[Flink configuration]({{ site.baseurl }}/ops/config.html#metrics-latency-interval) or `ExecutionConfig`.
 
 At the `latencyTrackingInterval`, the sources will periodically emit a special record, called a `LatencyMarker`.
 The marker contains a timestamp from the time when the record has been emitted at the sources.
@@ -1659,6 +1660,9 @@ latency issues caused by individual machines.
 Currently, Flink assumes that the clocks of all machines in the cluster are in sync. We recommend setting
 up an automated clock synchronisation service (like NTP) to avoid false latency results.
 
+<span class="label label-danger">Warning</span> Enabling latency metrics can significantly impact the performance
+of the cluster. It is highly recommended to only use them for debugging purposes.
+
 ## REST API integration
 
 Metrics can be queried through the [Monitoring REST API]({{ site.baseurl }}/monitoring/rest_api.html).
diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index d563bcf108e..4f85e3cf8d5 100644
--- a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -1049,7 +1049,7 @@ private void handlePendingFilesForPreviousCheckpoints(Map<Long, List<String>> pe
 	}
 
 	/**
-	 * Sets the prefix of part files.  The default is no suffix.
+	 * Sets the suffix of part files.  The default is no suffix.
 	 */
 	public BucketingSink<T> setPartSuffix(String partSuffix) {
 		this.partSuffix = partSuffix;
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 59fa803791a..6b7caaac6ec 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -22,6 +22,7 @@
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.util.Preconditions;
 
@@ -131,7 +132,9 @@
 	/**
 	 * Interval in milliseconds for sending latency tracking marks from the sources to the sinks.
 	 */
-	private long latencyTrackingInterval = 2000L;
+	private long latencyTrackingInterval = MetricOptions.LATENCY_INTERVAL.defaultValue();
+
+	private boolean isLatencyTrackingConfigured = false;
 
 	/**
 	 * @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration
@@ -234,8 +237,6 @@ public long getAutoWatermarkInterval()  {
 	 * Interval for sending latency tracking marks from the sources to the sinks.
 	 * Flink will send latency tracking marks from the sources at the specified interval.
 	 *
-	 * Recommended value: 2000 (2 seconds).
-	 *
 	 * Setting a tracking interval <= 0 disables the latency tracking.
 	 *
 	 * @param interval Interval in milliseconds.
@@ -243,6 +244,7 @@ public long getAutoWatermarkInterval()  {
 	@PublicEvolving
 	public ExecutionConfig setLatencyTrackingInterval(long interval) {
 		this.latencyTrackingInterval = interval;
+		this.isLatencyTrackingConfigured = true;
 		return this;
 	}
 
@@ -256,12 +258,17 @@ public long getLatencyTrackingInterval() {
 	}
 
 	/**
-	 * Returns if latency tracking is enabled
-	 * @return True, if the tracking is enabled, false otherwise.
+	 * @deprecated will be removed in a future version
 	 */
 	@PublicEvolving
+	@Deprecated
 	public boolean isLatencyTrackingEnabled() {
-		return latencyTrackingInterval > 0;
+		return isLatencyTrackingConfigured && latencyTrackingInterval > 0;
+	}
+
+	@Internal
+	public boolean isLatencyTrackingConfigured() {
+		return isLatencyTrackingConfigured;
 	}
 
 	/**
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
index f9fd02423d8..67444a5397c 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -19,8 +19,10 @@
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.description.Description;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.configuration.description.TextElement.text;
 
 /**
  * Configuration options for metrics and metric reporters.
@@ -104,6 +106,24 @@
 			.defaultValue("<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>")
 			.withDescription("Defines the scope format string that is applied to all metrics scoped to an operator.");
 
+	public static final ConfigOption<Long> LATENCY_INTERVAL =
+		key("metrics.latency.interval")
+			.defaultValue(0L)
+			.withDescription("Defines the interval at which latency tracking marks are emitted from the sources." +
+				" Disables latency tracking if set to 0 or a negative value. Enabling this feature can significantly" +
+				" impact the performance of the cluster.");
+
+	public static final ConfigOption<String> LATENCY_SOURCE_GRANULARITY =
+		key("metrics.latency.granularity")
+			.defaultValue("operator")
+			.withDescription(Description.builder()
+				.text("Defines the granularity of latency metrics. Accepted values are:")
+				.list(
+					text("single - Track latency without differentiating between sources and subtasks."),
+					text("operator - Track latency while differentiating between sources, but not subtasks."),
+					text("subtask - Track latency while differentiating between sources and subtasks."))
+				.build());
+
 	/** The number of measured latencies to maintain at each operator. */
 	public static final ConfigOption<Integer> LATENCY_HISTORY_SIZE =
 		key("metrics.latency.history-size")
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 37f6d024a07..195812d1d1c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -711,10 +711,10 @@ abstract class TableEnvironment(val config: TableConfig) {
       case insert: SqlInsert =>
         // validate the SQL query
         val query = insert.getSource
-        planner.validate(query)
+        val validatedQuery = planner.validate(query)
 
         // get query result as Table
-        val queryResult = new Table(this, LogicalRelNode(planner.rel(query).rel))
+        val queryResult = new Table(this, LogicalRelNode(planner.rel(validatedQuery).rel))
 
         // get name of sink table
         val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
index 1aadf3140da..fa6f020ea6d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
@@ -19,6 +19,7 @@ package org.apache.flink.table.runtime.functions
 
 import java.lang.{StringBuilder, Long => JLong}
 import java.math.{BigDecimal => JBigDecimal}
+import java.nio.charset.StandardCharsets
 
 import org.apache.commons.codec.binary.{Base64, Hex}
 import org.apache.commons.lang3.StringUtils
@@ -207,12 +208,14 @@ object ScalarFunctions {
   /**
     * Returns the base string decoded with base64.
     */
-  def fromBase64(str: String): String = new String(Base64.decodeBase64(str))
+  def fromBase64(str: String): String =
+    new String(Base64.decodeBase64(str), StandardCharsets.UTF_8)
 
   /**
     * Returns the base64-encoded result of the input string.
     */
-  def toBase64(base: String): String = Base64.encodeBase64String(base.getBytes())
+  def toBase64(base: String): String =
+    Base64.encodeBase64String(base.getBytes(StandardCharsets.UTF_8))
 
   /**
     * Returns the hex string of a long argument.
@@ -222,7 +225,8 @@ object ScalarFunctions {
   /**
     * Returns the hex string of a string argument.
     */
-  def hex(x: String): String = Hex.encodeHexString(x.getBytes).toUpperCase()
+  def hex(x: String): String =
+    Hex.encodeHexString(x.getBytes(StandardCharsets.UTF_8)).toUpperCase()
 
   /**
     * Returns an UUID string using Java utilities.
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
index 083ed9468bf..6c477fd9ca9 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/validation/SortValidationTest.scala
@@ -38,7 +38,6 @@ class SortValidationTest extends TableTestBase {
     streamUtil.verifySql(sqlQuery, "")
   }
 
-
   // test should fail because time is not the primary order field
   @Test(expected = classOf[TableException])
   def testSortProcessingTimeSecondaryField(): Unit = {
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 503825475e7..fbd9b028547 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -483,6 +483,13 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "f24.hex()",
       "HEX(f24)",
       "2A5F546869732069732061207465737420537472696E672E")
+
+    testAllApis(
+      "你好".hex(),
+      "'你好'.hex()",
+      "HEX('你好')",
+      "E4BDA0E5A5BD"
+    )
   }
 
   @Test
@@ -563,6 +570,13 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "f33.fromBase64()",
       "FROM_BASE64(f33)",
       "null")
+
+    testAllApis(
+      "5L2g5aW9".fromBase64(),
+      "'5L2g5aW9'.fromBase64()",
+      "FROM_BASE64('5L2g5aW9')",
+      "你好"
+    )
   }
 
   @Test
@@ -591,6 +605,13 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
       "f33.toBase64()",
       "TO_BASE64(f33)",
       "null")
+
+    testAllApis(
+      "你好".toBase64(),
+      "'你好'.toBase64()",
+      "TO_BASE64('你好')",
+      "5L2g5aW9"
+    )
   }
 
   @Test
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
index 19db2a031b4..e7b79a5a196 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala
@@ -18,15 +18,17 @@
 
 package org.apache.flink.table.runtime.stream.sql
 
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{TableEnvironment, Types}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
 import org.apache.flink.table.runtime.stream.sql.SortITCase.StringRowSelectorSink
-import org.apache.flink.table.runtime.utils.{StreamITCase, StreamingWithStateTestBase}
+import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
+import org.apache.flink.table.utils.MemoryTableSourceSinkUtil
 import org.apache.flink.types.Row
 import org.junit.Assert._
 import org.junit._
@@ -105,6 +107,36 @@ class SortITCase extends StreamingWithStateTestBase {
       "20")
     assertEquals(expected, SortITCase.testResults)
   }
+
+  @Test
+  def testInsertIntoMemoryTableOrderBy(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    MemoryTableSourceSinkUtil.clear()
+
+    val t = StreamTestData.getSmall3TupleDataStream(env)
+      .assignAscendingTimestamps(x => x._2)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+    tEnv.registerTable("sourceTable", t)
+
+    val fieldNames = Array("d", "e", "f", "t")
+    val fieldTypes = Array(Types.INT, Types.LONG, Types.STRING, Types.SQL_TIMESTAMP)
+      .asInstanceOf[Array[TypeInformation[_]]]
+    val sink = new MemoryTableSourceSinkUtil.UnsafeMemoryAppendTableSink
+    tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
+
+    val sql = "INSERT INTO targetTable SELECT a, b, c, rowtime " +
+      "FROM sourceTable ORDER BY rowtime, a desc"
+    tEnv.sqlUpdate(sql)
+    env.execute()
+
+    val expected = List(
+      "1,1,Hi,1970-01-01 00:00:00.001",
+      "3,2,Hello world,1970-01-01 00:00:00.002",
+      "2,2,Hello,1970-01-01 00:00:00.002")
+    assertEquals(expected, MemoryTableSourceSinkUtil.tableDataStrings)
+  }
 }
 
 object SortITCase {
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
index cb0ad436a18..1edd79fca56 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
@@ -119,8 +119,10 @@ object MemoryTableSourceSinkUtil {
     }
 
     override def emitDataStream(dataStream: DataStream[Row]): Unit = {
+      val inputParallelism = dataStream.getParallelism
       dataStream
         .addSink(new MemoryAppendSink)
+        .setParallelism(inputParallelism)
         .name(TableConnectorUtil.generateRuntimeName(this.getClass, getFieldNames))
     }
   }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 801f35a41dc..ca206c15874 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -22,6 +22,7 @@
 import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -176,6 +177,8 @@
 
 	private volatile IOMetrics ioMetrics;
 
+	private int currentSplitIndex = 0;
+
 	// --------------------------------------------------------------------------------------------
 
 	/**
@@ -307,6 +310,12 @@ boolean tryAssignResource(final LogicalSlot logicalSlot) {
 		}
 	}
 
+	public InputSplit getNextInputSplit() {
+		final LogicalSlot slot = this.getAssignedResource();
+		final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null;
+		return this.vertex.getNextInputSplit(this.currentSplitIndex++, host);
+	}
+
 	@Override
 	public TaskManagerLocation getAssignedResourceLocation() {
 		// returns non-null only when a location is already assigned
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index e4228011830..4929d3931b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -23,6 +23,7 @@
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
@@ -104,6 +105,9 @@
 	/** The current or latest execution attempt of this vertex's task. */
 	private volatile Execution currentExecution;	// this field must never be null
 
+	/** input split*/
+	private ArrayList<InputSplit> inputSplits;
+
 	// --------------------------------------------------------------------------------------------
 
 	/**
@@ -185,6 +189,7 @@ public ExecutionVertex(
 		getExecutionGraph().registerExecution(currentExecution);
 
 		this.timeout = timeout;
+		this.inputSplits = new ArrayList<>();
 	}
 
 
@@ -249,6 +254,19 @@ public CoLocationConstraint getLocationConstraint() {
 		return locationConstraint;
 	}
 
+	public InputSplit getNextInputSplit(int index, String host) {
+		final int taskId = this.getParallelSubtaskIndex();
+		synchronized (this.inputSplits) {
+			if (index < this.inputSplits.size()) {
+				return this.inputSplits.get(index);
+			} else {
+				final InputSplit nextInputSplit = this.jobVertex.getSplitAssigner().getNextInputSplit(host, taskId);
+				this.inputSplits.add(nextInputSplit);
+				return nextInputSplit;
+			}
+		}
+	}
+
 	@Override
 	public Execution getCurrentExecutionAttempt() {
 		return currentExecution;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index 0019acf6e86..041fea00fa9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -226,7 +226,6 @@ public void releaseJob(JobID jobId, ExecutionAttemptID executionId) {
 			Set<ExecutionAttemptID> jobRefCounter = jobRefHolders.get(jobId);
 
 			if (jobRefCounter == null || jobRefCounter.isEmpty()) {
-				LOG.warn("improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId " + jobId);
 				return;
 			}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
index 4bad92f06b0..f368ff0365e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
@@ -90,11 +90,12 @@ public Buffer build() {
 	}
 
 	/**
-	 * @return a retained copy of self with separate indexes - it allows two read from the same {@link MemorySegment}
-	 * twice.
+	 * Returns a retained copy with separate indexes. This allows to read from the same {@link MemorySegment} twice.
 	 *
 	 * <p>WARNING: newly returned {@link BufferConsumer} will have reader index copied from the original buffer. In
 	 * other words, data already consumed before copying will not be visible to the returned copies.
+	 *
+	 * @return a retained copy of self with separate indexes
 	 */
 	public BufferConsumer copy() {
 		return new BufferConsumer(buffer.retainBuffer(), writerPosition.positionMarker, currentReaderPosition);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
index a4928ed307e..2927fae1069 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
@@ -34,7 +34,7 @@
 	/**
 	 * Destroys this buffer pool.
 	 *
-	 * <p> If not all buffers are available, they are recycled lazily as soon as they are recycled.
+	 * <p>If not all buffers are available, they are recycled lazily as soon as they are recycled.
 	 */
 	void lazyDestroy();
 
@@ -50,7 +50,7 @@
 	int getNumberOfRequiredMemorySegments();
 
 	/**
-	 * Returns the maximum number of memory segments this buffer pool should use
+	 * Returns the maximum number of memory segments this buffer pool should use.
 	 *
 	 * @return maximum number of memory segments to use or <tt>-1</tt> if unlimited
 	 */
@@ -59,14 +59,14 @@
 	/**
 	 * Returns the current size of this buffer pool.
 	 *
-	 * <p> The size of the buffer pool can change dynamically at runtime.
+	 * <p>The size of the buffer pool can change dynamically at runtime.
 	 */
 	int getNumBuffers();
 
 	/**
 	 * Sets the current size of this buffer pool.
 	 *
-	 * <p> The size needs to be greater or equal to the guaranteed number of memory segments.
+	 * <p>The size needs to be greater or equal to the guaranteed number of memory segments.
 	 */
 	void setNumBuffers(int numBuffers) throws IOException;
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
index ffed43251d8..c90e3025237 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
@@ -29,7 +29,7 @@
 	 * Tries to create a buffer pool, which is guaranteed to provide at least the number of required
 	 * buffers.
 	 *
-	 * <p> The buffer pool is of dynamic size with at least <tt>numRequiredBuffers</tt> buffers.
+	 * <p>The buffer pool is of dynamic size with at least <tt>numRequiredBuffers</tt> buffers.
 	 *
 	 * @param numRequiredBuffers
 	 * 		minimum number of network buffers in this pool
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolOwner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolOwner.java
index 66a69956008..acfe2405e20 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolOwner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolOwner.java
@@ -20,6 +20,9 @@
 
 import java.io.IOException;
 
+/**
+ * Interface for releasing memory buffers.
+ */
 public interface BufferPoolOwner {
 
 	void releaseMemory(int numBuffersToRecycle) throws IOException;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferRecycler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferRecycler.java
index a6495d02c9a..66b1fa2dc58 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferRecycler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferRecycler.java
@@ -20,6 +20,9 @@
 
 import org.apache.flink.core.memory.MemorySegment;
 
+/**
+ * Interface for recycling {@link MemorySegment}s.
+ */
 public interface BufferRecycler {
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FreeingBufferRecycler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FreeingBufferRecycler.java
index fdce8837046..548c0cc7948 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FreeingBufferRecycler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/FreeingBufferRecycler.java
@@ -24,11 +24,11 @@
  * A simple buffer recycler that frees the memory segments.
  */
 public class FreeingBufferRecycler implements BufferRecycler {
-	
+
 	public static final BufferRecycler INSTANCE = new FreeingBufferRecycler();
-	
+
 	// ------------------------------------------------------------------------
-	
+
 	// Not instantiable
 	private FreeingBufferRecycler() {}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index cc793635047..c6f3e158519 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -80,7 +80,7 @@ public void flush() {
 	@Override
 	public void finish() throws IOException {
 		add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true);
-		LOG.debug("Finished {}.", this);
+		LOG.debug("{}: Finished {}.", parent.getOwningTaskName(), this);
 	}
 
 	private boolean add(BufferConsumer bufferConsumer, boolean finish) {
@@ -132,7 +132,7 @@ public void release() {
 			isReleased = true;
 		}
 
-		LOG.debug("Released {}.", this);
+		LOG.debug("{}: Released {}.", parent.getOwningTaskName(), this);
 
 		if (view != null) {
 			view.releaseAllResources();
@@ -224,7 +224,8 @@ public PipelinedSubpartitionView createReadView(BufferAvailabilityListener avail
 					"Subpartition %s of is being (or already has been) consumed, " +
 					"but pipelined subpartitions can only be consumed once.", index, parent.getPartitionId());
 
-			LOG.debug("Creating read view for subpartition {} of partition {}.", index, parent.getPartitionId());
+			LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
+				parent.getOwningTaskName(), index, parent.getPartitionId());
 
 			readView = new PipelinedSubpartitionView(this, availabilityListener);
 			if (!buffers.isEmpty()) {
@@ -268,8 +269,8 @@ public String toString() {
 		}
 
 		return String.format(
-			"PipelinedSubpartition [number of buffers: %d (%d bytes), number of buffers in backlog: %d, finished? %s, read view? %s]",
-			numBuffers, numBytes, getBuffersInBacklog(), finished, hasReadView);
+			"PipelinedSubpartition#%d [number of buffers: %d (%d bytes), number of buffers in backlog: %d, finished? %s, read view? %s]",
+			index, numBuffers, numBytes, getBuffersInBacklog(), finished, hasReadView);
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index fbbfa4b45bb..93e5ba15097 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -199,6 +199,10 @@ public JobID getJobId() {
 		return jobId;
 	}
 
+	public String getOwningTaskName() {
+		return owningTaskName;
+	}
+
 	public ResultPartitionID getPartitionId() {
 		return partitionId;
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 69b461b1a4d..9f696adc362 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -143,6 +143,7 @@ public synchronized void finish() throws IOException {
 		if (spillWriter != null) {
 			spillWriter.close();
 		}
+		LOG.debug("{}: Finished {}.", parent.getOwningTaskName(), this);
 	}
 
 	@Override
@@ -180,6 +181,8 @@ public synchronized void release() throws IOException {
 			isReleased = true;
 		}
 
+		LOG.debug("{}: Released {}.", parent.getOwningTaskName(), this);
+
 		if (view != null) {
 			view.releaseAllResources();
 		}
@@ -236,8 +239,8 @@ public int releaseMemory() throws IOException {
 				long spilledBytes = spillFinishedBufferConsumers(isFinished);
 				int spilledBuffers = numberOfBuffers - buffers.size();
 
-				LOG.debug("Spilling {} bytes ({} buffers} for sub partition {} of {}.",
-					spilledBytes, spilledBuffers, index, parent.getPartitionId());
+				LOG.debug("{}: Spilling {} bytes ({} buffers} for sub partition {} of {}.",
+					parent.getOwningTaskName(), spilledBytes, spilledBuffers, index, parent.getPartitionId());
 
 				return spilledBuffers;
 			}
@@ -300,9 +303,9 @@ public int unsynchronizedGetNumberOfQueuedBuffers() {
 
 	@Override
 	public String toString() {
-		return String.format("SpillableSubpartition [%d number of buffers (%d bytes)," +
+		return String.format("SpillableSubpartition#%d [%d number of buffers (%d bytes)," +
 				"%d number of buffers in backlog, finished? %s, read view? %s, spilled? %s]",
-			getTotalNumberOfBuffers(), getTotalNumberOfBytes(),
+			index, getTotalNumberOfBuffers(), getTotalNumberOfBytes(),
 			getBuffersInBacklog(), isFinished, readView != null, spillWriter != null);
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 01cb2b6b099..14f2c950e67 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -24,7 +24,6 @@
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.StoppingException;
@@ -95,7 +94,6 @@
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskexecutor.AccumulatorReport;
@@ -569,16 +567,12 @@ public void start() {
 			return FutureUtils.completedExceptionally(new Exception("Cannot find execution vertex for vertex ID " + vertexID));
 		}
 
-		final InputSplitAssigner splitAssigner = vertex.getSplitAssigner();
-		if (splitAssigner == null) {
+		if (vertex.getSplitAssigner() == null) {
 			log.error("No InputSplitAssigner for vertex ID {}.", vertexID);
 			return FutureUtils.completedExceptionally(new Exception("No InputSplitAssigner for vertex ID " + vertexID));
 		}
 
-		final LogicalSlot slot = execution.getAssignedResource();
-		final int taskId = execution.getVertex().getParallelSubtaskIndex();
-		final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null;
-		final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId);
+		final InputSplit nextInputSplit = execution.getNextInputSplit();
 
 		if (log.isDebugEnabled()) {
 			log.debug("Send next input split {}.", nextInputSplit);
@@ -909,7 +903,7 @@ public void heartbeatFromResourceManager(final ResourceID resourceID) {
 	}
 
 	@Override
-	public CompletableFuture<JobDetails> requestJobDetails(@RpcTimeout Time timeout) {
+	public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
 		final ExecutionGraph currentExecutionGraph = executionGraph;
 		return CompletableFuture.supplyAsync(() -> WebMonitorUtils.createDetailsForJob(currentExecutionGraph), scheduledExecutorService);
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 981222d17a6..bc073c192bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -268,7 +268,7 @@ void heartbeatFromTaskManager(
 	CompletableFuture<String> triggerSavepoint(
 		@Nullable final String targetDirectory,
 		final boolean cancelJob,
-		final Time timeout);
+		@RpcTimeout final Time timeout);
 
 	/**
 	 * Requests the statistics on operator back pressure.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java
index 77b9847d331..c328dd52ce3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java
@@ -25,8 +25,8 @@
 import java.util.concurrent.CompletableFuture;
 
 /**
- * A leader listener that exposes a future for the first leader notification.  
- * 
+ * A leader listener that exposes a future for the first leader notification.
+ *
  * <p>The future can be obtained via the {@link #future()} method.
  */
 public class OneTimeLeaderListenerFuture implements LeaderRetrievalListener {
@@ -38,7 +38,7 @@ public OneTimeLeaderListenerFuture() {
 	}
 
 	/**
-	 * Gets the future that is completed with the leader address and ID. 
+	 * Gets the future that is completed with the leader address and ID.
 	 * @return The future.
 	 */
 	public CompletableFuture<LeaderAddressAndId> future() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
index 808de222d0a..1ef1f3b5832 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
@@ -43,8 +43,8 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import scala.Option;
 import scala.concurrent.Await;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 1c2d2a3ecaf..19de2102a35 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -100,9 +100,12 @@ public AbstractKeyedStateBackend(
 		ExecutionConfig executionConfig,
 		TtlTimeProvider ttlTimeProvider) {
 
+		Preconditions.checkArgument(numberOfKeyGroups >= 1, "NumberOfKeyGroups must be a positive number");
+		Preconditions.checkArgument(numberOfKeyGroups >= keyGroupRange.getNumberOfKeyGroups(), "The total number of key groups must be at least the number in the key group range assigned to this backend");
+
 		this.kvStateRegistry = kvStateRegistry;
 		this.keySerializer = Preconditions.checkNotNull(keySerializer);
-		this.numberOfKeyGroups = Preconditions.checkNotNull(numberOfKeyGroups);
+		this.numberOfKeyGroups = numberOfKeyGroups;
 		this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
 		this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
 		this.cancelStreamRegistry = new CloseableRegistry();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ConjunctFutureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ConjunctFutureTest.java
index f92504ef849..9f33866bf10 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ConjunctFutureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ConjunctFutureTest.java
@@ -193,7 +193,7 @@ public void testConjunctFutureFailureOnSuccessive() throws Exception {
 	}
 
 	/**
-	 * Tests that the conjunct future returns upon completion the collection of all future values
+	 * Tests that the conjunct future returns upon completion the collection of all future values.
 	 */
 	@Test
 	public void testConjunctFutureValue() throws ExecutionException, InterruptedException {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index 1639c919569..c386952c056 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -535,7 +535,7 @@ public void testCompleteAllExceptional() throws Exception {
 			final FlinkException suppressedException;
 
 			if (actual.equals(testException1)) {
-				 suppressedException = testException2;
+				suppressedException = testException2;
 			} else {
 				suppressedException = testException1;
 			}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
index fcbf9d553f4..5e8e42eee77 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java
@@ -26,6 +26,9 @@
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for the destruction of a {@link LocalBufferPool}.
+ */
 public class LocalBufferPoolDestroyTest {
 
 	/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index 7a6fe6a3c65..537d167908f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -50,17 +50,20 @@
 import static org.mockito.Mockito.verify;
 import static org.powermock.api.mockito.PowerMockito.spy;
 
+/**
+ * Tests for the {@link LocalBufferPool}.
+ */
 public class LocalBufferPoolTest extends TestLogger {
 
-	private final static int numBuffers = 1024;
+	private static final int numBuffers = 1024;
 
-	private final static int memorySegmentSize = 128;
+	private static final int memorySegmentSize = 128;
 
 	private NetworkBufferPool networkBufferPool;
 
 	private BufferPool localBufferPool;
 
-	private final static ExecutorService executor = Executors.newCachedThreadPool();
+	private static final ExecutorService executor = Executors.newCachedThreadPool();
 
 	@Before
 	public void setupLocalBufferPool() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LargeObjectType.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LargeObjectType.java
index 8efd2bb7021..dfea9376f02 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LargeObjectType.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LargeObjectType.java
@@ -16,20 +16,22 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.network.serialization.types;
 
-import java.io.IOException;
-import java.util.Random;
-
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.testutils.serialization.types.SerializationTestType;
 
+import java.io.IOException;
+import java.util.Random;
+
+/**
+ * A large {@link SerializationTestType}.
+ */
 public class LargeObjectType implements SerializationTestType {
 
 	private static final int MIN_LEN = 12 * 1000 * 1000;
-	
+
 	private static final int MAX_LEN = 40 * 1000 * 1000;
 
 	private int len;
@@ -68,13 +70,13 @@ public void write(DataOutputView out) throws IOException {
 	public void read(DataInputView in) throws IOException {
 		final int len = in.readInt();
 		this.len = len;
-		
+
 		for (int i = 0; i < len / 8; i++) {
 			if (in.readLong() != i) {
 				throw new IOException("corrupt serialization");
 			}
 		}
-		
+
 		for (int i = 0; i < len % 8; i++) {
 			if (in.readByte() != i) {
 				throw new IOException("corrupt serialization");
@@ -91,7 +93,7 @@ public int hashCode() {
 	public boolean equals(Object obj) {
 		return (obj instanceof LargeObjectType) && ((LargeObjectType) obj).len == this.len;
 	}
-	
+
 	@Override
 	public String toString() {
 		return "Large Object " + len;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 66ca769165a..adc401b96af 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -21,13 +21,17 @@
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.io.TextInputFormat;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
@@ -50,10 +54,12 @@
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
@@ -64,9 +70,11 @@
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.InputFormatVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
@@ -76,6 +84,8 @@
 import org.apache.flink.runtime.jobmaster.slotpool.DefaultSlotPoolFactory;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.DataSourceTask;
+import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
@@ -92,6 +102,7 @@
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
@@ -101,6 +112,7 @@
 import org.hamcrest.Matchers;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -108,6 +120,7 @@
 import org.junit.rules.TemporaryFolder;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -122,24 +135,29 @@
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.BiConsumer;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for {@link JobMaster}.
  */
 public class JobMasterTest extends TestLogger {
 
-	static final TestingInputSplit[] EMPTY_TESTING_INPUT_SPLITS = new TestingInputSplit[0];
+	private static final TestingInputSplit[] EMPTY_TESTING_INPUT_SPLITS = new TestingInputSplit[0];
 
 	@ClassRule
 	public static TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -418,7 +436,7 @@ public void testAutomaticRestartingWhenCheckpointing() throws Exception {
 	}
 
 	/**
-	 * Tests that an existing checkpoint will have precedence over an savepoint
+	 * Tests that an existing checkpoint will have precedence over an savepoint.
 	 */
 	@Test
 	public void testCheckpointPrecedesSavepointRecovery() throws Exception {
@@ -470,7 +488,7 @@ public void testCheckpointPrecedesSavepointRecovery() throws Exception {
 
 	/**
 	 * Tests that the JobMaster retries the scheduling of a job
-	 * in case of a missing slot offering from a registered TaskExecutor
+	 * in case of a missing slot offering from a registered TaskExecutor.
 	 */
 	@Test
 	public void testSlotRequestTimeoutWhenNoSlotOffering() throws Exception {
@@ -698,6 +716,66 @@ public void testResourceManagerConnectionAfterRegainingLeadership() throws Excep
 		}
 	}
 
+	private JobGraph createDataSourceJobGraph() {
+		final TextInputFormat inputFormat = new TextInputFormat(new Path("."));
+		final InputFormatVertex producer = new InputFormatVertex("Producer");
+		new TaskConfig(producer.getConfiguration()).setStubWrapper(new UserCodeObjectWrapper<InputFormat<?, ?>>(inputFormat));
+		producer.setInvokableClass(DataSourceTask.class);
+
+		final JobVertex consumer = new JobVertex("Consumer");
+		consumer.setInvokableClass(NoOpInvokable.class);
+		consumer.connectNewDataSetAsInput(producer, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+
+		final JobGraph jobGraph = new JobGraph(producer, consumer);
+		jobGraph.setAllowQueuedScheduling(true);
+
+		return jobGraph;
+	}
+
+	/**
+	 * Tests the {@link JobMaster#requestNextInputSplit(JobVertexID, ExecutionAttemptID)}
+	 * validate that it will get same result for a different retry
+	 */
+	@Test
+	public void testRequestNextInputSplitWithDataSourceFailover() throws Exception {
+
+		final JobGraph dataSourceJobGraph = createDataSourceJobGraph();
+		testJobMasterAPIWithMockExecution(dataSourceJobGraph, (tdd, jobMaster) ->{
+			try{
+				final JobMasterGateway gateway = jobMaster.getSelfGateway(JobMasterGateway.class);
+
+				final TaskInformation taskInformation = tdd.getSerializedTaskInformation()
+					.deserializeValue(getClass().getClassLoader());
+				JobVertexID vertexID = taskInformation.getJobVertexId();
+
+				//get the previous split
+				SerializedInputSplit split1 = gateway.requestNextInputSplit(vertexID, tdd.getExecutionAttemptId()).get();
+
+				//start a new version of this execution
+				ExecutionGraph executionGraph = jobMaster.getExecutionGraph();
+				Execution execution = executionGraph.getRegisteredExecutions().get(tdd.getExecutionAttemptId());
+				ExecutionVertex executionVertex = execution.getVertex();
+
+				long version = execution.getGlobalModVersion();
+				gateway.updateTaskExecutionState(new TaskExecutionState(dataSourceJobGraph.getJobID(), tdd.getExecutionAttemptId(), ExecutionState.FINISHED)).get();
+				Execution newExecution = executionVertex.resetForNewExecution(System.currentTimeMillis(), version);
+
+				//get the new split
+				SerializedInputSplit split2 = gateway.requestNextInputSplit(vertexID, newExecution.getAttemptId()).get();
+
+				Assert.assertArrayEquals(split1.getInputSplitData(), split2.getInputSplitData());
+
+				//get the new split3
+				SerializedInputSplit split3 = gateway.requestNextInputSplit(vertexID, newExecution.getAttemptId()).get();
+
+				Assert.assertNotEquals(split1.getInputSplitData().length, split3.getInputSplitData().length);
+			}
+			catch (Exception e){
+				Assert.fail(e.toString());
+			}
+		});
+	}
+
 	@Test
 	public void testRequestNextInputSplit() throws Exception {
 		final List<TestingInputSplit> expectedInputSplits = Arrays.asList(
@@ -844,16 +922,10 @@ public int hashCode() {
 		}
 	}
 
-	/**
-	 * Tests the {@link JobMaster#requestPartitionState(IntermediateDataSetID, ResultPartitionID)}
-	 * call for a finished result partition.
-	 */
-	@Test
-	public void testRequestPartitionState() throws Exception {
-		final JobGraph producerConsumerJobGraph = producerConsumerJobGraph();
+	private void testJobMasterAPIWithMockExecution(JobGraph graph, BiConsumer<TaskDeploymentDescriptor, JobMaster> consumer) throws  Exception{
 		final JobMaster jobMaster = createJobMaster(
 			configuration,
-			producerConsumerJobGraph,
+			graph,
 			haServices,
 			new TestingJobManagerSharedServicesBuilder().build(),
 			heartbeatServices);
@@ -874,9 +946,9 @@ public void testRequestPartitionState() throws Exception {
 			final CompletableFuture<TaskDeploymentDescriptor> tddFuture = new CompletableFuture<>();
 			final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
 				.setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
-					  tddFuture.complete(taskDeploymentDescriptor);
-					  return CompletableFuture.completedFuture(Acknowledge.get());
-				  })
+					tddFuture.complete(taskDeploymentDescriptor);
+					return CompletableFuture.completedFuture(Acknowledge.get());
+				})
 				.createTestingTaskExecutorGateway();
 			rpcService.registerGateway(testingTaskExecutorGateway.getAddress(), testingTaskExecutorGateway);
 
@@ -898,20 +970,90 @@ public void testRequestPartitionState() throws Exception {
 
 			// obtain tdd for the result partition ids
 			final TaskDeploymentDescriptor tdd = tddFuture.get();
+			consumer.accept(tdd, jobMaster);
+
+		} finally {
+			RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
+		}
+	}
 
-			assertThat(tdd.getProducedPartitions(), hasSize(1));
-			final ResultPartitionDeploymentDescriptor partition = tdd.getProducedPartitions().iterator().next();
+	/**
+	 * Tests the {@link JobMaster#requestPartitionState(IntermediateDataSetID, ResultPartitionID)}
+	 * call for a finished result partition.
+	 */
+	@Test
+	public void testRequestPartitionState() throws Exception {
+		JobGraph producerConsumerJobGraph = producerConsumerJobGraph();
+		testJobMasterAPIWithMockExecution(producerConsumerJobGraph, (tdd, jobMaster) ->{
+			try{
+				final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
+				assertThat(tdd.getProducedPartitions(), hasSize(1));
+				final ResultPartitionDeploymentDescriptor partition = tdd.getProducedPartitions().iterator().next();
 
-			final ExecutionAttemptID executionAttemptId = tdd.getExecutionAttemptId();
-			final ExecutionAttemptID copiedExecutionAttemptId = new ExecutionAttemptID(executionAttemptId.getLowerPart(), executionAttemptId.getUpperPart());
+				final ExecutionAttemptID executionAttemptId = tdd.getExecutionAttemptId();
+				final ExecutionAttemptID copiedExecutionAttemptId = new ExecutionAttemptID(executionAttemptId.getLowerPart(), executionAttemptId.getUpperPart());
 
-			// finish the producer task
-			jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(producerConsumerJobGraph.getJobID(), executionAttemptId, ExecutionState.FINISHED)).get();
+				// finish the producer task
+				jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(producerConsumerJobGraph.getJobID(), executionAttemptId, ExecutionState.FINISHED)).get();
 
-			// request the state of the result partition of the producer
-			final CompletableFuture<ExecutionState> partitionStateFuture = jobMasterGateway.requestPartitionState(partition.getResultId(), new ResultPartitionID(partition.getPartitionId(), copiedExecutionAttemptId));
+				// request the state of the result partition of the producer
+				final CompletableFuture<ExecutionState> partitionStateFuture = jobMasterGateway.requestPartitionState(partition.getResultId(), new ResultPartitionID(partition.getPartitionId(), copiedExecutionAttemptId));
+
+				assertThat(partitionStateFuture.get(), equalTo(ExecutionState.FINISHED));
+			}
+			catch (Exception e) {
+				Assert.fail(e.toString());
+			}
+		});
+	}
+
+	/**
+	 * Tests that the timeout in {@link JobMasterGateway#triggerSavepoint(String, boolean, Time)}
+	 * is respected.
+	 */
+	@Test
+	public void testTriggerSavepointTimeout() throws Exception {
+		final JobMaster jobMaster = new JobMaster(
+			rpcService,
+			JobMasterConfiguration.fromConfiguration(configuration),
+			jmResourceId,
+			jobGraph,
+			haServices,
+			DefaultSlotPoolFactory.fromConfiguration(configuration, rpcService),
+			new TestingJobManagerSharedServicesBuilder().build(),
+			heartbeatServices,
+			blobServer,
+			UnregisteredJobManagerJobMetricGroupFactory.INSTANCE,
+			new NoOpOnCompletionActions(),
+			testingFatalErrorHandler,
+			JobMasterTest.class.getClassLoader()) {
+
+			@Override
+			public CompletableFuture<String> triggerSavepoint(
+					@Nullable final String targetDirectory,
+					final boolean cancelJob,
+					final Time timeout) {
+				return new CompletableFuture<>();
+			}
+		};
+
+		try {
+			final CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout);
+			startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+			final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class);
+			final CompletableFuture<String> savepointFutureLowTimeout = jobMasterGateway.triggerSavepoint("/tmp", false, Time.milliseconds(1));
+			final CompletableFuture<String> savepointFutureHighTimeout = jobMasterGateway.triggerSavepoint("/tmp", false, RpcUtils.INF_TIMEOUT);
+
+			try {
+				savepointFutureLowTimeout.get(testingTimeout.getSize(), testingTimeout.getUnit());
+				fail();
+			} catch (final ExecutionException e) {
+				final Throwable cause = ExceptionUtils.stripExecutionException(e);
+				assertThat(cause, instanceOf(TimeoutException.class));
+			}
 
-			assertThat(partitionStateFuture.get(), equalTo(ExecutionState.FINISHED));
+			assertThat(savepointFutureHighTimeout.isDone(), is(equalTo(false)));
 		} finally {
 			RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout);
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 68858bc0cea..0c9dd49c1d1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -45,7 +45,6 @@
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
-import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
 import org.apache.flink.util.Preconditions;
@@ -91,6 +90,8 @@
 
 	private final JobVertexID jobVertexID;
 
+	private final TaskManagerRuntimeInfo taskManagerRuntimeInfo;
+
 	private final BroadcastVariableManager bcVarManager = new BroadcastVariableManager();
 
 	private final AccumulatorRegistry accumulatorRegistry;
@@ -127,7 +128,8 @@ protected MockEnvironment(
 		int parallelism,
 		int subtaskIndex,
 		ClassLoader userCodeClassLoader,
-		TaskMetricGroup taskMetricGroup) {
+		TaskMetricGroup taskMetricGroup,
+		TaskManagerRuntimeInfo taskManagerRuntimeInfo) {
 
 		this.jobID = jobID;
 		this.jobVertexID = jobVertexID;
@@ -140,6 +142,7 @@ protected MockEnvironment(
 
 		this.memManager = new MemoryManager(memorySize, 1);
 		this.ioManager = new IOManagerAsync();
+		this.taskManagerRuntimeInfo = taskManagerRuntimeInfo;
 
 		this.executionConfig = executionConfig;
 		this.inputSplitProvider = inputSplitProvider;
@@ -212,7 +215,7 @@ public Configuration getJobConfiguration() {
 
 	@Override
 	public TaskManagerRuntimeInfo getTaskManagerInfo() {
-		return new TestingTaskManagerRuntimeInfo();
+		return this.taskManagerRuntimeInfo;
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
index dfcc5f312e0..34a6ec492dc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
@@ -27,6 +27,8 @@
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 
 public class MockEnvironmentBuilder {
 	private String taskName = "mock-task";
@@ -43,6 +45,7 @@
 	private JobID jobID = new JobID();
 	private JobVertexID jobVertexID = new JobVertexID();
 	private TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+	private TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo();
 
 	public MockEnvironmentBuilder setTaskName(String taskName) {
 		this.taskName = taskName;
@@ -79,6 +82,11 @@ public MockEnvironmentBuilder setExecutionConfig(ExecutionConfig executionConfig
 		return this;
 	}
 
+	public MockEnvironmentBuilder setTaskManagerRuntimeInfo(TaskManagerRuntimeInfo taskManagerRuntimeInfo){
+		this.taskManagerRuntimeInfo = taskManagerRuntimeInfo;
+		return this;
+	}
+
 	public MockEnvironmentBuilder setMaxParallelism(int maxParallelism) {
 		this.maxParallelism = maxParallelism;
 		return this;
@@ -129,6 +137,7 @@ public MockEnvironment build() {
 			parallelism,
 			subtaskIndex,
 			userCodeClassLoader,
-			taskMetricGroup);
+			taskMetricGroup,
+			taskManagerRuntimeInfo);
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 649c6d03a6a..2634268c947 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -117,6 +117,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
@@ -2916,6 +2917,52 @@ public void testMapState() throws Exception {
 		backend.dispose();
 	}
 
+	/**
+	 * Verify iterator of {@link MapState} supporting arbitrary access, see [FLINK-10267] to know more details.
+	 */
+	@Test
+	public void testMapStateIteratorArbitraryAccess() throws Exception {
+		MapStateDescriptor<Integer, Long> kvId = new MapStateDescriptor<>("id", Integer.class, Long.class);
+
+		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
+		try {
+			MapState<Integer, Long> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+			backend.setCurrentKey(1);
+			int stateSize = 4096;
+			for (int i = 0; i < stateSize; i++) {
+				state.put(i, i * 2L);
+			}
+			Iterator<Map.Entry<Integer, Long>> iterator = state.iterator();
+			int iteratorCount = 0;
+			while (iterator.hasNext()) {
+				Map.Entry<Integer, Long> entry = iterator.next();
+				assertEquals(iteratorCount, (int) entry.getKey());
+				switch (ThreadLocalRandom.current().nextInt() % 3) {
+					case 0: // remove twice
+						iterator.remove();
+						try {
+							iterator.remove();
+							fail();
+						} catch (IllegalStateException e) {
+							// ignore expected exception
+						}
+						break;
+					case 1: // hasNext -> remove
+						iterator.hasNext();
+						iterator.remove();
+						break;
+					case 2: // nothing to do
+						break;
+				}
+				iteratorCount++;
+			}
+			assertEquals(stateSize, iteratorCount);
+		} finally {
+			backend.dispose();
+		}
+	}
+
 	/**
 	 * Verify that {@link ValueStateDescriptor} allows {@code null} as default.
 	 */
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 5c9f7f9f30c..cb656b53b1b 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -498,6 +498,7 @@ public UV setValue(UV value) {
 		 * have the same prefix, hence we can stop iterating once coming across an
 		 * entry with a different prefix.
 		 */
+		@Nonnull
 		private final byte[] keyPrefixBytes;
 
 		/**
@@ -508,6 +509,9 @@ public UV setValue(UV value) {
 
 		/** A in-memory cache for the entries in the rocksdb. */
 		private ArrayList<RocksDBMapEntry> cacheEntries = new ArrayList<>();
+
+		/** The entry pointing to the current position which is last returned by calling {@link #nextEntry()}. */
+		private RocksDBMapEntry currentEntry;
 		private int cacheIndex = 0;
 
 		private final TypeSerializer<UK> keySerializer;
@@ -537,12 +541,11 @@ public boolean hasNext() {
 
 		@Override
 		public void remove() {
-			if (cacheIndex == 0 || cacheIndex > cacheEntries.size()) {
+			if (currentEntry == null || currentEntry.deleted) {
 				throw new IllegalStateException("The remove operation must be called after a valid next operation.");
 			}
 
-			RocksDBMapEntry lastEntry = cacheEntries.get(cacheIndex - 1);
-			lastEntry.remove();
+			currentEntry.remove();
 		}
 
 		final RocksDBMapEntry nextEntry() {
@@ -556,10 +559,10 @@ final RocksDBMapEntry nextEntry() {
 				return null;
 			}
 
-			RocksDBMapEntry entry = cacheEntries.get(cacheIndex);
+			this.currentEntry = cacheEntries.get(cacheIndex);
 			cacheIndex++;
 
-			return entry;
+			return currentEntry;
 		}
 
 		private void loadCache() {
@@ -577,12 +580,11 @@ private void loadCache() {
 			try (RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(db, columnFamily)) {
 
 				/*
-				 * The iteration starts from the prefix bytes at the first loading. The cache then is
-				 * reloaded when the next entry to return is the last one in the cache. At that time,
-				 * we will start the iterating from the last returned entry.
- 				 */
-				RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? null : cacheEntries.get(cacheEntries.size() - 1);
-				byte[] startBytes = (lastEntry == null ? keyPrefixBytes : lastEntry.rawKeyBytes);
+				 * The iteration starts from the prefix bytes at the first loading. After #nextEntry() is called,
+				 * the currentEntry points to the last returned entry, and at that time, we will start
+				 * the iterating from currentEntry if reloading cache is needed.
+				 */
+				byte[] startBytes = (currentEntry == null ? keyPrefixBytes : currentEntry.rawKeyBytes);
 
 				cacheEntries.clear();
 				cacheIndex = 0;
@@ -590,10 +592,10 @@ private void loadCache() {
 				iterator.seek(startBytes);
 
 				/*
-				 * If the last returned entry is not deleted, it will be the first entry in the
-				 * iterating. Skip it to avoid redundant access in such cases.
+				 * If the entry pointing to the current position is not removed, it will be the first entry in the
+				 * new iterating. Skip it to avoid redundant access in such cases.
 				 */
-				if (lastEntry != null && !lastEntry.deleted) {
+				if (currentEntry != null && !currentEntry.deleted) {
 					iterator.next();
 				}
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
index 0407cc7e32a..a415a83b5d8 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
@@ -30,6 +30,8 @@
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
 import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
@@ -43,6 +45,8 @@
  */
 public class RocksDBResource extends ExternalResource {
 
+	private static final Logger LOG = LoggerFactory.getLogger(RocksDBResource.class);
+
 	/** Factory for {@link DBOptions} and {@link ColumnFamilyOptions}. */
 	private final OptionsFactory optionsFactory;
 
@@ -74,11 +78,25 @@ public RocksDBResource() {
 		this(new OptionsFactory() {
 			@Override
 			public DBOptions createDBOptions(DBOptions currentOptions) {
+				//close it before reuse the reference.
+				try {
+					currentOptions.close();
+				} catch (Exception e) {
+					LOG.error("Close previous DBOptions's instance failed.", e);
+				}
+
 				return PredefinedOptions.FLASH_SSD_OPTIMIZED.createDBOptions();
 			}
 
 			@Override
 			public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
+				//close it before reuse the reference.
+				try {
+					currentOptions.close();
+				} catch (Exception e) {
+					LOG.error("Close previous DBOptions's instance failed.", e);
+				}
+
 				return PredefinedOptions.FLASH_SSD_OPTIMIZED.createColumnOptions();
 			}
 		});
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 480f981bc75..9c36dab75fd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -29,6 +29,7 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 
 import org.slf4j.Logger;
@@ -201,6 +202,8 @@ protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List<URL>
 		configuration.setString(JobManagerOptions.ADDRESS, host);
 		configuration.setInteger(JobManagerOptions.PORT, port);
 
+		configuration.setInteger(RestOptions.PORT, port);
+
 		final ClusterClient<?> client;
 		try {
 			if (CoreOptions.LEGACY_MODE.equals(configuration.getString(CoreOptions.MODE))) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 010628f8161..c0216e56e2e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -48,7 +48,7 @@ protected StreamContextEnvironment(ContextEnvironment ctx) {
 
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
-		Preconditions.checkNotNull("Streaming Job name should not be null.");
+		Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
 
 		StreamGraph streamGraph = this.getStreamGraph();
 		streamGraph.setJobName(jobName);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
index d3ad0f9220f..591ebccea9f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
@@ -21,7 +21,7 @@
 
 /**
  * A stream data source that is executed in parallel. Upon execution, the runtime will
- * execute as many parallel instances of this function function as configured parallelism
+ * execute as many parallel instances of this function as configured parallelism
  * of the source.
  *
  * <p>This interface acts only as a marker to tell the system that this source may
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java
index 94b85b69aba..46c1443664d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java
@@ -22,7 +22,7 @@
 
 /**
  * Base class for implementing a parallel data source. Upon execution, the runtime will
- * execute as many parallel instances of this function function as configured parallelism
+ * execute as many parallel instances of this function as configured parallelism
  * of the source.
  *
  * <p>The data source has access to context information (such as the number of parallel
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index f52168bd9b9..f3c22080ab7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -72,6 +72,7 @@
 
 import java.io.Closeable;
 import java.io.Serializable;
+import java.util.Locale;
 
 /**
  * Base class for all stream operators. Operators that contain a user function should extend the class
@@ -193,11 +194,33 @@ public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<S
 				LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
 				historySize = MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
 			}
+
+			final String configuredGranularity = taskManagerConfig.getString(MetricOptions.LATENCY_SOURCE_GRANULARITY);
+			LatencyStats.Granularity granularity;
+			try {
+				granularity = LatencyStats.Granularity.valueOf(configuredGranularity.toUpperCase(Locale.ROOT));
+			} catch (IllegalArgumentException iae) {
+				granularity = LatencyStats.Granularity.OPERATOR;
+				LOG.warn(
+					"Configured value {} option for {} is invalid. Defaulting to {}.",
+					configuredGranularity,
+					MetricOptions.LATENCY_SOURCE_GRANULARITY.key(),
+					granularity);
+			}
 			TaskManagerJobMetricGroup jobMetricGroup = this.metrics.parent().parent();
-			this.latencyStats = new LatencyStats(jobMetricGroup.addGroup("latency"), historySize, container.getIndexInSubtaskGroup(), getOperatorID());
+			this.latencyStats = new LatencyStats(jobMetricGroup.addGroup("latency"),
+				historySize,
+				container.getIndexInSubtaskGroup(),
+				getOperatorID(),
+				granularity);
 		} catch (Exception e) {
 			LOG.warn("An error occurred while instantiating latency metrics.", e);
-			this.latencyStats = new LatencyStats(UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup().addGroup("latency"), 1, 0, new OperatorID());
+			this.latencyStats = new LatencyStats(
+				UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup().addGroup("latency"),
+				1,
+				0,
+				new OperatorID(),
+				LatencyStats.Granularity.SINGLE);
 		}
 
 		this.runtimeContext = new StreamingRuntimeContext(this, environment, container.getAccumulatorMap());
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 5600d8f13cc..63dd3e4d427 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -18,6 +18,8 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -62,12 +64,17 @@ public void run(final Object lockingObject,
 
 		final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
 
-		LatencyMarksEmitter latencyEmitter = null;
-		if (getExecutionConfig().isLatencyTrackingEnabled()) {
+		final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
+		final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured()
+			? getExecutionConfig().getLatencyTrackingInterval()
+			: configuration.getLong(MetricOptions.LATENCY_INTERVAL);
+
+		LatencyMarksEmitter<OUT> latencyEmitter = null;
+		if (latencyTrackingInterval > 0) {
 			latencyEmitter = new LatencyMarksEmitter<>(
 				getProcessingTimeService(),
 				collector,
-				getExecutionConfig().getLatencyTrackingInterval(),
+				latencyTrackingInterval,
 				this.getOperatorID(),
 				getRuntimeContext().getIndexOfThisSubtask());
 		}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java
index 4f3d33ec6f9..926753dc78f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java
@@ -34,23 +34,29 @@
 	private final int historySize;
 	private final int subtaskIndex;
 	private final OperatorID operatorId;
+	private final Granularity granularity;
 
-	public LatencyStats(MetricGroup metricGroup, int historySize, int subtaskIndex, OperatorID operatorID) {
+	public LatencyStats(
+			MetricGroup metricGroup,
+			int historySize,
+			int subtaskIndex,
+			OperatorID operatorID,
+			Granularity granularity) {
 		this.metricGroup = metricGroup;
 		this.historySize = historySize;
 		this.subtaskIndex = subtaskIndex;
 		this.operatorId = operatorID;
+		this.granularity = granularity;
 	}
 
 	public void reportLatency(LatencyMarker marker) {
-		String uniqueName =  "" + marker.getOperatorId() + marker.getSubtaskIndex() + operatorId + subtaskIndex;
+		final String uniqueName = granularity.createUniqueHistogramName(marker, operatorId, subtaskIndex);
+
 		DescriptiveStatisticsHistogram latencyHistogram = this.latencyStats.get(uniqueName);
 		if (latencyHistogram == null) {
 			latencyHistogram = new DescriptiveStatisticsHistogram(this.historySize);
 			this.latencyStats.put(uniqueName, latencyHistogram);
-			this.metricGroup
-				.addGroup("source_id", String.valueOf(marker.getOperatorId()))
-				.addGroup("source_subtask_index", String.valueOf(marker.getSubtaskIndex()))
+			granularity.createSourceMetricGroups(metricGroup, marker, operatorId, subtaskIndex)
 				.addGroup("operator_id", String.valueOf(operatorId))
 				.addGroup("operator_subtask_index", String.valueOf(subtaskIndex))
 				.histogram("latency", latencyHistogram);
@@ -59,4 +65,62 @@ public void reportLatency(LatencyMarker marker) {
 		long now = System.currentTimeMillis();
 		latencyHistogram.update(now - marker.getMarkedTime());
 	}
+
+	/**
+	 * Granularity for latency metrics.
+	 */
+	public enum Granularity {
+		SINGLE {
+			@Override
+			String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex) {
+				return String.valueOf(operatorId) + operatorSubtaskIndex;
+			}
+
+			@Override
+			MetricGroup createSourceMetricGroups(
+					MetricGroup base,
+					LatencyMarker marker,
+					OperatorID operatorId,
+					int operatorSubtaskIndex) {
+				return base;
+			}
+		},
+		OPERATOR {
+			@Override
+			String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex) {
+				return String.valueOf(marker.getOperatorId()) + operatorId + operatorSubtaskIndex;
+			}
+
+			@Override
+			MetricGroup createSourceMetricGroups(
+					MetricGroup base,
+					LatencyMarker marker,
+					OperatorID operatorId,
+					int operatorSubtaskIndex) {
+				return base
+					.addGroup("source_id", String.valueOf(marker.getOperatorId()));
+			}
+		},
+		SUBTASK {
+			@Override
+			String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex) {
+				return String.valueOf(marker.getOperatorId()) + marker.getSubtaskIndex() + operatorId + operatorSubtaskIndex;
+			}
+
+			@Override
+			MetricGroup createSourceMetricGroups(
+					MetricGroup base,
+					LatencyMarker marker,
+					OperatorID operatorId,
+					int operatorSubtaskIndex) {
+				return base
+					.addGroup("source_id", String.valueOf(marker.getOperatorId()))
+					.addGroup("source_subtask_index", String.valueOf(marker.getSubtaskIndex()));
+			}
+		};
+
+		abstract String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex);
+
+		abstract MetricGroup createSourceMetricGroups(MetricGroup base, LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex);
+	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java
new file mode 100644
index 00000000000..60ee66f4246
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Iterator;
+
+/**
+ * Tests for the {@link RemoteStreamEnvironment}.
+ */
+public class RemoteStreamExecutionEnvironmentTest extends TestLogger {
+
+	private static MiniCluster flink;
+
+	@BeforeClass
+	public static void setUp() throws Exception {
+		final Configuration config = new Configuration();
+		config.setInteger(RestOptions.PORT, 0);
+
+		final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
+			.setConfiguration(config)
+			.setNumTaskManagers(1)
+			.setNumSlotsPerTaskManager(1)
+			.build();
+
+		flink = new MiniCluster(miniClusterConfiguration);
+
+		flink.start();
+	}
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		if (flink != null) {
+			flink.close();
+		}
+	}
+
+	/**
+	 * Verifies that the port passed to the RemoteStreamEnvironment is used for connecting to the cluster.
+	 */
+	@Test
+	public void testPortForwarding() throws Exception {
+		final Configuration clientConfiguration = new Configuration();
+		clientConfiguration.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 0);
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
+			flink.getRestAddress().getHost(),
+			flink.getRestAddress().getPort(),
+			clientConfiguration);
+
+		final DataStream<Integer> resultStream = env.fromElements(1)
+			.map(x -> x * 2);
+
+		final Iterator<Integer> result = DataStreamUtils.collect(resultStream);
+		Assert.assertTrue(result.hasNext());
+		Assert.assertEquals(2, result.next().intValue());
+		Assert.assertFalse(result.hasNext());
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
new file mode 100644
index 00000000000..14d51474b57
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.CollectorOutput;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the emission of latency markers by {@link StreamSource} operators.
+ */
+public class StreamSourceOperatorLatencyMetricsTest extends TestLogger {
+
+	private static final long maxProcessingTime = 100L;
+	private static final long latencyMarkInterval = 10L;
+
+	/**
+	 * Verifies that by default no latency metrics are emitted.
+	 */
+	@Test
+	public void testLatencyMarkEmissionDisabled() throws Exception {
+		testLatencyMarkEmission(0, (operator, timeProvider) -> {
+			setupSourceOperator(operator, new ExecutionConfig(), MockEnvironment.builder().build(), timeProvider);
+		});
+	}
+
+	/**
+	 * Verifies that latency metrics can be enabled via the {@link ExecutionConfig}.
+	 */
+	@Test
+	public void testLatencyMarkEmissionEnabledViaExecutionConfig() throws Exception {
+		testLatencyMarkEmission((int) (maxProcessingTime / latencyMarkInterval) + 1, (operator, timeProvider) -> {
+			ExecutionConfig executionConfig = new ExecutionConfig();
+			executionConfig.setLatencyTrackingInterval(latencyMarkInterval);
+
+			setupSourceOperator(operator, executionConfig, MockEnvironment.builder().build(), timeProvider);
+		});
+	}
+
+	/**
+	 * Verifies that latency metrics can be enabled via the configuration.
+	 */
+	@Test
+	public void testLatencyMarkEmissionEnabledViaFlinkConfig() throws Exception {
+		testLatencyMarkEmission((int) (maxProcessingTime / latencyMarkInterval) + 1, (operator, timeProvider) -> {
+			Configuration tmConfig = new Configuration();
+			tmConfig.setLong(MetricOptions.LATENCY_INTERVAL, latencyMarkInterval);
+
+			Environment env = MockEnvironment.builder()
+				.setTaskManagerRuntimeInfo(new TestingTaskManagerRuntimeInfo(tmConfig))
+				.build();
+
+			setupSourceOperator(operator, new ExecutionConfig(), env, timeProvider);
+		});
+	}
+
+	/**
+	 * Verifies that latency metrics can be enabled via the {@link ExecutionConfig} even if they are disabled via
+	 * the configuration.
+	 */
+	@Test
+	public void testLatencyMarkEmissionEnabledOverrideViaExecutionConfig() throws Exception {
+		testLatencyMarkEmission((int) (maxProcessingTime / latencyMarkInterval) + 1, (operator, timeProvider) -> {
+			ExecutionConfig executionConfig = new ExecutionConfig();
+			executionConfig.setLatencyTrackingInterval(latencyMarkInterval);
+
+			Configuration tmConfig = new Configuration();
+			tmConfig.setLong(MetricOptions.LATENCY_INTERVAL, 0L);
+
+			Environment env = MockEnvironment.builder()
+				.setTaskManagerRuntimeInfo(new TestingTaskManagerRuntimeInfo(tmConfig))
+				.build();
+
+			setupSourceOperator(operator, executionConfig, env, timeProvider);
+		});
+	}
+
+	/**
+	 * Verifies that latency metrics can be disabled via the {@link ExecutionConfig} even if they are enabled via
+	 * the configuration.
+	 */
+	@Test
+	public void testLatencyMarkEmissionDisabledOverrideViaExecutionConfig() throws Exception {
+		testLatencyMarkEmission(0, (operator, timeProvider) -> {
+			Configuration tmConfig = new Configuration();
+			tmConfig.setLong(MetricOptions.LATENCY_INTERVAL, latencyMarkInterval);
+
+			Environment env = MockEnvironment.builder()
+				.setTaskManagerRuntimeInfo(new TestingTaskManagerRuntimeInfo(tmConfig))
+				.build();
+
+			ExecutionConfig executionConfig = new ExecutionConfig();
+			executionConfig.setLatencyTrackingInterval(0);
+
+			setupSourceOperator(operator, executionConfig, env, timeProvider);
+		});
+	}
+
+	private interface OperatorSetupOperation {
+		void setupSourceOperator(
+			StreamSource<Long, ?> operator,
+			TestProcessingTimeService testProcessingTimeService
+		);
+	}
+
+	private void testLatencyMarkEmission(int numberLatencyMarkers, OperatorSetupOperation operatorSetup) throws Exception {
+		final List<StreamElement> output = new ArrayList<>();
+
+		final TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
+		testProcessingTimeService.setCurrentTime(0L);
+		final List<Long> processingTimes = Arrays.asList(1L, 10L, 11L, 21L, maxProcessingTime);
+
+		// regular stream source operator
+		final StreamSource<Long, ProcessingTimeServiceSource> operator =
+			new StreamSource<>(new ProcessingTimeServiceSource(testProcessingTimeService, processingTimes));
+
+		operatorSetup.setupSourceOperator(operator, testProcessingTimeService);
+
+		// run and wait to be stopped
+		operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<Long>(output));
+
+		assertEquals(
+			numberLatencyMarkers + 1, // + 1 is the final watermark element
+			output.size());
+
+		long timestamp = 0L;
+
+		int i = 0;
+		// verify that its only latency markers + a final watermark
+		for (; i < numberLatencyMarkers; i++) {
+			StreamElement se = output.get(i);
+			Assert.assertTrue(se.isLatencyMarker());
+			Assert.assertEquals(operator.getOperatorID(), se.asLatencyMarker().getOperatorId());
+			Assert.assertEquals(0, se.asLatencyMarker().getSubtaskIndex());
+			Assert.assertTrue(se.asLatencyMarker().getMarkedTime() == timestamp);
+
+			timestamp += latencyMarkInterval;
+		}
+
+		Assert.assertTrue(output.get(i).isWatermark());
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static <T> void setupSourceOperator(
+			StreamSource<T, ?> operator,
+			ExecutionConfig executionConfig,
+			Environment env,
+			ProcessingTimeService timeProvider) {
+
+		StreamConfig cfg = new StreamConfig(new Configuration());
+		cfg.setStateBackend(new MemoryStateBackend());
+
+		cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
+		cfg.setOperatorID(new OperatorID());
+
+		StreamStatusMaintainer streamStatusMaintainer = mock(StreamStatusMaintainer.class);
+		when(streamStatusMaintainer.getStreamStatus()).thenReturn(StreamStatus.ACTIVE);
+
+		StreamTask<?, ?> mockTask = mock(StreamTask.class);
+		when(mockTask.getName()).thenReturn("Mock Task");
+		when(mockTask.getCheckpointLock()).thenReturn(new Object());
+		when(mockTask.getConfiguration()).thenReturn(cfg);
+		when(mockTask.getEnvironment()).thenReturn(env);
+		when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
+		when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
+		when(mockTask.getStreamStatusMaintainer()).thenReturn(streamStatusMaintainer);
+
+		doAnswer(new Answer<ProcessingTimeService>() {
+			@Override
+			public ProcessingTimeService answer(InvocationOnMock invocation) throws Throwable {
+				if (timeProvider == null) {
+					throw new RuntimeException("The time provider is null.");
+				}
+				return timeProvider;
+			}
+		}).when(mockTask).getProcessingTimeService();
+
+		operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) mock(Output.class));
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class ProcessingTimeServiceSource implements SourceFunction<Long> {
+
+		private final TestProcessingTimeService processingTimeService;
+		private final List<Long> processingTimes;
+
+		private boolean cancelled = false;
+
+		private ProcessingTimeServiceSource(TestProcessingTimeService processingTimeService, List<Long> processingTimes) {
+			this.processingTimeService = processingTimeService;
+			this.processingTimes = processingTimes;
+		}
+
+		@Override
+		public void run(SourceContext<Long> ctx) throws Exception {
+			for (Long processingTime : processingTimes) {
+				if (cancelled) {
+					break;
+				}
+
+				processingTimeService.setCurrentTime(processingTime);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			cancelled = true;
+		}
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java
similarity index 75%
rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java
index cf09a6ebdb8..4b5259e91c4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java
@@ -43,13 +43,11 @@
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.CollectorOutput;
 
-import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -63,7 +61,7 @@
  * Tests for {@link StreamSource} operators.
  */
 @SuppressWarnings("serial")
-public class StreamSourceOperatorTest {
+public class StreamSourceOperatorWatermarksTest {
 
 	@Test
 	public void testEmitMaxWatermarkForFiniteSource() throws Exception {
@@ -74,7 +72,7 @@ public void testEmitMaxWatermarkForFiniteSource() throws Exception {
 
 		final List<StreamElement> output = new ArrayList<>();
 
-		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0);
 		operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<String>(output));
 
 		assertEquals(1, output.size());
@@ -90,7 +88,7 @@ public void testNoMaxWatermarkOnImmediateCancel() throws Exception {
 		final StreamSource<String, InfiniteSource<String>> operator =
 				new StreamSource<>(new InfiniteSource<String>());
 
-		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0);
 		operator.cancel();
 
 		// run and exit
@@ -109,7 +107,7 @@ public void testNoMaxWatermarkOnAsyncCancel() throws Exception {
 		final StreamSource<String, InfiniteSource<String>> operator =
 				new StreamSource<>(new InfiniteSource<String>());
 
-		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0);
 
 		// trigger an async cancel in a bit
 		new Thread("canceler") {
@@ -141,7 +139,7 @@ public void testNoMaxWatermarkOnImmediateStop() throws Exception {
 		final StoppableStreamSource<String, InfiniteSource<String>> operator =
 				new StoppableStreamSource<>(new InfiniteSource<String>());
 
-		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0);
 		operator.stop();
 
 		// run and stop
@@ -159,7 +157,7 @@ public void testNoMaxWatermarkOnAsyncStop() throws Exception {
 		final StoppableStreamSource<String, InfiniteSource<String>> operator =
 				new StoppableStreamSource<>(new InfiniteSource<String>());
 
-		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0);
 
 		// trigger an async cancel in a bit
 		new Thread("canceler") {
@@ -178,53 +176,6 @@ public void run() {
 		assertTrue(output.isEmpty());
 	}
 
-	/**
-	 * Test that latency marks are emitted.
-	 */
-	@Test
-	public void testLatencyMarkEmission() throws Exception {
-		final List<StreamElement> output = new ArrayList<>();
-
-		final long maxProcessingTime = 100L;
-		final long latencyMarkInterval = 10L;
-
-		final TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
-		testProcessingTimeService.setCurrentTime(0L);
-		final List<Long> processingTimes = Arrays.asList(1L, 10L, 11L, 21L, maxProcessingTime);
-
-		// regular stream source operator
-		final StreamSource<Long, ProcessingTimeServiceSource> operator =
-				new StreamSource<>(new ProcessingTimeServiceSource(testProcessingTimeService, processingTimes));
-
-		// emit latency marks every 10 milliseconds.
-		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, latencyMarkInterval, testProcessingTimeService);
-
-		// run and wait to be stopped
-		operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<Long>(output));
-
-		int numberLatencyMarkers = (int) (maxProcessingTime / latencyMarkInterval) + 1;
-
-		assertEquals(
-			numberLatencyMarkers + 1, // + 1 is the final watermark element
-			output.size());
-
-		long timestamp = 0L;
-
-		int i = 0;
-		// and that its only latency markers + a final watermark
-		for (; i < output.size() - 1; i++) {
-			StreamElement se = output.get(i);
-			Assert.assertTrue(se.isLatencyMarker());
-			Assert.assertEquals(operator.getOperatorID(), se.asLatencyMarker().getOperatorId());
-			Assert.assertEquals(0, se.asLatencyMarker().getSubtaskIndex());
-			Assert.assertTrue(se.asLatencyMarker().getMarkedTime() == timestamp);
-
-			timestamp += latencyMarkInterval;
-		}
-
-		Assert.assertTrue(output.get(i).isWatermark());
-	}
-
 	@Test
 	public void testAutomaticWatermarkContext() throws Exception {
 
@@ -236,7 +187,7 @@ public void testAutomaticWatermarkContext() throws Exception {
 		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
 		processingTimeService.setCurrentTime(0);
 
-		setupSourceOperator(operator, TimeCharacteristic.IngestionTime, watermarkInterval, 0, processingTimeService);
+		setupSourceOperator(operator, TimeCharacteristic.IngestionTime, watermarkInterval, processingTimeService);
 
 		final List<StreamElement> output = new ArrayList<>();
 
@@ -271,21 +222,18 @@ public void testAutomaticWatermarkContext() throws Exception {
 	@SuppressWarnings("unchecked")
 	private static <T> void setupSourceOperator(StreamSource<T, ?> operator,
 			TimeCharacteristic timeChar,
-			long watermarkInterval,
-			long latencyMarkInterval) {
-		setupSourceOperator(operator, timeChar, watermarkInterval, latencyMarkInterval, new TestProcessingTimeService());
+			long watermarkInterval) {
+		setupSourceOperator(operator, timeChar, watermarkInterval, new TestProcessingTimeService());
 	}
 
 	@SuppressWarnings("unchecked")
 	private static <T> void setupSourceOperator(StreamSource<T, ?> operator,
 												TimeCharacteristic timeChar,
 												long watermarkInterval,
-												long latencyMarkInterval,
 												final ProcessingTimeService timeProvider) {
 
 		ExecutionConfig executionConfig = new ExecutionConfig();
 		executionConfig.setAutoWatermarkInterval(watermarkInterval);
-		executionConfig.setLatencyTrackingInterval(latencyMarkInterval);
 
 		StreamConfig cfg = new StreamConfig(new Configuration());
 		cfg.setStateBackend(new MemoryStateBackend());
@@ -355,33 +303,4 @@ public void stop() {
 			running = false;
 		}
 	}
-
-	private static final class ProcessingTimeServiceSource implements SourceFunction<Long> {
-
-		private final TestProcessingTimeService processingTimeService;
-		private final List<Long> processingTimes;
-
-		private boolean cancelled = false;
-
-		private ProcessingTimeServiceSource(TestProcessingTimeService processingTimeService, List<Long> processingTimes) {
-			this.processingTimeService = processingTimeService;
-			this.processingTimes = processingTimes;
-		}
-
-		@Override
-		public void run(SourceContext<Long> ctx) throws Exception {
-			for (Long processingTime : processingTimes) {
-				if (cancelled) {
-					break;
-				}
-
-				processingTimeService.setCurrentTime(processingTime);
-			}
-		}
-
-		@Override
-		public void cancel() {
-			cancelled = true;
-		}
-	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/LatencyStatsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/LatencyStatsTest.java
new file mode 100644
index 00000000000..ef14dcbb96b
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/LatencyStatsTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.GenericMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+/**
+ * Tests for the {@link LatencyStats}.
+ */
+public class LatencyStatsTest extends TestLogger {
+
+	private static final OperatorID OPERATOR_ID = new OperatorID();
+	private static final OperatorID SOURCE_ID_1 = new OperatorID();
+	private static final OperatorID SOURCE_ID_2 = new OperatorID();
+
+	private static final int OPERATOR_SUBTASK_INDEX = 64;
+
+	private static final String PARENT_GROUP_NAME = "parent";
+
+	@Test
+	public void testLatencyStatsSingle() {
+		testLatencyStats(LatencyStats.Granularity.SINGLE, registrations -> {
+			Assert.assertEquals(1, registrations.size());
+
+			{
+				final Tuple2<String, Histogram> registration = registrations.get(0);
+				assertName(registration.f0);
+				Assert.assertEquals(5, registration.f1.getCount());
+			}
+		});
+	}
+
+	@Test
+	public void testLatencyStatsOperator() {
+		testLatencyStats(LatencyStats.Granularity.OPERATOR, registrations -> {
+			Assert.assertEquals(2, registrations.size());
+
+			{
+				final Tuple2<String, Histogram> registration = registrations.get(0);
+				assertName(registration.f0, SOURCE_ID_1);
+				Assert.assertEquals(3, registration.f1.getCount());
+			}
+
+			{
+				final Tuple2<String, Histogram> registration = registrations.get(1);
+				assertName(registration.f0, SOURCE_ID_2);
+				Assert.assertEquals(2, registration.f1.getCount());
+			}
+		});
+	}
+
+	@Test
+	public void testLatencyStatsSubtask() {
+		testLatencyStats(LatencyStats.Granularity.SUBTASK, registrations -> {
+			Assert.assertEquals(4, registrations.size());
+
+			{
+				final Tuple2<String, Histogram> registration = registrations.get(0);
+				assertName(registration.f0, SOURCE_ID_1, 0);
+				Assert.assertEquals(2, registration.f1.getCount());
+			}
+
+			{
+				final Tuple2<String, Histogram> registration = registrations.get(1);
+				assertName(registration.f0, SOURCE_ID_1, 1);
+				Assert.assertEquals(1, registration.f1.getCount());
+			}
+
+			{
+				final Tuple2<String, Histogram> registration = registrations.get(2);
+				assertName(registration.f0, SOURCE_ID_2, 2);
+				Assert.assertEquals(1, registration.f1.getCount());
+			}
+
+			{
+				final Tuple2<String, Histogram> registration = registrations.get(3);
+				assertName(registration.f0, SOURCE_ID_2, 3);
+				Assert.assertEquals(1, registration.f1.getCount());
+			}
+		});
+	}
+
+	private static void testLatencyStats(
+		final LatencyStats.Granularity granularity,
+		final Consumer<List<Tuple2<String, Histogram>>> verifier) {
+
+		final AbstractMetricGroup<?> dummyGroup = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
+		final TestMetricRegistry registry = new TestMetricRegistry();
+		final MetricGroup parentGroup = new GenericMetricGroup(registry, dummyGroup, PARENT_GROUP_NAME);
+
+		final LatencyStats latencyStats = new LatencyStats(
+			parentGroup,
+			MetricOptions.LATENCY_HISTORY_SIZE.defaultValue(),
+			OPERATOR_SUBTASK_INDEX,
+			OPERATOR_ID,
+			granularity);
+
+		latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 0));
+		latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 0));
+		latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 1));
+		latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_2, 2));
+		latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_2, 3));
+
+		verifier.accept(registry.latencyHistograms);
+	}
+
+	/**
+	 * Removes all parts from the metric identifier preceding the latency-related parts.
+	 */
+	private static String sanitizeName(final String registrationName) {
+		return registrationName.substring(registrationName.lastIndexOf(PARENT_GROUP_NAME) + PARENT_GROUP_NAME.length() + 1);
+	}
+
+	private static void assertName(final String registrationName) {
+		final String sanitizedName = sanitizeName(registrationName);
+		Assert.assertEquals("operator_id." + OPERATOR_ID +
+			".operator_subtask_index." + OPERATOR_SUBTASK_INDEX +
+			".latency", sanitizedName);
+	}
+
+	private static void assertName(final String registrationName, final OperatorID sourceId) {
+		final String sanitizedName = sanitizeName(registrationName);
+		Assert.assertEquals("source_id." + sourceId +
+			".operator_id." + OPERATOR_ID +
+			".operator_subtask_index." + OPERATOR_SUBTASK_INDEX +
+			".latency", sanitizedName);
+	}
+
+	private static void assertName(final String registrationName, final OperatorID sourceId, final int sourceIndex) {
+		final String sanitizedName = sanitizeName(registrationName);
+		Assert.assertEquals("source_id." + sourceId +
+			".source_subtask_index." + sourceIndex +
+			".operator_id." + OPERATOR_ID +
+			".operator_subtask_index." + OPERATOR_SUBTASK_INDEX +
+			".latency", sanitizedName);
+	}
+
+	private static class TestMetricRegistry implements MetricRegistry {
+
+		private final List<Tuple2<String, Histogram>> latencyHistograms = new ArrayList<>(4);
+
+		@Override
+		public void register(Metric metric, String metricName, AbstractMetricGroup group) {
+			if (metric instanceof Histogram) {
+				latencyHistograms.add(Tuple2.of(group.getMetricIdentifier(metricName), (Histogram) metric));
+			}
+		}
+
+		@Override
+		public char getDelimiter() {
+			return '.';
+		}
+
+		@Override
+		public char getDelimiter(int index) {
+			return 0;
+		}
+
+		@Override
+		public int getNumberReporters() {
+			return 0;
+		}
+
+		@Override
+		public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {
+
+		}
+
+		@Override
+		public ScopeFormats getScopeFormats() {
+			return null;
+		}
+
+		@Nullable
+		@Override
+		public String getMetricQueryServicePath() {
+			return null;
+		}
+	}
+}
diff --git a/tools/maven/suppressions-core.xml b/tools/maven/suppressions-core.xml
index db5d16a3671..5c1a914f97c 100644
--- a/tools/maven/suppressions-core.xml
+++ b/tools/maven/suppressions-core.xml
@@ -23,10 +23,6 @@ under the License.
 	"http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
 
 <suppressions>
-	<suppress
-		files="(.*)api[/\\]java[/\\]functions[/\\](.*)"
-		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-
 	<suppress
 		files="(.*)api[/\\]java[/\\]typeutils[/\\]runtime[/\\](.*)"
 		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
@@ -76,7 +72,7 @@ under the License.
 		checks="AvoidStarImport|ArrayTypeStyle|Regexp"/>
 
 	<suppress
-		files="(.*)api[/\\]common[/\\](distributions|restartstrategy)[/\\](.*)"
+		files="(.*)api[/\\]common[/\\]distributions[/\\](.*)"
 		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
 
 	<suppress
@@ -91,10 +87,6 @@ under the License.
 		files="(.*)test[/\\](.*)core[/\\]io[/\\](.*)"
 		checks="AvoidStarImport"/>
 
-	<suppress
-		files="(.*)migration[/\\](.*)"
-		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-
 	<suppress
 		files="(.*)types[/\\](.*)"
 		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
@@ -103,11 +95,6 @@ under the License.
 		files="(.*)test[/\\](.*)types[/\\](.*)"
 		checks="AvoidStarImport|NeedBraces"/>
 
-	<!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory-->
-	<suppress
-		files="(.*)test[/\\](.*)util[/\\](.*)"
-		checks="UnusedImports|AvoidStarImport"/>
-
 	<suppress
 		files="(.*)test[/\\](.*)testutils[/\\](.*)"
 		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
diff --git a/tools/maven/suppressions-runtime.xml b/tools/maven/suppressions-runtime.xml
index 33a92e3075f..5efc9744037 100644
--- a/tools/maven/suppressions-runtime.xml
+++ b/tools/maven/suppressions-runtime.xml
@@ -44,13 +44,6 @@ under the License.
 	<suppress
 		files="(.*)test[/\\](.*)runtime[/\\]clusterframework[/\\](.*)"
 		checks="AvoidStarImport"/>
-	<suppress
-		files="(.*)runtime[/\\]concurrent[/\\](.*)"
-		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-	<!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory-->
-	<suppress
-		files="(.*)test[/\\](.*)runtime[/\\]concurrent[/\\](.*)"
-		checks="AvoidStarImport"/>
 	<suppress
 		files="(.*)runtime[/\\]execution[/\\](.*)"
 		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
@@ -80,18 +73,18 @@ under the License.
 		files="(.*)test[/\\](.*)runtime[/\\]instance[/\\](.*)"
 		checks="AvoidStarImport"/>
 	<suppress
-		files="(.*)runtime[/\\]io[/\\](async|disk)[/\\](.*)"
+		files="(.*)runtime[/\\]io[/\\]disk[/\\](.*)"
 		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
 	<!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory-->
 	<suppress
-		files="(.*)test[/\\](.*)runtime[/\\]io[/\\](async|disk)[/\\](.*)"
+		files="(.*)test[/\\](.*)runtime[/\\]io[/\\]disk[/\\](.*)"
 		checks="AvoidStarImport|UnusedImports"/>
 	<suppress
-		files="(.*)runtime[/\\]io[/\\]network[/\\](buffer|netty|partition|serialization|util)[/\\](.*)"
+		files="(.*)runtime[/\\]io[/\\]network[/\\](netty|partition|util)[/\\](.*)"
 		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
 	<!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory-->
 	<suppress
-		files="(.*)test[/\\](.*)runtime[/\\]io[/\\]network[/\\](buffer|netty|partition|serialization|util)[/\\](.*)"
+		files="(.*)test[/\\](.*)runtime[/\\]io[/\\]network[/\\](netty|partition|util)[/\\](.*)"
 		checks="AvoidStarImport|UnusedImports"/>
 	<!--Test class copied from the netty project-->
 	<suppress
@@ -132,13 +125,6 @@ under the License.
 	<suppress
 		files="(.*)test[/\\](.*)runtime[/\\]messages[/\\](.*)"
 		checks="AvoidStarImport"/>
-	<suppress
-		files="(.*)runtime[/\\]minicluster[/\\](.*)"
-		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-	<!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory-->
-	<suppress
-		files="(.*)test[/\\](.*)runtime[/\\]minicluster[/\\](.*)"
-		checks="AvoidStarImport"/>
 	<suppress
 		files="(.*)runtime[/\\]operators[/\\](.*)"
 		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
@@ -184,10 +170,6 @@ under the License.
 	<suppress
 		files="(.*)runtime[/\\]testutils[/\\](.*)"
 		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-	<!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory-->
-	<suppress
-		files="(.*)test[/\\](.*)runtime[/\\]testutils[/\\](.*)"
-		checks="AvoidStarImport"/>
 	<suppress
 		files="(.*)runtime[/\\]util[/\\](.*)"
 		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
@@ -198,10 +180,6 @@ under the License.
 	<suppress
 		files="(.*)runtime[/\\]zookeeper[/\\](.*)"
 		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>
-	<!--Only additional checks for test sources. Those checks were present in the "pre-strict" checkstyle but were not applied to test sources. We do not want to suppress them for sources directory-->
-	<suppress
-		files="(.*)test[/\\](.*)runtime[/\\]zookeeper[/\\](.*)"
-		checks="AvoidStarImport"/>
 	<suppress
 		files="(.*)StateBackendTestBase.java"
 		checks="FileLength"/>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services