You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2018/07/04 09:11:11 UTC
[GitHub] flink pull request #6251: [FLINK-9693] Set Execution#taskRestore to null aft...
GitHub user tillrohrmann opened a pull request:
https://github.com/apache/flink/pull/6251
[FLINK-9693] Set Execution#taskRestore to null after deployment
## What is the purpose of the change
Setting the assigned Execution#taskRestore to null after the deployment allows the
JobManagerTaskRestore instance to be garbage collected. Furthermore, it won't be
archived along with the Execution in the ExecutionVertex in case of a restart. This
is especially important when setting state.backend.fs.memory-threshold to larger
values because every state below this threshold will be stored in the meta state files
and, thus, also the JobManagerTaskRestore instances.
## Verifying this change
- Added `ExecutionTest#testTaskRestoreStateIsNulledAfterDeployment`
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
- If yes, how is the feature documented? (not applicable)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tillrohrmann/flink fixMemoryLeakInJobManager
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6251.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6251
----
----
---
[GitHub] flink issue #6251: [FLINK-9693] Set Execution#taskRestore to null after depl...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:
https://github.com/apache/flink/pull/6251
Thanks for the review @StefanRRichter and @zentol. I will merge this PR once Travis gives green light. I will address your comments while merging this PR.
---
[GitHub] flink pull request #6251: [FLINK-9693] Set Execution#taskRestore to null aft...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/6251
---
[GitHub] flink pull request #6251: [FLINK-9693] Set Execution#taskRestore to null aft...
Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6251#discussion_r200075750
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java ---
@@ -385,6 +381,76 @@ public void testTerminationFutureIsCompletedAfterSlotRelease() throws Exception
restartFuture.get();
}
+ /**
+ * Tests that the task restore state is nulled after the {@link Execution} has been
+ * deployed. See FLINK-9693.
+ */
+ @Test
+ public void testTaskRestoreStateIsNulledAfterDeployment() throws Exception {
+ final JobVertex jobVertex = createNoOpJobVertex();
+ final JobVertexID jobVertexId = jobVertex.getID();
+
+ final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
+ final ProgrammedSlotProvider slotProvider = createProgrammedSlotProvider(
+ 1,
+ Collections.singleton(jobVertexId),
+ slotOwner);
+
+ ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph(
+ new JobID(),
+ slotProvider,
+ new NoRestartStrategy(),
+ jobVertex);
+
+ ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
+
+ ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
+
+ final Execution execution = executionVertex.getCurrentExecutionAttempt();
+
+ final JobManagerTaskRestore taskRestoreState = new JobManagerTaskRestore(1L, new TaskStateSnapshot());
+ execution.setInitialState(taskRestoreState);
+
+ assertThat(execution.getTaskRestore(), is(notNullValue()));
+
+ // schedule the execution vertex and wait for its deployment
+ executionVertex.scheduleForExecution(slotProvider, false, LocationPreferenceConstraint.ANY).get();
+
+ assertThat(execution.getTaskRestore(), is(nullValue()));
+ }
+
+ @Nonnull
+ private JobVertex createNoOpJobVertex() {
+ final JobVertex jobVertex = new JobVertex("Test vertex", new JobVertexID());
+ jobVertex.setInvokableClass(NoOpInvokable.class);
+
+ return jobVertex;
+ }
+
+ @Nonnull
+ private ProgrammedSlotProvider createProgrammedSlotProvider(
+ int parallelism,
+ Collection<JobVertexID> jobVertexIds,
+ SlotOwner slotOwner) {
+ final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
+
+ for (JobVertexID jobVertexId : jobVertexIds) {
+ for (int i = 0; i < parallelism; i++) {
+ final SimpleSlot slot = new SimpleSlot(
+ slotOwner,
+ new LocalTaskManagerLocation(),
+ 0,
+ new SimpleAckingTaskManagerGateway(),
+ null,
+ null );
--- End diff --
True, will remove it.
---
[GitHub] flink issue #6251: [FLINK-9693] Set Execution#taskRestore to null after depl...
Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:
https://github.com/apache/flink/pull/6251
LGTM 👍
---
[GitHub] flink pull request #6251: [FLINK-9693] Set Execution#taskRestore to null aft...
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/6251#discussion_r200062854
--- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java ---
@@ -385,6 +381,76 @@ public void testTerminationFutureIsCompletedAfterSlotRelease() throws Exception
restartFuture.get();
}
+ /**
+ * Tests that the task restore state is nulled after the {@link Execution} has been
+ * deployed. See FLINK-9693.
+ */
+ @Test
+ public void testTaskRestoreStateIsNulledAfterDeployment() throws Exception {
+ final JobVertex jobVertex = createNoOpJobVertex();
+ final JobVertexID jobVertexId = jobVertex.getID();
+
+ final SingleSlotTestingSlotOwner slotOwner = new SingleSlotTestingSlotOwner();
+ final ProgrammedSlotProvider slotProvider = createProgrammedSlotProvider(
+ 1,
+ Collections.singleton(jobVertexId),
+ slotOwner);
+
+ ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph(
+ new JobID(),
+ slotProvider,
+ new NoRestartStrategy(),
+ jobVertex);
+
+ ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
+
+ ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
+
+ final Execution execution = executionVertex.getCurrentExecutionAttempt();
+
+ final JobManagerTaskRestore taskRestoreState = new JobManagerTaskRestore(1L, new TaskStateSnapshot());
+ execution.setInitialState(taskRestoreState);
+
+ assertThat(execution.getTaskRestore(), is(notNullValue()));
+
+ // schedule the execution vertex and wait for its deployment
+ executionVertex.scheduleForExecution(slotProvider, false, LocationPreferenceConstraint.ANY).get();
+
+ assertThat(execution.getTaskRestore(), is(nullValue()));
+ }
+
+ @Nonnull
+ private JobVertex createNoOpJobVertex() {
+ final JobVertex jobVertex = new JobVertex("Test vertex", new JobVertexID());
+ jobVertex.setInvokableClass(NoOpInvokable.class);
+
+ return jobVertex;
+ }
+
+ @Nonnull
+ private ProgrammedSlotProvider createProgrammedSlotProvider(
+ int parallelism,
+ Collection<JobVertexID> jobVertexIds,
+ SlotOwner slotOwner) {
+ final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
+
+ for (JobVertexID jobVertexId : jobVertexIds) {
+ for (int i = 0; i < parallelism; i++) {
+ final SimpleSlot slot = new SimpleSlot(
+ slotOwner,
+ new LocalTaskManagerLocation(),
+ 0,
+ new SimpleAckingTaskManagerGateway(),
+ null,
+ null );
--- End diff --
whitepace after `null`
---