You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/07/29 12:22:35 UTC

[flink] branch release-1.11 updated (fe95187 -> b122f8f)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from fe95187  [FLINK-18581] Do not try to run GC phantom cleaners for jdk < 8u72
     new bff52d5  [FLINK-18656][task] Provide checkpointStartDelayNanos for SourceStreamTask
     new b122f8f  [FLINK-18656][tests] Rename MultipleInputStreamTaskTestHarnessBuilder to StreamTaskMailboxTestHarnessBuilder

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../streaming/runtime/tasks/SourceStreamTask.java  |   2 +
 .../flink/streaming/runtime/tasks/StreamTask.java  |   9 ++
 .../runtime/tasks/MultipleInputStreamTaskTest.java |  22 ++---
 .../MultipleInputStreamTaskTestHarnessBuilder.java | 108 ---------------------
 .../tasks/SourceOperatorStreamTaskTest.java        |   4 +-
 .../runtime/tasks/SourceStreamTaskTest.java        |  40 ++++++++
 .../tasks/StreamTaskMailboxTestHarnessBuilder.java |  61 +++++++++++-
 ...treamTaskMultipleInputSelectiveReadingTest.java |   4 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    |  12 +--
 9 files changed, 131 insertions(+), 131 deletions(-)
 delete mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTestHarnessBuilder.java


[flink] 02/02: [FLINK-18656][tests] Rename MultipleInputStreamTaskTestHarnessBuilder to StreamTaskMailboxTestHarnessBuilder

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b122f8ff92e78ec17d5692e02da5aa922e0d3985
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue Jul 28 16:35:51 2020 +0200

    [FLINK-18656][tests] Rename MultipleInputStreamTaskTestHarnessBuilder to StreamTaskMailboxTestHarnessBuilder
    
    The orignal concept MultipleInputStreamTaskTestHarnessBuilder proved much more
    versatile then initially expected and it can easily handle all of the uses cases:
    - MultipleInputStreamTask
    - OneInputStreamTask
    - SourceStreamTask
    
    Hence there is no need for the abstraction and no need to provide specialized versions of
    MultipleInputStreamTaskTestHarnessBuilder for the other types of tasks.
