You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/05/16 16:27:49 UTC

[flink] branch master updated (1670f16 -> 79aa7d1)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 1670f16  [FLINK-17028][hbase] Introduce a new HBase connector with new property keys
     new e341310  [FLINK-17674][state] Type OperatorCoordinator state in checkpoints strictly to ByteStreamStateHandle
     new e1fc86b  [hotfix][tests] Remove unused temp folder in CheckpointMetadataLoadingTest
     new 90e4708  [FLINK-17670][refactor] Refactor single test in CheckpointMetadataLoadingTest into finer grained tests.
     new 382cf09  [FLINK-17670][checkpointing] Savepoint handling of "non-restored" state also takes OperatorCoordinator state into account.
     new 8ad1ba3  [FLINK-17671][tests][refactor] Simplify ManuallyTriggeredScheduledExecutor for better debugability.
     new 9f30c7f  [FLINK-16177][refactor] Make test setup logic for OperatorCoordinatorSchedulerTest more flexible.
     new 2430dd8  [FLINK-17672][scheduler] OperatorCoordinators receive failure notifications on task failure instead of restarts
     new b3da4e1  [FLINK-10740][scheduler] Add failure reason to OperatorCoordinator.failTask(...)
     new 52f7b9d  [FLINK-16177][checkpointing] Integrate OperatorCoordinator fully with checkpointing.
     new 7ffa87b  [FLINK-16357][checkpointing] Offer different methods for "global restore" and "local restore" in CheckpointCoordinator
     new 8e3e9cc  [FLINK-16357][checkpointing] Only global failure/restores reset the coordinator state.
     new bfef3db  [FLINK-17702][tests][refactor] Refactor test utils to support different failover strategies.
     new 79aa7d1  [FLINK-17702][scheduler] Cancellations during failover also notify the OperatorCoordinator as "failed tasks"

The 13 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/flink/util/ExceptionUtils.java |  37 ++
 .../runtime/checkpoint/CheckpointCoordinator.java  |  77 +++-
 .../flink/runtime/checkpoint/Checkpoints.java      |  24 +-
 .../checkpoint/OperatorCoordinatorCheckpoints.java |   9 +-
 .../flink/runtime/checkpoint/OperatorState.java    |   8 +-
 .../runtime/checkpoint/PendingCheckpoint.java      |   4 +-
 .../checkpoint/StateAssignmentOperation.java       |   8 +-
 .../metadata/MetadataV2V3SerializerBase.java       |  14 +
 .../checkpoint/metadata/MetadataV3Serializer.java  |   2 +-
 .../runtime/executiongraph/ExecutionVertex.java    |   2 -
 .../failover/flip1/ExecutionFailureHandler.java    |  17 +-
 .../failover/flip1/FailureHandlingResult.java      |  34 +-
 .../coordination/OperatorCoordinator.java          |   4 +-
 .../flink/runtime/scheduler/DefaultScheduler.java  |  40 ++-
 .../flink/runtime/scheduler/SchedulerBase.java     |  42 ++-
 .../source/coordinator/SourceCoordinator.java      |   4 +-
 .../CheckpointCoordinatorMasterHooksTest.java      |  12 +-
 .../CheckpointCoordinatorRestoringTest.java        |  10 +-
 .../checkpoint/CheckpointCoordinatorTest.java      |   4 +-
 .../checkpoint/CheckpointMetadataLoadingTest.java  | 193 ++++++----
 .../checkpoint/CheckpointStateRestoreTest.java     |  22 +-
 .../runtime/checkpoint/PendingCheckpointTest.java  |   4 +-
 .../checkpoint/metadata/CheckpointTestUtils.java   |   8 +-
 .../ManuallyTriggeredScheduledExecutor.java        |   9 +-
 .../failover/flip1/FailureHandlingResultTest.java  |   5 +-
 .../coordination/MockOperatorCoordinator.java      |   4 +-
 .../OperatorCoordinatorSchedulerTest.java          | 393 +++++++++++++++++++--
 .../coordination/TestingOperatorCoordinator.java   |  39 +-
 .../runtime/scheduler/SchedulerTestingUtils.java   | 104 +++++-
 .../source/coordinator/SourceCoordinatorTest.java  |   6 +-
 .../TestingCheckpointStorageCoordinatorView.java   | 181 ++++++++++
 .../runtime/state/TestingStreamStateHandle.java    |  32 +-
 .../collect/CollectSinkOperatorCoordinator.java    |   4 +-
 .../operators/collect/CollectSinkFunctionTest.java |   2 +-
 .../CollectSinkOperatorCoordinatorTest.java        |   2 +-
 .../apache/flink/core/testutils/FlinkMatchers.java |  61 +++-
 36 files changed, 1166 insertions(+), 255 deletions(-)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageCoordinatorView.java


[flink] 11/13: [FLINK-16357][checkpointing] Only global failure/restores reset the coordinator state.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8e3e9ccea1b376050c46a7b055a0055380e0c2be
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed May 13 17:58:44 2020 +0200

    [FLINK-16357][checkpointing] Only global failure/restores reset the coordinator state.
    
    The failure handling results are now flagged with whether the failure was a global failure
    or a task failure. Based on that, the Scheduler invokes different restore methods on the
    CheckpointCoordinator.
---
 .../failover/flip1/ExecutionFailureHandler.java    | 17 +++++++----
 .../failover/flip1/FailureHandlingResult.java      | 34 ++++++++++++++++++----
 .../flink/runtime/scheduler/DefaultScheduler.java  |  7 +++--
 .../flink/runtime/scheduler/SchedulerBase.java     | 25 ++++++++++------
 .../failover/flip1/FailureHandlingResultTest.java  |  5 ++--
 .../OperatorCoordinatorSchedulerTest.java          | 12 ++++++++
 6 files changed, 74 insertions(+), 26 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
