You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/04 13:50:20 UTC

flink git commit: [FLINK-9693] Set Execution#taskRestore to null after deployment

Repository: flink
Updated Branches:
  refs/heads/master 2fd2ccb65 -> fb37d51f3


[FLINK-9693] Set Execution#taskRestore to null after deployment

Setting the assigned Execution#taskRestore to null after the deployment allows the
JobManagerTaskRestore instance to be garbage collected. Furthermore, it won't be
archived along with the Execution in the ExecutionVertex in case of a restart. This
is especially important when setting state.backend.fs.memory-threshold to larger
values because every state below this threshold will be stored in the meta state files
and, thus, also the JobManagerTaskRestore instances.

This closes #6251.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fb37d51f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fb37d51f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fb37d51f

Branch: refs/heads/master
Commit: fb37d51f309fc891d10c20ff25736d957f80367a
Parents: 2fd2ccb
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Jul 4 11:05:25 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 4 15:50:07 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java |   3 +
 .../runtime/executiongraph/ExecutionTest.java   | 112 +++++++++++++++----
 2 files changed, 92 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fb37d51f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 64e602f..853732f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -583,6 +583,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				taskRestore,
 				attemptNumber);
 
+			// null taskRestore to let it be GC'ed
+			taskRestore = null;
+
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
 
 			final CompletableFuture<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, rpcTimeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/fb37d51f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index 99879c0..d3e88e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
@@ -40,8 +42,11 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
+
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -50,6 +55,8 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -66,9 +73,8 @@ public class ExecutionTest extends TestLogger {
 	 */
 	@Test
 	public void testSlotReleaseOnFailedResourceAssignment() throws Exception {
-		final JobVertexID jobVertexId = new JobVertexID();
-		final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId);
-		jobVertex.setInvokableClass(NoOpInvokable.class);
+		final JobVertex jobVertex = createNoOpJobVertex();
+		final JobVertexID jobVertexId = jobVertex.getID();
 
 		final CompletableFuture<LogicalSlot> slotFuture = new CompletableFuture<>();
 		final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
@@ -119,9 +125,8 @@ public class ExecutionTest extends TestLogger {
 	 */
 	@Test
 	public void testSlotReleaseOnExecutionCancellationInScheduled() throws Exception {
-		final JobVertexID jobVertexId = new JobVertexID();
-		final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId);
-		jobVertex.setInvokableClass(NoOpInvokable.class);
+		final JobVertex jobVertex = createNoOpJobVertex();
+		final JobVertexID jobVertexId = jobVertex.getID();
 
 		final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
 
@@ -169,9 +174,8 @@ public class ExecutionTest extends TestLogger {
 	 */
 	@Test
 	public void testSlotReleaseOnExecutionCancellationInRunning() throws Exception {
-		final JobVertexID jobVertexId = new JobVertexID();
-		final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId);
-		jobVertex.setInvokableClass(NoOpInvokable.class);
+		final JobVertex jobVertex = createNoOpJobVertex();
+		final JobVertexID jobVertexId = jobVertex.getID();
 
 		final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
 
@@ -331,22 +335,14 @@ public class ExecutionTest extends TestLogger {
 	 */
 	@Test
 	public void testTerminationFutureIsCompletedAfterSlotRelease() throws Exception {
-		final JobVertexID jobVertexId = new JobVertexID();
-		final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId);
-		jobVertex.setInvokableClass(NoOpInvokable.class);
+		final JobVertex jobVertex = createNoOpJobVertex();
+		final JobVertexID jobVertexId = jobVertex.getID();
 
 		final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
-
-		final SimpleSlot slot = new SimpleSlot(
-			slotOwner,
-			new LocalTaskManagerLocation(),
-			0,
-			new SimpleAckingTaskManagerGateway(),
-			null,
-			null);
-
-		final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
-		slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot));
+		final ProgrammedSlotProvider slotProvider = createProgrammedSlotProvider(
+			1,
+			Collections.singleton(jobVertexId),
+			slotOwner);
 
 		ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph(
 			new JobID(),
@@ -386,6 +382,76 @@ public class ExecutionTest extends TestLogger {
 	}
 
 	/**
+	 * Tests that the task restore state is nulled after the {@link Execution} has been
+	 * deployed. See FLINK-9693.
+	 */
+	@Test
+	public void testTaskRestoreStateIsNulledAfterDeployment() throws Exception {
+		final JobVertex jobVertex = createNoOpJobVertex();
+		final JobVertexID jobVertexId = jobVertex.getID();
+
+		final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
+		final ProgrammedSlotProvider slotProvider = createProgrammedSlotProvider(
+			1,
+			Collections.singleton(jobVertexId),
+			slotOwner);
+
+		ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph(
+			new JobID(),
+			slotProvider,
+			new NoRestartStrategy(),
+			jobVertex);
+
+		ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
+
+		ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
+
+		final Execution execution = executionVertex.getCurrentExecutionAttempt();
+
+		final JobManagerTaskRestore taskRestoreState = new JobManagerTaskRestore(1L, new TaskStateSnapshot());
+		execution.setInitialState(taskRestoreState);
+
+		assertThat(execution.getTaskRestore(), is(notNullValue()));
+
+		// schedule the execution vertex and wait for its deployment
+		executionVertex.scheduleForExecution(slotProvider, false, LocationPreferenceConstraint.ANY).get();
+
+		assertThat(execution.getTaskRestore(), is(nullValue()));
+	}
+
+	@Nonnull
+	private JobVertex createNoOpJobVertex() {
+		final JobVertex jobVertex = new JobVertex("Test vertex", new JobVertexID());
+		jobVertex.setInvokableClass(NoOpInvokable.class);
+
+		return jobVertex;
+	}
+
+	@Nonnull
+	private ProgrammedSlotProvider createProgrammedSlotProvider(
+		int parallelism,
+		Collection<JobVertexID> jobVertexIds,
+		SlotOwner slotOwner) {
+		final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
+
+		for (JobVertexID jobVertexId : jobVertexIds) {
+			for (int i = 0; i < parallelism; i++) {
+				final SimpleSlot slot = new SimpleSlot(
+					slotOwner,
+					new LocalTaskManagerLocation(),
+					0,
+					new SimpleAckingTaskManagerGateway(),
+					null,
+					null);
+
+				slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot));
+			}
+		}
+
+		return slotProvider;
+	}
+
+	/**
 	 * Slot owner which records the first returned slot.
 	 */
 	private static final class SingleSlotTestingSlotOwner implements SlotOwner {