You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/05/19 13:10:41 UTC

[flink] branch release-1.11 updated: [FLINK-16383][task] Do not relay notifyCheckpointComplete to closed operators.

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 14f0bce  [FLINK-16383][task] Do not relay notifyCheckpointComplete to closed operators.
14f0bce is described below

commit 14f0bcef0643130d0b1c0a5e9fe2625b77bcd114
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Fri May 15 22:49:44 2020 +0200

    [FLINK-16383][task] Do not relay notifyCheckpointComplete to closed operators.
    
    Through StreamOperatorWrapper an operator may already be closed while the StreamTask is still running. Notification might be relayed in that time from the task to the closed operator causing issues on operators reacting on completed checkpoints, such as two phase commit sinks.
    
    This commit adds the information of the closing to the wrapper and avoids relaying notifications to closed operators.
    Also fixes a potential related issue in SubtaskCheckpointCoordinatorImpl#takeSnapshotSync.
---
 .../checkpoint/CheckpointFailureManager.java       |  1 +
 .../checkpoint/CheckpointFailureReason.java        |  2 +
 .../streaming/runtime/tasks/OperatorChain.java     |  4 +-
 .../runtime/tasks/StreamOperatorWrapper.java       | 24 +++++-
 .../tasks/SubtaskCheckpointCoordinatorImpl.java    | 44 +++++++----
 .../runtime/tasks/StreamMockEnvironment.java       | 12 ++-
 .../tasks/StreamTaskMailboxTestHarness.java        |  5 ++
 .../tasks/StreamTaskMailboxTestHarnessBuilder.java |  1 +
 .../streaming/runtime/tasks/StreamTaskTest.java    | 86 ++++++++++++++++++++++
 9 files changed, 161 insertions(+), 18 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