---
 .../runtime/tasks/MultipleInputStreamTaskTest.java |  22 ++---
 .../MultipleInputStreamTaskTestHarnessBuilder.java | 108 ---------------------
 .../tasks/SourceOperatorStreamTaskTest.java        |   4 +-
 .../runtime/tasks/SourceStreamTaskTest.java        |   4 +-
 .../tasks/StreamTaskMailboxTestHarnessBuilder.java |  61 +++++++++++-
 ...treamTaskMultipleInputSelectiveReadingTest.java |   4 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    |  12 +--
 7 files changed, 82 insertions(+), 133 deletions(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
index ed3bd77..51bb156 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
@@ -81,7 +81,7 @@ public class MultipleInputStreamTaskTest {
 	@Test
 	public void testOpenCloseAndTimestamps() throws Exception {
 		try (StreamTaskMailboxTestHarness<String> testHarness =
-			new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+			new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.INT_TYPE_INFO)
 				.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO)
@@ -111,7 +111,7 @@ public class MultipleInputStreamTaskTest {
 	@Test
 	public void testCheckpointBarriers() throws Exception {
 		try (StreamTaskMailboxTestHarness<String> testHarness =
-			new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+			new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
 				.addInput(BasicTypeInfo.INT_TYPE_INFO, 2)
 				.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
@@ -158,7 +158,7 @@ public class MultipleInputStreamTaskTest {
 	@Test
 	public void testOvertakingCheckpointBarriers() throws Exception {
 		try (StreamTaskMailboxTestHarness<String> testHarness =
-			new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+			new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
 				.addInput(BasicTypeInfo.INT_TYPE_INFO, 2)
 				.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
@@ -219,7 +219,7 @@ public class MultipleInputStreamTaskTest {
 		};
 
 		try (StreamTaskMailboxTestHarness<String> testHarness =
-			new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+			new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO)
@@ -283,7 +283,7 @@ public class MultipleInputStreamTaskTest {
 	@Test
 	public void testClosingAllOperatorsOnChainProperly() throws Exception {
 		StreamTaskMailboxTestHarness<String> testHarness =
-			new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+			new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO)
@@ -330,7 +330,7 @@ public class MultipleInputStreamTaskTest {
 	@Test
 	public void testInputFairness() throws Exception {
 		try (StreamTaskMailboxTestHarness<String> testHarness =
-				new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+				new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
 					.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 					.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 					.addInput(BasicTypeInfo.STRING_TYPE_INFO)
@@ -369,7 +369,7 @@ public class MultipleInputStreamTaskTest {
 	@Test
 	public void testWatermark() throws Exception {
 		try (StreamTaskMailboxTestHarness<String> testHarness =
-				new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+				new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
 					.addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
 					.addInput(BasicTypeInfo.INT_TYPE_INFO, 2)
 					.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
@@ -441,7 +441,7 @@ public class MultipleInputStreamTaskTest {
 	@Test
 	public void testWatermarkAndStreamStatusForwarding() throws Exception {
 		try (StreamTaskMailboxTestHarness<String> testHarness =
-				new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+				new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
 					.addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
 					.addInput(BasicTypeInfo.INT_TYPE_INFO, 2)
 					.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
@@ -502,7 +502,7 @@ public class MultipleInputStreamTaskTest {
 		};
 
 		try (StreamTaskMailboxTestHarness<String> testHarness =
-				new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+				new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
 					.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 					.addInput(BasicTypeInfo.INT_TYPE_INFO)
 					.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO)
@@ -588,7 +588,7 @@ public class MultipleInputStreamTaskTest {
 		final TaskMetricGroup taskMetricGroup = new StreamTaskTestHarness.TestTaskMetricGroup(metrics);
 
 		try (StreamTaskMailboxTestHarness<String> testHarness =
-				new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+				new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
 					.addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
 					.addInput(BasicTypeInfo.INT_TYPE_INFO, 2)
 					.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
@@ -610,7 +610,7 @@ public class MultipleInputStreamTaskTest {
 		final TaskMetricGroup taskMetricGroup = new StreamTaskTestHarness.TestTaskMetricGroup(metrics);
 
 		try (StreamTaskMailboxTestHarness<String> testHarness =
-				new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+				new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
 					.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 					.addInput(BasicTypeInfo.INT_TYPE_INFO)
 					.addInput(BasicTypeInfo.DOUBLE_TYPE_INFO)
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTestHarnessBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTestHarnessBuilder.java
deleted file mode 100644
index cd0627e..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTestHarnessBuilder.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.graph.StreamNode;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
-import org.apache.flink.util.function.FunctionWithException;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Builder to create a {@link StreamTaskMailboxTestHarness} to test {@link MultipleInputStreamTask}.
- */
-public class MultipleInputStreamTaskTestHarnessBuilder<OUT> extends StreamTaskMailboxTestHarnessBuilder<OUT> {
-
-	private final ArrayList<TypeSerializer<?>> inputSerializers = new ArrayList<>();
-	private final ArrayList<Integer> inputChannelsPerGate = new ArrayList<>();
-
-	public MultipleInputStreamTaskTestHarnessBuilder(
-			FunctionWithException<Environment, ? extends StreamTask<OUT, ?>, Exception> taskFactory,
-			TypeInformation<OUT> outputType) {
-		super(taskFactory, outputType);
-	}
-
-	public MultipleInputStreamTaskTestHarnessBuilder<OUT> addInput(TypeInformation<?> inputType) {
-		return addInput(inputType, 1);
-	}
-
-	public MultipleInputStreamTaskTestHarnessBuilder<OUT> addInput(TypeInformation<?> inputType, int inputChannels) {
-		return addInput(inputType, inputChannels, null);
-	}
-
-	public MultipleInputStreamTaskTestHarnessBuilder<OUT> addInput(
-			TypeInformation<?> inputType,
-			int inputChannels,
-			@Nullable KeySelector<?, ?> keySelector) {
-		streamConfig.setStatePartitioner(inputSerializers.size(), keySelector);
-		inputSerializers.add(inputType.createSerializer(executionConfig));
-		inputChannelsPerGate.add(inputChannels);
-		return this;
-	}
-
-	@Override
-	protected void initializeInputs(StreamMockEnvironment streamMockEnvironment) {
-		inputGates = new StreamTestSingleInputGate[inputSerializers.size()];
-		List<StreamEdge> inPhysicalEdges = new LinkedList<>();
-
-		StreamOperator<?> dummyOperator = new AbstractStreamOperator<Object>() {
-			private static final long serialVersionUID = 1L;
-		};
-
-		StreamNode sourceVertexDummy = new StreamNode(0, "default group", null, dummyOperator, "source dummy", new LinkedList<>(), SourceStreamTask.class);
-		StreamNode targetVertexDummy = new StreamNode(1, "default group", null, dummyOperator, "target dummy", new LinkedList<>(), SourceStreamTask.class);
-
-		for (int i = 0; i < inputSerializers.size(); i++) {
-			TypeSerializer<?> inputSerializer = inputSerializers.get(i);
-			inputGates[i] = new StreamTestSingleInputGate<>(
-				inputChannelsPerGate.get(i),
-				i,
-				inputSerializer,
-				bufferSize);
-
-			StreamEdge streamEdge = new StreamEdge(
-				sourceVertexDummy,
-				targetVertexDummy,
-				i + 1,
-				new LinkedList<>(),
-				new BroadcastPartitioner<>(),
-				null);
-
-			inPhysicalEdges.add(streamEdge);
-			streamMockEnvironment.addInputGate(inputGates[i].getInputGate());
-		}
-
-		streamConfig.setInPhysicalEdges(inPhysicalEdges);
-		streamConfig.setNumberOfInputs(inputGates.length);
-		streamConfig.setTypeSerializersIn(inputSerializers.toArray(new TypeSerializer[inputSerializers.size()]));
-	}
-}
-
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
index df60344..56475ed 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
@@ -136,8 +136,8 @@ public class SourceOperatorStreamTaskTest {
 				WatermarkStrategy.noWatermarks());
 
 		// build a test harness.
-		MultipleInputStreamTaskTestHarnessBuilder<Integer> builder =
-				new MultipleInputStreamTaskTestHarnessBuilder<>(SourceOperatorStreamTask::new, BasicTypeInfo.INT_TYPE_INFO);
+		StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+				new StreamTaskMailboxTestHarnessBuilder<>(SourceOperatorStreamTask::new, BasicTypeInfo.INT_TYPE_INFO);
 		if (snapshot != null) {
 			// Set initial snapshot if needed.
 			builder.setTaskStateSnapshot(checkpointId, snapshot);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index e63ad6c..27c2576 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -113,8 +113,8 @@ public class SourceStreamTaskTest {
 	@Test(timeout = 60_000)
 	public void testStartDelayMetric() throws Exception {
 		long sleepTime = 42;
-		MultipleInputStreamTaskTestHarnessBuilder<String> builder =
-			new MultipleInputStreamTaskTestHarnessBuilder<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+		StreamTaskMailboxTestHarnessBuilder<String> builder =
+			new StreamTaskMailboxTestHarnessBuilder<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
 
 		final Map<String, Metric> metrics = new ConcurrentHashMap<>();
 		final TaskMetricGroup taskMetricGroup = new StreamTaskTestHarness.TestTaskMetricGroup(metrics);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
index e9fb768..6946156 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.Environment;
@@ -48,6 +49,7 @@ import org.apache.flink.util.function.FunctionWithException;
 import javax.annotation.Nullable;
 
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
@@ -60,7 +62,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 /**
  * Builder class for {@link StreamTaskMailboxTestHarness}.
  */
-public abstract class StreamTaskMailboxTestHarnessBuilder<OUT> {
+public class StreamTaskMailboxTestHarnessBuilder<OUT> {
 	protected final FunctionWithException<Environment, ? extends StreamTask<OUT, ?>, Exception> taskFactory;
 	protected final TypeSerializer<OUT> outputSerializer;
 	protected final ExecutionConfig executionConfig = new ExecutionConfig();
@@ -77,6 +79,9 @@ public abstract class StreamTaskMailboxTestHarnessBuilder<OUT> {
 	protected TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
 	protected Map<Long, TaskStateSnapshot> taskStateSnapshots;
 
+	protected final ArrayList<TypeSerializer<?>> inputSerializers = new ArrayList<>();
+	protected final ArrayList<Integer> inputChannelsPerGate = new ArrayList<>();
+
 	private boolean setupCalled = false;
 
 	public StreamTaskMailboxTestHarnessBuilder(
@@ -86,6 +91,24 @@ public abstract class StreamTaskMailboxTestHarnessBuilder<OUT> {
 		outputSerializer = outputType.createSerializer(executionConfig);
 	}
 
+	public StreamTaskMailboxTestHarnessBuilder<OUT> addInput(TypeInformation<?> inputType) {
+		return addInput(inputType, 1);
+	}
+
+	public StreamTaskMailboxTestHarnessBuilder<OUT> addInput(TypeInformation<?> inputType, int inputChannels) {
+		return addInput(inputType, inputChannels, null);
+	}
+
+	public StreamTaskMailboxTestHarnessBuilder<OUT> addInput(
+			TypeInformation<?> inputType,
+			int inputChannels,
+			@Nullable KeySelector<?, ?> keySelector) {
+		streamConfig.setStatePartitioner(inputSerializers.size(), keySelector);
+		inputSerializers.add(inputType.createSerializer(executionConfig));
+		inputChannelsPerGate.add(inputChannels);
+		return this;
+	}
+
 	public StreamTaskMailboxTestHarness<OUT> build() throws Exception {
 		streamConfig.setBufferTimeout(bufferTimeout);
 
@@ -125,7 +148,41 @@ public abstract class StreamTaskMailboxTestHarnessBuilder<OUT> {
 			streamMockEnvironment);
 	}
 
-	protected abstract void initializeInputs(StreamMockEnvironment streamMockEnvironment);
+	protected void initializeInputs(StreamMockEnvironment streamMockEnvironment) {
+		inputGates = new StreamTestSingleInputGate[inputSerializers.size()];
+		List<StreamEdge> inPhysicalEdges = new LinkedList<>();
+
+		StreamOperator<?> dummyOperator = new AbstractStreamOperator<Object>() {
+			private static final long serialVersionUID = 1L;
+		};
+
+		StreamNode sourceVertexDummy = new StreamNode(0, "default group", null, dummyOperator, "source dummy", new LinkedList<>(), SourceStreamTask.class);
+		StreamNode targetVertexDummy = new StreamNode(1, "default group", null, dummyOperator, "target dummy", new LinkedList<>(), SourceStreamTask.class);
+
+		for (int i = 0; i < inputSerializers.size(); i++) {
+			TypeSerializer<?> inputSerializer = inputSerializers.get(i);
+			inputGates[i] = new StreamTestSingleInputGate<>(
+				inputChannelsPerGate.get(i),
+				i,
+				inputSerializer,
+				bufferSize);
+
+			StreamEdge streamEdge = new StreamEdge(
+				sourceVertexDummy,
+				targetVertexDummy,
+				i + 1,
+				new LinkedList<>(),
+				new BroadcastPartitioner<>(),
+				null);
+
+			inPhysicalEdges.add(streamEdge);
+			streamMockEnvironment.addInputGate(inputGates[i].getInputGate());
+		}
+
+		streamConfig.setInPhysicalEdges(inPhysicalEdges);
+		streamConfig.setNumberOfInputs(inputGates.length);
+		streamConfig.setTypeSerializersIn(inputSerializers.toArray(new TypeSerializer[inputSerializers.size()]));
+	}
 
 	public StreamTaskMailboxTestHarnessBuilder<OUT> setupOutputForSingletonOperatorChain(StreamOperator<?> operator) {
 		return setupOutputForSingletonOperatorChain(SimpleOperatorFactory.of(operator), new OperatorID());
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest.java
index ef01cf3..b51a701 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMultipleInputSelectiveReadingTest.java
@@ -140,7 +140,7 @@ public class StreamTaskMultipleInputSelectiveReadingTest {
 			ArrayDeque<Object> expectedOutput,
 			boolean orderedCheck) throws Exception {
 		try (StreamTaskMailboxTestHarness<String> testHarness =
-			new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+			new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 				.addInput(BasicTypeInfo.INT_TYPE_INFO)
 				.setupOutputForSingletonOperatorChain(streamOperatorFactory)
@@ -176,7 +176,7 @@ public class StreamTaskMultipleInputSelectiveReadingTest {
 	@Test
 	public void testInputStarvation() throws Exception {
 		try (StreamTaskMailboxTestHarness<String> testHarness =
-				new MultipleInputStreamTaskTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
+				new StreamTaskMailboxTestHarnessBuilder<>(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO)
 					.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 					.addInput(BasicTypeInfo.STRING_TYPE_INFO)
 					.addInput(BasicTypeInfo.STRING_TYPE_INFO)
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 833f07b..09af3af 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -949,8 +949,8 @@ public class StreamTaskTest extends TestLogger {
 	@Test
 	public void testNotifyCheckpointOnClosedOperator() throws Throwable {
 		ClosingOperator operator = new ClosingOperator();
-		MultipleInputStreamTaskTestHarnessBuilder<Integer> builder =
-			new MultipleInputStreamTaskTestHarnessBuilder<>(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+		StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+			new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
 				.addInput(BasicTypeInfo.INT_TYPE_INFO);
 		StreamTaskMailboxTestHarness<Integer> harness = builder
 			.setupOutputForSingletonOperatorChain(operator)
@@ -984,8 +984,8 @@ public class StreamTaskTest extends TestLogger {
 
 	private void testFailToConfirmCheckpointMessage(Consumer<StreamTask<?, ?>> consumer) throws Exception {
 		StreamMap<Integer, Integer> streamMap = new StreamMap<>(new FailOnNotifyCheckpointMapper<>());
-		MultipleInputStreamTaskTestHarnessBuilder<Integer> builder =
-			new MultipleInputStreamTaskTestHarnessBuilder<>(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+		StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+			new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
 				.addInput(BasicTypeInfo.INT_TYPE_INFO);
 		StreamTaskMailboxTestHarness<Integer> harness = builder
 			.setupOutputForSingletonOperatorChain(streamMap)
@@ -1008,8 +1008,8 @@ public class StreamTaskTest extends TestLogger {
 	@Test
 	public void testCheckpointDeclinedOnClosedOperator() throws Throwable {
 		ClosingOperator operator = new ClosingOperator();
-		MultipleInputStreamTaskTestHarnessBuilder<Integer> builder =
-			new MultipleInputStreamTaskTestHarnessBuilder<>(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+		StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+			new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
 					.addInput(BasicTypeInfo.INT_TYPE_INFO);
 		StreamTaskMailboxTestHarness<Integer> harness = builder
 			.setupOutputForSingletonOperatorChain(operator)


[flink] 01/02: [FLINK-18656][task] Provide checkpointStartDelayNanos for SourceStreamTask

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bff52d54a1bc214c88140acc66830237fcd143d0
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue Jul 28 16:27:12 2020 +0200

    [FLINK-18656][task] Provide checkpointStartDelayNanos for SourceStreamTask
    
    checkpointStartDelayNanos for SourceStreamTask is meassured how long did it take
    for the checkpoint triggering RPC call to finally start executing inside the mailbox
    thread. If the mailbox is busy, for example SourceFunction is backpressured, this
    time can be quite significant.
---
 .../streaming/runtime/tasks/SourceStreamTask.java  |  2 ++
 .../flink/streaming/runtime/tasks/StreamTask.java  |  9 +++++
 .../runtime/tasks/SourceStreamTaskTest.java        | 40 ++++++++++++++++++++++
 3 files changed, 51 insertions(+)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 96a1b63..405aff7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.util.FatalExitExceptionHandler;
 import org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -110,6 +111,7 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 
 			((ExternallyInducedSource<?, ?>) source).setCheckpointTrigger(triggerHook);
 		}
+		getEnvironment().getMetricGroup().getIOMetricGroup().gauge(MetricNames.CHECKPOINT_START_DELAY_TIME, this::getAsyncCheckpointStartDelayNanos);
 	}
 
 	@Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index a7d04d2..a90c5f2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -217,6 +217,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 	private Long syncSavepointId = null;
 
+	private long latestAsyncCheckpointStartDelayNanos;
+
 	// ------------------------------------------------------------------------
 
 	/**
@@ -785,6 +787,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		CompletableFuture<Boolean> result = new CompletableFuture<>();
 		mainMailboxExecutor.execute(
 				() -> {
+					latestAsyncCheckpointStartDelayNanos = 1_000_000 * Math.max(
+						0,
+						System.currentTimeMillis() - checkpointMetaData.getTimestamp());
 					try {
 						result.complete(triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime));
 					}
@@ -1183,4 +1188,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			handleAsyncException("Caught exception while processing timer.", new TimerException(t));
 		}
 	}
+
+	protected long getAsyncCheckpointStartDelayNanos() {
+		return latestAsyncCheckpointStartDelayNanos;
+	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 37f1f25..e63ad6c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -26,10 +26,14 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
@@ -55,9 +59,11 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -68,8 +74,10 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.flink.util.Preconditions.checkState;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -102,6 +110,38 @@ public class SourceStreamTaskTest {
 		Assert.assertEquals(10, resultElements.size());
 	}
 
+	@Test(timeout = 60_000)
+	public void testStartDelayMetric() throws Exception {
+		long sleepTime = 42;
+		MultipleInputStreamTaskTestHarnessBuilder<String> builder =
+			new MultipleInputStreamTaskTestHarnessBuilder<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+
+		final Map<String, Metric> metrics = new ConcurrentHashMap<>();
+		final TaskMetricGroup taskMetricGroup = new StreamTaskTestHarness.TestTaskMetricGroup(metrics);
+
+		StreamTaskMailboxTestHarness<String> harness = builder
+			.setupOutputForSingletonOperatorChain(
+				new StreamSource<>(
+					new CancelTestSource(
+						BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+						"Hello")))
+			.setTaskMetricGroup(taskMetricGroup)
+			.build();
+
+		Future<Boolean> triggerFuture = harness.streamTask.triggerCheckpointAsync(
+			new CheckpointMetaData(1L, System.currentTimeMillis()),
+			CheckpointOptions.forCheckpointWithDefaultLocation(),
+			false);
+
+		assertFalse(triggerFuture.isDone());
+		Thread.sleep(sleepTime);
+		while (!triggerFuture.isDone()) {
+			harness.streamTask.runMailboxStep();
+		}
+		Gauge<Long> checkpointStartDelayGauge = (Gauge<Long>) metrics.get(MetricNames.CHECKPOINT_START_DELAY_TIME);
+		assertThat(checkpointStartDelayGauge.getValue(), greaterThanOrEqualTo(sleepTime * 1_000_000));
+	}
+
 	/**
 	 * This test ensures that the SourceStreamTask properly serializes checkpointing
 	 * and element emission. This also verifies that there are no concurrent invocations