You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/09/07 12:27:08 UTC

[GitHub] zentol closed pull request #6656: [FLINK-10242][metrics] Disable latency metrics by default

zentol closed pull request #6656: [FLINK-10242][metrics] Disable latency metrics by default
URL: https://github.com/apache/flink/pull/6656
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/metric_configuration.html b/docs/_includes/generated/metric_configuration.html
index 98054e94224..02f4ceb162f 100644
--- a/docs/_includes/generated/metric_configuration.html
+++ b/docs/_includes/generated/metric_configuration.html
@@ -12,6 +12,11 @@
             <td style="word-wrap: break-word;">128</td>
             <td>Defines the number of measured latencies to maintain at each operator.</td>
         </tr>
+        <tr>
+            <td><h5>metrics.latency.interval</h5></td>
+            <td style="word-wrap: break-word;">0</td>
+            <td>Defines the interval at which latency tracking marks are emitted from the sources. Disables latency tracking if set to 0 or a negative value. Enabling this feature can significantly impact the performance of the cluster.</td>
+        </tr>
         <tr>
             <td><h5>metrics.reporter.&lt;name&gt;.&lt;parameter&gt;</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 7d88a36393c..85c60a67a22 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1638,8 +1638,9 @@ logged by `SystemResourcesMetricsInitializer` during the startup.
 
 ## Latency tracking
 