index 63d5e88..e6f8fcb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java
@@ -75,7 +75,7 @@ public class ExecutionFailureHandler {
 	 * @return result of the failure handling
 	 */
 	public FailureHandlingResult getFailureHandlingResult(ExecutionVertexID failedTask, Throwable cause) {
-		return handleFailure(cause, failoverStrategy.getTasksNeedingRestart(failedTask, cause));
+		return handleFailure(cause, failoverStrategy.getTasksNeedingRestart(failedTask, cause), false);
 	}
 
 	/**
@@ -90,15 +90,18 @@ public class ExecutionFailureHandler {
 			cause,
 			IterableUtils.toStream(schedulingTopology.getVertices())
 				.map(SchedulingExecutionVertex::getId)
-				.collect(Collectors.toSet()));
+				.collect(Collectors.toSet()),
+			true);
 	}
 
 	private FailureHandlingResult handleFailure(
 			final Throwable cause,
-			final Set<ExecutionVertexID> verticesToRestart) {
+			final Set<ExecutionVertexID> verticesToRestart,
+			final boolean globalFailure) {
 
 		if (isUnrecoverableError(cause)) {
-			return FailureHandlingResult.unrecoverable(new JobException("The failure is not recoverable", cause));
+			return FailureHandlingResult.unrecoverable(
+				new JobException("The failure is not recoverable", cause), globalFailure);
 		}
 
 		restartBackoffTimeStrategy.notifyFailure(cause);
@@ -107,10 +110,12 @@ public class ExecutionFailureHandler {
 
 			return FailureHandlingResult.restartable(
 				verticesToRestart,
-				restartBackoffTimeStrategy.getBackoffTime());
+				restartBackoffTimeStrategy.getBackoffTime(),
+				globalFailure);
 		} else {
 			return FailureHandlingResult.unrecoverable(
-				new JobException("Recovery is suppressed by " + restartBackoffTimeStrategy, cause));
+				new JobException("Recovery is suppressed by " + restartBackoffTimeStrategy, cause),
+				globalFailure);
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
index 51e487d..f45daa0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java
@@ -41,18 +41,22 @@ public class FailureHandlingResult {
 	/** Reason why the failure is not recoverable. */
 	private final Throwable error;
 
+	/** True if the original failure was a global failure. **/
+	private final boolean globalFailure;
+
 	/**
 	 * Creates a result of a set of tasks to restart to recover from the failure.
 	 *
 	 * @param verticesToRestart containing task vertices to restart to recover from the failure
 	 * @param restartDelayMS indicate a delay before conducting the restart
 	 */
-	private FailureHandlingResult(Set<ExecutionVertexID> verticesToRestart, long restartDelayMS) {
+	private FailureHandlingResult(Set<ExecutionVertexID> verticesToRestart, long restartDelayMS, boolean globalFailure) {
 		checkState(restartDelayMS >= 0);
 
 		this.verticesToRestart = Collections.unmodifiableSet(checkNotNull(verticesToRestart));
 		this.restartDelayMS = restartDelayMS;
 		this.error = null;
+		this.globalFailure = globalFailure;
 	}
 
 	/**
@@ -60,10 +64,11 @@ public class FailureHandlingResult {
 	 *
 	 * @param error reason why the failure is not recoverable
 	 */
-	private FailureHandlingResult(Throwable error) {
+	private FailureHandlingResult(Throwable error, boolean globalFailure) {
 		this.verticesToRestart = null;
 		this.restartDelayMS = -1;
 		this.error = checkNotNull(error);
+		this.globalFailure = globalFailure;
 	}
 
 	/**
@@ -115,23 +120,40 @@ public class FailureHandlingResult {
 	}
 
 	/**
+	 * Checks if this failure was a global failure, i.e., coming from a "safety net" failover that involved
+	 * all tasks and should reset also components like the coordinators.
+	 */
+	public boolean isGlobalFailure() {
+		return globalFailure;
+	}
+
+	/**
 	 * Creates a result of a set of tasks to restart to recover from the failure.
 	 *
+	 * <p>The result can be flagged to be from a global failure triggered by the scheduler, rather than from
+	 * the failure of an individual task.
+	 *
 	 * @param verticesToRestart containing task vertices to restart to recover from the failure
 	 * @param restartDelayMS indicate a delay before conducting the restart
 	 * @return result of a set of tasks to restart to recover from the failure
 	 */
-	public static FailureHandlingResult restartable(Set<ExecutionVertexID> verticesToRestart, long restartDelayMS) {
-		return new FailureHandlingResult(verticesToRestart, restartDelayMS);
+	public static FailureHandlingResult restartable(
+			Set<ExecutionVertexID> verticesToRestart,
+			long restartDelayMS,
+			boolean globalFailure) {
+		return new FailureHandlingResult(verticesToRestart, restartDelayMS, globalFailure);
 	}
 
 	/**
 	 * Creates a result that the failure is not recoverable and no restarting should be conducted.
 	 *
+	 * <p>The result can be flagged to be from a global failure triggered by the scheduler, rather than from
+	 * the failure of an individual task.
+	 *
 	 * @param error reason why the failure is not recoverable
 	 * @return result indicating the failure is not recoverable
 	 */
-	public static FailureHandlingResult unrecoverable(Throwable error) {
-		return new FailureHandlingResult(error);
+	public static FailureHandlingResult unrecoverable(Throwable error, boolean globalFailure) {
+		return new FailureHandlingResult(error, globalFailure);
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index dcb6549..e42af43 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -221,6 +221,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 
 		final Set<ExecutionVertexVersion> executionVertexVersions =
 			new HashSet<>(executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
+		final boolean globalRecovery = failureHandlingResult.isGlobalFailure();
 
 		addVerticesToRestartPending(verticesToRestart);
 
@@ -228,7 +229,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 
 		delayExecutor.schedule(
 			() -> FutureUtils.assertNoException(
-				cancelFuture.thenRunAsync(restartTasks(executionVertexVersions), getMainThreadExecutor())),
+				cancelFuture.thenRunAsync(restartTasks(executionVertexVersions, globalRecovery), getMainThreadExecutor())),
 			failureHandlingResult.getRestartDelayMS(),
 			TimeUnit.MILLISECONDS);
 	}
@@ -245,7 +246,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 		}
 	}
 
-	private Runnable restartTasks(final Set<ExecutionVertexVersion> executionVertexVersions) {
+	private Runnable restartTasks(final Set<ExecutionVertexVersion> executionVertexVersions, final boolean isGlobalRecovery) {
 		return () -> {
 			final Set<ExecutionVertexID> verticesToRestart = executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
 
@@ -254,7 +255,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 			resetForNewExecutions(verticesToRestart);
 
 			try {
-				restoreState(verticesToRestart);
+				restoreState(verticesToRestart, isGlobalRecovery);
 			} catch (Throwable t) {
 				handleGlobalFailure(t);
 				return;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index d8908c2..3d510fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -321,18 +321,25 @@ public abstract class SchedulerBase implements SchedulerNG {
 
 	}
 
-	protected void restoreState(final Set<ExecutionVertexID> vertices) throws Exception {
+	protected void restoreState(final Set<ExecutionVertexID> vertices, final boolean isGlobalRecovery) throws Exception {
+		final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
+		if (checkpointCoordinator == null) {
+			return;
+		}
+
 		// if there is checkpointed state, reload it into the executions
-		if (executionGraph.getCheckpointCoordinator() != null) {
-			// abort pending checkpoints to
-			// i) enable new checkpoint triggering without waiting for last checkpoint expired.
-			// ii) ensure the EXACTLY_ONCE semantics if needed.
-			executionGraph.getCheckpointCoordinator().abortPendingCheckpoints(
+
+		// abort pending checkpoints to
+		// i) enable new checkpoint triggering without waiting for last checkpoint expired.
+		// ii) ensure the EXACTLY_ONCE semantics if needed.
+		checkpointCoordinator.abortPendingCheckpoints(
 				new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION));
 
-			executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedStateToAll(
-				getInvolvedExecutionJobVertices(vertices),
-				true);
+		final Set<ExecutionJobVertex> jobVerticesToRestore = getInvolvedExecutionJobVertices(vertices);
+		if (isGlobalRecovery) {
+			checkpointCoordinator.restoreLatestCheckpointedStateToAll(jobVerticesToRestore, true);
+		} else {
+			checkpointCoordinator.restoreLatestCheckpointedStateToSubtasks(jobVerticesToRestore);
 		}
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java
index 8943655..c64d50d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph.failover.flip1;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import java.util.HashSet;
@@ -45,7 +46,7 @@ public class FailureHandlingResultTest extends TestLogger {
 		Set<ExecutionVertexID> tasks = new HashSet<>();
 		tasks.add(new ExecutionVertexID(new JobVertexID(), 0));
 		long delay = 1234;
-		FailureHandlingResult result = FailureHandlingResult.restartable(tasks, delay);
+		FailureHandlingResult result = FailureHandlingResult.restartable(tasks, delay, false);
 
 		assertTrue(result.canRestart());
 		assertEquals(delay, result.getRestartDelayMS());
@@ -65,7 +66,7 @@ public class FailureHandlingResultTest extends TestLogger {
 	public void testRestartingSuppressedFailureHandlingResult() {
 		// create a FailureHandlingResult with error
 		Throwable error = new Exception("test error");
-		FailureHandlingResult result = FailureHandlingResult.unrecoverable(error);
+		FailureHandlingResult result = FailureHandlingResult.unrecoverable(error, false);
 
 		assertFalse(result.canRestart());
 		assertEquals(error, result.getError());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index bb0e68e..a3fb582 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -71,6 +71,7 @@ import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -272,6 +273,17 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 	}
 
 	@Test
+	public void testLocalFailureDoesNotResetToCheckpoint() throws Exception {
+		final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
+		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+		takeCompleteCheckpoint(scheduler, coordinator, new byte[] {37, 11, 83, 4});
+		failAndRestartTask(scheduler, 0);
+
+		assertNull("coordinator should not have a restored checkpoint", coordinator.getLastRestoredCheckpointState());
+	}
+
+	@Test
 	public void testConfirmCheckpointComplete() throws Exception {
 		final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
 		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);


[flink] 02/13: [hotfix][tests] Remove unused temp folder in CheckpointMetadataLoadingTest

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e1fc86b7ac8771da4224c1ebb96fad04831bd3fa
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed May 13 15:15:04 2020 +0200

    [hotfix][tests] Remove unused temp folder in CheckpointMetadataLoadingTest
---
 .../flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java    | 7 -------
 1 file changed, 7 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
index fc93b2c..557e8ba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
@@ -30,12 +30,9 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
 import java.io.ByteArrayOutputStream;
-import java.io.File;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -57,9 +54,6 @@ import static org.mockito.Mockito.when;
  */
 public class CheckpointMetadataLoadingTest {
 
-	@Rule
-	public final TemporaryFolder tmpFolder = new TemporaryFolder();
-
 	/**
 	 * Tests loading and validation of savepoints with correct setup,
 	 * parallelism mismatch, and a missing task.
@@ -67,7 +61,6 @@ public class CheckpointMetadataLoadingTest {
 	@Test
 	public void testLoadAndValidateSavepoint() throws Exception {
 		final Random rnd = new Random();
-		File tmp = tmpFolder.newFolder();
 
 		int parallelism = 128128;
 		long checkpointId = Integer.MAX_VALUE + 123123L;


[flink] 03/13: [FLINK-17670][refactor] Refactor single test in CheckpointMetadataLoadingTest into finer grained tests.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 90e4708c9a5ab24b5f50423a181d75be375e2d1e
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed May 13 16:00:29 2020 +0200

    [FLINK-17670][refactor] Refactor single test in CheckpointMetadataLoadingTest into finer grained tests.
---
 .../checkpoint/CheckpointMetadataLoadingTest.java  | 161 ++++++++++++++-------
 1 file changed, 106 insertions(+), 55 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
index 557e8ba..936fe23 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLo
 import org.junit.Test;
 
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -42,7 +43,6 @@ import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNew
 import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewResultSubpartitionStateHandle;
 import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singleton;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
@@ -54,37 +54,90 @@ import static org.mockito.Mockito.when;
  */
 public class CheckpointMetadataLoadingTest {
 
+	private final ClassLoader cl = getClass().getClassLoader();
+
 	/**
-	 * Tests loading and validation of savepoints with correct setup,
-	 * parallelism mismatch, and a missing task.
+	 * Tests correct savepoint loading.
 	 */
 	@Test
-	public void testLoadAndValidateSavepoint() throws Exception {
-		final Random rnd = new Random();
+	public void testAllStateRestored() throws Exception {
+		final JobID jobId = new JobID();
+		final OperatorID operatorId = new OperatorID();
+		final long checkpointId = Integer.MAX_VALUE + 123123L;
+		final int parallelism = 128128;
 
-		int parallelism = 128128;
-		long checkpointId = Integer.MAX_VALUE + 123123L;
-		JobVertexID jobVertexID = new JobVertexID();
-		OperatorID operatorID = OperatorID.fromJobVertexID(jobVertexID);
+		final CompletedCheckpointStorageLocation testSavepoint = createSavepointWithOperatorSubtaskState(checkpointId, operatorId, parallelism);
+		final Map<JobVertexID, ExecutionJobVertex> tasks = createTasks(operatorId, parallelism, parallelism);
 
-		OperatorSubtaskState subtaskState = new OperatorSubtaskState(
-				new OperatorStreamStateHandle(Collections.emptyMap(), new ByteStreamStateHandle("testHandler", new byte[0])),
-				null,
-				null,
-				null,
-				singleton(createNewInputChannelStateHandle(10, rnd)),
-				singleton(createNewResultSubpartitionStateHandle(10, rnd)));
+		final CompletedCheckpoint loaded = Checkpoints.loadAndValidateCheckpoint(jobId, tasks, testSavepoint, cl, false);
 
-		OperatorState state = new OperatorState(operatorID, parallelism, parallelism);
-		state.putState(0, subtaskState);
+		assertEquals(jobId, loaded.getJobId());
+		assertEquals(checkpointId, loaded.getCheckpointID());
+	}
+
+	/**
+	 * Tests that savepoint loading fails when there is a max-parallelism mismatch.
+	 */
+	@Test
+	public void testMaxParallelismMismatch() throws Exception {
+		final OperatorID operatorId = new OperatorID();
+		final int parallelism = 128128;
 
-		Map<OperatorID, OperatorState> taskStates = new HashMap<>();
-		taskStates.put(operatorID, state);
+		final CompletedCheckpointStorageLocation testSavepoint = createSavepointWithOperatorSubtaskState(242L, operatorId, parallelism);
+		final Map<JobVertexID, ExecutionJobVertex> tasks = createTasks(operatorId, parallelism, parallelism + 1);
 
-		JobID jobId = new JobID();
+		try {
+			Checkpoints.loadAndValidateCheckpoint(new JobID(), tasks, testSavepoint, cl, false);
+			fail("Did not throw expected Exception");
+		} catch (IllegalStateException expected) {
+			assertTrue(expected.getMessage().contains("Max parallelism mismatch"));
+		}
+	}
 
-		// Store savepoint
-		final CheckpointMetadata savepoint = new CheckpointMetadata(checkpointId, taskStates.values(), Collections.emptyList());
+	/**
+	 * Tests that savepoint loading fails when there is non-restored state, but it is not allowed.
+	 */
+	@Test
+	public void testNonRestoredStateWhenDisallowed() throws Exception {
+		final OperatorID operatorId = new OperatorID();
+		final int parallelism = 9;
+
+		final CompletedCheckpointStorageLocation testSavepoint = createSavepointWithOperatorSubtaskState(242L, operatorId, parallelism);
+		final Map<JobVertexID, ExecutionJobVertex> tasks = Collections.emptyMap();
+
+		try {
+			Checkpoints.loadAndValidateCheckpoint(new JobID(), tasks, testSavepoint, cl, false);
+			fail("Did not throw expected Exception");
+		} catch (IllegalStateException expected) {
+			assertTrue(expected.getMessage().contains("allowNonRestoredState"));
+		}
+	}
+
+	/**
+	 * Tests that savepoint loading succeeds when there is non-restored state and it is not allowed.
+	 */
+	@Test
+	public void testNonRestoredStateWhenAllowed() throws Exception {
+		final OperatorID operatorId = new OperatorID();
+		final int parallelism = 9;
+
+		final CompletedCheckpointStorageLocation testSavepoint = createSavepointWithOperatorSubtaskState(242L, operatorId, parallelism);
+		final Map<JobVertexID, ExecutionJobVertex> tasks = Collections.emptyMap();
+
+		final CompletedCheckpoint loaded = Checkpoints.loadAndValidateCheckpoint(new JobID(), tasks, testSavepoint, cl, true);
+
+		assertTrue(loaded.getOperatorStates().isEmpty());
+	}
+
+	// ------------------------------------------------------------------------
+	//  setup utils
+	// ------------------------------------------------------------------------
+
+	private static CompletedCheckpointStorageLocation createSavepointWithOperatorState(
+			final long checkpointId,
+			final OperatorState state) throws IOException {
+
+		final CheckpointMetadata savepoint = new CheckpointMetadata(checkpointId, Collections.singletonList(state), Collections.emptyList());
 		final StreamStateHandle serializedMetadata;
 
 		try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
@@ -92,47 +145,45 @@ public class CheckpointMetadataLoadingTest {
 			serializedMetadata = new ByteStreamStateHandle("checkpoint", os.toByteArray());
 		}
 
-		final CompletedCheckpointStorageLocation storageLocation = new TestCompletedCheckpointStorageLocation(
-				serializedMetadata, "dummy/pointer");
-
-		ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
-		when(vertex.getParallelism()).thenReturn(parallelism);
-		when(vertex.getMaxParallelism()).thenReturn(parallelism);
-		when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly(operatorID)));
+		return new TestCompletedCheckpointStorageLocation(serializedMetadata, "dummy/pointer");
+	}
 
-		Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
-		tasks.put(jobVertexID, vertex);
+	private static CompletedCheckpointStorageLocation createSavepointWithOperatorSubtaskState(
+			final long checkpointId,
+			final OperatorID operatorId,
+			final int parallelism) throws IOException {
 
-		ClassLoader ucl = Thread.currentThread().getContextClassLoader();
+		final Random rnd = new Random();
 
-		// 1) Load and validate: everything correct
-		CompletedCheckpoint loaded = Checkpoints.loadAndValidateCheckpoint(jobId, tasks, storageLocation, ucl, false);
+		final OperatorSubtaskState subtaskState = new OperatorSubtaskState(
+			new OperatorStreamStateHandle(Collections.emptyMap(), new ByteStreamStateHandle("testHandler", new byte[0])),
+			null,
+			null,
+			null,
+			singleton(createNewInputChannelStateHandle(10, rnd)),
+			singleton(createNewResultSubpartitionStateHandle(10, rnd)));
 
-		assertEquals(jobId, loaded.getJobId());
-		assertEquals(checkpointId, loaded.getCheckpointID());
+		final OperatorState state = new OperatorState(operatorId, parallelism, parallelism);
+		state.putState(0, subtaskState);
 
-		// 2) Load and validate: max parallelism mismatch
-		when(vertex.getMaxParallelism()).thenReturn(222);
-		when(vertex.isMaxParallelismConfigured()).thenReturn(true);
+		return createSavepointWithOperatorState(checkpointId, state);
+	}
 
-		try {
-			Checkpoints.loadAndValidateCheckpoint(jobId, tasks, storageLocation, ucl, false);
-			fail("Did not throw expected Exception");
-		} catch (IllegalStateException expected) {
-			assertTrue(expected.getMessage().contains("Max parallelism mismatch"));
-		}
+	private static Map<JobVertexID, ExecutionJobVertex> createTasks(OperatorID operatorId, int parallelism, int maxParallelism) {
+		final JobVertexID vertexId = new JobVertexID(operatorId.getLowerPart(), operatorId.getUpperPart());
 
-		// 3) Load and validate: missing vertex
-		assertNotNull(tasks.remove(jobVertexID));
+		ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
+		when(vertex.getParallelism()).thenReturn(parallelism);
+		when(vertex.getMaxParallelism()).thenReturn(maxParallelism);
+		when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly(operatorId)));
 
-		try {
-			Checkpoints.loadAndValidateCheckpoint(jobId, tasks, storageLocation, ucl, false);
-			fail("Did not throw expected Exception");
-		} catch (IllegalStateException expected) {
-			assertTrue(expected.getMessage().contains("allowNonRestoredState"));
+		if (parallelism != maxParallelism) {
+			when(vertex.isMaxParallelismConfigured()).thenReturn(true);
 		}
 
-		// 4) Load and validate: ignore missing vertex
-		Checkpoints.loadAndValidateCheckpoint(jobId, tasks, storageLocation, ucl, true);
+		Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
+		tasks.put(vertexId, vertex);
+
+		return tasks;
 	}
 }


[flink] 10/13: [FLINK-16357][checkpointing] Offer different methods for "global restore" and "local restore" in CheckpointCoordinator

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7ffa87b1f3c6089391d381d1a94cccf60860d0a3
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed May 13 19:57:59 2020 +0200

    [FLINK-16357][checkpointing] Offer different methods for "global restore" and "local restore" in CheckpointCoordinator
    
    Global restores are meant for initial savepoint restores, and for global failover in the scheduler.
    Global failover in the scheduler happens for example during master failover and as a safety net when
    encountering unexpected/inconsistent situations that might impact correctness.
    
    Local failovers are all common task failures and recoveries.
    
    This commit offers different methods to be called in these two situations, but does not make use of the
    local restore method yet. All calls still go to the global restore, which was the previous behavior in
    all cases.
    
    The difference in the CheckpointCoordinator between local and global restore is currently that OperatorCoordinators
    are only restored during global restores.
    
    As a side effect, this change also eliminates the "failWhenNoCheckpoint" flag outside of the CheckpointCoordinator.
    The flag is exclusively used by the "restoreSavepoint()" method which is a separate call to the Coordinator anyways.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 62 ++++++++++++++++++----
 .../flink/runtime/scheduler/SchedulerBase.java     |  6 +--
 .../CheckpointCoordinatorMasterHooksTest.java      | 12 ++---
 .../CheckpointCoordinatorRestoringTest.java        | 10 ++--
 .../checkpoint/CheckpointCoordinatorTest.java      |  4 +-
 .../checkpoint/CheckpointStateRestoreTest.java     | 22 ++++----
 6 files changed, 76 insertions(+), 40 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index fbdf2d4..9e2a4a7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1127,16 +1127,48 @@ public class CheckpointCoordinator {
 			boolean errorIfNoCheckpoint,
 			boolean allowNonRestoredState) throws Exception {
 
-		return restoreLatestCheckpointedState(new HashSet<>(tasks.values()), errorIfNoCheckpoint, allowNonRestoredState);
+		return restoreLatestCheckpointedStateInternal(new HashSet<>(tasks.values()), true, errorIfNoCheckpoint, allowNonRestoredState);
 	}
 
 	/**
-	 * Restores the latest checkpointed state.
+	 * Restores the latest checkpointed state to a set of subtasks. This method represents a "local"
+	 * or "regional" failover and does restore states to coordinators. Note that a regional failover
+	 * might still include all tasks.
+	 *
+	 * @param tasks Set of job vertices to restore. State for these vertices is
+	 * restored via {@link Execution#setInitialState(JobManagerTaskRestore)}.
+
+	 * @return <code>true</code> if state was restored, <code>false</code> otherwise.
+	 * @throws IllegalStateException If the CheckpointCoordinator is shut down.
+	 * @throws IllegalStateException If no completed checkpoint is available and
+	 *                               the <code>failIfNoCheckpoint</code> flag has been set.
+	 * @throws IllegalStateException If the checkpoint contains state that cannot be
+	 *                               mapped to any job vertex in <code>tasks</code> and the
+	 *                               <code>allowNonRestoredState</code> flag has not been set.
+	 * @throws IllegalStateException If the max parallelism changed for an operator
+	 *                               that restores state from this checkpoint.
+	 * @throws IllegalStateException If the parallelism changed for an operator
+	 *                               that restores <i>non-partitioned</i> state from this
+	 *                               checkpoint.
+	 */
+	public boolean restoreLatestCheckpointedStateToSubtasks(final Set<ExecutionJobVertex> tasks) throws Exception {
+		// when restoring subtasks only we accept potentially unmatched state for the
+		// following reasons
+		//   - the set frequently does not include all Job Vertices (only the ones that are part
+		//     of the restarted region), meaning there will be unmatched state by design.
+		//   - because what we might end up restoring from an original savepoint with unmatched
+		//     state, if there is was no checkpoint yet.
+		return restoreLatestCheckpointedStateInternal(tasks, false, false, true);
+	}
+
+	/**
+	 * Restores the latest checkpointed state to all tasks and all coordinators.
+	 * This method represents a "global restore"-style operation where all stateful tasks
+	 * and coordinators from the given set of Job Vertices are restored.
+	 * are restored to their latest checkpointed state.
 	 *
 	 * @param tasks Set of job vertices to restore. State for these vertices is
 	 * restored via {@link Execution#setInitialState(JobManagerTaskRestore)}.
-	 * @param errorIfNoCheckpoint Fail if no completed checkpoint is available to
-	 * restore from.
 	 * @param allowNonRestoredState Allow checkpoint state that cannot be mapped
 	 * to any job vertex in tasks.
 	 * @return <code>true</code> if state was restored, <code>false</code> otherwise.
@@ -1152,11 +1184,19 @@ public class CheckpointCoordinator {
 	 *                               that restores <i>non-partitioned</i> state from this
 	 *                               checkpoint.
 	 */
-	public boolean restoreLatestCheckpointedState(
+	public boolean restoreLatestCheckpointedStateToAll(
 			final Set<ExecutionJobVertex> tasks,
-			final boolean errorIfNoCheckpoint,
 			final boolean allowNonRestoredState) throws Exception {
 
+		return restoreLatestCheckpointedStateInternal(tasks, true, false, allowNonRestoredState);
+	}
+
+	private boolean restoreLatestCheckpointedStateInternal(
+		final Set<ExecutionJobVertex> tasks,
+		final boolean restoreCoordinators,
+		final boolean errorIfNoCheckpoint,
+		final boolean allowNonRestoredState) throws Exception {
+
 		synchronized (lock) {
 			if (shutdown) {
 				throw new IllegalStateException("CheckpointCoordinator is shut down");
@@ -1202,7 +1242,9 @@ public class CheckpointCoordinator {
 
 			stateAssignmentOperation.assignStates();
 
-			// call master hooks for restore
+			// call master hooks for restore. we currently call them also on "regional restore" because
+			// there is no other failure notification mechanism in the master hooks
+			// ultimately these should get removed anyways in favor of the operator coordinators
 
 			MasterHooks.restoreMasterHooks(
 					masterHooks,
@@ -1211,7 +1253,9 @@ public class CheckpointCoordinator {
 					allowNonRestoredState,
 					LOG);
 
-			restoreStateToCoordinators(operatorStates);
+			if (restoreCoordinators) {
+				restoreStateToCoordinators(operatorStates);
+			}
 
 			// update metrics
 
@@ -1267,7 +1311,7 @@ public class CheckpointCoordinator {
 
 		LOG.info("Reset the checkpoint ID of job {} to {}.", job, nextCheckpointId);
 
-		return restoreLatestCheckpointedState(new HashSet<>(tasks.values()), true, allowNonRestored);
+		return restoreLatestCheckpointedStateInternal(new HashSet<>(tasks.values()), true, true, allowNonRestored);
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 58b077a..d8908c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -244,9 +244,8 @@ public abstract class SchedulerBase implements SchedulerNG {
 
 		if (checkpointCoordinator != null) {
 			// check whether we find a valid checkpoint
-			if (!checkpointCoordinator.restoreLatestCheckpointedState(
+			if (!checkpointCoordinator.restoreLatestCheckpointedStateToAll(
 				new HashSet<>(newExecutionGraph.getAllVertices().values()),
-				false,
 				false)) {
 
 				// check whether we can restore from a savepoint
@@ -331,9 +330,8 @@ public abstract class SchedulerBase implements SchedulerNG {
 			executionGraph.getCheckpointCoordinator().abortPendingCheckpoints(
 				new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION));
 
-			executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState(
+			executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedStateToAll(
 				getInvolvedExecutionJobVertices(vertices),
-				false,
 				true);
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index a935306..91c5ce8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -141,9 +141,8 @@ public class CheckpointCoordinatorMasterHooksTest {
 		cc.addMasterHook(hook2);
 
 		// initialize the hooks
-		cc.restoreLatestCheckpointedState(
+		cc.restoreLatestCheckpointedStateToAll(
 			Collections.emptySet(),
-			false,
 			false);
 		verify(hook1, times(1)).reset();
 		verify(hook2, times(1)).reset();
@@ -280,9 +279,8 @@ public class CheckpointCoordinatorMasterHooksTest {
 		cc.addMasterHook(statefulHook2);
 
 		cc.getCheckpointStore().addCheckpoint(checkpoint);
-		cc.restoreLatestCheckpointedState(
+		cc.restoreLatestCheckpointedStateToAll(
 				Collections.emptySet(),
-				true,
 				false);
 
 		verify(statefulHook1, times(1)).restoreCheckpoint(eq(checkpointId), eq(state1));
@@ -335,18 +333,16 @@ public class CheckpointCoordinatorMasterHooksTest {
 
 		// since we have unmatched state, this should fail
 		try {
-			cc.restoreLatestCheckpointedState(
+			cc.restoreLatestCheckpointedStateToAll(
 					Collections.emptySet(),
-					true,
 					false);
 			fail("exception expected");
 		}
 		catch (IllegalStateException ignored) {}
 
 		// permitting unmatched state should succeed
-		cc.restoreLatestCheckpointedState(
+		cc.restoreLatestCheckpointedStateToAll(
 				Collections.emptySet(),
-				true,
 				true);
 
 		verify(statefulHook, times(1)).restoreCheckpoint(eq(checkpointId), eq(state1));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
index 143e6a8..c2140cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
@@ -204,7 +204,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 		tasks.add(jobVertex1);
 		tasks.add(jobVertex2);
 
-		coord.restoreLatestCheckpointedState(tasks, true, false);
+		assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false));
 
 		// validate that all shared states are registered again after the recovery.
 		for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
@@ -333,7 +333,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 			assertNotNull(savepointFuture.get());
 
 			//restore and jump the latest savepoint
-			coord.restoreLatestCheckpointedState(tasks, true, false);
+			assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false));
 
 			//compare and see if it used the checkpoint's subtaskStates
 			BaseMatcher<JobManagerTaskRestore> matcher = new BaseMatcher<JobManagerTaskRestore>() {
@@ -493,7 +493,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 
 		tasks.add(newJobVertex1);
 		tasks.add(newJobVertex2);
-		coord.restoreLatestCheckpointedState(tasks, true, false);
+		assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false));
 
 		// verify the restored state
 		verifyStateRestore(jobVertexID1, newJobVertex1, keyGroupPartitions1);
@@ -639,7 +639,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 		tasks.add(newJobVertex1);
 		tasks.add(newJobVertex2);
 
-		coord.restoreLatestCheckpointedState(tasks, true, false);
+		assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false));
 
 		fail("The restoration should have failed because the max parallelism changed.");
 	}
@@ -813,7 +813,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger {
 				.setTimer(manuallyTriggeredScheduledExecutor)
 				.build();
 
-		coord.restoreLatestCheckpointedState(tasks, false, true);
+		coord.restoreLatestCheckpointedStateToAll(tasks, true);
 
 		for (int i = 0; i < newJobVertex1.getParallelism(); i++) {
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 23e0f3b..ac48106 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -1930,7 +1930,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
 		coord.setCheckpointStatsTracker(tracker);
 
-		assertTrue(coord.restoreLatestCheckpointedState(Collections.emptySet(), false, true));
+		assertTrue(coord.restoreLatestCheckpointedStateToAll(Collections.emptySet(), true));
 
 		verify(tracker, times(1))
 			.reportRestoredCheckpoint(any(RestoredCheckpointStats.class));
@@ -2046,7 +2046,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
 		// restore the store
 		Set<ExecutionJobVertex> tasks = new HashSet<>();
 		tasks.add(jobVertex1);
-		coord.restoreLatestCheckpointedState(tasks, true, false);
+		assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false));
 
 		// validate that all shared states are registered again after the recovery.
 		cp = 0;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index ed4e516..ed04b21 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -50,6 +50,8 @@ import java.util.Objects;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -138,7 +140,7 @@ public class CheckpointStateRestoreTest {
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 
 			// let the coordinator inject the state
-			coord.restoreLatestCheckpointedState(tasks, true, false);
+			assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false));
 
 			// verify that each stateful vertex got the state
 
@@ -177,13 +179,8 @@ public class CheckpointStateRestoreTest {
 				new CheckpointCoordinatorBuilder()
 					.build();
 
-			try {
-				coord.restoreLatestCheckpointedState(Collections.emptySet(), true, false);
-				fail("this should throw an exception");
-			}
-			catch (IllegalStateException e) {
-				// expected
-			}
+			final boolean restored = coord.restoreLatestCheckpointedStateToAll(Collections.emptySet(), false);
+			assertFalse(restored);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -245,8 +242,8 @@ public class CheckpointStateRestoreTest {
 
 		coord.getCheckpointStore().addCheckpoint(checkpoint);
 
-		coord.restoreLatestCheckpointedState(tasks, true, false);
-		coord.restoreLatestCheckpointedState(tasks, true, true);
+		assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false));
+		assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, true));
 
 		// --- (3) JobVertex missing for task state that is part of the checkpoint ---
 		JobVertexID newJobVertexID = new JobVertexID();
@@ -273,11 +270,12 @@ public class CheckpointStateRestoreTest {
 		coord.getCheckpointStore().addCheckpoint(checkpoint);
 
 		// (i) Allow non restored state (should succeed)
-		coord.restoreLatestCheckpointedState(tasks, true, true);
+		final boolean restored = coord.restoreLatestCheckpointedStateToAll(tasks, true);
+		assertTrue(restored);
 
 		// (ii) Don't allow non restored state (should fail)
 		try {
-			coord.restoreLatestCheckpointedState(tasks, true, false);
+			coord.restoreLatestCheckpointedStateToAll(tasks, false);
 			fail("Did not throw the expected Exception.");
 		} catch (IllegalStateException ignored) {
 		}


[flink] 06/13: [FLINK-16177][refactor] Make test setup logic for OperatorCoordinatorSchedulerTest more flexible.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9f30c7fc0b8dbbcbb9c9c8910fd8b48ad84bb9cd
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon May 11 22:15:50 2020 +0200

    [FLINK-16177][refactor] Make test setup logic for OperatorCoordinatorSchedulerTest more flexible.
---
 .../OperatorCoordinatorSchedulerTest.java          |  92 +++++++++--
 .../runtime/scheduler/SchedulerTestingUtils.java   |  49 +++++-
 .../TestingCheckpointStorageCoordinatorView.java   | 181 +++++++++++++++++++++
 3 files changed, 310 insertions(+), 12 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 8ecf172..9ea633f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -19,6 +19,9 @@
 package org.apache.flink.runtime.operators.coordination;
 
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.checkpoint.Checkpoints;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
@@ -29,10 +32,13 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.scheduler.DefaultScheduler;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.TestingCheckpointStorageCoordinatorView;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -44,7 +50,11 @@ import org.junit.Test;
 
 import javax.annotation.Nullable;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 
 import static org.apache.flink.core.testutils.FlinkMatchers.futureFailedWith;
 import static org.hamcrest.Matchers.contains;
@@ -69,7 +79,7 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 	private final ManuallyTriggeredScheduledExecutorService executor = new ManuallyTriggeredScheduledExecutorService();
 
 	// ------------------------------------------------------------------------
-	//  tests
+	//  tests for scheduling
 	// ------------------------------------------------------------------------
 
 	@Test
@@ -165,6 +175,10 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		assertThat(result, futureFailedWith(TestException.class));
 	}
 
+	// ------------------------------------------------------------------------
+	//  tests for REST request delivery
+	// ------------------------------------------------------------------------
+
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testDeliveringClientRequestToRequestHandler() throws Exception {
@@ -216,39 +230,66 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 	// ------------------------------------------------------------------------
 
 	private DefaultScheduler createScheduler(OperatorCoordinator.Provider provider) throws Exception {
-		return setupTestJobAndScheduler(provider, null, false);
+		return setupTestJobAndScheduler(provider);
 	}
 
 	private DefaultScheduler createAndStartScheduler() throws Exception {
-		final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId), null, false);
+		final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId));
 		scheduler.startScheduling();
 		return scheduler;
 	}
 
 	private DefaultScheduler createSchedulerAndDeployTasks() throws Exception {
-		final DefaultScheduler scheduler = createAndStartScheduler();
+		return createSchedulerAndDeployTasks(new TestingOperatorCoordinator.Provider(testOperatorId));
+	}
+
+	private DefaultScheduler createSchedulerAndDeployTasks(OperatorCoordinator.Provider provider) throws Exception {
+		final DefaultScheduler scheduler = setupTestJobAndScheduler(provider);
+		scheduler.startScheduling();
+		executor.triggerAll();
+		executor.triggerScheduledTasks();
 		SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
 		return scheduler;
 	}
 
 	private DefaultScheduler createSchedulerAndDeployTasks(TaskExecutorOperatorEventGateway gateway) throws Exception {
-		final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId), gateway, false);
+		final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId), gateway, null);
 		scheduler.startScheduling();
+		executor.triggerAll();
+		executor.triggerScheduledTasks();
 		SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
 		return scheduler;
 	}
 
-	private DefaultScheduler createSchedulerWithCheckpointing() throws Exception {
-		final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId), null, true);
+	private DefaultScheduler createSchedulerWithRestoredSavepoint(byte[] coordinatorState) throws Exception {
+		final byte[] savepointMetadata = serializeAsCheckpointMetadata(testOperatorId, coordinatorState);
+		final String savepointPointer = "testingSavepointPointer";
+
+		final TestingCheckpointStorageCoordinatorView storage = new TestingCheckpointStorageCoordinatorView();
+		storage.registerSavepoint(savepointPointer, savepointMetadata);
+
+		final Consumer<JobGraph> savepointConfigurer = (jobGraph) -> {
+			SchedulerTestingUtils.enableCheckpointing(jobGraph, storage.asStateBackend());
+			jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPointer));
+		};
+
+		final DefaultScheduler scheduler = setupTestJobAndScheduler(
+				new TestingOperatorCoordinator.Provider(testOperatorId),
+				null,
+				savepointConfigurer);
+
 		scheduler.startScheduling();
-		SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
 		return scheduler;
 	}
 
+	private DefaultScheduler setupTestJobAndScheduler(OperatorCoordinator.Provider provider) throws Exception {
+		return setupTestJobAndScheduler(provider, null, null);
+	}
+
 	private DefaultScheduler setupTestJobAndScheduler(
 			OperatorCoordinator.Provider provider,
 			@Nullable TaskExecutorOperatorEventGateway taskExecutorOperatorEventGateway,
-			boolean enableCheckpoints) throws Exception {
+			@Nullable Consumer<JobGraph> jobGraphPreProcessing) throws Exception {
 
 		final JobVertex vertex = new JobVertex("Vertex with OperatorCoordinator", testVertexId);
 		vertex.setInvokableClass(NoOpInvokable.class);
@@ -256,8 +297,9 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		vertex.setParallelism(2);
 
 		final JobGraph jobGraph = new JobGraph("test job with OperatorCoordinator", vertex);
-		if (enableCheckpoints) {
-			SchedulerTestingUtils.enableCheckpointing(jobGraph);
+		SchedulerTestingUtils.enableCheckpointing(jobGraph);
+		if (jobGraphPreProcessing != null) {
+			jobGraphPreProcessing.accept(jobGraph);
 		}
 
 		final DefaultScheduler scheduler = taskExecutorOperatorEventGateway == null
@@ -301,6 +343,22 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		return scheduler.getExecutionVertex(id).getJobVertex();
 	}
 
+	private static OperatorState createOperatorState(OperatorID id, byte[] coordinatorState) {
+		final OperatorState state = new OperatorState(id, 10, 16384);
+		state.setCoordinatorState(new ByteStreamStateHandle("name", coordinatorState));
+		return state;
+	}
+
+	private static byte[] serializeAsCheckpointMetadata(OperatorID id, byte[] coordinatorState) throws IOException {
+		final OperatorState state = createOperatorState(id, coordinatorState);
+		final CheckpointMetadata metadata = new CheckpointMetadata(
+			1337L, Collections.singletonList(state), Collections.emptyList());
+
+		final ByteArrayOutputStream out = new ByteArrayOutputStream();
+		Checkpoints.storeCheckpointMetadata(metadata, out);
+		return out.toByteArray();
+	}
+
 	// ------------------------------------------------------------------------
 	//  test mocks
 	// ------------------------------------------------------------------------
@@ -321,6 +379,18 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		}
 	}
 
+	private static final class CoordinatorThatFailsCheckpointing extends TestingOperatorCoordinator {
+
+		public CoordinatorThatFailsCheckpointing(Context context) {
+			super(context);
+		}
+
+		@Override
+		public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) {
+			throw new Error(new TestException());
+		}
+	}
+
 	private static final class FailingTaskExecutorOperatorEventGateway implements TaskExecutorOperatorEventGateway {
 
 		@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index 99458e4..40fe221 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.SlotProviderStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
 import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
@@ -61,9 +62,11 @@ import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker;
 import org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
 import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -72,6 +75,9 @@ import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -181,6 +187,10 @@ public class SchedulerTestingUtils {
 	}
 
 	public static void enableCheckpointing(final JobGraph jobGraph) {
+		enableCheckpointing(jobGraph, null);
+	}
+
+	public static void enableCheckpointing(final JobGraph jobGraph, @Nullable StateBackend stateBackend) {
 		final List<JobVertexID> triggerVertices = new ArrayList<>();
 		final List<JobVertexID> allVertices = new ArrayList<>();
 
@@ -202,9 +212,18 @@ public class SchedulerTestingUtils {
 			false,
 			0);
 
+		SerializedValue<StateBackend> serializedStateBackend = null;
+		if (stateBackend != null) {
+			try {
+				serializedStateBackend = new SerializedValue<>(stateBackend);
+			} catch (IOException e) {
+				throw new RuntimeException("could not serialize state backend", e);
+			}
+		}
+
 		jobGraph.setSnapshotSettings(new JobCheckpointingSettings(
 				triggerVertices, allVertices, allVertices,
-				config, null));
+				config, serializedStateBackend));
 	}
 
 	public static Collection<ExecutionAttemptID> getAllCurrentExecutionAttempts(DefaultScheduler scheduler) {
@@ -213,6 +232,29 @@ public class SchedulerTestingUtils {
 			.collect(Collectors.toList());
 	}
 
+	public static ExecutionState getExecutionState(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
+		final ExecutionJobVertex ejv = getJobVertex(scheduler, jvid);
+		return ejv.getTaskVertices()[subtask].getCurrentExecutionAttempt().getState();
+	}
+
+	public static void failExecution(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
+		final ExecutionJobVertex ejv = getJobVertex(scheduler, jvid);
+		assert ejv != null;
+		final ExecutionAttemptID attemptID = ejv.getTaskVertices()[subtask].getCurrentExecutionAttempt().getAttemptId();
+
+		scheduler.updateTaskExecutionState(new TaskExecutionState(
+			ejv.getJobId(), attemptID, ExecutionState.FAILED, new Exception("test task failure")));
+	}
+
+	public static void setExecutionToRunning(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
+		final ExecutionJobVertex ejv = getJobVertex(scheduler, jvid);
+		assert ejv != null;
+		final ExecutionAttemptID attemptID = ejv.getTaskVertices()[subtask].getCurrentExecutionAttempt().getAttemptId();
+
+		scheduler.updateTaskExecutionState(new TaskExecutionState(
+			ejv.getJobId(), attemptID, ExecutionState.RUNNING));
+	}
+
 	public static void setAllExecutionsToRunning(final DefaultScheduler scheduler) {
 		final JobID jid = scheduler.requestJob().getJobID();
 		getAllCurrentExecutionAttempts(scheduler).forEach(
@@ -250,6 +292,11 @@ public class SchedulerTestingUtils {
 		return scheduler.getCheckpointCoordinator();
 	}
 
+	private static ExecutionJobVertex getJobVertex(DefaultScheduler scheduler, JobVertexID jobVertexId) {
+		final ExecutionVertexID id = new ExecutionVertexID(jobVertexId, 0);
+		return scheduler.getExecutionVertex(id).getJobVertex();
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static final class TaskExecutorOperatorEventGatewayAdapter extends SimpleAckingTaskManagerGateway {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageCoordinatorView.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageCoordinatorView.java
new file mode 100644
index 0000000..e06d594
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageCoordinatorView.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.runtime.state.memory.NonPersistentMetadataCheckpointStorageLocation;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+
+/**
+ * A testing implementation of the {@link CheckpointStorageCoordinatorView}.
+ */
+@SuppressWarnings("serial")
+public class TestingCheckpointStorageCoordinatorView implements CheckpointStorage, java.io.Serializable {
+
+	private final HashMap<String, TestingCompletedCheckpointStorageLocation> registeredSavepoints = new HashMap<>();
+
+	// ------------------------------------------------------------------------
+	//  test setup methods
+	// ------------------------------------------------------------------------
+
+	public void registerSavepoint(String pointer, byte[] metadata) {
+		registeredSavepoints.put(pointer, new TestingCompletedCheckpointStorageLocation(pointer, metadata));
+	}
+
+	// ------------------------------------------------------------------------
+	//  CheckpointStorageCoordinatorView methods
+	// ------------------------------------------------------------------------
+
+	@Override
+	public boolean supportsHighlyAvailableStorage() {
+		return false;
+	}
+
+	@Override
+	public boolean hasDefaultSavepointLocation() {
+		return false;
+	}
+
+	@Override
+	public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException {
+		final CompletedCheckpointStorageLocation location = registeredSavepoints.get(externalPointer);
+		if (location != null) {
+			return location;
+		} else {
+			throw new IOException("Could not find savepoint for pointer: " + externalPointer);
+		}
+	}
+
+	@Override
+	public void initializeBaseLocations() throws IOException {}
+
+	@Override
+	public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
+		return new NonPersistentMetadataCheckpointStorageLocation(Integer.MAX_VALUE);
+	}
+
+	@Override
+	public CheckpointStorageLocation initializeLocationForSavepoint(long checkpointId, @Nullable String externalLocationPointer) throws IOException {
+		return new NonPersistentMetadataCheckpointStorageLocation(Integer.MAX_VALUE);
+	}
+
+	@Override
+	public CheckpointStreamFactory resolveCheckpointStorageLocation(
+			long checkpointId,
+			CheckpointStorageLocationReference reference) {
+		return new MemCheckpointStreamFactory(Integer.MAX_VALUE);
+	}
+
+	@Override
+	public CheckpointStreamFactory.CheckpointStateOutputStream createTaskOwnedStateStream() {
+		return new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(Integer.MAX_VALUE);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utils
+	// ------------------------------------------------------------------------
+
+	public StateBackend asStateBackend() {
+		return new FactoringStateBackend(this);
+	}
+
+	// ------------------------------------------------------------------------
+	//  internal support classes
+	// ------------------------------------------------------------------------
+
+	private static final class TestingCompletedCheckpointStorageLocation
+			implements CompletedCheckpointStorageLocation, java.io.Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		private final String pointer;
+		private final byte[] metadata;
+
+		TestingCompletedCheckpointStorageLocation(String pointer, byte[] metadata) {
+			this.pointer = pointer;
+			this.metadata = metadata;
+		}
+
+		@Override
+		public String getExternalPointer() {
+			return pointer;
+		}
+
+		@Override
+		public StreamStateHandle getMetadataHandle() {
+			return new ByteStreamStateHandle(pointer, metadata);
+		}
+
+		@Override
+		public void disposeStorageLocation() throws IOException {}
+	}
+
+	// ------------------------------------------------------------------------
+	//   Everything below here is necessary only to make it possible to
+	//   pass the CheckpointStorageCoordinatorView to the CheckpointCoordinator
+	//   via the JobGraph, because that part expects a StateBackend
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A StateBackend whose only purpose is to create a given CheckpointStorage.
+	 */
+	private static final class FactoringStateBackend implements StateBackend {
+
+		private final TestingCheckpointStorageCoordinatorView testingCoordinatorView;
+
+		private FactoringStateBackend(TestingCheckpointStorageCoordinatorView testingCoordinatorView) {
+			this.testingCoordinatorView = testingCoordinatorView;
+		}
+
+
+		@Override
+		public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException {
+			return testingCoordinatorView.resolveCheckpoint(externalPointer);
+		}
+
+		@Override
+		public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
+			return testingCoordinatorView;
+		}
+
+		@Override
+		public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws Exception {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier, @Nonnull Collection<OperatorStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws Exception {
+			throw new UnsupportedOperationException();
+		}
+	}
+}


[flink] 09/13: [FLINK-16177][checkpointing] Integrate OperatorCoordinator fully with checkpointing.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 52f7b9d070f73ee6acf2a87ba81d3d243d01f845
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed May 13 12:43:58 2020 +0200

    [FLINK-16177][checkpointing] Integrate OperatorCoordinator fully with checkpointing.
    
      - This adds verious tests for OperatorCoordinator checkpointing
      - The checkpoint coordinator also restores state to the OperatorCoordinator
---
 .../java/org/apache/flink/util/ExceptionUtils.java |  37 +++++
 .../runtime/checkpoint/CheckpointCoordinator.java  |  17 +++
 .../checkpoint/StateAssignmentOperation.java       |   8 +-
 .../OperatorCoordinatorSchedulerTest.java          | 156 ++++++++++++++++++++-
 .../coordination/TestingOperatorCoordinator.java   |  35 ++++-
 .../runtime/scheduler/SchedulerTestingUtils.java   |  36 +++++
 .../apache/flink/core/testutils/FlinkMatchers.java |  61 ++++++--
 7 files changed, 333 insertions(+), 17 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 6512531..8982b29 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -462,6 +462,43 @@ public final class ExceptionUtils {
 	}
 
 	/**
+	 * Checks whether a throwable chain contains a specific type of exception and returns it.
+	 * This method handles {@link SerializedThrowable}s in the chain and deserializes them with
+	 * the given ClassLoader.
+	 *
+	 * <p>SerializedThrowables are often used when exceptions might come from dynamically loaded code and
+	 * be transported over RPC / HTTP for better error reporting.
+	 * The receiving processes or threads might not have the dynamically loaded code available.
+	 *
+	 * @param throwable the throwable chain to check.
+	 * @param searchType the type of exception to search for in the chain.
+	 * @param classLoader the ClassLoader to use when encountering a SerializedThrowable.
+	 * @return Optional throwable of the requested type if available, otherwise empty
+	 */
+	public static <T extends Throwable> Optional<T> findThrowableSerializedAware(
+			Throwable throwable,
+			Class<T> searchType,
+			ClassLoader classLoader) {
+
+		if (throwable == null || searchType == null) {
+			return Optional.empty();
+		}
+
+		Throwable t = throwable;
+		while (t != null) {
+			if (searchType.isAssignableFrom(t.getClass())) {
+				return Optional.of(searchType.cast(t));
+			} else if (t instanceof SerializedThrowable) {
+				t = ((SerializedThrowable) t).deserializeError(classLoader);
+			} else {
+				t = t.getCause();
+			}
+		}
+
+		return Optional.empty();
+	}
+
+	/**
 	 * Checks whether a throwable chain contains an exception matching a predicate and returns it.
 	 *
 	 * @param throwable the throwable chain to check.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 5d2db1d..fbdf2d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistryFactory;
 import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
@@ -1210,6 +1211,8 @@ public class CheckpointCoordinator {
 					allowNonRestoredState,
 					LOG);
 
+			restoreStateToCoordinators(operatorStates);
+
 			// update metrics
 
 			if (statsTracker != null) {
@@ -1416,6 +1419,20 @@ public class CheckpointCoordinator {
 			initDelay, baseInterval, TimeUnit.MILLISECONDS);
 	}
 
+	private void restoreStateToCoordinators(final Map<OperatorID, OperatorState> operatorStates) throws Exception {
+		for (OperatorCoordinatorCheckpointContext coordContext : coordinatorsToCheckpoint) {
+			final OperatorState state = operatorStates.get(coordContext.operatorId());
+			if (state == null) {
+				continue;
+			}
+
+			final ByteStreamStateHandle coordinatorState = state.getCoordinatorState();
+			if (coordinatorState != null) {
+				coordContext.coordinator().resetToCheckpoint(coordinatorState.getData());
+			}
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  job status listener that schedules / cancels periodic checkpoints
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index 9644430..8ce8e55 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -89,7 +89,7 @@ public class StateAssignmentOperation {
 			// find the states of all operators belonging to this task
 			List<OperatorIDPair> operatorIDPairs = executionJobVertex.getOperatorIDs();
 			List<OperatorState> operatorStates = new ArrayList<>(operatorIDPairs.size());
-			boolean statelessTask = true;
+			boolean statelessSubTasks = true;
 			for (OperatorIDPair operatorIDPair : operatorIDPairs) {
 				OperatorID operatorID = operatorIDPair.getUserDefinedOperatorID().orElse(operatorIDPair.getGeneratedOperatorID());
 
@@ -99,12 +99,12 @@ public class StateAssignmentOperation {
 						operatorID,
 						executionJobVertex.getParallelism(),
 						executionJobVertex.getMaxParallelism());
-				} else {
-					statelessTask = false;
+				} else if (operatorState.getNumberCollectedStates() > 0) {
+					statelessSubTasks = false;
 				}
 				operatorStates.add(operatorState);
 			}
-			if (!statelessTask) { // skip tasks where no operator has any state
+			if (!statelessSubTasks) { // skip tasks where no operator has any state
 				assignAttemptState(executionJobVertex, operatorStates);
 			}
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 0354e3d..bb0e68e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -19,7 +19,9 @@
 package org.apache.flink.runtime.operators.coordination;
 
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
@@ -37,28 +39,36 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.scheduler.DefaultScheduler;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TestingCheckpointStorageCoordinatorView;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
+import org.hamcrest.Matcher;
+import org.junit.After;
 import org.junit.Test;
 
 import javax.annotation.Nullable;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Collections;
+import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 
 import static org.apache.flink.core.testutils.FlinkMatchers.futureFailedWith;
+import static org.apache.flink.core.testutils.FlinkMatchers.futureWillCompleteExceptionally;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
@@ -77,6 +87,15 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 
 	private final ManuallyTriggeredScheduledExecutorService executor = new ManuallyTriggeredScheduledExecutorService();
 
+	private DefaultScheduler createdScheduler;
+
+	@After
+	public void shutdownScheduler() throws Exception{
+		if (createdScheduler != null) {
+			createdScheduler.suspend(new Exception("shutdown"));
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  tests for scheduling
 	// ------------------------------------------------------------------------
@@ -184,6 +203,86 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 	}
 
 	// ------------------------------------------------------------------------
+	//  tests for checkpointing
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testTakeCheckpoint() throws Exception {
+		final byte[] checkpointData = new byte[656];
+		new Random().nextBytes(checkpointData);
+
+		final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
+		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+		final CompletableFuture<CompletedCheckpoint> checkpointFuture = triggerCheckpoint(scheduler);
+		coordinator.getLastTriggeredCheckpoint().complete(checkpointData);
+		acknowledgeCurrentCheckpoint(scheduler);
+
+		final OperatorState state = checkpointFuture.get().getOperatorStates().get(testOperatorId);
+		assertArrayEquals(checkpointData, getStateHandleContents(state.getCoordinatorState()));
+	}
+
+	@Test
+	public void testSnapshotSyncFailureFailsCheckpoint() throws Exception {
+		final OperatorCoordinator.Provider failingCoordinatorProvider =
+			new TestingOperatorCoordinator.Provider(testOperatorId, CoordinatorThatFailsCheckpointing::new);
+		final DefaultScheduler scheduler = createSchedulerAndDeployTasks(failingCoordinatorProvider);
+
+		final CompletableFuture<?> checkpointFuture = triggerCheckpoint(scheduler);
+
+		assertThat(checkpointFuture, futureWillCompleteWithTestException());
+	}
+
+	@Test
+	public void testSnapshotAsyncFailureFailsCheckpoint() throws Exception {
+		final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
+		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+		final CompletableFuture<?> checkpointFuture = triggerCheckpoint(scheduler);
+		final CompletableFuture<?> coordinatorStateFuture = coordinator.getLastTriggeredCheckpoint();
+
+		coordinatorStateFuture.completeExceptionally(new TestException());
+
+		assertThat(checkpointFuture, futureWillCompleteWithTestException());
+	}
+
+	@Test
+	public void testSavepointRestoresCoordinator() throws Exception {
+		final byte[] testCoordinatorState = new byte[123];
+		new Random().nextBytes(testCoordinatorState);
+
+		final DefaultScheduler scheduler = createSchedulerWithRestoredSavepoint(testCoordinatorState);
+		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+		final byte[] restoredState = coordinator.getLastRestoredCheckpointState();
+		assertArrayEquals(testCoordinatorState, restoredState);
+	}
+
+	@Test
+	public void testGlobalFailureResetsToCheckpoint() throws Exception {
+		final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
+		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+		final byte[] coordinatorState = new byte[] {7, 11, 3, 5};
+		takeCompleteCheckpoint(scheduler, coordinator, coordinatorState);
+		failGlobalAndRestart(scheduler, new TestException());
+
+		assertArrayEquals("coordinator should have a restored checkpoint",
+				coordinatorState, coordinator.getLastRestoredCheckpointState());
+	}
+
+	@Test
+	public void testConfirmCheckpointComplete() throws Exception {
+		final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
+		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+		final long checkpointId = takeCompleteCheckpoint(scheduler, coordinator, new byte[] {37, 11, 83, 4});
+
+		assertEquals("coordinator should be notified of completed checkpoint",
+				checkpointId, coordinator.getLastCheckpointComplete());
+	}
+
+	// ------------------------------------------------------------------------
 	//  tests for REST request delivery
 	// ------------------------------------------------------------------------
 
@@ -312,7 +411,8 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 			@Nullable TaskExecutorOperatorEventGateway taskExecutorOperatorEventGateway,
 			@Nullable Consumer<JobGraph> jobGraphPreProcessing) throws Exception {
 
-		final JobVertex vertex = new JobVertex("Vertex with OperatorCoordinator", testVertexId);
+		final OperatorIDPair opIds = OperatorIDPair.of(new OperatorID(), provider.getOperatorId());
+		final JobVertex vertex = new JobVertex("Vertex with OperatorCoordinator", testVertexId, Collections.singletonList(opIds));
 		vertex.setInvokableClass(NoOpInvokable.class);
 		vertex.addOperatorCoordinator(new SerializedValue<>(provider));
 		vertex.setParallelism(2);
@@ -328,6 +428,7 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 				: SchedulerTestingUtils.createScheduler(jobGraph, executor, taskExecutorOperatorEventGateway);
 		scheduler.setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
+		this.createdScheduler = scheduler;
 		return scheduler;
 	}
 
@@ -370,6 +471,43 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		assertEquals(ExecutionState.RUNNING, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, subtask));
 	}
 
+	private void failGlobalAndRestart(DefaultScheduler scheduler, Throwable reason) {
+		scheduler.handleGlobalFailure(reason);
+		SchedulerTestingUtils.setAllExecutionsToCancelled(scheduler);
+
+		executor.triggerScheduledTasks();   // this handles the restart / redeploy
+		SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
+
+		// guard the test assumptions: This must bring the tasks back to RUNNING
+		assertEquals(ExecutionState.RUNNING, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, 0));
+	}
+
+	private CompletableFuture<CompletedCheckpoint> triggerCheckpoint(DefaultScheduler scheduler) throws Exception {
+		final CompletableFuture<CompletedCheckpoint> future = SchedulerTestingUtils.triggerCheckpoint(scheduler);
+		executor.triggerAll();
+		return future;
+	}
+
+	private void acknowledgeCurrentCheckpoint(DefaultScheduler scheduler) {
+		executor.triggerAll();
+		SchedulerTestingUtils.acknowledgeCurrentCheckpoint(scheduler);
+		executor.triggerAll();
+	}
+
+	private long takeCompleteCheckpoint(
+			DefaultScheduler scheduler,
+			TestingOperatorCoordinator testingOperatorCoordinator,
+			byte[] coordinatorState) throws Exception {
+
+		final CompletableFuture<CompletedCheckpoint> checkpointFuture = triggerCheckpoint(scheduler);
+
+		testingOperatorCoordinator.getLastTriggeredCheckpoint().complete(coordinatorState);
+		acknowledgeCurrentCheckpoint(scheduler);
+
+		// wait until checkpoint has completed
+		return checkpointFuture.get().getCheckpointID();
+	}
+
 	// ------------------------------------------------------------------------
 	//  miscellaneous utilities
 	// ------------------------------------------------------------------------
@@ -395,6 +533,22 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		return out.toByteArray();
 	}
 
+	private static <T> Matcher<CompletableFuture<T>> futureWillCompleteWithTestException() {
+		return futureWillCompleteExceptionally(
+				(e) -> ExceptionUtils.findThrowableSerializedAware(
+						e, TestException.class, OperatorCoordinatorSchedulerTest.class.getClassLoader()).isPresent(),
+				Duration.ofSeconds(10),
+				"A TestException in the cause chain");
+	}
+
+	private static byte[] getStateHandleContents(StreamStateHandle stateHandle) {
+		if (stateHandle instanceof ByteStreamStateHandle) {
+			return ((ByteStreamStateHandle) stateHandle).getData();
+		}
+		fail("other state handles not implemented");
+		return null;
+	}
+
 	// ------------------------------------------------------------------------
 	//  test mocks
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
index 9889ba8..5d93bcc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
@@ -25,7 +25,9 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * A simple testing implementation of the {@link OperatorCoordinator}.
@@ -36,11 +38,20 @@ class TestingOperatorCoordinator implements OperatorCoordinator {
 
 	private final ArrayList<Integer> failedTasks = new ArrayList<>();
 
+	@Nullable
+	private byte[] lastRestoredCheckpointState;
+
+	private BlockingQueue<CompletableFuture<byte[]>> triggeredCheckpoints;
+
+	private BlockingQueue<Long> lastCheckpointComplete;
+
 	private boolean started;
 	private boolean closed;
 
 	public TestingOperatorCoordinator(OperatorCoordinator.Context context) {
 		this.context = context;
+		this.triggeredCheckpoints = new LinkedBlockingQueue<>();
+		this.lastCheckpointComplete = new LinkedBlockingQueue<>();
 	}
 
 	// ------------------------------------------------------------------------
@@ -65,17 +76,22 @@ class TestingOperatorCoordinator implements OperatorCoordinator {
 
 	@Override
 	public CompletableFuture<byte[]> checkpointCoordinator(long checkpointId) {
-		throw new UnsupportedOperationException();
+		final CompletableFuture<byte[]> coordinatorStateFuture = new CompletableFuture<>();
+
+		boolean added = triggeredCheckpoints.offer(coordinatorStateFuture);
+		assert added; // guard the test assumptions
+
+		return coordinatorStateFuture;
 	}
 
 	@Override
 	public void checkpointComplete(long checkpointId) {
-		throw new UnsupportedOperationException();
+		lastCheckpointComplete.offer(checkpointId);
 	}
 
 	@Override
 	public void resetToCheckpoint(byte[] checkpointData) {
-		throw new UnsupportedOperationException();
+		lastRestoredCheckpointState = checkpointData;
 	}
 
 	// ------------------------------------------------------------------------
@@ -96,6 +112,19 @@ class TestingOperatorCoordinator implements OperatorCoordinator {
 		return failedTasks;
 	}
 
+	@Nullable
+	public byte[] getLastRestoredCheckpointState() {
+		return lastRestoredCheckpointState;
+	}
+
+	public CompletableFuture<byte[]> getLastTriggeredCheckpoint() throws InterruptedException {
+		return triggeredCheckpoints.take();
+	}
+
+	public long getLastCheckpointComplete() throws InterruptedException {
+		return lastCheckpointComplete.take();
+	}
+
 	// ------------------------------------------------------------------------
 	//  The provider for this coordinator implementation
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index 40fe221..124e733 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
@@ -89,6 +90,7 @@ import java.util.stream.StreamSupport;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
 
 /**
  * A utility class to create {@link DefaultScheduler} instances for testing.
@@ -262,6 +264,13 @@ public class SchedulerTestingUtils {
 		);
 	}
 
+	public static void setAllExecutionsToCancelled(final DefaultScheduler scheduler) {
+		final JobID jid = scheduler.requestJob().getJobID();
+		getAllCurrentExecutionAttempts(scheduler).forEach(
+			(attemptId) -> scheduler.updateTaskExecutionState(new TaskExecutionState(jid, attemptId, ExecutionState.CANCELED))
+		);
+	}
+
 	public static void acknowledgePendingCheckpoint(final DefaultScheduler scheduler, final long checkpointId) throws CheckpointException {
 		final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
 		final JobID jid = scheduler.requestJob().getJobID();
@@ -272,6 +281,33 @@ public class SchedulerTestingUtils {
 		}
 	}
 
+	public static CompletableFuture<CompletedCheckpoint> triggerCheckpoint(DefaultScheduler scheduler) throws Exception {
+		final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
+		return checkpointCoordinator.triggerCheckpoint(false);
+	}
+
+	public static void acknowledgeCurrentCheckpoint(DefaultScheduler scheduler) {
+		final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
+		assertEquals("Coordinator has not ", 1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+
+		final PendingCheckpoint pc = checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
+
+		// because of races against the async thread in the coordinator, we need to wait here until the
+		// coordinator state is acknowledged. This can be removed once the CheckpointCoordinator is
+		// executes all actions in the Scheduler's main thread executor.
+		while (pc.getNumberOfNonAcknowledgedOperatorCoordinators() > 0) {
+			try {
+				Thread.sleep(1);
+			} catch (InterruptedException e) {
+				Thread.currentThread().interrupt();
+				fail("interrupted");
+			}
+		}
+
+		getAllCurrentExecutionAttempts(scheduler).forEach(
+			(attemptId) -> scheduler.acknowledgeCheckpoint(pc.getJobId(), attemptId, pc.getCheckpointId(), new CheckpointMetrics(), null));
+	}
+
 	public static CompletedCheckpoint takeCheckpoint(DefaultScheduler scheduler) throws Exception {
 		final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
 		checkpointCoordinator.triggerCheckpoint(false);
diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkMatchers.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkMatchers.java
index 0303272..ada7bf4 100644
--- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkMatchers.java
+++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/FlinkMatchers.java
@@ -21,6 +21,8 @@ package org.apache.flink.core.testutils;
 import org.hamcrest.Description;
 import org.hamcrest.TypeSafeDiagnosingMatcher;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.time.Duration;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
@@ -28,6 +30,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
 
 /**
  * Some reusable hamcrest matchers for Flink.
@@ -60,6 +63,18 @@ public class FlinkMatchers {
 	/**
 	 * Checks whether {@link CompletableFuture} will completed exceptionally within a certain time.
 	 */
+	public static <T> FutureWillFailMatcher<T> futureWillCompleteExceptionally(
+			Function<Throwable, Boolean> exceptionCheck,
+			Duration timeout,
+			String checkDescription) {
+		Objects.requireNonNull(exceptionCheck, "exceptionType should not be null");
+		Objects.requireNonNull(timeout, "timeout should not be null");
+		return new FutureWillFailMatcher<>(exceptionCheck, timeout, checkDescription);
+	}
+
+	/**
+	 * Checks whether {@link CompletableFuture} will completed exceptionally within a certain time.
+	 */
 	public static <T> FutureWillFailMatcher<T> futureWillCompleteExceptionally(Duration timeout) {
 		return futureWillCompleteExceptionally(Throwable.class, timeout);
 	}
@@ -118,14 +133,31 @@ public class FlinkMatchers {
 
 	private static final class FutureWillFailMatcher<T> extends TypeSafeDiagnosingMatcher<CompletableFuture<T>> {
 
-		private final Class<? extends Throwable> expectedException;
+		private final Function<Throwable, Boolean> exceptionValidator;
 
 		private final Duration timeout;
 
-		FutureWillFailMatcher(Class<? extends Throwable> expectedException, Duration timeout) {
+		private final String validationDescription;
+
+		FutureWillFailMatcher(
+			Class<? extends Throwable> expectedException,
+			Duration timeout) {
+
 			super(CompletableFuture.class);
-			this.expectedException = expectedException;
+			this.exceptionValidator = (e) -> expectedException.isAssignableFrom(e.getClass());
 			this.timeout = timeout;
+			this.validationDescription = expectedException.getName();
+		}
+
+		FutureWillFailMatcher(
+				Function<Throwable, Boolean> exceptionValidator,
+				Duration timeout,
+				String validationDescription) {
+
+			super(CompletableFuture.class);
+			this.exceptionValidator = exceptionValidator;
+			this.timeout = timeout;
+			this.validationDescription = validationDescription;
 		}
 
 		@Override
@@ -144,18 +176,29 @@ public class FlinkMatchers {
 				return false;
 			}
 			catch (ExecutionException e) {
-				if (e.getCause() == null || !expectedException.isAssignableFrom(e.getCause().getClass())) {
-					mismatchDescription.appendText("Future completed with different exception: " + e.getCause());
-					return false;
+				final Throwable cause = e.getCause();
+				if (cause != null && exceptionValidator.apply(cause)) {
+					return true;
+				}
+
+				String otherDescription = "(null)";
+				if (cause != null) {
+					final StringWriter stm = new StringWriter();
+					try (PrintWriter wrt = new PrintWriter(stm)) {
+						cause.printStackTrace(wrt);
+					}
+					otherDescription = stm.toString();
 				}
-				return true;
+
+				mismatchDescription.appendText("Future completed with different exception: " + otherDescription);
+				return false;
 			}
 		}
 
 		@Override
 		public void describeTo(Description description) {
-			description.appendText("A CompletableFuture that will failed within " +
-				timeout.toMillis() + " milliseconds with: " + expectedException.getName());
+			description.appendText("A CompletableFuture that will have failed within " +
+				timeout.toMillis() + " milliseconds with: " + validationDescription);
 		}
 	}
 }


[flink] 01/13: [FLINK-17674][state] Type OperatorCoordinator state in checkpoints strictly to ByteStreamStateHandle

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e3413106ccd030e44d5e4690430c841370a1f988
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed May 13 13:31:58 2020 +0200

    [FLINK-17674][state] Type OperatorCoordinator state in checkpoints strictly to ByteStreamStateHandle
    
    State restore on the master side happens in the JobManager's main thread and must hence not do any
    potentially blocking I/O operations.
    Typing the state to ByteStreamStateHandle makes sure that we can retrieve the data directly without I/O
    as a strict contract.
    
    If state restoring becomes an asynchronous operation we can relax this restriction.
---
 .../checkpoint/OperatorCoordinatorCheckpoints.java |  9 ++----
 .../flink/runtime/checkpoint/OperatorState.java    |  8 +++---
 .../runtime/checkpoint/PendingCheckpoint.java      |  4 +--
 .../metadata/MetadataV2V3SerializerBase.java       | 14 ++++++++++
 .../checkpoint/metadata/MetadataV3Serializer.java  |  2 +-
 .../runtime/checkpoint/PendingCheckpointTest.java  |  4 +--
 .../checkpoint/metadata/CheckpointTestUtils.java   |  8 ++++--
 .../runtime/state/TestingStreamStateHandle.java    | 32 ++++------------------
 8 files changed, 36 insertions(+), 45 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
index cfacec8..2423bcb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 
 import java.util.ArrayList;
@@ -130,13 +129,9 @@ final class OperatorCoordinatorCheckpoints {
 	static final class CoordinatorSnapshot {
 
 		final OperatorCoordinatorCheckpointContext coordinator;
-		final StreamStateHandle state;
-
-		CoordinatorSnapshot(OperatorCoordinatorCheckpointContext coordinator, StreamStateHandle state) {
-			// if this is not true any more, we need more elaborate dispose/cleanup handling
-			// see comment above the class.
-			assert state instanceof ByteStreamStateHandle;
+		final ByteStreamStateHandle state;
 
+		CoordinatorSnapshot(OperatorCoordinatorCheckpointContext coordinator, ByteStreamStateHandle state) {
 			this.coordinator = coordinator;
 			this.state = state;
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
index 998a3bd..a4994c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.CompositeStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
-import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
@@ -50,7 +50,7 @@ public class OperatorState implements CompositeStateHandle {
 
 	/** The state of the operator coordinator. Null, if no such state exists. */
 	@Nullable
-	private StreamStateHandle coordinatorState;
+	private ByteStreamStateHandle coordinatorState;
 
 	/** The parallelism of the operator when it was checkpointed. */
 	private final int parallelism;
@@ -96,13 +96,13 @@ public class OperatorState implements CompositeStateHandle {
 		}
 	}
 
-	public void setCoordinatorState(@Nullable StreamStateHandle coordinatorState) {
+	public void setCoordinatorState(@Nullable ByteStreamStateHandle coordinatorState) {
 		checkState(this.coordinatorState == null, "coordinator state already set");
 		this.coordinatorState = coordinatorState;
 	}
 
 	@Nullable
-	public StreamStateHandle getCoordinatorState() {
+	public ByteStreamStateHandle getCoordinatorState() {
 		return coordinatorState;
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index c69c1cc..2a0eba7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.StateUtil;
-import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -432,7 +432,7 @@ public class PendingCheckpoint {
 
 	public TaskAcknowledgeResult acknowledgeCoordinatorState(
 			OperatorCoordinatorCheckpointContext coordinatorInfo,
-			@Nullable StreamStateHandle stateHandle) {
+			@Nullable ByteStreamStateHandle stateHandle) {
 
 		synchronized (lock) {
 			if (discarded) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
index fe8343a..44ad464 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
@@ -468,6 +468,7 @@ public abstract class MetadataV2V3SerializerBase {
 		dos.flush();
 	}
 
+	@Nullable
 	static StreamStateHandle deserializeStreamStateHandle(
 			DataInputStream dis,
 			@Nullable DeserializationContext context) throws IOException {
@@ -498,6 +499,19 @@ public abstract class MetadataV2V3SerializerBase {
 		}
 	}
 
+	@Nullable
+	static ByteStreamStateHandle deserializeAndCheckByteStreamStateHandle(
+			DataInputStream dis,
+			@Nullable DeserializationContext context) throws IOException {
+
+		final StreamStateHandle handle = deserializeStreamStateHandle(dis, context);
+		if (handle == null || handle instanceof ByteStreamStateHandle) {
+			return (ByteStreamStateHandle) handle;
+		} else {
+			throw new IOException("Expected a ByteStreamStateHandle but found a " + handle.getClass().getName());
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  utilities
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java
index 6bf98c9..1563abb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV3Serializer.java
@@ -122,7 +122,7 @@ public class MetadataV3Serializer extends MetadataV2V3SerializerBase implements
 		final OperatorState operatorState = new OperatorState(jobVertexId, parallelism, maxParallelism);
 
 		// Coordinator state
-		operatorState.setCoordinatorState(deserializeStreamStateHandle(dis, context));
+		operatorState.setCoordinatorState(deserializeAndCheckByteStreamStateHandle(dis, context));
 
 		// Sub task states
 		final int numSubTaskStates = dis.readInt();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index d5cec9b..ee292b0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -35,9 +35,9 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinator;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.SharedStateRegistry;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TestingStreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 
 import org.hamcrest.Matchers;
 import org.junit.Assert;
@@ -519,7 +519,7 @@ public class PendingCheckpointTest {
 		return checkpoint;
 	}
 
-	private PendingCheckpoint createPendingCheckpointWithAcknowledgedCoordinators(StreamStateHandle... handles) throws IOException {
+	private PendingCheckpoint createPendingCheckpointWithAcknowledgedCoordinators(ByteStreamStateHandle... handles) throws IOException {
 		OperatorCoordinatorCheckpointContext[] coords = new OperatorCoordinatorCheckpointContext[handles.length];
 		for (int i = 0; i < handles.length; i++) {
 			coords[i] = createOperatorCoordinator();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
index 64cfafd..be4ee3f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
@@ -84,7 +84,7 @@ public class CheckpointTestUtils {
 
 			final boolean hasCoordinatorState = random.nextBoolean();
 			if (hasCoordinatorState) {
-				final StreamStateHandle stateHandle = createDummyStreamStateHandle(random, basePath);
+				final ByteStreamStateHandle stateHandle = createDummyByteStreamStreamStateHandle(random);
 				taskState.setCoordinatorState(stateHandle);
 			}
 
@@ -222,7 +222,11 @@ public class CheckpointTestUtils {
 			createDummyStreamStateHandle(rnd, basePath));
 	}
 
-	public static StreamStateHandle createDummyStreamStateHandle(Random rnd, String basePath) {
+	public static ByteStreamStateHandle createDummyByteStreamStreamStateHandle(Random rnd) {
+		return (ByteStreamStateHandle) createDummyStreamStateHandle(rnd, null);
+	}
+
+	public static StreamStateHandle createDummyStreamStateHandle(Random rnd, @Nullable String basePath) {
 		if (!isSavepoint(basePath)) {
 			return new ByteStreamStateHandle(
 				String.valueOf(createRandomUUID(rnd)),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingStreamStateHandle.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingStreamStateHandle.java
index 7a968d2..2471fd4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingStreamStateHandle.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingStreamStateHandle.java
@@ -18,52 +18,30 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 
-import javax.annotation.Nullable;
+import java.util.UUID;
 
 /**
  * A simple test mock for a {@link StreamStateHandle}.
  */
-public class TestingStreamStateHandle implements StreamStateHandle {
+public class TestingStreamStateHandle extends ByteStreamStateHandle {
 	private static final long serialVersionUID = 1L;
 
-	@Nullable
-	private final FSDataInputStream inputStream;
-
-	private final long size;
-
 	private boolean disposed;
 
 	public TestingStreamStateHandle() {
-		this(null, 0L);
-	}
-
-	public TestingStreamStateHandle(@Nullable FSDataInputStream inputStream, long size) {
-		this.inputStream = inputStream;
-		this.size = size;
+		super(UUID.randomUUID().toString(), new byte[0]);
 	}
 
 	// ------------------------------------------------------------------------
 
 	@Override
-	public FSDataInputStream openInputStream() {
-		if (inputStream == null) {
-			throw new UnsupportedOperationException("no input stream provided");
-		}
-		return inputStream;
-	}
-
-	@Override
 	public void discardState() {
+		super.discardState();
 		disposed = true;
 	}
 
-	@Override
-	public long getStateSize() {
-		return size;
-	}
-
 	// ------------------------------------------------------------------------
 
 	public boolean isDisposed() {


[flink] 04/13: [FLINK-17670][checkpointing] Savepoint handling of "non-restored" state also takes OperatorCoordinator state into account.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 382cf09bbfb4e383e373af8ebf1a9dee13b8a632
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed May 13 16:10:49 2020 +0200

    [FLINK-17670][checkpointing] Savepoint handling of "non-restored" state also takes OperatorCoordinator state into account.
    
    The savepoint restore now also fails when there is unmatched state from an OperatorCoordinator only,
    and non-restored state is not allowed.
---
 .../flink/runtime/checkpoint/Checkpoints.java      | 24 ++++++++++++++--------
 .../checkpoint/CheckpointMetadataLoadingTest.java  | 23 +++++++++++++++++++++
 2 files changed, 39 insertions(+), 8 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
index 6e23131..23df694 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java
@@ -171,16 +171,13 @@ public class Checkpoints {
 			} else if (allowNonRestoredState) {
 				LOG.info("Skipping savepoint state for operator {}.", operatorState.getOperatorID());
 			} else {
+				if (operatorState.getCoordinatorState() != null) {
+					throwNonRestoredStateException(checkpointPointer, operatorState.getOperatorID());
+				}
+
 				for (OperatorSubtaskState operatorSubtaskState : operatorState.getStates()) {
 					if (operatorSubtaskState.hasState()) {
-						String msg = String.format("Failed to rollback to checkpoint/savepoint %s. " +
-										"Cannot map checkpoint/savepoint state for operator %s to the new program, " +
-										"because the operator is not available in the new program. If " +
-										"you want to allow to skip this, you can set the --allowNonRestoredState " +
-										"option on the CLI.",
-								checkpointPointer, operatorState.getOperatorID());
-
-						throw new IllegalStateException(msg);
+						throwNonRestoredStateException(checkpointPointer, operatorState.getOperatorID());
 					}
 				}
 
@@ -202,6 +199,17 @@ public class Checkpoints {
 				location);
 	}
 
+	private static void throwNonRestoredStateException(String checkpointPointer, OperatorID operatorId) {
+		String msg = String.format("Failed to rollback to checkpoint/savepoint %s. " +
+				"Cannot map checkpoint/savepoint state for operator %s to the new program, " +
+				"because the operator is not available in the new program. If " +
+				"you want to allow to skip this, you can set the --allowNonRestoredState " +
+				"option on the CLI.",
+			checkpointPointer, operatorId);
+
+		throw new IllegalStateException(msg);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Savepoint Disposal Hooks
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
index 936fe23..987d77d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
@@ -129,6 +129,29 @@ public class CheckpointMetadataLoadingTest {
 		assertTrue(loaded.getOperatorStates().isEmpty());
 	}
 
+	/**
+	 * Tests that savepoint loading fails when there is non-restored coordinator state only,
+	 * and non-restored state is not allowed.
+	 */
+	@Test
+	public void testUnmatchedCoordinatorOnlyStateFails() throws Exception {
+		final OperatorID operatorID = new OperatorID();
+		final int maxParallelism = 1234;
+
+		final OperatorState state = new OperatorState(operatorID, maxParallelism / 2, maxParallelism);
+		state.setCoordinatorState(new ByteStreamStateHandle("coordinatorState", new byte[0]));
+
+		final CompletedCheckpointStorageLocation testSavepoint = createSavepointWithOperatorState(42L, state);
+		final Map<JobVertexID, ExecutionJobVertex> tasks = Collections.emptyMap();
+
+		try {
+			Checkpoints.loadAndValidateCheckpoint(new JobID(), tasks, testSavepoint, cl, false);
+			fail("Did not throw expected Exception");
+		} catch (IllegalStateException expected) {
+			assertTrue(expected.getMessage().contains("allowNonRestoredState"));
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  setup utils
 	// ------------------------------------------------------------------------


[flink] 05/13: [FLINK-17671][tests][refactor] Simplify ManuallyTriggeredScheduledExecutor for better debugability.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8ad1ba3b7cc9d80e7bcd09e140950bce971d7657
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue May 12 16:42:47 2020 +0200

    [FLINK-17671][tests][refactor] Simplify ManuallyTriggeredScheduledExecutor for better debugability.
---
 .../runtime/concurrent/ManuallyTriggeredScheduledExecutor.java   | 9 +--------
 1 file changed, 1 insertion(+), 8 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
index 04d1f48..0453bcc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ManuallyTriggeredScheduledExecutor.java
@@ -32,7 +32,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -43,8 +42,6 @@ import java.util.stream.Collectors;
  */
 public class ManuallyTriggeredScheduledExecutor implements ScheduledExecutor {
 
-	private final Executor executorDelegate;
-
 	private final ArrayDeque<Runnable> queuedRunnables = new ArrayDeque<>();
 
 	private final ConcurrentLinkedQueue<ScheduledTask<?>> nonPeriodicScheduledTasks =
@@ -53,10 +50,6 @@ public class ManuallyTriggeredScheduledExecutor implements ScheduledExecutor {
 	private final ConcurrentLinkedQueue<ScheduledTask<?>> periodicScheduledTasks =
 		new ConcurrentLinkedQueue<>();
 
-	public ManuallyTriggeredScheduledExecutor() {
-		this.executorDelegate = Runnable::run;
-	}
-
 	@Override
 	public void execute(@Nonnull Runnable command) {
 		synchronized (queuedRunnables) {
@@ -82,7 +75,7 @@ public class ManuallyTriggeredScheduledExecutor implements ScheduledExecutor {
 			next = queuedRunnables.removeFirst();
 		}
 
-		CompletableFuture.runAsync(next, executorDelegate).join();
+		next.run();
 	}
 
 	/**


[flink] 07/13: [FLINK-17672][scheduler] OperatorCoordinators receive failure notifications on task failure instead of restarts

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2430dd8b8e38c284a69c71e9be6098e7db2d8ca4
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue May 12 17:19:16 2020 +0200

    [FLINK-17672][scheduler] OperatorCoordinators receive failure notifications on task failure instead of restarts
---
 .../runtime/executiongraph/ExecutionVertex.java    |  2 -
 .../flink/runtime/scheduler/DefaultScheduler.java  |  9 ++++
 .../flink/runtime/scheduler/SchedulerBase.java     |  4 ++
 .../OperatorCoordinatorSchedulerTest.java          | 60 +++++++++++++++++-----
 4 files changed, 61 insertions(+), 14 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 2a0b432..6acc519 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -650,8 +650,6 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 				}
 			}
 
-			jobVertex.getOperatorCoordinators().forEach((c -> c.subtaskFailed(getParallelSubtaskIndex())));
-
 			CoLocationGroup grp = jobVertex.getCoLocationGroup();
 			if (grp != null) {
 				locationConstraint = grp.getLocationConstraint(subTaskIndex);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index f31f84b..c7ff47d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler;
 import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
@@ -186,10 +187,18 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 
 	private void handleTaskFailure(final ExecutionVertexID executionVertexId, @Nullable final Throwable error) {
 		setGlobalFailureCause(error);
+		notifyCoordinatorsAboutTaskFailure(executionVertexId, error);
 		final FailureHandlingResult failureHandlingResult = executionFailureHandler.getFailureHandlingResult(executionVertexId, error);
 		maybeRestartTasks(failureHandlingResult);
 	}
 
+	private void notifyCoordinatorsAboutTaskFailure(final ExecutionVertexID executionVertexId, @Nullable final Throwable error) {
+		final ExecutionJobVertex jobVertex = getExecutionJobVertex(executionVertexId.getJobVertexId());
+		final int subtaskIndex = executionVertexId.getSubtaskIndex();
+
+		jobVertex.getOperatorCoordinators().forEach(c -> c.subtaskFailed(subtaskIndex));
+	}
+
 	@Override
 	public void handleGlobalFailure(final Throwable error) {
 		setGlobalFailureCause(error);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 6996854..58b077a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -405,6 +405,10 @@ public abstract class SchedulerBase implements SchedulerNG {
 		return executionGraph.getAllVertices().get(executionVertexId.getJobVertexId()).getTaskVertices()[executionVertexId.getSubtaskIndex()];
 	}
 
+	public ExecutionJobVertex getExecutionJobVertex(final JobVertexID jobVertexId) {
+		return executionGraph.getAllVertices().get(jobVertexId);
+	}
+
 	protected JobGraph getJobGraph() {
 		return jobGraph;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 9ea633f..0354e3d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -40,7 +40,6 @@ import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.state.TestingCheckpointStorageCoordinatorView;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
-import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedValue;
@@ -129,12 +128,23 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 	}
 
 	@Test
-	public void taskFailureNotifiesCoordinator() throws Exception {
+	public void deployingTaskFailureNotifiesCoordinator() throws Exception {
 		final DefaultScheduler scheduler = createAndStartScheduler();
 		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
 
 		failTask(scheduler, 1);
-		executor.triggerScheduledTasks();
+
+		assertEquals(1, coordinator.getFailedTasks().size());
+		assertThat(coordinator.getFailedTasks(), contains(1));
+		assertThat(coordinator.getFailedTasks(), not(contains(0)));
+	}
+
+	@Test
+	public void runningTaskFailureNotifiesCoordinator() throws Exception {
+		final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
+		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+		failTask(scheduler, 1);
 
 		assertEquals(1, coordinator.getFailedTasks().size());
 		assertThat(coordinator.getFailedTasks(), contains(1));
@@ -146,10 +156,8 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
 		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
 
-		failTask(scheduler, 0);
-		executor.triggerScheduledTasks();
-		failTask(scheduler, 0);
-		executor.triggerScheduledTasks();
+		failAndRestartTask(scheduler, 0);
+		failAndRestartTask(scheduler, 0);
 
 		assertEquals(2, coordinator.getFailedTasks().size());
 		assertThat(coordinator.getFailedTasks(), contains(0, 0));
@@ -236,6 +244,11 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 	private DefaultScheduler createAndStartScheduler() throws Exception {
 		final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId));
 		scheduler.startScheduling();
+		executor.triggerAll();
+
+		// guard test assumptions: this brings tasks into DEPLOYING state
+		assertEquals(ExecutionState.DEPLOYING, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, 0));
+
 		return scheduler;
 	}
 
@@ -249,6 +262,10 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		executor.triggerAll();
 		executor.triggerScheduledTasks();
 		SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
+
+		// guard test assumptions: this brings tasks into RUNNING state
+		assertEquals(ExecutionState.RUNNING, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, 0));
+
 		return scheduler;
 	}
 
@@ -258,6 +275,10 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		executor.triggerAll();
 		executor.triggerScheduledTasks();
 		SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
+
+		// guard test assumptions: this brings tasks into RUNNING state
+		assertEquals(ExecutionState.RUNNING, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, 0));
+
 		return scheduler;
 	}
 
@@ -326,12 +347,27 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 	// ------------------------------------------------------------------------
 
 	private void failTask(DefaultScheduler scheduler, int subtask) {
-		final ExecutionJobVertex ejv = getJobVertex(scheduler, testVertexId);
-		assert ejv != null;
-		final ExecutionAttemptID attemptID = ejv.getTaskVertices()[subtask].getCurrentExecutionAttempt().getAttemptId();
+		SchedulerTestingUtils.failExecution(scheduler, testVertexId, subtask);
+		executor.triggerAll();
+
+		// guard the test assumptions: This must not lead to a restart, but must keep the task in FAILED state
+		assertEquals(ExecutionState.FAILED, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, subtask));
+	}
+
+	private void failAndRedeployTask(DefaultScheduler scheduler, int subtask) {
+		failTask(scheduler, subtask);
+		executor.triggerScheduledTasks();
+
+		// guard the test assumptions: This must lead to a restarting and redeploying
+		assertEquals(ExecutionState.DEPLOYING, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, subtask));
+	}
+
+	private void failAndRestartTask(DefaultScheduler scheduler, int subtask) {
+		failAndRedeployTask(scheduler, subtask);
+		SchedulerTestingUtils.setExecutionToRunning(scheduler, testVertexId, subtask);
 
-		scheduler.updateTaskExecutionState(new TaskExecutionState(
-				ejv.getJobId(), attemptID, ExecutionState.FAILED, new Exception("test task failure")));
+		// guard the test assumptions: This must bring the task back to RUNNING
+		assertEquals(ExecutionState.RUNNING, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, subtask));
 	}
 
 	// ------------------------------------------------------------------------


[flink] 13/13: [FLINK-17702][scheduler] Cancellations during failover also notify the OperatorCoordinator as "failed tasks"

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 79aa7d115db5c4b5471b415dccbcfcb5b0d10249
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu May 14 22:50:56 2020 +0200

    [FLINK-17702][scheduler] Cancellations during failover also notify the OperatorCoordinator as "failed tasks"
    
    This closes #12137
---
 .../flink/runtime/scheduler/DefaultScheduler.java  | 24 +++++++++++-
 .../OperatorCoordinatorSchedulerTest.java          | 43 ++++++++++++++++++++++
 2 files changed, 66 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index e42af43..f1a9924 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.ThrowingSlotProvider;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
@@ -274,8 +275,12 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 	}
 
 	private CompletableFuture<?> cancelExecutionVertex(final ExecutionVertexID executionVertexId) {
+		final ExecutionVertex vertex = getExecutionVertex(executionVertexId);
+
+		notifyCoordinatorOfCancellation(vertex);
+
 		executionSlotAllocator.cancel(executionVertexId);
-		return executionVertexOperations.cancel(getExecutionVertex(executionVertexId));
+		return executionVertexOperations.cancel(vertex);
 	}
 
 	@Override
@@ -468,4 +473,21 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 			handleTaskDeploymentFailure(executionVertexId, e);
 		}
 	}
+
+	private void notifyCoordinatorOfCancellation(ExecutionVertex vertex) {
+		// this method makes a best effort to filter out duplicate notifications, meaning cases where
+		// the coordinator was already notified for that specific task
+		// we don't notify if the task is already FAILED, CANCELLING, or CANCELED
+
+		final ExecutionState currentState = vertex.getExecutionState();
+		if (currentState == ExecutionState.FAILED ||
+				currentState == ExecutionState.CANCELING ||
+				currentState == ExecutionState.CANCELED) {
+			return;
+		}
+
+		for (OperatorCoordinator coordinator : vertex.getJobVertex().getOperatorCoordinators()) {
+			coordinator.subtaskFailed(vertex.getParallelSubtaskIndex(), null);
+		}
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 80ec9c9..935e0ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -52,6 +52,7 @@ import org.apache.flink.util.TestLogger;
 
 import org.hamcrest.Matcher;
 import org.junit.After;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import javax.annotation.Nullable;
@@ -67,6 +68,7 @@ import java.util.function.Consumer;
 import static org.apache.flink.core.testutils.FlinkMatchers.futureFailedWith;
 import static org.apache.flink.core.testutils.FlinkMatchers.futureWillCompleteExceptionally;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertArrayEquals;
@@ -173,6 +175,17 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 	}
 
 	@Test
+	public void cancellationAsPartOfFailoverNotifiesCoordinator() throws Exception {
+		final DefaultScheduler scheduler = createSchedulerWithAllRestartOnFailureAndDeployTasks();
+		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+		failTask(scheduler, 1);
+
+		assertEquals(2, coordinator.getFailedTasks().size());
+		assertThat(coordinator.getFailedTasks(), containsInAnyOrder(0, 1));
+	}
+
+	@Test
 	public void taskRepeatedFailureNotifyCoordinator() throws Exception {
 		final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
 		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
@@ -204,6 +217,36 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		assertThat(result, futureFailedWith(TestException.class));
 	}
 
+	// THESE TESTS BELOW SHOULD LEGITIMATELY WORK, BUT THE SCHEDULER ITSELF SEEMS TO NOT HANDLE
+	// THIS SITUATION AT THE MOMENT
+	// WE KEEP THESE TESTS HERE TO ENABLE THEM ONCE THE SCHEDULER'S CONTRACT SUPPORTS THEM
+
+	@Ignore
+	@Test
+	public void deployingTaskCancellationNotifiesCoordinator() throws Exception {
+		final DefaultScheduler scheduler = createAndStartScheduler();
+		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+		cancelTask(scheduler, 1);
+
+		assertEquals(1, coordinator.getFailedTasks().size());
+		assertThat(coordinator.getFailedTasks(), contains(1));
+		assertThat(coordinator.getFailedTasks(), not(contains(0)));
+	}
+
+	@Ignore
+	@Test
+	public void runningTaskCancellationNotifiesCoordinator() throws Exception {
+		final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
+		final TestingOperatorCoordinator coordinator = getCoordinator(scheduler);
+
+		cancelTask(scheduler, 0);
+
+		assertEquals(1, coordinator.getFailedTasks().size());
+		assertThat(coordinator.getFailedTasks(), contains(0));
+		assertThat(coordinator.getFailedTasks(), not(contains(1)));
+	}
+
 	// ------------------------------------------------------------------------
 	//  tests for checkpointing
 	// ------------------------------------------------------------------------


[flink] 12/13: [FLINK-17702][tests][refactor] Refactor test utils to support different failover strategies.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bfef3db4bd95d03b4d551d99cd10aedde1c326fc
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu May 14 19:05:22 2020 +0200

    [FLINK-17702][tests][refactor] Refactor test utils to support different failover strategies.
---
 .../flink/runtime/scheduler/SchedulerBase.java     |  9 +++
 .../OperatorCoordinatorSchedulerTest.java          | 64 ++++++++++++++--------
 .../runtime/scheduler/SchedulerTestingUtils.java   | 39 +++++++------
 3 files changed, 72 insertions(+), 40 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 3d510fd..c2c911a8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -981,4 +981,13 @@ public abstract class SchedulerBase implements SchedulerNG {
 		}
 		return coordinatorMap;
 	}
+
+	// ------------------------------------------------------------------------
+	//  access utils for testing
+	// ------------------------------------------------------------------------
+
+	@VisibleForTesting
+	JobID getJobId() {
+		return jobGraph.getJobID();
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index a3fb582..80ec9c9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorSer
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.failover.flip1.RestartAllFailoverStrategy;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -367,29 +368,21 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		return createSchedulerAndDeployTasks(new TestingOperatorCoordinator.Provider(testOperatorId));
 	}
 
+	private DefaultScheduler createSchedulerWithAllRestartOnFailureAndDeployTasks() throws Exception {
+		final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId), null, null, true);
+		scheduleAllTasksToRunning(scheduler);
+		return scheduler;
+	}
+
 	private DefaultScheduler createSchedulerAndDeployTasks(OperatorCoordinator.Provider provider) throws Exception {
 		final DefaultScheduler scheduler = setupTestJobAndScheduler(provider);
-		scheduler.startScheduling();
-		executor.triggerAll();
-		executor.triggerScheduledTasks();
-		SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
-
-		// guard test assumptions: this brings tasks into RUNNING state
-		assertEquals(ExecutionState.RUNNING, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, 0));
-
+		scheduleAllTasksToRunning(scheduler);
 		return scheduler;
 	}
 
 	private DefaultScheduler createSchedulerAndDeployTasks(TaskExecutorOperatorEventGateway gateway) throws Exception {
-		final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId), gateway, null);
-		scheduler.startScheduling();
-		executor.triggerAll();
-		executor.triggerScheduledTasks();
-		SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
-
-		// guard test assumptions: this brings tasks into RUNNING state
-		assertEquals(ExecutionState.RUNNING, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, 0));
-
+		final DefaultScheduler scheduler = setupTestJobAndScheduler(new TestingOperatorCoordinator.Provider(testOperatorId), gateway, null, false);
+		scheduleAllTasksToRunning(scheduler);
 		return scheduler;
 	}
 
@@ -408,20 +401,22 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		final DefaultScheduler scheduler = setupTestJobAndScheduler(
 				new TestingOperatorCoordinator.Provider(testOperatorId),
 				null,
-				savepointConfigurer);
+				savepointConfigurer,
+				false);
 
 		scheduler.startScheduling();
 		return scheduler;
 	}
 
 	private DefaultScheduler setupTestJobAndScheduler(OperatorCoordinator.Provider provider) throws Exception {
-		return setupTestJobAndScheduler(provider, null, null);
+		return setupTestJobAndScheduler(provider, null, null, false);
 	}
 
 	private DefaultScheduler setupTestJobAndScheduler(
 			OperatorCoordinator.Provider provider,
 			@Nullable TaskExecutorOperatorEventGateway taskExecutorOperatorEventGateway,
-			@Nullable Consumer<JobGraph> jobGraphPreProcessing) throws Exception {
+			@Nullable Consumer<JobGraph> jobGraphPreProcessing,
+			boolean restartAllOnFailover) throws Exception {
 
 		final OperatorIDPair opIds = OperatorIDPair.of(new OperatorID(), provider.getOperatorId());
 		final JobVertex vertex = new JobVertex("Vertex with OperatorCoordinator", testVertexId, Collections.singletonList(opIds));
@@ -435,15 +430,30 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 			jobGraphPreProcessing.accept(jobGraph);
 		}
 
-		final DefaultScheduler scheduler = taskExecutorOperatorEventGateway == null
-				? SchedulerTestingUtils.createScheduler(jobGraph, executor)
-				: SchedulerTestingUtils.createScheduler(jobGraph, executor, taskExecutorOperatorEventGateway);
+		final SchedulerTestingUtils.DefaultSchedulerBuilder schedulerBuilder = taskExecutorOperatorEventGateway == null
+				? SchedulerTestingUtils.createSchedulerBuilder(jobGraph, executor)
+				: SchedulerTestingUtils.createSchedulerBuilder(jobGraph, executor, taskExecutorOperatorEventGateway);
+		if (restartAllOnFailover) {
+			schedulerBuilder.setFailoverStrategyFactory(new RestartAllFailoverStrategy.Factory());
+		}
+
+		final DefaultScheduler scheduler = schedulerBuilder.build();
 		scheduler.setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forMainThread());
 
 		this.createdScheduler = scheduler;
 		return scheduler;
 	}
 
+	private void scheduleAllTasksToRunning(DefaultScheduler scheduler) {
+		scheduler.startScheduling();
+		executor.triggerAll();
+		executor.triggerScheduledTasks();
+		SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
+
+		// guard test assumptions: this brings tasks into RUNNING state
+		assertEquals(ExecutionState.RUNNING, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, 0));
+	}
+
 	private TestingOperatorCoordinator getCoordinator(DefaultScheduler scheduler) {
 		final ExecutionJobVertex vertexWithCoordinator = getJobVertex(scheduler, testVertexId);
 		assertNotNull("vertex for coordinator not found", vertexWithCoordinator);
@@ -494,6 +504,14 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger {
 		assertEquals(ExecutionState.RUNNING, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, 0));
 	}
 
+	private void cancelTask(DefaultScheduler scheduler, int subtask) {
+		SchedulerTestingUtils.canceledExecution(scheduler, testVertexId, subtask);
+		executor.triggerAll();
+
+		// guard the test assumptions: This must not lead to a restart, but must keep the task in FAILED state
+		assertEquals(ExecutionState.CANCELED, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, subtask));
+	}
+
 	private CompletableFuture<CompletedCheckpoint> triggerCheckpoint(DefaultScheduler scheduler) throws Exception {
 		final CompletableFuture<CompletedCheckpoint> future = SchedulerTestingUtils.triggerCheckpoint(scheduler);
 		executor.triggerAll();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index 124e733..6f62067 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -142,14 +142,14 @@ public class SchedulerTestingUtils {
 			.build();
 	}
 
-	public static DefaultScheduler createScheduler(
+	public static DefaultSchedulerBuilder createSchedulerBuilder(
 			JobGraph jobGraph,
 			ManuallyTriggeredScheduledExecutorService asyncExecutor) throws Exception {
 
 		return createScheduler(jobGraph, asyncExecutor, new SimpleAckingTaskManagerGateway());
 	}
 
-	public static DefaultScheduler createScheduler(
+	public static DefaultSchedulerBuilder createSchedulerBuilder(
 			JobGraph jobGraph,
 			ManuallyTriggeredScheduledExecutorService asyncExecutor,
 			TaskExecutorOperatorEventGateway operatorEventGateway) throws Exception {
@@ -161,7 +161,7 @@ public class SchedulerTestingUtils {
 		return createScheduler(jobGraph, asyncExecutor, gateway);
 	}
 
-	public static DefaultScheduler createScheduler(
+	public static DefaultSchedulerBuilder createScheduler(
 			JobGraph jobGraph,
 			ManuallyTriggeredScheduledExecutorService asyncExecutor,
 			TaskManagerGateway taskManagerGateway) throws Exception {
@@ -171,8 +171,7 @@ public class SchedulerTestingUtils {
 			.setDelayExecutor(asyncExecutor)
 			.setSchedulingStrategyFactory(new EagerSchedulingStrategy.Factory())
 			.setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0))
-			.setExecutionSlotAllocatorFactory(new TestExecutionSlotAllocatorFactory(taskManagerGateway))
-			.build();
+			.setExecutionSlotAllocatorFactory(new TestExecutionSlotAllocatorFactory(taskManagerGateway));
 	}
 
 	public static DefaultExecutionSlotAllocatorFactory createDefaultExecutionSlotAllocatorFactory(
@@ -240,32 +239,32 @@ public class SchedulerTestingUtils {
 	}
 
 	public static void failExecution(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
-		final ExecutionJobVertex ejv = getJobVertex(scheduler, jvid);
-		assert ejv != null;
-		final ExecutionAttemptID attemptID = ejv.getTaskVertices()[subtask].getCurrentExecutionAttempt().getAttemptId();
+		final ExecutionAttemptID attemptID = getAttemptId(scheduler, jvid, subtask);
+		scheduler.updateTaskExecutionState(new TaskExecutionState(
+			scheduler.getJobId(), attemptID, ExecutionState.FAILED, new Exception("test task failure")));
+	}
 
+	public static void canceledExecution(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
+		final ExecutionAttemptID attemptID = getAttemptId(scheduler, jvid, subtask);
 		scheduler.updateTaskExecutionState(new TaskExecutionState(
-			ejv.getJobId(), attemptID, ExecutionState.FAILED, new Exception("test task failure")));
+			scheduler.getJobId(), attemptID, ExecutionState.CANCELED, new Exception("test task failure")));
 	}
 
 	public static void setExecutionToRunning(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
-		final ExecutionJobVertex ejv = getJobVertex(scheduler, jvid);
-		assert ejv != null;
-		final ExecutionAttemptID attemptID = ejv.getTaskVertices()[subtask].getCurrentExecutionAttempt().getAttemptId();
-
+		final ExecutionAttemptID attemptID = getAttemptId(scheduler, jvid, subtask);
 		scheduler.updateTaskExecutionState(new TaskExecutionState(
-			ejv.getJobId(), attemptID, ExecutionState.RUNNING));
+			scheduler.getJobId(), attemptID, ExecutionState.RUNNING));
 	}
 
 	public static void setAllExecutionsToRunning(final DefaultScheduler scheduler) {
-		final JobID jid = scheduler.requestJob().getJobID();
+		final JobID jid = scheduler.getJobId();
 		getAllCurrentExecutionAttempts(scheduler).forEach(
 			(attemptId) -> scheduler.updateTaskExecutionState(new TaskExecutionState(jid, attemptId, ExecutionState.RUNNING))
 		);
 	}
 
 	public static void setAllExecutionsToCancelled(final DefaultScheduler scheduler) {
-		final JobID jid = scheduler.requestJob().getJobID();
+		final JobID jid = scheduler.getJobId();
 		getAllCurrentExecutionAttempts(scheduler).forEach(
 			(attemptId) -> scheduler.updateTaskExecutionState(new TaskExecutionState(jid, attemptId, ExecutionState.CANCELED))
 		);
@@ -273,7 +272,7 @@ public class SchedulerTestingUtils {
 
 	public static void acknowledgePendingCheckpoint(final DefaultScheduler scheduler, final long checkpointId) throws CheckpointException {
 		final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);
-		final JobID jid = scheduler.requestJob().getJobID();
+		final JobID jid = scheduler.getJobId();
 
 		for (ExecutionAttemptID attemptId : getAllCurrentExecutionAttempts(scheduler)) {
 			final AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(jid, attemptId, checkpointId);
@@ -333,6 +332,12 @@ public class SchedulerTestingUtils {
 		return scheduler.getExecutionVertex(id).getJobVertex();
 	}
 
+	public static ExecutionAttemptID getAttemptId(DefaultScheduler scheduler, JobVertexID jvid, int subtask) {
+		final ExecutionJobVertex ejv = getJobVertex(scheduler, jvid);
+		assert ejv != null;
+		return ejv.getTaskVertices()[subtask].getCurrentExecutionAttempt().getAttemptId();
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static final class TaskExecutorOperatorEventGatewayAdapter extends SimpleAckingTaskManagerGateway {


[flink] 08/13: [FLINK-10740][scheduler] Add failure reason to OperatorCoordinator.failTask(...)

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b3da4e18873dd6e31404c24ce877ff79c19c9300
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue May 12 18:44:20 2020 +0200

    [FLINK-10740][scheduler] Add failure reason to OperatorCoordinator.failTask(...)
---
 .../flink/runtime/operators/coordination/OperatorCoordinator.java   | 4 +++-
 .../java/org/apache/flink/runtime/scheduler/DefaultScheduler.java   | 2 +-
 .../apache/flink/runtime/source/coordinator/SourceCoordinator.java  | 4 +++-
 .../runtime/operators/coordination/MockOperatorCoordinator.java     | 4 +++-
 .../runtime/operators/coordination/TestingOperatorCoordinator.java  | 4 +++-
 .../flink/runtime/source/coordinator/SourceCoordinatorTest.java     | 6 +++---
 .../api/operators/collect/CollectSinkOperatorCoordinator.java       | 4 +++-
 .../streaming/api/operators/collect/CollectSinkFunctionTest.java    | 2 +-
 .../api/operators/collect/CollectSinkOperatorCoordinatorTest.java   | 2 +-
 9 files changed, 21 insertions(+), 11 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
index 7f7f93c..27e0910 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.operators.coordination;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.messages.Acknowledge;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.util.concurrent.CompletableFuture;
 
@@ -61,7 +63,7 @@ public interface OperatorCoordinator extends AutoCloseable {
 	/**
 	 * Called when one of the subtasks of the task running the coordinated operator failed.
 	 */
-	void subtaskFailed(int subtask);
+	void subtaskFailed(int subtask, @Nullable Throwable reason);
 
 	// ------------------------------------------------------------------------
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index c7ff47d..dcb6549 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -196,7 +196,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio
 		final ExecutionJobVertex jobVertex = getExecutionJobVertex(executionVertexId.getJobVertexId());
 		final int subtaskIndex = executionVertexId.getSubtaskIndex();
 
-		jobVertex.getOperatorCoordinators().forEach(c -> c.subtaskFailed(subtaskIndex));
+		jobVertex.getOperatorCoordinators().forEach(c -> c.subtaskFailed(subtaskIndex, error));
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index 4fc2aeb..e0edbad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -35,6 +35,8 @@ import org.apache.flink.runtime.source.event.SourceEventWrapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -145,7 +147,7 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements
 	}
 
 	@Override
-	public void subtaskFailed(int subtaskId) {
+	public void subtaskFailed(int subtaskId, @Nullable Throwable reason) {
 		ensureStarted();
 		coordinatorExecutor.execute(() -> {
 			try {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java
index c4122fc..5e47350 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/MockOperatorCoordinator.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
+import javax.annotation.Nullable;
+
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -42,7 +44,7 @@ public final class MockOperatorCoordinator implements OperatorCoordinator {
 	}
 
 	@Override
-	public void subtaskFailed(int subtask) {
+	public void subtaskFailed(int subtask, @Nullable Throwable reason) {
 		throw new UnsupportedOperationException();
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
index 20fcefe..9889ba8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.operators.coordination;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.util.SerializableFunction;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
@@ -57,7 +59,7 @@ class TestingOperatorCoordinator implements OperatorCoordinator {
 	public void handleEventFromOperator(int subtask, OperatorEvent event) {}
 
 	@Override
-	public void subtaskFailed(int subtask) {
+	public void subtaskFailed(int subtask, @Nullable Throwable reason) {
 		failedTasks.add(subtask);
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
index 2f12235..d92b9ba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
@@ -53,7 +53,7 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
 				failureMessage, "The coordinator has not started yet.");
 		verifyException(() -> sourceCoordinator.handleEventFromOperator(0, null),
 				failureMessage, "The coordinator has not started yet.");
-		verifyException(() -> sourceCoordinator.subtaskFailed(0),
+		verifyException(() -> sourceCoordinator.subtaskFailed(0, null),
 				failureMessage, "The coordinator has not started yet.");
 		verifyException(() -> sourceCoordinator.checkpointCoordinator(100L),
 				failureMessage, "The coordinator has not started yet.");
@@ -160,7 +160,7 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
 		});
 
 		// Fail reader 0.
-		sourceCoordinator.subtaskFailed(0);
+		sourceCoordinator.subtaskFailed(0, null);
 
 		// check the state again.
 		check(() -> {
@@ -190,7 +190,7 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase {
 		sourceCoordinator.checkpointComplete(100L);
 
 		// Fail reader 0.
-		sourceCoordinator.subtaskFailed(0);
+		sourceCoordinator.subtaskFailed(0, null);
 
 		check(() -> {
 			// Reader 0 hase been unregistered.
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
index d08bcda..dea9bf3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinator.java
@@ -32,6 +32,8 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -168,7 +170,7 @@ public class CollectSinkOperatorCoordinator implements OperatorCoordinator, Coor
 	}
 
 	@Override
-	public void subtaskFailed(int subtask) {
+	public void subtaskFailed(int subtask, @Nullable Throwable reason) {
 		// subtask failed, the socket server does not exist anymore
 		address = null;
 	}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.java
index 4932890..1483998 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunctionTest.java
@@ -370,7 +370,7 @@ public class CollectSinkFunctionTest extends TestLogger {
 	private void closeFuntionAbnormally() throws Exception {
 		// this is an exceptional shutdown
 		function.close();
-		coordinator.subtaskFailed(0);
+		coordinator.subtaskFailed(0, null);
 	}
 
 	private void finishJob() throws Exception {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
index 98d6237..d2ad57b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java
@@ -89,7 +89,7 @@ public class CollectSinkOperatorCoordinatorTest {
 		// server closes here
 		request = new CollectCoordinationRequest("version3", 789);
 		CompletableFuture<CoordinationResponse> responseFuture = coordinator.handleCoordinationRequest(request);
-		coordinator.subtaskFailed(0);
+		coordinator.subtaskFailed(0, null);
 
 		// new server comes
 		expected = Collections.singletonList(Arrays.asList(Row.of(6, "fff"), Row.of(7, "ggg")));