You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/16 02:21:20 UTC

[GitHub] tony810430 closed pull request #4828: [FLINK-4816] [checkpoints] Executions failed from "DEPLOYING" should retain restored checkpoint information

tony810430 closed pull request #4828: [FLINK-4816] [checkpoints] Executions failed from "DEPLOYING" should retain restored checkpoint information
URL: https://github.com/apache/flink/pull/4828
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 9a4456ef7d7..ae74ce3b5a5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -181,6 +181,8 @@
 	/** Registry that tracks state which is shared across (incremental) checkpoints */
 	private SharedStateRegistry sharedStateRegistry;
 
+	private volatile long restoredCheckpointID = -1;
+
 	// --------------------------------------------------------------------------------------------
 
 	public CheckpointCoordinator(
@@ -1109,7 +1111,8 @@ public boolean restoreLatestCheckpointedState(
 
 				statsTracker.reportRestoredCheckpoint(restored);
 			}
-
+			// set it inside lock
+			restoredCheckpointID = latest.getCheckpointID();
 			return true;
 		}
 	}
@@ -1152,6 +1155,12 @@ public boolean restoreSavepoint(
 		return restoreLatestCheckpointedState(tasks, true, allowNonRestored);
 	}
 
+	public long getRestoredCheckpointID() {
+		synchronized (lock) {
+			return this.restoredCheckpointID;
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Accessors
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/DeployTaskException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/DeployTaskException.java
new file mode 100644
index 00000000000..278a2ec526a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/DeployTaskException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.execution;
+
+/**
+ * Thrown to indicate that a task failed while in the {@link ExecutionState#DEPLOYING}
+ * and there was no checkpoint restoration
+ */
+public class DeployTaskException extends RuntimeException {
+
+	private static final long serialVersionUID = 1L;
+
+	public DeployTaskException(Throwable cause) {
+		super(cause);
+	}
+
+	public DeployTaskException(String msg) {
+		super(msg);
+	}
+
+	public DeployTaskException() {
+		super();
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RestoreTaskException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RestoreTaskException.java
new file mode 100644
index 00000000000..f7118364885
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RestoreTaskException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.execution;
+
+/**
+ * Thrown to indicate that a task failed while in the {@link ExecutionState#DEPLOYING}
+ * and also a check point restore happened
+ */
+public class RestoreTaskException extends RuntimeException {
+	private static final long serialVersionUID = 1L;
+
+	private long checkpointId;
+	public RestoreTaskException(Throwable cause, long checkpointId) {
+		super(cause);
+		this.checkpointId = checkpointId;
+	}
+
+	public RestoreTaskException(String msg, long checkpointId) {
+		super(msg);
+		this.checkpointId = checkpointId;
+	}
+
+	public long getCheckpointId() {
+		return this.checkpointId;
+	}
+	public RestoreTaskException() {
+		super();
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index cc35060e7ae..626d0b99abe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -24,6 +24,7 @@
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -32,10 +33,12 @@
 import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.DeployTaskException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.execution.RestoreTaskException;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
@@ -1035,7 +1038,17 @@ private boolean processFail(Throwable t, boolean isCallback, Map<String, Accumul
 
 			if (transitionState(current, FAILED, t)) {
 				// success (in a manner of speaking)
-				this.failureCause = t;
+				if (current == DEPLOYING) {
+					CheckpointCoordinator checkpointCoordinator = getVertex().getExecutionGraph().getCheckpointCoordinator();
+					if (checkpointCoordinator != null && checkpointCoordinator.getRestoredCheckpointID() != -1L) {
+						// we have restore it from a checkpoint
+						this.failureCause = new RestoreTaskException(t, checkpointCoordinator.getRestoredCheckpointID());
+					} else {
+						this.failureCause = new DeployTaskException(t);
+					}
+				} else {
+					this.failureCause = t;
+				}
 
 				updateAccumulatorsAndMetrics(userAccumulators, metrics);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 51d21420c99..54b86602175 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2072,6 +2072,9 @@ public void testRestoreLatestCheckpointedState() throws Exception {
 		// verify the restored state
 		verifyStateRestore(jobVertexID1, jobVertex1, keyGroupPartitions1);
 		verifyStateRestore(jobVertexID2, jobVertex2, keyGroupPartitions2);
+
+		// verify restored checkpoint id is same as the latest one.
+		assertEquals(completedCheckpoints.get(0).getCheckpointID(), coord.getRestoredCheckpointID());
 	}
 
 	/**
@@ -2190,6 +2193,8 @@ public void testRestoreLatestCheckpointFailureWhenMaxParallelismChanges() throws
 		coord.restoreLatestCheckpointedState(tasks, true, false);
 
 		fail("The restoration should have failed because the max parallelism changed.");
+
+		assertEquals(-1L, coord.getRestoredCheckpointID());
 	}
 
 	@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index b1ee3cc1c7d..10a188fc339 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -26,6 +26,7 @@
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.PermanentBlobService;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -74,6 +75,8 @@
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 
@@ -476,4 +479,26 @@ public static ExecutionJobVertex getExecutionVertex(
 	public static ExecutionJobVertex getExecutionVertex(JobVertexID id) throws Exception {
 		return getExecutionVertex(id, TestingUtils.defaultExecutor());
 	}
+
+	public static ExecutionJobVertex getExecutionVertex(
+		JobVertexID id, ScheduledExecutorService executor, CheckpointCoordinator checkpointCoordinator)throws Exception {
+
+		JobVertex ajv = new JobVertex("TestVertex", id);
+		ajv.setInvokableClass(mock(AbstractInvokable.class).getClass());
+
+		ExecutionGraph graph = spy(new ExecutionGraph(
+			executor,
+			executor,
+			new JobID(),
+			"test job",
+			new Configuration(),
+			new SerializedValue<>(new ExecutionConfig()),
+			AkkaUtils.getDefaultTimeout(),
+			new NoRestartStrategy(),
+			new Scheduler(ExecutionContext$.MODULE$.fromExecutor(executor))));
+
+		doReturn(checkpointCoordinator).when(graph).getCheckpointCoordinator();
+
+		return spy(new ExecutionJobVertex(graph, ajv, 1, AkkaUtils.getDefaultTimeout()));
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index d91380ed275..2b98f7a9064 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -20,10 +20,13 @@
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.DeployTaskException;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.RestoreTaskException;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.instance.SimpleSlot;
@@ -275,7 +278,53 @@ public void testFailExternallyDuringDeploy() {
 			vertex.fail(testError);
 
 			assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
-			assertEquals(testError, vertex.getFailureCause());
+			assertTrue(vertex.getFailureCause() instanceof DeployTaskException);
+			assertEquals(testError, vertex.getFailureCause().getCause());
+
+			queue.triggerNextAction();
+			queue.triggerNextAction();
+
+			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
+			assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testFailExternallyDuringDeployWhenRestoredFromCheckpoint() {
+		try {
+			final JobVertexID jid = new JobVertexID();
+
+			final TestingUtils.QueuedActionExecutionContext ec = TestingUtils.queuedActionExecutionContext();
+			final TestingUtils.ActionQueue queue = ec.actionQueue();
+
+			final CheckpointCoordinator checkpointCoordinator = mock(CheckpointCoordinator.class);
+			when(checkpointCoordinator.getRestoredCheckpointID()).thenReturn(10L);
+
+			final ExecutionJobVertex ejv = getExecutionVertex(jid, ec, checkpointCoordinator);
+
+			final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0],
+				AkkaUtils.getDefaultTimeout());
+
+			final Instance instance = getInstance(
+				new ActorTaskManagerGateway(
+					new SimpleActorGateway(TestingUtils.directExecutionContext())));
+			final SimpleSlot slot = instance.allocateSimpleSlot();
+
+			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
+			vertex.deployToSlot(slot);
+			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
+
+			Exception testError = new Exception("test error");
+			vertex.fail(testError);
+
+			assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
+			assertTrue(vertex.getFailureCause() instanceof RestoreTaskException);
+			assertEquals(testError, vertex.getFailureCause().getCause());
+			assertEquals(10L, ((RestoreTaskException) vertex.getFailureCause()).getCheckpointId());
 
 			queue.triggerNextAction();
 			queue.triggerNextAction();
@@ -339,7 +388,8 @@ public void testFailCallOvertakesDeploymentAnswer() {
 
 			assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
 
-			assertEquals(testError, vertex.getFailureCause());
+			assertTrue(vertex.getFailureCause() instanceof DeployTaskException);
+			assertEquals(testError, vertex.getFailureCause().getCause());
 
 			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
 			assertTrue(vertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services