You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/07/29 12:22:36 UTC

[flink] 01/02: [FLINK-18656][task] Provide checkpointStartDelayNanos for SourceStreamTask

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bff52d54a1bc214c88140acc66830237fcd143d0
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Tue Jul 28 16:27:12 2020 +0200

    [FLINK-18656][task] Provide checkpointStartDelayNanos for SourceStreamTask
    
    checkpointStartDelayNanos for SourceStreamTask is meassured how long did it take
    for the checkpoint triggering RPC call to finally start executing inside the mailbox
    thread. If the mailbox is busy, for example SourceFunction is backpressured, this
    time can be quite significant.
---
 .../streaming/runtime/tasks/SourceStreamTask.java  |  2 ++
 .../flink/streaming/runtime/tasks/StreamTask.java  |  9 +++++
 .../runtime/tasks/SourceStreamTaskTest.java        | 40 ++++++++++++++++++++++
 3 files changed, 51 insertions(+)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 96a1b63..405aff7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.util.FatalExitExceptionHandler;
 import org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -110,6 +111,7 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 
 			((ExternallyInducedSource<?, ?>) source).setCheckpointTrigger(triggerHook);
 		}
+		getEnvironment().getMetricGroup().getIOMetricGroup().gauge(MetricNames.CHECKPOINT_START_DELAY_TIME, this::getAsyncCheckpointStartDelayNanos);
 	}
 
 	@Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index a7d04d2..a90c5f2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -217,6 +217,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 	private Long syncSavepointId = null;
 
+	private long latestAsyncCheckpointStartDelayNanos;
+
 	// ------------------------------------------------------------------------
 
 	/**
@@ -785,6 +787,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		CompletableFuture<Boolean> result = new CompletableFuture<>();
 		mainMailboxExecutor.execute(
 				() -> {
+					latestAsyncCheckpointStartDelayNanos = 1_000_000 * Math.max(
+						0,
+						System.currentTimeMillis() - checkpointMetaData.getTimestamp());
 					try {
 						result.complete(triggerCheckpoint(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime));
 					}
@@ -1183,4 +1188,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			handleAsyncException("Caught exception while processing timer.", new TimerException(t));
 		}
 	}
+
+	protected long getAsyncCheckpointStartDelayNanos() {
+		return latestAsyncCheckpointStartDelayNanos;
+	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 37f1f25..e63ad6c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -26,10 +26,14 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.MultiShotLatch;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
@@ -55,9 +59,11 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -68,8 +74,10 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.flink.util.Preconditions.checkState;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -102,6 +110,38 @@ public class SourceStreamTaskTest {
 		Assert.assertEquals(10, resultElements.size());
 	}
 
+	@Test(timeout = 60_000)
+	public void testStartDelayMetric() throws Exception {
+		long sleepTime = 42;
+		MultipleInputStreamTaskTestHarnessBuilder<String> builder =
+			new MultipleInputStreamTaskTestHarnessBuilder<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+
+		final Map<String, Metric> metrics = new ConcurrentHashMap<>();
+		final TaskMetricGroup taskMetricGroup = new StreamTaskTestHarness.TestTaskMetricGroup(metrics);
+
+		StreamTaskMailboxTestHarness<String> harness = builder
+			.setupOutputForSingletonOperatorChain(
+				new StreamSource<>(
+					new CancelTestSource(
+						BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+						"Hello")))
+			.setTaskMetricGroup(taskMetricGroup)
+			.build();
+
+		Future<Boolean> triggerFuture = harness.streamTask.triggerCheckpointAsync(
+			new CheckpointMetaData(1L, System.currentTimeMillis()),
+			CheckpointOptions.forCheckpointWithDefaultLocation(),
+			false);
+
+		assertFalse(triggerFuture.isDone());
+		Thread.sleep(sleepTime);
+		while (!triggerFuture.isDone()) {
+			harness.streamTask.runMailboxStep();
+		}
+		Gauge<Long> checkpointStartDelayGauge = (Gauge<Long>) metrics.get(MetricNames.CHECKPOINT_START_DELAY_TIME);
+		assertThat(checkpointStartDelayGauge.getValue(), greaterThanOrEqualTo(sleepTime * 1_000_000));
+	}
+
 	/**
 	 * This test ensures that the SourceStreamTask properly serializes checkpointing
 	 * and element emission. This also verifies that there are no concurrent invocations