index bb0913b..9e162e7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java
@@ -109,6 +109,7 @@ public class CheckpointFailureManager {
 			case JOB_FAILOVER_REGION:
 			//for compatibility purposes with user job behavior
 			case CHECKPOINT_DECLINED_TASK_NOT_READY:
+			case CHECKPOINT_DECLINED_TASK_CLOSING:
 			case CHECKPOINT_DECLINED_TASK_NOT_CHECKPOINTING:
 			case CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED:
 			case CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER:
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
index 66a5712..023f9bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureReason.java
@@ -44,6 +44,8 @@ public enum CheckpointFailureReason {
 
 	CHECKPOINT_DECLINED_TASK_NOT_READY(false, "Checkpoint was declined (tasks not ready)"),
 
+	CHECKPOINT_DECLINED_TASK_CLOSING(false, "Checkpoint was declined (task's operators partially closed)"),
+
 	CHECKPOINT_DECLINED_TASK_NOT_CHECKPOINTING(false, "Task does not support checkpointing"),
 
 	CHECKPOINT_DECLINED_SUBSUMED(false, "Checkpoint was canceled because a barrier from newer checkpoint was received."),
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index aeae830..35b3003 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -262,7 +262,9 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 		// go forward through the operator chain and tell each operator
 		// to prepare the checkpoint
 		for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators()) {
-			operatorWrapper.getStreamOperator().prepareSnapshotPreBarrier(checkpointId);
+			if (!operatorWrapper.isClosed()) {
+				operatorWrapper.getStreamOperator().prepareSnapshotPreBarrier(checkpointId);
+			}
 		}
 	}
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java
index 779ad4b..424bebe 100755
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.java
@@ -56,6 +56,8 @@ public class StreamOperatorWrapper<OUT, OP extends StreamOperator<OUT>> {
 
 	private StreamOperatorWrapper<?, ?> next;
 
+	private boolean closed;
+
 	StreamOperatorWrapper(
 		OP wrapped,
 		Optional<ProcessingTimeService> processingTimeService,
@@ -79,6 +81,15 @@ public class StreamOperatorWrapper<OUT, OP extends StreamOperator<OUT>> {
 	}
 
 	/**
+	 * Checks if the wrapped operator has been closed.
+	 *
+	 * <p>Note that this method must be called in the task thread.
+	 */
+	public boolean isClosed() {
+		return closed;
+	}
+
+	/**
 	 * Ends an input of the operator contained by this wrapper.
 	 *
 	 * @param inputId the input ID starts from 1 which indicates the first input.
@@ -91,6 +102,12 @@ public class StreamOperatorWrapper<OUT, OP extends StreamOperator<OUT>> {
 		}
 	}
 
+	public void notifyCheckpointComplete(long checkpointId) throws Exception {
+		if (!closed) {
+			wrapped.notifyCheckpointComplete(checkpointId);
+		}
+	}
+
 	public OP getStreamOperator() {
 		return wrapped;
 	}
@@ -130,7 +147,7 @@ public class StreamOperatorWrapper<OUT, OP extends StreamOperator<OUT>> {
 		// step 3. send a closed mail to ensure that the mails that are from the operator and still in the mailbox
 		//         are completed before exiting the following mailbox processing loop
 		CompletableFuture<Void> closedFuture = quiesceProcessingTimeService()
-			.thenCompose(unused  -> deferCloseOperatorToMailbox(actionExecutor))
+			.thenCompose(unused -> deferCloseOperatorToMailbox(actionExecutor))
 			.thenCompose(unused -> sendClosedMail());
 
 		// run the mailbox processing loop until all operations are finished
@@ -183,7 +200,10 @@ public class StreamOperatorWrapper<OUT, OP extends StreamOperator<OUT>> {
 	}
 
 	private void closeOperator(StreamTaskActionExecutor actionExecutor) throws Exception {
-		actionExecutor.runThrowing(wrapped::close);
+		actionExecutor.runThrowing(() -> {
+			closed = true;
+			wrapped.close();
+		});
 	}
 
 	static class ReadIterator implements Iterator<StreamOperatorWrapper<?, ?>>, Iterable<StreamOperatorWrapper<?, ?>> {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index c0232ef..8a55aa6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -18,6 +18,8 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
@@ -161,8 +163,11 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
 
 		Map<OperatorID, OperatorSnapshotFutures> snapshotFutures = new HashMap<>(operatorChain.getNumberOfOperators());
 		try {
-			takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isCanceled);
-			finishAndReportAsync(snapshotFutures, metadata, metrics, options);
+			if (takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isCanceled)) {
+				finishAndReportAsync(snapshotFutures, metadata, metrics, options);
+			} else {
+				cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined"));
+			}
 		} catch (Exception ex) {
 			cleanup(snapshotFutures, metadata, metrics, ex);
 			throw ex;
@@ -175,7 +180,7 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
 			LOG.debug("Notification of complete checkpoint for task {}", taskName);
 
 			for (StreamOperatorWrapper<?, ?> operatorWrapper : operatorChain.getAllOperators(true)) {
-				operatorWrapper.getStreamOperator().notifyCheckpointComplete(checkpointId);
+				operatorWrapper.notifyCheckpointComplete(checkpointId);
 			}
 		} else {
 			LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", taskName);
@@ -251,7 +256,7 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
 			asyncExceptionHandler));
 	}
 
-	private void takeSnapshotSync(
+	private boolean takeSnapshotSync(
 			Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress,
 			CheckpointMetaData checkpointMetaData,
 			CheckpointMetrics checkpointMetrics,
@@ -259,6 +264,14 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
 			OperatorChain<?, ?> operatorChain,
 			Supplier<Boolean> isCanceled) throws Exception {
 
+		for (final StreamOperatorWrapper<?, ?> operatorWrapper : operatorChain.getAllOperators(true)) {
+			if (operatorWrapper.isClosed()) {
+				env.declineCheckpoint(checkpointMetaData.getCheckpointId(),
+					new CheckpointException("Task Name" + taskName, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_CLOSING));
+				return false;
+			}
+		}
+
 		long checkpointId = checkpointMetaData.getCheckpointId();
 		long started = System.nanoTime();
 
@@ -270,16 +283,18 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
 
 		try {
 			for (StreamOperatorWrapper<?, ?> operatorWrapper : operatorChain.getAllOperators(true)) {
-				operatorSnapshotsInProgress.put(
-					operatorWrapper.getStreamOperator().getOperatorID(),
-					buildOperatorSnapshotFutures(
-						checkpointMetaData,
-						checkpointOptions,
-						operatorChain,
-						operatorWrapper.getStreamOperator(),
-						isCanceled,
-						channelStateWriteResult,
-						storage));
+				if (!operatorWrapper.isClosed()) {
+					operatorSnapshotsInProgress.put(
+							operatorWrapper.getStreamOperator().getOperatorID(),
+							buildOperatorSnapshotFutures(
+									checkpointMetaData,
+									checkpointOptions,
+									operatorChain,
+									operatorWrapper.getStreamOperator(),
+									isCanceled,
+									channelStateWriteResult,
+									storage));
+				}
 			}
 		} finally {
 			checkpointStorage.clearCacheFor(checkpointId);
@@ -293,6 +308,7 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
 			checkpointMetrics.getSyncDurationMillis());
 
 		checkpointMetrics.setSyncDurationMillis((System.nanoTime() - started) / 1_000_000);
+		return true;
 	}
 
 	private OperatorSnapshotFutures buildOperatorSnapshotFutures(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index ca6e4ec..fcfe7c8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -52,6 +52,8 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
 import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
+import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.NoOpCheckpointResponder;
 import org.apache.flink.runtime.taskmanager.NoOpTaskOperatorEventGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
@@ -118,6 +120,8 @@ public class StreamMockEnvironment implements Environment {
 
 	private TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
 
+	private CheckpointResponder checkpointResponder = NoOpCheckpointResponder.INSTANCE;
+
 	public StreamMockEnvironment(
 		Configuration jobConfig,
 		Configuration taskConfig,
@@ -338,7 +342,9 @@ public class StreamMockEnvironment implements Environment {
 	}
 
 	@Override
-	public void declineCheckpoint(long checkpointId, Throwable cause) {}
+	public void declineCheckpoint(long checkpointId, Throwable cause) {
+		checkpointResponder.declineCheckpoint(jobID, executionAttemptID, checkpointId, cause);
+	}
 
 	@Override
 	public TaskOperatorEventGateway getOperatorCoordinatorEventGateway() {
@@ -369,4 +375,8 @@ public class StreamMockEnvironment implements Environment {
 	public void setTaskMetricGroup(TaskMetricGroup taskMetricGroup) {
 		this.taskMetricGroup = taskMetricGroup;
 	}
+
+	public void setCheckpointResponder(CheckpointResponder checkpointResponder) {
+		this.checkpointResponder = checkpointResponder;
+	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java
index a28283a..c6d3fb8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
 
 import java.util.Queue;
 
@@ -160,5 +161,9 @@ public class StreamTaskMailboxTestHarness<OUT> implements AutoCloseable {
 	public void setAutoProcess(boolean autoProcess) {
 		this.autoProcess = autoProcess;
 	}
+
+	public TestCheckpointResponder getCheckpointResponder() {
+		return (TestCheckpointResponder) taskStateManager.getCheckpointResponder();
+	}
 }
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
index 9e7ae7e..e9fb768 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
@@ -104,6 +104,7 @@ public abstract class StreamTaskMailboxTestHarnessBuilder<OUT> {
 			bufferSize,
 			taskStateManager);
 
+		streamMockEnvironment.setCheckpointResponder(taskStateManager.getCheckpointResponder());
 		initializeInputs(streamMockEnvironment);
 
 		checkState(inputGates != null, "InputGates hasn't been initialised");
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 5556334..adaf406 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -146,6 +146,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton;
 import static org.apache.flink.streaming.util.StreamTaskUtil.waitTaskIsRunning;
@@ -936,6 +938,62 @@ public class StreamTaskTest extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that {@link StreamTask#notifyCheckpointCompleteAsync(long)} is not relayed to closed operators.
+	 *
+	 * <p>See FLINK-16383.
+	 */
+	@Test
+	public void testNotifyCheckpointOnClosedOperator() throws Throwable {
+		ClosingOperator operator = new ClosingOperator();
+		MultipleInputStreamTaskTestHarnessBuilder<Integer> builder =
+			new MultipleInputStreamTaskTestHarnessBuilder<>(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+				.addInput(BasicTypeInfo.INT_TYPE_INFO);
+		StreamTaskMailboxTestHarness<Integer> harness = builder
+			.setupOutputForSingletonOperatorChain(operator)
+			.build();
+		// keeps the mailbox from suspending
+		harness.setAutoProcess(false);
+		harness.processElement(new StreamRecord<>(1));
+
+		harness.streamTask.notifyCheckpointCompleteAsync(1);
+		harness.streamTask.runMailboxStep();
+		assertEquals(1, operator.notified.get());
+		assertEquals(false, operator.closed.get());
+
+		// close operators directly, so that task is still fully running
+		harness.streamTask.operatorChain.closeOperators(harness.streamTask.getActionExecutor());
+		harness.streamTask.notifyCheckpointCompleteAsync(2);
+		harness.streamTask.runMailboxStep();
+		assertEquals(1, operator.notified.get());
+		assertEquals(true, operator.closed.get());
+	}
+
+	/**
+	 * Tests that checkpoints are declined if operators are (partially) closed.
+	 *
+	 * <p>See FLINK-16383.
+	 */
+	@Test
+	public void testCheckpointDeclinedOnClosedOperator() throws Throwable {
+		ClosingOperator operator = new ClosingOperator();
+		MultipleInputStreamTaskTestHarnessBuilder<Integer> builder =
+			new MultipleInputStreamTaskTestHarnessBuilder<>(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO)
+					.addInput(BasicTypeInfo.INT_TYPE_INFO);
+		StreamTaskMailboxTestHarness<Integer> harness = builder
+			.setupOutputForSingletonOperatorChain(operator)
+			.build();
+		// keeps the mailbox from suspending
+		harness.setAutoProcess(false);
+		harness.processElement(new StreamRecord<>(1));
+
+		harness.streamTask.operatorChain.closeOperators(harness.streamTask.getActionExecutor());
+		assertEquals(true, operator.closed.get());
+
+		harness.streamTask.triggerCheckpointOnBarrier(new CheckpointMetaData(1, 0), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetrics());
+		assertEquals(1, harness.getCheckpointResponder().getDeclineReports().size());
+	}
+
 	@Test
 	public void testExecuteMailboxActionsAfterLeavingInputProcessorMailboxLoop() throws Exception {
 		OneShotLatch latch = new OneShotLatch();
@@ -1926,4 +1984,32 @@ public class StreamTaskTest extends TestLogger {
 			return isPartitionRequested;
 		}
 	}
+
+	private static class ClosingOperator<T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
+		static AtomicBoolean closed = new AtomicBoolean();
+		static AtomicInteger notified = new AtomicInteger();
+
+		@Override
+		public void open() throws Exception {
+			super.open();
+			closed.set(false);
+			notified.set(0);
+		}
+
+		@Override
+		public void close() throws Exception {
+			super.close();
+			closed.set(true);
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+			super.notifyCheckpointComplete(checkpointId);
+			notified.incrementAndGet();
+		}
+
+		@Override
+		public void processElement(StreamRecord<T> element) throws Exception {
+		}
+	}
 }