You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/04 13:50:20 UTC
flink git commit: [FLINK-9693] Set Execution#taskRestore to null
after deployment
Repository: flink
Updated Branches:
refs/heads/master 2fd2ccb65 -> fb37d51f3
[FLINK-9693] Set Execution#taskRestore to null after deployment
Setting the assigned Execution#taskRestore to null after the deployment allows the
JobManagerTaskRestore instance to be garbage collected. Furthermore, it won't be
archived along with the Execution in the ExecutionVertex in case of a restart. This
is especially important when setting state.backend.fs.memory-threshold to larger
values because every state below this threshold will be stored in the meta state files
and, thus, also the JobManagerTaskRestore instances.
This closes #6251.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fb37d51f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fb37d51f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fb37d51f
Branch: refs/heads/master
Commit: fb37d51f309fc891d10c20ff25736d957f80367a
Parents: 2fd2ccb
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Jul 4 11:05:25 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jul 4 15:50:07 2018 +0200
----------------------------------------------------------------------
.../flink/runtime/executiongraph/Execution.java | 3 +
.../runtime/executiongraph/ExecutionTest.java | 112 +++++++++++++++----
2 files changed, 92 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fb37d51f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 64e602f..853732f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -583,6 +583,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
taskRestore,
attemptNumber);
+ // null taskRestore to let it be GC'ed
+ taskRestore = null;
+
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
final CompletableFuture<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, rpcTimeout);
http://git-wip-us.apache.org/repos/asf/flink/blob/fb37d51f/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index 99879c0..d3e88e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.executiongraph;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
@@ -40,8 +42,11 @@ import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import javax.annotation.Nonnull;
+
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -50,6 +55,8 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
@@ -66,9 +73,8 @@ public class ExecutionTest extends TestLogger {
*/
@Test
public void testSlotReleaseOnFailedResourceAssignment() throws Exception {
- final JobVertexID jobVertexId = new JobVertexID();
- final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId);
- jobVertex.setInvokableClass(NoOpInvokable.class);
+ final JobVertex jobVertex = createNoOpJobVertex();
+ final JobVertexID jobVertexId = jobVertex.getID();
final CompletableFuture<LogicalSlot> slotFuture = new CompletableFuture<>();
final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
@@ -119,9 +125,8 @@ public class ExecutionTest extends TestLogger {
*/
@Test
public void testSlotReleaseOnExecutionCancellationInScheduled() throws Exception {
- final JobVertexID jobVertexId = new JobVertexID();
- final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId);
- jobVertex.setInvokableClass(NoOpInvokable.class);
+ final JobVertex jobVertex = createNoOpJobVertex();
+ final JobVertexID jobVertexId = jobVertex.getID();
final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
@@ -169,9 +174,8 @@ public class ExecutionTest extends TestLogger {
*/
@Test
public void testSlotReleaseOnExecutionCancellationInRunning() throws Exception {
- final JobVertexID jobVertexId = new JobVertexID();
- final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId);
- jobVertex.setInvokableClass(NoOpInvokable.class);
+ final JobVertex jobVertex = createNoOpJobVertex();
+ final JobVertexID jobVertexId = jobVertex.getID();
final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
@@ -331,22 +335,14 @@ public class ExecutionTest extends TestLogger {
*/
@Test
public void testTerminationFutureIsCompletedAfterSlotRelease() throws Exception {
- final JobVertexID jobVertexId = new JobVertexID();
- final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId);
- jobVertex.setInvokableClass(NoOpInvokable.class);
+ final JobVertex jobVertex = createNoOpJobVertex();
+ final JobVertexID jobVertexId = jobVertex.getID();
final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
-
- final SimpleSlot slot = new SimpleSlot(
- slotOwner,
- new LocalTaskManagerLocation(),
- 0,
- new SimpleAckingTaskManagerGateway(),
- null,
- null);
-
- final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
- slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot));
+ final ProgrammedSlotProvider slotProvider = createProgrammedSlotProvider(
+ 1,
+ Collections.singleton(jobVertexId),
+ slotOwner);
ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph(
new JobID(),
@@ -386,6 +382,76 @@ public class ExecutionTest extends TestLogger {
}
/**
+ * Tests that the task restore state is nulled after the {@link Execution} has been
+ * deployed. See FLINK-9693.
+ */
+ @Test
+ public void testTaskRestoreStateIsNulledAfterDeployment() throws Exception {
+ final JobVertex jobVertex = createNoOpJobVertex();
+ final JobVertexID jobVertexId = jobVertex.getID();
+
+ final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
+ final ProgrammedSlotProvider slotProvider = createProgrammedSlotProvider(
+ 1,
+ Collections.singleton(jobVertexId),
+ slotOwner);
+
+ ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph(
+ new JobID(),
+ slotProvider,
+ new NoRestartStrategy(),
+ jobVertex);
+
+ ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
+
+ ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
+
+ final Execution execution = executionVertex.getCurrentExecutionAttempt();
+
+ final JobManagerTaskRestore taskRestoreState = new JobManagerTaskRestore(1L, new TaskStateSnapshot());
+ execution.setInitialState(taskRestoreState);
+
+ assertThat(execution.getTaskRestore(), is(notNullValue()));
+
+ // schedule the execution vertex and wait for its deployment
+ executionVertex.scheduleForExecution(slotProvider, false, LocationPreferenceConstraint.ANY).get();
+
+ assertThat(execution.getTaskRestore(), is(nullValue()));
+ }
+
+ @Nonnull
+ private JobVertex createNoOpJobVertex() {
+ final JobVertex jobVertex = new JobVertex("Test vertex", new JobVertexID());
+ jobVertex.setInvokableClass(NoOpInvokable.class);
+
+ return jobVertex;
+ }
+
+ @Nonnull
+ private ProgrammedSlotProvider createProgrammedSlotProvider(
+ int parallelism,
+ Collection<JobVertexID> jobVertexIds,
+ SlotOwner slotOwner) {
+ final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
+
+ for (JobVertexID jobVertexId : jobVertexIds) {
+ for (int i = 0; i < parallelism; i++) {
+ final SimpleSlot slot = new SimpleSlot(
+ slotOwner,
+ new LocalTaskManagerLocation(),
+ 0,
+ new SimpleAckingTaskManagerGateway(),
+ null,
+ null);
+
+ slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot));
+ }
+ }
+
+ return slotProvider;
+ }
+
+ /**
* Slot owner which records the first returned slot.
*/
private static final class SingleSlotTestingSlotOwner implements SlotOwner {