-Flink allows to track the latency of records traveling through the system. To enable the latency tracking
-a `latencyTrackingInterval` (in milliseconds) has to be set to a positive value in the `ExecutionConfig`.
+Flink allows to track the latency of records traveling through the system. This feature is disabled by default.
+To enable the latency tracking you must set the `latencyTrackingInterval` to a positive number in either the
+[Flink configuration]({{ site.baseurl }}/ops/config.html#metrics-latency-interval) or `ExecutionConfig`.
 
 At the `latencyTrackingInterval`, the sources will periodically emit a special record, called a `LatencyMarker`.
 The marker contains a timestamp from the time when the record has been emitted at the sources.
@@ -1659,6 +1660,9 @@ latency issues caused by individual machines.
 Currently, Flink assumes that the clocks of all machines in the cluster are in sync. We recommend setting
 up an automated clock synchronisation service (like NTP) to avoid false latency results.
 
+<span class="label label-danger">Warning</span> Enabling latency metrics can significantly impact the performance
+of the cluster. It is highly recommended to only use them for debugging purposes.
+
 ## REST API integration
 
 Metrics can be queried through the [Monitoring REST API]({{ site.baseurl }}/monitoring/rest_api.html).
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 59fa803791a..6b7caaac6ec 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -22,6 +22,7 @@
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.util.Preconditions;
 
@@ -131,7 +132,9 @@
 	/**
 	 * Interval in milliseconds for sending latency tracking marks from the sources to the sinks.
 	 */
-	private long latencyTrackingInterval = 2000L;
+	private long latencyTrackingInterval = MetricOptions.LATENCY_INTERVAL.defaultValue();
+
+	private boolean isLatencyTrackingConfigured = false;
 
 	/**
 	 * @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration
@@ -234,8 +237,6 @@ public long getAutoWatermarkInterval()  {
 	 * Interval for sending latency tracking marks from the sources to the sinks.
 	 * Flink will send latency tracking marks from the sources at the specified interval.
 	 *
-	 * Recommended value: 2000 (2 seconds).
-	 *
 	 * Setting a tracking interval <= 0 disables the latency tracking.
 	 *
 	 * @param interval Interval in milliseconds.
@@ -243,6 +244,7 @@ public long getAutoWatermarkInterval()  {
 	@PublicEvolving
 	public ExecutionConfig setLatencyTrackingInterval(long interval) {
 		this.latencyTrackingInterval = interval;
+		this.isLatencyTrackingConfigured = true;
 		return this;
 	}
 
@@ -256,12 +258,17 @@ public long getLatencyTrackingInterval() {
 	}
 
 	/**
-	 * Returns if latency tracking is enabled
-	 * @return True, if the tracking is enabled, false otherwise.
+	 * @deprecated will be removed in a future version
 	 */
 	@PublicEvolving
+	@Deprecated
 	public boolean isLatencyTrackingEnabled() {
-		return latencyTrackingInterval > 0;
+		return isLatencyTrackingConfigured && latencyTrackingInterval > 0;
+	}
+
+	@Internal
+	public boolean isLatencyTrackingConfigured() {
+		return isLatencyTrackingConfigured;
 	}
 
 	/**
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
index f9fd02423d8..336ead6e193 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -19,6 +19,7 @@
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.description.Description;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
 
@@ -104,6 +105,13 @@
 			.defaultValue("<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>")
 			.withDescription("Defines the scope format string that is applied to all metrics scoped to an operator.");
 
+	public static final ConfigOption<Long> LATENCY_INTERVAL =
+		key("metrics.latency.interval")
+			.defaultValue(0L)
+			.withDescription("Defines the interval at which latency tracking marks are emitted from the sources." +
+				" Disables latency tracking if set to 0 or a negative value. Enabling this feature can significantly" +
+				" impact the performance of the cluster.");
+
 	/** The number of measured latencies to maintain at each operator. */
 	public static final ConfigOption<Integer> LATENCY_HISTORY_SIZE =
 		key("metrics.latency.history-size")
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 68858bc0cea..0c9dd49c1d1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -45,7 +45,6 @@
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
-import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
 import org.apache.flink.util.Preconditions;
@@ -91,6 +90,8 @@
 
 	private final JobVertexID jobVertexID;
 
+	private final TaskManagerRuntimeInfo taskManagerRuntimeInfo;
+
 	private final BroadcastVariableManager bcVarManager = new BroadcastVariableManager();
 
 	private final AccumulatorRegistry accumulatorRegistry;
@@ -127,7 +128,8 @@ protected MockEnvironment(
 		int parallelism,
 		int subtaskIndex,
 		ClassLoader userCodeClassLoader,
-		TaskMetricGroup taskMetricGroup) {
+		TaskMetricGroup taskMetricGroup,
+		TaskManagerRuntimeInfo taskManagerRuntimeInfo) {
 
 		this.jobID = jobID;
 		this.jobVertexID = jobVertexID;
@@ -140,6 +142,7 @@ protected MockEnvironment(
 
 		this.memManager = new MemoryManager(memorySize, 1);
 		this.ioManager = new IOManagerAsync();
+		this.taskManagerRuntimeInfo = taskManagerRuntimeInfo;
 
 		this.executionConfig = executionConfig;
 		this.inputSplitProvider = inputSplitProvider;
@@ -212,7 +215,7 @@ public Configuration getJobConfiguration() {
 
 	@Override
 	public TaskManagerRuntimeInfo getTaskManagerInfo() {
-		return new TestingTaskManagerRuntimeInfo();
+		return this.taskManagerRuntimeInfo;
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
index dfcc5f312e0..34a6ec492dc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
@@ -27,6 +27,8 @@
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 
 public class MockEnvironmentBuilder {
 	private String taskName = "mock-task";
@@ -43,6 +45,7 @@
 	private JobID jobID = new JobID();
 	private JobVertexID jobVertexID = new JobVertexID();
 	private TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+	private TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo();
 
 	public MockEnvironmentBuilder setTaskName(String taskName) {
 		this.taskName = taskName;
@@ -79,6 +82,11 @@ public MockEnvironmentBuilder setExecutionConfig(ExecutionConfig executionConfig
 		return this;
 	}
 
+	public MockEnvironmentBuilder setTaskManagerRuntimeInfo(TaskManagerRuntimeInfo taskManagerRuntimeInfo){
+		this.taskManagerRuntimeInfo = taskManagerRuntimeInfo;
+		return this;
+	}
+
 	public MockEnvironmentBuilder setMaxParallelism(int maxParallelism) {
 		this.maxParallelism = maxParallelism;
 		return this;
@@ -129,6 +137,7 @@ public MockEnvironment build() {
 			parallelism,
 			subtaskIndex,
 			userCodeClassLoader,
-			taskMetricGroup);
+			taskMetricGroup,
+			taskManagerRuntimeInfo);
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 5600d8f13cc..63dd3e4d427 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -18,6 +18,8 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -62,12 +64,17 @@ public void run(final Object lockingObject,
 
 		final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
 
-		LatencyMarksEmitter latencyEmitter = null;
-		if (getExecutionConfig().isLatencyTrackingEnabled()) {
+		final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
+		final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured()
+			? getExecutionConfig().getLatencyTrackingInterval()
+			: configuration.getLong(MetricOptions.LATENCY_INTERVAL);
+
+		LatencyMarksEmitter<OUT> latencyEmitter = null;
+		if (latencyTrackingInterval > 0) {
 			latencyEmitter = new LatencyMarksEmitter<>(
 				getProcessingTimeService(),
 				collector,
-				getExecutionConfig().getLatencyTrackingInterval(),
+				latencyTrackingInterval,
 				this.getOperatorID(),
 				getRuntimeContext().getIndexOfThisSubtask());
 		}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
new file mode 100644
index 00000000000..14d51474b57
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.util.CollectorOutput;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the emission of latency markers by {@link StreamSource} operators.
+ */
+public class StreamSourceOperatorLatencyMetricsTest extends TestLogger {
+
+	private static final long maxProcessingTime = 100L;
+	private static final long latencyMarkInterval = 10L;
+
+	/**
+	 * Verifies that by default no latency metrics are emitted.
+	 */
+	@Test
+	public void testLatencyMarkEmissionDisabled() throws Exception {
+		testLatencyMarkEmission(0, (operator, timeProvider) -> {
+			setupSourceOperator(operator, new ExecutionConfig(), MockEnvironment.builder().build(), timeProvider);
+		});
+	}
+
+	/**
+	 * Verifies that latency metrics can be enabled via the {@link ExecutionConfig}.
+	 */
+	@Test
+	public void testLatencyMarkEmissionEnabledViaExecutionConfig() throws Exception {
+		testLatencyMarkEmission((int) (maxProcessingTime / latencyMarkInterval) + 1, (operator, timeProvider) -> {
+			ExecutionConfig executionConfig = new ExecutionConfig();
+			executionConfig.setLatencyTrackingInterval(latencyMarkInterval);
+
+			setupSourceOperator(operator, executionConfig, MockEnvironment.builder().build(), timeProvider);
+		});
+	}
+
+	/**
+	 * Verifies that latency metrics can be enabled via the configuration.
+	 */
+	@Test
+	public void testLatencyMarkEmissionEnabledViaFlinkConfig() throws Exception {
+		testLatencyMarkEmission((int) (maxProcessingTime / latencyMarkInterval) + 1, (operator, timeProvider) -> {
+			Configuration tmConfig = new Configuration();
+			tmConfig.setLong(MetricOptions.LATENCY_INTERVAL, latencyMarkInterval);
+
+			Environment env = MockEnvironment.builder()
+				.setTaskManagerRuntimeInfo(new TestingTaskManagerRuntimeInfo(tmConfig))
+				.build();
+
+			setupSourceOperator(operator, new ExecutionConfig(), env, timeProvider);
+		});
+	}
+
+	/**
+	 * Verifies that latency metrics can be enabled via the {@link ExecutionConfig} even if they are disabled via
+	 * the configuration.
+	 */
+	@Test
+	public void testLatencyMarkEmissionEnabledOverrideViaExecutionConfig() throws Exception {
+		testLatencyMarkEmission((int) (maxProcessingTime / latencyMarkInterval) + 1, (operator, timeProvider) -> {
+			ExecutionConfig executionConfig = new ExecutionConfig();
+			executionConfig.setLatencyTrackingInterval(latencyMarkInterval);
+
+			Configuration tmConfig = new Configuration();
+			tmConfig.setLong(MetricOptions.LATENCY_INTERVAL, 0L);
+
+			Environment env = MockEnvironment.builder()
+				.setTaskManagerRuntimeInfo(new TestingTaskManagerRuntimeInfo(tmConfig))
+				.build();
+
+			setupSourceOperator(operator, executionConfig, env, timeProvider);
+		});
+	}
+
+	/**
+	 * Verifies that latency metrics can be disabled via the {@link ExecutionConfig} even if they are enabled via
+	 * the configuration.
+	 */
+	@Test
+	public void testLatencyMarkEmissionDisabledOverrideViaExecutionConfig() throws Exception {
+		testLatencyMarkEmission(0, (operator, timeProvider) -> {
+			Configuration tmConfig = new Configuration();
+			tmConfig.setLong(MetricOptions.LATENCY_INTERVAL, latencyMarkInterval);
+
+			Environment env = MockEnvironment.builder()
+				.setTaskManagerRuntimeInfo(new TestingTaskManagerRuntimeInfo(tmConfig))
+				.build();
+
+			ExecutionConfig executionConfig = new ExecutionConfig();
+			executionConfig.setLatencyTrackingInterval(0);
+
+			setupSourceOperator(operator, executionConfig, env, timeProvider);
+		});
+	}
+
+	private interface OperatorSetupOperation {
+		void setupSourceOperator(
+			StreamSource<Long, ?> operator,
+			TestProcessingTimeService testProcessingTimeService
+		);
+	}
+
+	private void testLatencyMarkEmission(int numberLatencyMarkers, OperatorSetupOperation operatorSetup) throws Exception {
+		final List<StreamElement> output = new ArrayList<>();
+
+		final TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
+		testProcessingTimeService.setCurrentTime(0L);
+		final List<Long> processingTimes = Arrays.asList(1L, 10L, 11L, 21L, maxProcessingTime);
+
+		// regular stream source operator
+		final StreamSource<Long, ProcessingTimeServiceSource> operator =
+			new StreamSource<>(new ProcessingTimeServiceSource(testProcessingTimeService, processingTimes));
+
+		operatorSetup.setupSourceOperator(operator, testProcessingTimeService);
+
+		// run and wait to be stopped
+		operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<Long>(output));
+
+		assertEquals(
+			numberLatencyMarkers + 1, // + 1 is the final watermark element
+			output.size());
+
+		long timestamp = 0L;
+
+		int i = 0;
+		// verify that its only latency markers + a final watermark
+		for (; i < numberLatencyMarkers; i++) {
+			StreamElement se = output.get(i);
+			Assert.assertTrue(se.isLatencyMarker());
+			Assert.assertEquals(operator.getOperatorID(), se.asLatencyMarker().getOperatorId());
+			Assert.assertEquals(0, se.asLatencyMarker().getSubtaskIndex());
+			Assert.assertTrue(se.asLatencyMarker().getMarkedTime() == timestamp);
+
+			timestamp += latencyMarkInterval;
+		}
+
+		Assert.assertTrue(output.get(i).isWatermark());
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static <T> void setupSourceOperator(
+			StreamSource<T, ?> operator,
+			ExecutionConfig executionConfig,
+			Environment env,
+			ProcessingTimeService timeProvider) {
+
+		StreamConfig cfg = new StreamConfig(new Configuration());
+		cfg.setStateBackend(new MemoryStateBackend());
+
+		cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
+		cfg.setOperatorID(new OperatorID());
+
+		StreamStatusMaintainer streamStatusMaintainer = mock(StreamStatusMaintainer.class);
+		when(streamStatusMaintainer.getStreamStatus()).thenReturn(StreamStatus.ACTIVE);
+
+		StreamTask<?, ?> mockTask = mock(StreamTask.class);
+		when(mockTask.getName()).thenReturn("Mock Task");
+		when(mockTask.getCheckpointLock()).thenReturn(new Object());
+		when(mockTask.getConfiguration()).thenReturn(cfg);
+		when(mockTask.getEnvironment()).thenReturn(env);
+		when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
+		when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
+		when(mockTask.getStreamStatusMaintainer()).thenReturn(streamStatusMaintainer);
+
+		doAnswer(new Answer<ProcessingTimeService>() {
+			@Override
+			public ProcessingTimeService answer(InvocationOnMock invocation) throws Throwable {
+				if (timeProvider == null) {
+					throw new RuntimeException("The time provider is null.");
+				}
+				return timeProvider;
+			}
+		}).when(mockTask).getProcessingTimeService();
+
+		operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) mock(Output.class));
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class ProcessingTimeServiceSource implements SourceFunction<Long> {
+
+		private final TestProcessingTimeService processingTimeService;
+		private final List<Long> processingTimes;
+
+		private boolean cancelled = false;
+
+		private ProcessingTimeServiceSource(TestProcessingTimeService processingTimeService, List<Long> processingTimes) {
+			this.processingTimeService = processingTimeService;
+			this.processingTimes = processingTimes;
+		}
+
+		@Override
+		public void run(SourceContext<Long> ctx) throws Exception {
+			for (Long processingTime : processingTimes) {
+				if (cancelled) {
+					break;
+				}
+
+				processingTimeService.setCurrentTime(processingTime);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			cancelled = true;
+		}
+	}
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java
similarity index 75%
rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java
index cf09a6ebdb8..4b5259e91c4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java
@@ -43,13 +43,11 @@
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.CollectorOutput;
 
-import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -63,7 +61,7 @@
  * Tests for {@link StreamSource} operators.
  */
 @SuppressWarnings("serial")
-public class StreamSourceOperatorTest {
+public class StreamSourceOperatorWatermarksTest {
 
 	@Test
 	public void testEmitMaxWatermarkForFiniteSource() throws Exception {
@@ -74,7 +72,7 @@ public void testEmitMaxWatermarkForFiniteSource() throws Exception {
 
 		final List<StreamElement> output = new ArrayList<>();
 
-		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0);
 		operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<String>(output));
 
 		assertEquals(1, output.size());
@@ -90,7 +88,7 @@ public void testNoMaxWatermarkOnImmediateCancel() throws Exception {
 		final StreamSource<String, InfiniteSource<String>> operator =
 				new StreamSource<>(new InfiniteSource<String>());
 
-		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0);
 		operator.cancel();
 
 		// run and exit
@@ -109,7 +107,7 @@ public void testNoMaxWatermarkOnAsyncCancel() throws Exception {
 		final StreamSource<String, InfiniteSource<String>> operator =
 				new StreamSource<>(new InfiniteSource<String>());
 
-		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0);
 
 		// trigger an async cancel in a bit
 		new Thread("canceler") {
@@ -141,7 +139,7 @@ public void testNoMaxWatermarkOnImmediateStop() throws Exception {
 		final StoppableStreamSource<String, InfiniteSource<String>> operator =
 				new StoppableStreamSource<>(new InfiniteSource<String>());
 
-		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0);
 		operator.stop();
 
 		// run and stop
@@ -159,7 +157,7 @@ public void testNoMaxWatermarkOnAsyncStop() throws Exception {
 		final StoppableStreamSource<String, InfiniteSource<String>> operator =
 				new StoppableStreamSource<>(new InfiniteSource<String>());
 
-		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0);
 
 		// trigger an async cancel in a bit
 		new Thread("canceler") {
@@ -178,53 +176,6 @@ public void run() {
 		assertTrue(output.isEmpty());
 	}
 
-	/**
-	 * Test that latency marks are emitted.
-	 */
-	@Test
-	public void testLatencyMarkEmission() throws Exception {
-		final List<StreamElement> output = new ArrayList<>();
-
-		final long maxProcessingTime = 100L;
-		final long latencyMarkInterval = 10L;
-
-		final TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService();
-		testProcessingTimeService.setCurrentTime(0L);
-		final List<Long> processingTimes = Arrays.asList(1L, 10L, 11L, 21L, maxProcessingTime);
-
-		// regular stream source operator
-		final StreamSource<Long, ProcessingTimeServiceSource> operator =
-				new StreamSource<>(new ProcessingTimeServiceSource(testProcessingTimeService, processingTimes));
-
-		// emit latency marks every 10 milliseconds.
-		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, latencyMarkInterval, testProcessingTimeService);
-
-		// run and wait to be stopped
-		operator.run(new Object(), mock(StreamStatusMaintainer.class), new CollectorOutput<Long>(output));
-
-		int numberLatencyMarkers = (int) (maxProcessingTime / latencyMarkInterval) + 1;
-
-		assertEquals(
-			numberLatencyMarkers + 1, // + 1 is the final watermark element
-			output.size());
-
-		long timestamp = 0L;
-
-		int i = 0;
-		// and that its only latency markers + a final watermark
-		for (; i < output.size() - 1; i++) {
-			StreamElement se = output.get(i);
-			Assert.assertTrue(se.isLatencyMarker());
-			Assert.assertEquals(operator.getOperatorID(), se.asLatencyMarker().getOperatorId());
-			Assert.assertEquals(0, se.asLatencyMarker().getSubtaskIndex());
-			Assert.assertTrue(se.asLatencyMarker().getMarkedTime() == timestamp);
-
-			timestamp += latencyMarkInterval;
-		}
-
-		Assert.assertTrue(output.get(i).isWatermark());
-	}
-
 	@Test
 	public void testAutomaticWatermarkContext() throws Exception {
 
@@ -236,7 +187,7 @@ public void testAutomaticWatermarkContext() throws Exception {
 		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
 		processingTimeService.setCurrentTime(0);
 
-		setupSourceOperator(operator, TimeCharacteristic.IngestionTime, watermarkInterval, 0, processingTimeService);
+		setupSourceOperator(operator, TimeCharacteristic.IngestionTime, watermarkInterval, processingTimeService);
 
 		final List<StreamElement> output = new ArrayList<>();
 
@@ -271,21 +222,18 @@ public void testAutomaticWatermarkContext() throws Exception {
 	@SuppressWarnings("unchecked")
 	private static <T> void setupSourceOperator(StreamSource<T, ?> operator,
 			TimeCharacteristic timeChar,
-			long watermarkInterval,
-			long latencyMarkInterval) {
-		setupSourceOperator(operator, timeChar, watermarkInterval, latencyMarkInterval, new TestProcessingTimeService());
+			long watermarkInterval) {
+		setupSourceOperator(operator, timeChar, watermarkInterval, new TestProcessingTimeService());
 	}
 
 	@SuppressWarnings("unchecked")
 	private static <T> void setupSourceOperator(StreamSource<T, ?> operator,
 												TimeCharacteristic timeChar,
 												long watermarkInterval,
-												long latencyMarkInterval,
 												final ProcessingTimeService timeProvider) {
 
 		ExecutionConfig executionConfig = new ExecutionConfig();
 		executionConfig.setAutoWatermarkInterval(watermarkInterval);
-		executionConfig.setLatencyTrackingInterval(latencyMarkInterval);
 
 		StreamConfig cfg = new StreamConfig(new Configuration());
 		cfg.setStateBackend(new MemoryStateBackend());
@@ -355,33 +303,4 @@ public void stop() {
 			running = false;
 		}
 	}
-
-	private static final class ProcessingTimeServiceSource implements SourceFunction<Long> {
-
-		private final TestProcessingTimeService processingTimeService;
-		private final List<Long> processingTimes;
-
-		private boolean cancelled = false;
-
-		private ProcessingTimeServiceSource(TestProcessingTimeService processingTimeService, List<Long> processingTimes) {
-			this.processingTimeService = processingTimeService;
-			this.processingTimes = processingTimes;
-		}
-
-		@Override
-		public void run(SourceContext<Long> ctx) throws Exception {
-			for (Long processingTime : processingTimes) {
-				if (cancelled) {
-					break;
-				}
-
-				processingTimeService.setCurrentTime(processingTime);
-			}
-		}
-
-		@Override
-		public void cancel() {
-			cancelled = true;
-		}
-	}
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services