You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2021/11/16 08:21:33 UTC

[flink] branch master updated: [FLINK-24864][metrics] Release TaskManagerJobMetricGroup with the last slot rather than task

This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f849ef3  [FLINK-24864][metrics] Release TaskManagerJobMetricGroup with the last slot rather than task
f849ef3 is described below

commit f849ef304088f2fec15c433c2077b7a7e53ab890
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Wed Nov 3 20:49:56 2021 +0100

    [FLINK-24864][metrics] Release TaskManagerJobMetricGroup with the last slot rather than task
    
    Motivation:
    TaskManagerJobMetricGroup might be used by components shared across tasks of a job
    (e.g. ChangelogStorage). The lifecycle of those is bound to slots rather than tasks
    (for various reasons including performance).
    Releasing them differently causes those metrics to be not reported.
    
    Besides that, this simplifies release code in TaskManagerJobMetricGroup
    and makes its lifecycle more consistent (creation and deletion in the
    same place).
---
 .../PrometheusReporterTaskScopeTest.java           |  33 +-
 .../metrics/groups/TaskManagerJobMetricGroup.java  |  20 +-
 .../metrics/groups/TaskManagerMetricGroup.java     |  71 +--
 .../metrics/groups/UnregisteredMetricGroups.java   |  12 -
 .../TaskExecutorStateChangelogStoragesManager.java |   7 +
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  12 +-
 .../io/network/NettyShuffleEnvironmentTest.java    |  11 +-
 .../metrics/groups/InternalOperatorGroupTest.java  |  38 +-
 .../runtime/metrics/groups/MetricGroupTest.java    |   3 +-
 .../metrics/groups/TaskManagerGroupTest.java       |  35 +-
 .../metrics/groups/TaskManagerMetricGroupTest.java |  87 ++++
 .../metrics/groups/TaskMetricGroupTest.java        |  50 +--
 .../chaining/ChainedOperatorsMetricTest.java       |   5 +-
 .../runtime/taskexecutor/TaskExecutorTest.java     | 478 ++++++++++++---------
 .../runtime/tasks/OneInputStreamTaskTest.java      |  10 +-
 .../runtime/tasks/StreamTaskTestHarness.java       |  10 +-
 .../runtime/tasks/TwoInputStreamTaskTest.java      |  10 +-
 17 files changed, 481 insertions(+), 411 deletions(-)

diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
index a0440b5..ffbfd47 100644
--- a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
+++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
@@ -121,23 +121,24 @@ public class PrometheusReporterTaskScopeTest {
                 TaskManagerMetricGroup.createTaskManagerMetricGroup(
                         registry, TASK_MANAGER_HOST, new ResourceID(TASK_MANAGER_ID));
         taskMetricGroup1 =
-                tmMetricGroup.addTaskForJob(
-                        jobId,
-                        JOB_NAME,
-                        taskId1,
-                        taskAttemptId1,
-                        TASK_NAME,
-                        SUBTASK_INDEX_1,
-                        ATTEMPT_NUMBER);
+                tmMetricGroup
+                        .addJob(jobId, JOB_NAME)
+                        .addTask(
+                                taskId1,
+                                taskAttemptId1,
+                                TASK_NAME,
+                                SUBTASK_INDEX_1,
+                                ATTEMPT_NUMBER);
+
         taskMetricGroup2 =
-                tmMetricGroup.addTaskForJob(
-                        jobId,
-                        JOB_NAME,
-                        taskId2,
-                        taskAttemptId2,
-                        TASK_NAME,
-                        SUBTASK_INDEX_2,
-                        ATTEMPT_NUMBER);
+                tmMetricGroup
+                        .addJob(jobId, JOB_NAME)
+                        .addTask(
+                                taskId2,
+                                taskAttemptId2,
+                                TASK_NAME,
+                                SUBTASK_INDEX_2,
+                                ATTEMPT_NUMBER);
     }
 
     @After
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
index 2535acc..a55b84e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
@@ -29,7 +29,9 @@ import javax.annotation.Nullable;
 import java.util.HashMap;
 import java.util.Map;
 
+import static java.lang.Thread.holdsLock;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * Special {@link org.apache.flink.metrics.MetricGroup} representing everything belonging to a
@@ -105,20 +107,15 @@ public class TaskManagerJobMetricGroup extends JobMetricGroup<TaskManagerMetricG
     public void removeTaskMetricGroup(ExecutionAttemptID executionId) {
         checkNotNull(executionId);
 
-        boolean removeFromParent = false;
+        // this can be a call from this.close which iterates over tasks
+        // changing tasks here would break iteration
         synchronized (this) {
-            if (!isClosed() && tasks.remove(executionId) != null && tasks.isEmpty()) {
-                // this call removed the last task. close this group.
-                removeFromParent = true;
-                close();
+            if (!isClosed()) {
+                tasks.remove(executionId);
+                // keep this group open even if tasks is empty - to re-use on new task submission
+                // the group will be closed by TM with the release of the last job slot on this TM
             }
         }
-
-        // IMPORTANT: removing from the parent must not happen while holding the this group's lock,
-        //      because it would violate the "first parent then subgroup" lock acquisition order
-        if (removeFromParent) {
-            parent.removeJobMetricsGroup(jobId, this);
-        }
     }
 
     // ------------------------------------------------------------------------
@@ -127,6 +124,7 @@ public class TaskManagerJobMetricGroup extends JobMetricGroup<TaskManagerMetricG
 
     @Override
     protected Iterable<? extends ComponentMetricGroup> subComponents() {
+        checkState(holdsLock(this));
         return tasks.values();
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
index b2d2d91..ddbf508 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
@@ -19,11 +19,10 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
@@ -81,61 +80,33 @@ public class TaskManagerMetricGroup extends ComponentMetricGroup<TaskManagerMetr
     //  job groups
     // ------------------------------------------------------------------------
 
-    public TaskMetricGroup addTaskForJob(
-            final JobID jobId,
-            final String jobName,
-            final JobVertexID jobVertexId,
-            final ExecutionAttemptID executionAttemptId,
-            final String taskName,
-            final int subtaskIndex,
-            final int attemptNumber) {
+    public TaskManagerJobMetricGroup addJob(JobID jobId, String jobName) {
         Preconditions.checkNotNull(jobId);
-
         String resolvedJobName = jobName == null || jobName.isEmpty() ? jobId.toString() : jobName;
-
-        // we cannot strictly lock both our map modification and the job group modification
-        // because it might lead to a deadlock
-        while (true) {
-            // get or create a jobs metric group
-            TaskManagerJobMetricGroup currentJobGroup;
-            synchronized (this) {
-                currentJobGroup = jobs.get(jobId);
-
-                if (currentJobGroup == null || currentJobGroup.isClosed()) {
-                    currentJobGroup =
-                            new TaskManagerJobMetricGroup(registry, this, jobId, resolvedJobName);
-                    jobs.put(jobId, currentJobGroup);
-                }
-            }
-
-            // try to add another task. this may fail if we found a pre-existing job metrics
-            // group and it is closed concurrently
-            TaskMetricGroup taskGroup =
-                    currentJobGroup.addTask(
-                            jobVertexId, executionAttemptId, taskName, subtaskIndex, attemptNumber);
-
-            if (taskGroup != null) {
-                // successfully added the next task
-                return taskGroup;
+        TaskManagerJobMetricGroup jobGroup;
+        synchronized (this) { // synchronization isn't strictly necessary as of FLINK-24864
+            jobGroup = jobs.get(jobId);
+            if (jobGroup == null) {
+                jobGroup = new TaskManagerJobMetricGroup(registry, this, jobId, resolvedJobName);
+                jobs.put(jobId, jobGroup);
             }
-
-            // else fall through the loop
         }
+        return jobGroup;
     }
 
-    public void removeJobMetricsGroup(JobID jobId, TaskManagerJobMetricGroup group) {
-        if (jobId == null || group == null || !group.isClosed()) {
-            return;
-        }
-
-        synchronized (this) {
-            // optimistically remove the currently contained group, and check later if it was
-            // correct
-            TaskManagerJobMetricGroup containedGroup = jobs.remove(jobId);
+    @VisibleForTesting
+    public TaskManagerJobMetricGroup getJobMetricsGroup(JobID jobId) {
+        return jobs.get(jobId);
+    }
 
-            // check if another group was actually contained, and restore that one
-            if (containedGroup != null && containedGroup != group) {
-                jobs.put(jobId, containedGroup);
+    public void removeJobMetricsGroup(JobID jobId) {
+        if (jobId != null) {
+            TaskManagerJobMetricGroup groupToClose;
+            synchronized (this) { // synchronization isn't strictly necessary as of FLINK-24864
+                groupToClose = jobs.remove(jobId);
+            }
+            if (groupToClose != null) {
+                groupToClose.close();
             }
         }
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
index e4a8280..6fee499 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
@@ -136,18 +136,6 @@ public class UnregisteredMetricGroups {
         protected UnregisteredTaskManagerMetricGroup() {
             super(NoOpMetricRegistry.INSTANCE, DEFAULT_HOST_NAME, DEFAULT_TASKMANAGER_ID);
         }
-
-        @Override
-        public TaskMetricGroup addTaskForJob(
-                final JobID jobId,
-                final String jobName,
-                final JobVertexID jobVertexId,
-                final ExecutionAttemptID executionAttemptId,
-                final String taskName,
-                final int subtaskIndex,
-                final int attemptNumber) {
-            return createUnregisteredTaskMetricGroup();
-        }
     }
 
     /** A safe drop-in replacement for {@link TaskManagerJobMetricGroup}s. */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
index 1af6e35..d6d3fb6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
@@ -145,4 +146,10 @@ public class TaskExecutorStateChangelogStoragesManager {
             }
         }
     }
+
+    @VisibleForTesting
+    @Nullable
+    public Optional<StateChangelogStorage<?>> getChangelogStoragesByJobId(JobID jobId) {
+        return changelogStoragesByJobId.get(jobId);
+    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index cb9fa89..133ebdc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -77,6 +77,7 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.TaskThreadInfoResponse;
 import org.apache.flink.runtime.messages.ThreadInfoSample;
 import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
@@ -640,10 +641,14 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
                                 + ")");
             }
 
+            TaskManagerJobMetricGroup jobGroup =
+                    taskManagerMetricGroup.addJob(
+                            jobInformation.getJobId(), jobInformation.getJobName());
+
+            // note that a pre-existing job group can NOT be closed concurrently - this is done by
+            // the same TM thread in removeJobMetricsGroup
             TaskMetricGroup taskMetricGroup =
-                    taskManagerMetricGroup.addTaskForJob(
-                            jobInformation.getJobId(),
-                            jobInformation.getJobName(),
+                    jobGroup.addTask(
                             taskInformation.getJobVertexId(),
                             tdd.getExecutionAttemptId(),
                             taskInformation.getTaskName(),
@@ -1788,6 +1793,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
                         job -> {
                             closeJob(job, cause);
                         });
+        taskManagerMetricGroup.removeJobMetricsGroup(jobId);
         changelogStoragesManager.releaseStateChangelogStorageForJob(jobId);
         currentSlotOfferPerJob.remove(jobId);
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
index 1766cee..9927fb6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java
@@ -343,16 +343,11 @@ public class NettyShuffleEnvironmentTest extends TestLogger {
     }
 
     private static TaskMetricGroup createTaskMetricGroup(Map<String, Metric> metrics) {
+
         return TaskManagerMetricGroup.createTaskManagerMetricGroup(
                         new TestMetricRegistry(metrics), "localhost", ResourceID.generate())
-                .addTaskForJob(
-                        new JobID(),
-                        "jobName",
-                        new JobVertexID(0, 0),
-                        new ExecutionAttemptID(),
-                        "test",
-                        0,
-                        0);
+                .addJob(new JobID(), "jobName")
+                .addTask(new JobVertexID(0, 0), new ExecutionAttemptID(), "test", 0, 0);
     }
 
     /** The metric registry for storing the registered metrics to verify in tests. */
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/InternalOperatorGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/InternalOperatorGroupTest.java
index a04266a..1c2ff23 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/InternalOperatorGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/InternalOperatorGroupTest.java
@@ -66,15 +66,10 @@ public class InternalOperatorGroupTest extends TestLogger {
         TaskManagerMetricGroup tmGroup =
                 TaskManagerMetricGroup.createTaskManagerMetricGroup(
                         registry, "theHostName", new ResourceID("test-tm-id"));
+
         TaskMetricGroup taskGroup =
-                tmGroup.addTaskForJob(
-                        new JobID(),
-                        "myJobName",
-                        new JobVertexID(),
-                        new ExecutionAttemptID(),
-                        "aTaskName",
-                        11,
-                        0);
+                tmGroup.addJob(new JobID(), "myJobName")
+                        .addTask(new JobVertexID(), new ExecutionAttemptID(), "aTaskName", 11, 0);
         InternalOperatorMetricGroup opGroup =
                 taskGroup.getOrAddOperator(new OperatorID(), "myOpName");
 
@@ -107,14 +102,8 @@ public class InternalOperatorGroupTest extends TestLogger {
             InternalOperatorMetricGroup operatorGroup =
                     TaskManagerMetricGroup.createTaskManagerMetricGroup(
                                     registry, "theHostName", new ResourceID(tmID))
-                            .addTaskForJob(
-                                    jid,
-                                    "myJobName",
-                                    vertexId,
-                                    new ExecutionAttemptID(),
-                                    "aTaskname",
-                                    13,
-                                    2)
+                            .addJob(jid, "myJobName")
+                            .addTask(vertexId, new ExecutionAttemptID(), "aTaskname", 13, 2)
                             .getOrAddOperator(operatorID, operatorName);
 
             assertArrayEquals(
@@ -141,15 +130,10 @@ public class InternalOperatorGroupTest extends TestLogger {
         TaskManagerMetricGroup tmGroup =
                 TaskManagerMetricGroup.createTaskManagerMetricGroup(
                         registry, "theHostName", new ResourceID("test-tm-id"));
+
         TaskMetricGroup taskGroup =
-                tmGroup.addTaskForJob(
-                        new JobID(),
-                        "myJobName",
-                        new JobVertexID(),
-                        new ExecutionAttemptID(),
-                        "aTaskName",
-                        11,
-                        0);
+                tmGroup.addJob(new JobID(), "myJobName")
+                        .addTask(new JobVertexID(), new ExecutionAttemptID(), "aTaskName", 11, 0);
         InternalOperatorMetricGroup opGroup =
                 taskGroup.getOrAddOperator(new OperatorID(), "myOpName");
 
@@ -168,8 +152,9 @@ public class InternalOperatorGroupTest extends TestLogger {
         TaskManagerMetricGroup tmGroup =
                 TaskManagerMetricGroup.createTaskManagerMetricGroup(
                         registry, "theHostName", new ResourceID("test-tm-id"));
+
         TaskMetricGroup taskGroup =
-                tmGroup.addTaskForJob(jid, "myJobName", tid, eid, "aTaskName", 11, 0);
+                tmGroup.addJob(jid, "myJobName").addTask(tid, eid, "aTaskName", 11, 0);
         InternalOperatorMetricGroup opGroup = taskGroup.getOrAddOperator(oid, "myOpName");
 
         Map<String, String> variables = opGroup.getAllVariables();
@@ -203,7 +188,8 @@ public class InternalOperatorGroupTest extends TestLogger {
         TaskManagerMetricGroup tm =
                 TaskManagerMetricGroup.createTaskManagerMetricGroup(
                         registry, "host", new ResourceID("id"));
-        TaskMetricGroup task = tm.addTaskForJob(jid, "jobname", vid, eid, "taskName", 4, 5);
+
+        TaskMetricGroup task = tm.addJob(jid, "jobname").addTask(vid, eid, "taskName", 4, 5);
         InternalOperatorMetricGroup operator = task.getOrAddOperator(oid, "operator");
 
         QueryScopeInfo.OperatorQueryScopeInfo info =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
index 0da3f45..15b9a27 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
@@ -341,7 +341,8 @@ public class MetricGroupTest extends TestLogger {
         TaskManagerMetricGroup tm =
                 TaskManagerMetricGroup.createTaskManagerMetricGroup(
                         registry, "host", new ResourceID("id"));
-        TaskMetricGroup task = tm.addTaskForJob(jid, "jobname", vid, eid, "taskName", 4, 5);
+
+        TaskMetricGroup task = tm.addJob(jid, "jobname").addTask(vid, eid, "taskName", 4, 5);
         GenericMetricGroup userGroup1 = new GenericMetricGroup(registry, task, "hello");
         GenericMetricGroup userGroup2 = new GenericMetricGroup(registry, userGroup1, "world");
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
index 2cea2ec..592339b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
@@ -40,6 +40,7 @@ import java.io.IOException;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
 /** Tests for the {@link TaskManagerMetricGroup}. */
@@ -88,11 +89,13 @@ public class TaskManagerGroupTest extends TestLogger {
         final ExecutionAttemptID execution21 = new ExecutionAttemptID();
 
         TaskMetricGroup tmGroup11 =
-                group.addTaskForJob(jid1, jobName1, vertex11, execution11, "test", 17, 0);
+                group.addJob(jid1, jobName1).addTask(vertex11, execution11, "test", 17, 0);
+
         TaskMetricGroup tmGroup12 =
-                group.addTaskForJob(jid1, jobName1, vertex12, execution12, "test", 13, 1);
+                group.addJob(jid1, jobName1).addTask(vertex12, execution12, "test", 13, 1);
+
         TaskMetricGroup tmGroup21 =
-                group.addTaskForJob(jid2, jobName2, vertex21, execution21, "test", 7, 2);
+                group.addJob(jid2, jobName2).addTask(vertex21, execution21, "test", 7, 2);
 
         assertEquals(2, group.numRegisteredJobMetricGroups());
         assertFalse(tmGroup11.parent().isClosed());
@@ -108,20 +111,20 @@ public class TaskManagerGroupTest extends TestLogger {
         // job 2 should be removed, job should still be there
         assertFalse(tmGroup11.parent().isClosed());
         assertFalse(tmGroup12.parent().isClosed());
-        assertTrue(tmGroup21.parent().isClosed());
-        assertEquals(1, group.numRegisteredJobMetricGroups());
+
+        // should keep TaskManagerJobMetricGroup open - slot isn't released yet
+        assertFalse(tmGroup21.parent().isClosed());
+        assertEquals(2, group.numRegisteredJobMetricGroups());
 
         // add one more to job one
+
         TaskMetricGroup tmGroup13 =
-                group.addTaskForJob(jid1, jobName1, vertex13, execution13, "test", 0, 0);
+                group.addJob(jid1, jobName1).addTask(vertex13, execution13, "test", 0, 0);
+        assertSame(
+                tmGroup11.parent(),
+                tmGroup13.parent()); // should use the same TaskManagerJobMetricGroup
         tmGroup12.close();
         tmGroup13.close();
-
-        assertTrue(tmGroup11.parent().isClosed());
-        assertTrue(tmGroup12.parent().isClosed());
-        assertTrue(tmGroup13.parent().isClosed());
-
-        assertEquals(0, group.numRegisteredJobMetricGroups());
     }
 
     @Test
@@ -145,11 +148,13 @@ public class TaskManagerGroupTest extends TestLogger {
         final ExecutionAttemptID execution21 = new ExecutionAttemptID();
 
         TaskMetricGroup tmGroup11 =
-                group.addTaskForJob(jid1, jobName1, vertex11, execution11, "test", 17, 0);
+                group.addJob(jid1, jobName1).addTask(vertex11, execution11, "test", 17, 0);
+
         TaskMetricGroup tmGroup12 =
-                group.addTaskForJob(jid1, jobName1, vertex12, execution12, "test", 13, 1);
+                group.addJob(jid1, jobName1).addTask(vertex12, execution12, "test", 13, 1);
+
         TaskMetricGroup tmGroup21 =
-                group.addTaskForJob(jid2, jobName2, vertex21, execution21, "test", 7, 1);
+                group.addJob(jid2, jobName2).addTask(vertex21, execution21, "test", 7, 1);
 
         group.close();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroupTest.java
new file mode 100644
index 0000000..e095b39
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroupTest.java
@@ -0,0 +1,87 @@
+package org.apache.flink.runtime.metrics.groups;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.JobID;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.flink.runtime.metrics.NoOpMetricRegistry.INSTANCE;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** {@link TaskManagerMetricGroup} test. */
+public class TaskManagerMetricGroupTest {
+    private static final JobID JOB_ID = new JobID();
+    private static final String JOB_NAME = "test job";
+    private TaskManagerMetricGroup metricGroup;
+
+    @BeforeEach
+    public void before() {
+        metricGroup = new TaskManagerMetricGroup(INSTANCE, "testHost", "testTm");
+    }
+
+    @AfterEach
+    public void after() {
+        if (!metricGroup.isClosed()) {
+            metricGroup.close();
+        }
+    }
+
+    @Test
+    public void testGetSameJob() {
+        assertSame(metricGroup.addJob(JOB_ID, JOB_NAME), metricGroup.addJob(JOB_ID, JOB_NAME));
+        assertNotSame(
+                metricGroup.addJob(JOB_ID, JOB_NAME),
+                metricGroup.addJob(new JobID(), "another job"));
+    }
+
+    @Test
+    public void testReCreateAfterRemoval() {
+        TaskManagerJobMetricGroup oldGroup = metricGroup.addJob(JOB_ID, JOB_NAME);
+        metricGroup.removeJobMetricsGroup(JOB_ID);
+        assertNotSame(oldGroup, metricGroup.addJob(JOB_ID, JOB_NAME));
+    }
+
+    @Test
+    public void testCloseOnRemove() {
+        TaskManagerJobMetricGroup tmJobMetricGroup = metricGroup.addJob(JOB_ID, JOB_NAME);
+        metricGroup.removeJobMetricsGroup(JOB_ID);
+        assertTrue(tmJobMetricGroup.isClosed());
+    }
+
+    @Test
+    public void testCloseWithoutRemoval() {
+        TaskManagerJobMetricGroup jobGroup = metricGroup.addJob(JOB_ID, JOB_NAME);
+        metricGroup.close();
+        assertTrue(jobGroup.isClosed());
+    }
+
+    @Test
+    public void testRemoveNullJobID() {
+        metricGroup.removeJobMetricsGroup(null);
+    }
+
+    @Test
+    public void testRemoveInvalidJobID() {
+        metricGroup.removeJobMetricsGroup(JOB_ID);
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
index b0da84c..976832a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
@@ -69,15 +69,10 @@ public class TaskMetricGroupTest extends TestLogger {
         TaskManagerMetricGroup tmGroup =
                 TaskManagerMetricGroup.createTaskManagerMetricGroup(
                         registry, "theHostName", new ResourceID("test-tm-id"));
+
         TaskMetricGroup taskGroup =
-                tmGroup.addTaskForJob(
-                        new JobID(),
-                        "myJobName",
-                        new JobVertexID(),
-                        new ExecutionAttemptID(),
-                        "aTaskName",
-                        13,
-                        2);
+                tmGroup.addJob(new JobID(), "myJobName")
+                        .addTask(new JobVertexID(), new ExecutionAttemptID(), "aTaskName", 13, 2);
 
         assertArrayEquals(
                 new String[] {
@@ -107,8 +102,9 @@ public class TaskMetricGroupTest extends TestLogger {
         TaskManagerMetricGroup tmGroup =
                 TaskManagerMetricGroup.createTaskManagerMetricGroup(
                         registry, "theHostName", new ResourceID("test-tm-id"));
+
         TaskMetricGroup taskGroup =
-                tmGroup.addTaskForJob(jid, "myJobName", vertexId, executionId, "aTaskName", 13, 2);
+                tmGroup.addJob(jid, "myJobName").addTask(vertexId, executionId, "aTaskName", 13, 2);
 
         assertArrayEquals(
                 new String[] {
@@ -136,14 +132,8 @@ public class TaskMetricGroupTest extends TestLogger {
                         registry, "theHostName", new ResourceID("test-tm-id"));
 
         TaskMetricGroup taskGroup =
-                tmGroup.addTaskForJob(
-                        new JobID(),
-                        "myJobName",
-                        new JobVertexID(),
-                        executionId,
-                        "aTaskName",
-                        13,
-                        1);
+                tmGroup.addJob(new JobID(), "myJobName")
+                        .addTask(new JobVertexID(), executionId, "aTaskName", 13, 1);
 
         assertArrayEquals(
                 new String[] {
@@ -170,7 +160,8 @@ public class TaskMetricGroupTest extends TestLogger {
         TaskManagerMetricGroup tm =
                 TaskManagerMetricGroup.createTaskManagerMetricGroup(
                         registry, "host", new ResourceID("id"));
-        TaskMetricGroup task = tm.addTaskForJob(jid, "jobname", vid, eid, "taskName", 4, 5);
+
+        TaskMetricGroup task = tm.addJob(jid, "jobname").addTask(vid, eid, "taskName", 4, 5);
 
         QueryScopeInfo.TaskQueryScopeInfo info =
                 task.createQueryServiceMetricInfo(new DummyCharacterFilter());
@@ -186,15 +177,11 @@ public class TaskMetricGroupTest extends TestLogger {
         TaskManagerMetricGroup taskManagerMetricGroup =
                 TaskManagerMetricGroup.createTaskManagerMetricGroup(
                         registry, "localhost", new ResourceID("0"));
+
         TaskMetricGroup taskMetricGroup =
-                taskManagerMetricGroup.addTaskForJob(
-                        new JobID(),
-                        "job",
-                        new JobVertexID(),
-                        new ExecutionAttemptID(),
-                        "task",
-                        0,
-                        0);
+                taskManagerMetricGroup
+                        .addJob(new JobID(), "job")
+                        .addTask(new JobVertexID(), new ExecutionAttemptID(), "task", 0, 0);
 
         // the io metric should have registered predefined metrics
         assertTrue(registry.getNumberRegisteredMetrics() > 0);
@@ -216,15 +203,10 @@ public class TaskMetricGroupTest extends TestLogger {
         TaskManagerMetricGroup tm =
                 TaskManagerMetricGroup.createTaskManagerMetricGroup(
                         registry, "host", new ResourceID("id"));
+
         TaskMetricGroup taskMetricGroup =
-                tm.addTaskForJob(
-                        new JobID(),
-                        "jobname",
-                        new JobVertexID(),
-                        new ExecutionAttemptID(),
-                        "task",
-                        0,
-                        0);
+                tm.addJob(new JobID(), "jobname")
+                        .addTask(new JobVertexID(), new ExecutionAttemptID(), "task", 0, 0);
 
         String originalName = new String(new char[100]).replace("\0", "-");
         InternalOperatorMetricGroup operatorMetricGroup =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
index 2cf9637..e1cdd28 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
@@ -84,9 +84,8 @@ public class ChainedOperatorsMetricTest extends TaskTestBase {
                                                 NoOpMetricRegistry.INSTANCE,
                                                 "host",
                                                 ResourceID.generate())
-                                        .addTaskForJob(
-                                                new JobID(),
-                                                "jobName",
+                                        .addJob(new JobID(), "jobName")
+                                        .addTask(
                                                 new JobVertexID(),
                                                 new ExecutionAttemptID(),
                                                 "task",
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 64cada4..ba12c4d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorBuilder;
 import org.apache.flink.runtime.entrypoint.ClusterInformation;
 import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
@@ -55,6 +56,7 @@ import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracke
 import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTrackerImpl;
 import org.apache.flink.runtime.io.network.partition.TestingTaskExecutorPartitionTracker;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
 import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
 import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationRejection;
@@ -66,6 +68,8 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -79,6 +83,7 @@ import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
+import org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
 import org.apache.flink.runtime.taskexecutor.TaskSubmissionTestEnvironment.Builder;
 import org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException;
 import org.apache.flink.runtime.taskexecutor.exceptions.TaskManagerException;
@@ -94,6 +99,7 @@ import org.apache.flink.runtime.taskexecutor.slot.ThreadSafeTaskSlotTable;
 import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
@@ -133,6 +139,7 @@ import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.CountDownLatch;
@@ -143,6 +150,8 @@ import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import static java.util.stream.IntStream.range;
+import static org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup;
 import static org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils.DEFAULT_RESOURCE_PROFILE;
 import static org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils.createDefaultTimerService;
 import static org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils.createTotalResourceProfile;
@@ -428,16 +437,14 @@ public class TaskExecutorTest extends TestLogger {
             slotReportReceived.await();
 
             final AllocationID allocationId = new AllocationID();
-            taskExecutorGateway
-                    .requestSlot(
-                            new SlotID(unresolvedTaskManagerLocation.getResourceID(), 0),
-                            jobId,
-                            allocationId,
-                            ResourceProfile.UNKNOWN,
-                            jobMasterAddress,
-                            testingResourceManagerGateway.getFencingToken(),
-                            timeout)
-                    .join();
+            requestSlot(
+                    taskExecutorGateway,
+                    jobId,
+                    allocationId,
+                    buildSlotID(0),
+                    ResourceProfile.UNKNOWN,
+                    jobMasterAddress,
+                    testingResourceManagerGateway.getFencingToken());
 
             // now inform the task manager about the new job leader
             jobManagerLeaderRetriever.notifyListener(jobMasterAddress, jmLeaderId);
@@ -621,7 +628,7 @@ public class TaskExecutorTest extends TestLogger {
 
         rpc.registerGateway(rmAddress, rmGateway);
 
-        final SlotID slotId = new SlotID(unresolvedTaskManagerLocation.getResourceID(), 0);
+        final SlotID slotId = buildSlotID(0);
         final ResourceProfile resourceProfile = ResourceProfile.fromResources(1.0, 1);
         final SlotReport slotReport1 = new SlotReport(new SlotStatus(slotId, resourceProfile));
         final SlotReport slotReport2 =
@@ -725,14 +732,11 @@ public class TaskExecutorTest extends TestLogger {
 
         rpc.registerGateway(resourceManagerAddress, testingResourceManagerGateway);
 
-        final TaskSlotTable taskSlotTable = TaskSlotUtils.createTaskSlotTable(1);
-        final TaskExecutorLocalStateStoresManager localStateStoresManager =
-                createTaskExecutorLocalStateStoresManager();
         final TaskManagerServices taskManagerServices =
                 new TaskManagerServicesBuilder()
                         .setUnresolvedTaskManagerLocation(unresolvedTaskManagerLocation)
-                        .setTaskSlotTable(taskSlotTable)
-                        .setTaskStateManager(localStateStoresManager)
+                        .setTaskSlotTable(TaskSlotUtils.createTaskSlotTable(1))
+                        .setTaskStateManager(createTaskExecutorLocalStateStoresManager())
                         .build();
 
         final TaskExecutor taskManager = createTaskExecutor(taskManagerServices);
@@ -917,7 +921,6 @@ public class TaskExecutorTest extends TestLogger {
         rpc.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway);
 
         final AllocationID allocationId = new AllocationID();
-        final SlotID slotId = new SlotID(unresolvedTaskManagerLocation.getResourceID(), 0);
 
         final TaskExecutorLocalStateStoresManager localStateStoresManager =
                 createTaskExecutorLocalStateStoresManager();
@@ -943,21 +946,16 @@ public class TaskExecutorTest extends TestLogger {
                     resourceManagerGateway.getAddress(),
                     resourceManagerGateway.getFencingToken().toUUID());
 
-            // wait for the initial slot report
             initialSlotReportFuture.get();
 
-            // request slots from the task manager under the given allocation id
-            CompletableFuture<Acknowledge> slotRequestAck =
-                    tmGateway.requestSlot(
-                            slotId,
-                            jobId,
-                            allocationId,
-                            ResourceProfile.ZERO,
-                            jobMasterGateway.getAddress(),
-                            resourceManagerGateway.getFencingToken(),
-                            timeout);
-
-            slotRequestAck.get();
+            requestSlot(
+                    tmGateway,
+                    jobId,
+                    allocationId,
+                    buildSlotID(0),
+                    ResourceProfile.ZERO,
+                    jobMasterGateway.getAddress(),
+                    resourceManagerGateway.getFencingToken());
 
             // now inform the task manager about the new job leader
             jobManagerLeaderRetriever.notifyListener(
@@ -1020,11 +1018,17 @@ public class TaskExecutorTest extends TestLogger {
             taskExecutorIsRegistered.await();
 
             // request 2 slots for the given allocation ids
-            requestSlots(
-                    tmGateway,
-                    Arrays.asList(allocationId1, allocationId2),
-                    resourceManagerGateway.getFencingToken(),
-                    jobMasterGateway.getAddress());
+            AllocationID[] allocationIds = new AllocationID[] {allocationId1, allocationId2};
+            for (int i = 0; i < allocationIds.length; i++) {
+                requestSlot(
+                        tmGateway,
+                        jobId,
+                        allocationIds[i],
+                        buildSlotID(i),
+                        ResourceProfile.UNKNOWN,
+                        jobMasterGateway.getAddress(),
+                        resourceManagerGateway.getFencingToken());
+            }
 
             // notify job leader to start slot offering
             jobManagerLeaderRetriever.notifyListener(
@@ -1038,35 +1042,30 @@ public class TaskExecutorTest extends TestLogger {
                     availableSlotFuture.get();
 
             final Tuple3<InstanceID, SlotID, AllocationID> expectedResult =
-                    Tuple3.of(
-                            registrationId,
-                            new SlotID(unresolvedTaskManagerLocation.getResourceID(), 1),
-                            allocationId2);
+                    Tuple3.of(registrationId, buildSlotID(1), allocationId2);
 
             assertThat(instanceIDSlotIDAllocationIDTuple3, equalTo(expectedResult));
             // the slot 1 can be activate for task submission
-            submitNoOpInvokableTask(allocationId1, jobMasterGateway, tmGateway);
+            submit(allocationId1, jobMasterGateway, tmGateway, NoOpInvokable.class);
             // wait for the task completion
             taskInTerminalState.await();
             // the slot 2 can NOT be activate for task submission
             try {
-                submitNoOpInvokableTask(allocationId2, jobMasterGateway, tmGateway);
+                submit(allocationId2, jobMasterGateway, tmGateway, NoOpInvokable.class);
                 fail(
                         "It should not be possible to submit task to acquired by JM slot with index 1 (allocationId2)");
             } catch (CompletionException e) {
                 assertThat(e.getCause(), instanceOf(TaskSubmissionException.class));
             }
             // the slot 2 is free to request
-            tmGateway
-                    .requestSlot(
-                            new SlotID(unresolvedTaskManagerLocation.getResourceID(), 1),
-                            jobId,
-                            allocationId2,
-                            ResourceProfile.UNKNOWN,
-                            jobMasterGateway.getAddress(),
-                            resourceManagerGateway.getFencingToken(),
-                            timeout)
-                    .join();
+            requestSlot(
+                    tmGateway,
+                    jobId,
+                    allocationId2,
+                    buildSlotID(1),
+                    ResourceProfile.UNKNOWN,
+                    jobMasterGateway.getAddress(),
+                    resourceManagerGateway.getFencingToken());
         } finally {
             RpcUtils.terminateRpcEndpoint(taskManager, timeout);
         }
@@ -1157,21 +1156,26 @@ public class TaskExecutorTest extends TestLogger {
             // request the first slot
             requestSlot(
                     tmGateway,
+                    jobId,
                     slotOffer1.getAllocationId(),
-                    slotOffer1.getSlotIndex(),
-                    resourceManagerGateway.getFencingToken(),
-                    jobMasterGateway.getAddress());
+                    buildSlotID(slotOffer1.getSlotIndex()),
+                    ResourceProfile.UNKNOWN,
+                    jobMasterGateway.getAddress(),
+                    resourceManagerGateway.getFencingToken());
 
             // wait until first slot offer as arrived
             offerSlotsLatch.await();
 
             // request second slot, triggering another offer containing both slots
+            int slotIndex = slotOffer2.getSlotIndex();
             requestSlot(
                     tmGateway,
+                    jobId,
                     slotOffer2.getAllocationId(),
-                    slotOffer2.getSlotIndex(),
-                    resourceManagerGateway.getFencingToken(),
-                    jobMasterGateway.getAddress());
+                    buildSlotID(slotIndex),
+                    ResourceProfile.UNKNOWN,
+                    jobMasterGateway.getAddress(),
+                    resourceManagerGateway.getFencingToken());
 
             // wait until second slot offer as arrived
             offerSlotsLatch.await();
@@ -1274,11 +1278,12 @@ public class TaskExecutorTest extends TestLogger {
             // request the first slot
             requestSlot(
                     tmGateway,
-                    slotOffer1.getAllocationId(),
-                    slotOffer1.getSlotIndex(),
                     jobId,
-                    resourceManagerGateway.getFencingToken(),
-                    jobMasterGateway1.getAddress());
+                    slotOffer1.getAllocationId(),
+                    buildSlotID(slotOffer1.getSlotIndex()),
+                    ResourceProfile.UNKNOWN,
+                    jobMasterGateway1.getAddress(),
+                    resourceManagerGateway.getFencingToken());
 
             // wait until first slot offer as arrived
             offerSlotsLatch.await();
@@ -1286,11 +1291,12 @@ public class TaskExecutorTest extends TestLogger {
             // request second slot, triggering another offer containing both slots
             requestSlot(
                     tmGateway,
-                    slotOffer2.getAllocationId(),
-                    slotOffer2.getSlotIndex(),
                     jobId2,
-                    resourceManagerGateway.getFencingToken(),
-                    jobMasterGateway2.getAddress());
+                    slotOffer2.getAllocationId(),
+                    buildSlotID(slotOffer2.getSlotIndex()),
+                    ResourceProfile.UNKNOWN,
+                    jobMasterGateway2.getAddress(),
+                    resourceManagerGateway.getFencingToken());
 
             // wait until second slot offer as arrived
             offerSlotsLatch.await();
@@ -1367,11 +1373,12 @@ public class TaskExecutorTest extends TestLogger {
 
             requestSlot(
                     tmGateway,
-                    allocationId,
-                    0,
                     jobId,
-                    resourceManagerGateway.getFencingToken(),
-                    jobMasterGateway.getAddress());
+                    allocationId,
+                    buildSlotID(0),
+                    ResourceProfile.UNKNOWN,
+                    jobMasterGateway.getAddress(),
+                    resourceManagerGateway.getFencingToken());
 
             offerSlotsLatch.await();
 
@@ -1427,11 +1434,17 @@ public class TaskExecutorTest extends TestLogger {
             taskExecutorIsRegistered.await();
 
             // request 2 slots for the given allocation ids
-            requestSlots(
-                    tmGateway,
-                    Arrays.asList(allocationId1, allocationId2),
-                    resourceManagerGateway.getFencingToken(),
-                    jobMasterGateway.getAddress());
+            AllocationID[] allocationIds = new AllocationID[] {allocationId1, allocationId2};
+            for (int i = 0; i < allocationIds.length; i++) {
+                requestSlot(
+                        tmGateway,
+                        jobId,
+                        allocationIds[i],
+                        buildSlotID(i),
+                        ResourceProfile.UNKNOWN,
+                        jobMasterGateway.getAddress(),
+                        resourceManagerGateway.getFencingToken());
+            }
 
             // notify job leader to start slot offering
             jobManagerLeaderRetriever.notifyListener(
@@ -1440,7 +1453,7 @@ public class TaskExecutorTest extends TestLogger {
             // wait until slots have been offered
             offerSlotsLatch.await();
 
-            submitNoOpInvokableTask(allocationId1, jobMasterGateway, tmGateway);
+            submit(allocationId1, jobMasterGateway, tmGateway, NoOpInvokable.class);
 
             // acknowledge the offered slots
             offerResultFuture.complete(Collections.singleton(offer1));
@@ -1519,66 +1532,18 @@ public class TaskExecutorTest extends TestLogger {
                 .build();
     }
 
-    private void requestSlots(
-            TaskExecutorGateway tmGateway,
-            Iterable<? extends AllocationID> allocationIds,
-            ResourceManagerId resourceManagerId,
-            String jobMasterGatewayAddress) {
-        int slotIndex = 0;
-        for (AllocationID allocationId : allocationIds) {
-            requestSlot(
-                    tmGateway, allocationId, slotIndex, resourceManagerId, jobMasterGatewayAddress);
-            slotIndex++;
-        }
-    }
-
-    private void requestSlot(
-            TaskExecutorGateway tmGateway,
-            AllocationID allocationId,
-            int slotIndex,
-            ResourceManagerId resourceManagerId,
-            String jobMasterGatewayAddress) {
-        requestSlot(
-                tmGateway,
-                allocationId,
-                slotIndex,
-                jobId,
-                resourceManagerId,
-                jobMasterGatewayAddress);
-    }
-
-    private void requestSlot(
-            TaskExecutorGateway tmGateway,
-            AllocationID allocationId,
-            int slotIndex,
-            JobID jobId,
-            ResourceManagerId resourceManagerId,
-            String jobMasterGatewayAddress) {
-
-        final SlotID slotId = new SlotID(unresolvedTaskManagerLocation.getResourceID(), slotIndex);
-
-        tmGateway
-                .requestSlot(
-                        slotId,
-                        jobId,
-                        allocationId,
-                        ResourceProfile.UNKNOWN,
-                        jobMasterGatewayAddress,
-                        resourceManagerId,
-                        Time.seconds(10L))
-                .join();
-    }
-
-    private void submitNoOpInvokableTask(
+    private <T extends TaskInvokable> ExecutionAttemptID submit(
             AllocationID allocationId,
             TestingJobMasterGateway jobMasterGateway,
-            TaskExecutorGateway tmGateway)
+            TaskExecutorGateway tmGateway,
+            Class<T> invokableClass)
             throws IOException {
         final TaskDeploymentDescriptor tdd =
-                TaskDeploymentDescriptorBuilder.newBuilder(jobId, NoOpInvokable.class)
+                TaskDeploymentDescriptorBuilder.newBuilder(jobId, invokableClass)
                         .setAllocationId(allocationId)
                         .build();
         tmGateway.submitTask(tdd, jobMasterGateway.getFencingToken(), timeout).join();
+        return tdd.getExecutionAttemptId();
     }
 
     /**
@@ -1697,7 +1662,7 @@ public class TaskExecutorTest extends TestLogger {
             final TaskExecutorGateway taskExecutorGateway =
                     taskExecutor.getSelfGateway(TaskExecutorGateway.class);
 
-            final SlotID slotId = new SlotID(unresolvedTaskManagerLocation.getResourceID(), 0);
+            final SlotID slotId = buildSlotID(0);
             final AllocationID allocationId = new AllocationID();
 
             assertThat(startFuture.isDone(), is(false));
@@ -1707,16 +1672,14 @@ public class TaskExecutorTest extends TestLogger {
             // wait for the initial slot report
             initialSlotReport.get();
 
-            taskExecutorGateway
-                    .requestSlot(
-                            slotId,
-                            jobId,
-                            allocationId,
-                            ResourceProfile.ZERO,
-                            "foobar",
-                            resourceManagerId,
-                            timeout)
-                    .get();
+            requestSlot(
+                    taskExecutorGateway,
+                    jobId,
+                    allocationId,
+                    slotId,
+                    ResourceProfile.ZERO,
+                    "foobar",
+                    resourceManagerId);
 
             // wait until the job leader retrieval service for jobId is started
             startFuture.get();
@@ -1854,10 +1817,9 @@ public class TaskExecutorTest extends TestLogger {
 
             final ResourceID resourceId = taskExecutorResourceIdFuture.get();
 
-            final SlotID slotId = new SlotID(resourceId, 0);
             final CompletableFuture<Acknowledge> slotRequestResponse =
                     taskExecutorGateway.requestSlot(
-                            slotId,
+                            new SlotID(resourceId, 0),
                             jobId,
                             new AllocationID(),
                             ResourceProfile.ZERO,
@@ -2137,16 +2099,14 @@ public class TaskExecutorTest extends TestLogger {
             // wait for the connection to the ResourceManager
             initialSlotReportFuture.get();
 
-            taskExecutorGateway
-                    .requestSlot(
-                            new SlotID(taskExecutor.getResourceID(), 0),
-                            jobId,
-                            allocationId,
-                            ResourceProfile.ZERO,
-                            jobManagerAddress,
-                            testingResourceManagerGateway.getFencingToken(),
-                            timeout)
-                    .get();
+            requestSlot(
+                    taskExecutorGateway,
+                    jobId,
+                    allocationId,
+                    new SlotID(taskExecutor.getResourceID(), 0),
+                    ResourceProfile.ZERO,
+                    jobManagerAddress,
+                    testingResourceManagerGateway.getFencingToken());
 
             slotOfferings.await();
 
@@ -2208,16 +2168,14 @@ public class TaskExecutorTest extends TestLogger {
 
             ResourceID resourceID =
                     taskManagerServices.getUnresolvedTaskManagerLocation().getResourceID();
-            taskExecutorGateway
-                    .requestSlot(
-                            new SlotID(resourceID, 0),
-                            jobId,
-                            new AllocationID(),
-                            ResourceProfile.ZERO,
-                            "foobar",
-                            resourceManagerGateway.getFencingToken(),
-                            timeout)
-                    .get();
+            requestSlot(
+                    taskExecutorGateway,
+                    jobId,
+                    new AllocationID(),
+                    new SlotID(resourceID, 0),
+                    ResourceProfile.ZERO,
+                    "foobar",
+                    resourceManagerGateway.getFencingToken());
 
             jobManagerLeaderRetriever.notifyListener(
                     jobMasterGateway.getAddress(), UUID.randomUUID());
@@ -2340,14 +2298,12 @@ public class TaskExecutorTest extends TestLogger {
 
             initialSlotReporting.await();
 
-            final SlotID slotId1 = new SlotID(taskExecutor.getResourceID(), 0);
-            final SlotID slotId2 = new SlotID(taskExecutor.getResourceID(), 1);
             final AllocationID allocationIdInBoth = new AllocationID();
             final AllocationID allocationIdOnlyInJM = new AllocationID();
             final AllocationID allocationIdOnlyInTM = new AllocationID();
 
             taskExecutorGateway.requestSlot(
-                    slotId1,
+                    new SlotID(taskExecutor.getResourceID(), 0),
                     jobId,
                     allocationIdInBoth,
                     ResourceProfile.ZERO,
@@ -2355,7 +2311,7 @@ public class TaskExecutorTest extends TestLogger {
                     testingResourceManagerGateway.getFencingToken(),
                     timeout);
             taskExecutorGateway.requestSlot(
-                    slotId2,
+                    new SlotID(taskExecutor.getResourceID(), 1),
                     jobId,
                     allocationIdOnlyInTM,
                     ResourceProfile.ZERO,
@@ -2471,18 +2427,16 @@ public class TaskExecutorTest extends TestLogger {
 
             scheduleFirstHeartbeat.await();
 
-            SlotID slotId = new SlotID(taskExecutorResourceId, 0);
-            final CompletableFuture<Acknowledge> requestSlotFuture =
-                    taskExecutorGateway.requestSlot(
-                            slotId,
+            taskExecutorGateway
+                    .requestSlot(
+                            new SlotID(taskExecutorResourceId, 0),
                             jobId,
                             new AllocationID(),
                             ResourceProfile.ZERO,
                             "foobar",
                             testingResourceManagerGateway.getFencingToken(),
-                            timeout);
-
-            requestSlotFuture.get();
+                            timeout)
+                    .get();
 
             terminateSlotReportVerification.trigger();
 
@@ -2508,18 +2462,16 @@ public class TaskExecutorTest extends TestLogger {
                     DEFAULT_RESOURCE_PROFILE.merge(
                             ResourceProfile.newBuilder().setCpuCores(0.1).build());
 
-            submissionContext
-                    .taskExecutor
-                    .getSelfGateway(TaskExecutorGateway.class)
-                    .requestSlot(
-                            SlotID.getDynamicSlotID(ResourceID.generate()),
-                            jobId,
-                            allocationId,
-                            resourceProfile,
-                            submissionContext.jobMasterGateway.getAddress(),
-                            resourceManagerId,
-                            timeout)
-                    .get();
+            TaskExecutorGateway selfGateway =
+                    submissionContext.taskExecutor.getSelfGateway(TaskExecutorGateway.class);
+            requestSlot(
+                    selfGateway,
+                    jobId,
+                    allocationId,
+                    SlotID.getDynamicSlotID(ResourceID.generate()),
+                    resourceProfile,
+                    submissionContext.jobMasterGateway.getAddress(),
+                    resourceManagerId);
 
             ResourceID resourceId = ResourceID.generate();
             SlotReport slotReport = submissionContext.taskSlotTable.createSlotReport(resourceId);
@@ -2537,6 +2489,64 @@ public class TaskExecutorTest extends TestLogger {
     }
 
     @Test
+    public void testReleasingJobResources() throws Exception {
+        AllocationID[] slots =
+                range(0, 5).mapToObj(i -> new AllocationID()).toArray(AllocationID[]::new);
+        try (TaskExecutorTestingContext ctx = createTaskExecutorTestingContext(slots.length)) {
+            ctx.start();
+            ResourceManagerId rmId;
+            {
+                CompletableFuture<Tuple3<ResourceID, InstanceID, SlotReport>>
+                        initialSlotReportFuture = new CompletableFuture<>();
+                rmId = createAndRegisterResourceManager(initialSlotReportFuture);
+                initialSlotReportFuture.get();
+            }
+
+            TaskExecutorGateway tm = ctx.taskExecutor.getSelfGateway(TaskExecutorGateway.class);
+            for (int i = 0; i < slots.length; i++) {
+                requestSlot(
+                        tm,
+                        jobId,
+                        slots[i],
+                        buildSlotID(i),
+                        ResourceProfile.UNKNOWN,
+                        ctx.jobMasterGateway.getAddress(),
+                        rmId);
+            }
+            ctx.offerSlotsLatch.await();
+            ExecutionAttemptID exec =
+                    submit(slots[0], ctx.jobMasterGateway, tm, BlockingNoOpInvokable.class);
+            assertNotNull(ctx.changelogStoragesManager.getChangelogStoragesByJobId(jobId));
+            assertNotNull(ctx.metricGroup.getJobMetricsGroup(jobId));
+
+            // cancel tasks before releasing the slots - so that TM will release job resources on
+            // the last slot release
+            tm.cancelTask(exec, timeout).get();
+            // wait for task thread to notify TM about its final state
+            // (taskSlotTable isn't thread safe - using MainThread)
+            while (callInMain(ctx, () -> ctx.taskSlotTable.getTasks(jobId).hasNext())) {
+                Thread.sleep(50);
+            }
+
+            for (int i = 0; i < slots.length; i++) {
+                tm.freeSlot(slots[i], new RuntimeException("test exception"), timeout).get();
+                boolean isLastSlot = i == slots.length - 1;
+                assertEquals(
+                        isLastSlot,
+                        null == callInMain(ctx, () -> ctx.metricGroup.getJobMetricsGroup(jobId)));
+                assertEquals(
+                        isLastSlot,
+                        null
+                                == callInMain(
+                                        ctx,
+                                        () ->
+                                                ctx.changelogStoragesManager
+                                                        .getChangelogStoragesByJobId(jobId)));
+            }
+        }
+    }
+
+    @Test
     public void taskExecutorJobServicesCloseClassLoaderLeaseUponClosing()
             throws InterruptedException {
         final OneShotLatch leaseReleaseLatch = new OneShotLatch();
@@ -2612,17 +2622,15 @@ public class TaskExecutorTest extends TestLogger {
 
             final AllocationID allocationId = new AllocationID();
             final SlotID slotId = new SlotID(taskExecutor.getResourceID(), 0);
-            final CompletableFuture<Acknowledge> requestSlotFuture =
-                    taskExecutorGateway.requestSlot(
-                            slotId,
-                            jobId,
-                            allocationId,
-                            ResourceProfile.UNKNOWN,
-                            jobMasterGateway.getAddress(),
-                            resourceManagerGateway.getFencingToken(),
-                            timeout);
 
-            assertThat(requestSlotFuture.get(), is(Acknowledge.get()));
+            requestSlot(
+                    taskExecutorGateway,
+                    jobId,
+                    allocationId,
+                    slotId,
+                    ResourceProfile.UNKNOWN,
+                    jobMasterGateway.getAddress(),
+                    resourceManagerGateway.getFencingToken());
 
             // The JobManager should reject the registration which should release all job resources
             // on the TaskExecutor
@@ -2683,17 +2691,15 @@ public class TaskExecutorTest extends TestLogger {
 
             final AllocationID allocationId = new AllocationID();
             final SlotID slotId = new SlotID(taskExecutor.getResourceID(), 0);
-            final CompletableFuture<Acknowledge> requestSlotFuture =
-                    taskExecutorGateway.requestSlot(
-                            slotId,
-                            jobId,
-                            allocationId,
-                            ResourceProfile.UNKNOWN,
-                            jobMasterGateway.getAddress(),
-                            resourceManagerGateway.getFencingToken(),
-                            timeout);
 
-            assertThat(requestSlotFuture.get(), is(Acknowledge.get()));
+            requestSlot(
+                    taskExecutorGateway,
+                    jobId,
+                    allocationId,
+                    slotId,
+                    ResourceProfile.UNKNOWN,
+                    jobMasterGateway.getAddress(),
+                    resourceManagerGateway.getFencingToken());
 
             taskExecutor.freeInactiveSlots(jobId, timeout);
 
@@ -2764,6 +2770,14 @@ public class TaskExecutorTest extends TestLogger {
 
     private TestingTaskExecutor createTestingTaskExecutor(
             TaskManagerServices taskManagerServices, HeartbeatServices heartbeatServices) {
+        return createTestingTaskExecutor(
+                taskManagerServices, heartbeatServices, createUnregisteredTaskManagerMetricGroup());
+    }
+
+    private TestingTaskExecutor createTestingTaskExecutor(
+            TaskManagerServices taskManagerServices,
+            HeartbeatServices heartbeatServices,
+            TaskManagerMetricGroup metricGroup) {
         return new TestingTaskExecutor(
                 rpc,
                 TaskManagerConfiguration.fromConfiguration(
@@ -2774,7 +2788,7 @@ public class TaskExecutorTest extends TestLogger {
                 taskManagerServices,
                 ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
                 heartbeatServices,
-                UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+                metricGroup,
                 null,
                 dummyBlobCacheService,
                 testingFatalErrorHandler,
@@ -2806,31 +2820,55 @@ public class TaskExecutorTest extends TestLogger {
 
         TaskExecutorLocalStateStoresManager stateStoresManager =
                 createTaskExecutorLocalStateStoresManager();
+        TaskExecutorStateChangelogStoragesManager changelogStoragesManager =
+                new TaskExecutorStateChangelogStoragesManager();
+        TaskManagerMetricGroup metricGroup =
+                TaskManagerMetricGroup.createTaskManagerMetricGroup(
+                        NoOpMetricRegistry.INSTANCE, "", ResourceID.generate());
+
         final TestingTaskExecutor taskExecutor =
                 createTestingTaskExecutor(
                         new TaskManagerServicesBuilder()
                                 .setTaskSlotTable(taskSlotTable)
                                 .setJobLeaderService(jobLeaderService)
                                 .setTaskStateManager(stateStoresManager)
-                                .build());
+                                .setTaskChangelogStoragesManager(changelogStoragesManager)
+                                .build(),
+                        HEARTBEAT_SERVICES,
+                        metricGroup);
 
         jobManagerLeaderRetriever.notifyListener(
                 jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID());
-        return new TaskExecutorTestingContext(jobMasterGateway, taskSlotTable, taskExecutor);
+        return new TaskExecutorTestingContext(
+                jobMasterGateway,
+                taskSlotTable,
+                taskExecutor,
+                changelogStoragesManager,
+                metricGroup,
+                offerSlotsLatch);
     }
 
     private class TaskExecutorTestingContext implements AutoCloseable {
         private final TestingJobMasterGateway jobMasterGateway;
         private final TaskSlotTable taskSlotTable;
         private final TestingTaskExecutor taskExecutor;
+        private final TaskExecutorStateChangelogStoragesManager changelogStoragesManager;
+        private final TaskManagerMetricGroup metricGroup;
+        private final OneShotLatch offerSlotsLatch;
 
         private TaskExecutorTestingContext(
                 TestingJobMasterGateway jobMasterGateway,
                 TaskSlotTable taskSlotTable,
-                TestingTaskExecutor taskExecutor) {
+                TestingTaskExecutor taskExecutor,
+                TaskExecutorStateChangelogStoragesManager changelogStoragesManager,
+                TaskManagerMetricGroup metricGroup,
+                OneShotLatch offerSlotsLatch) {
             this.jobMasterGateway = jobMasterGateway;
             this.taskSlotTable = taskSlotTable;
             this.taskExecutor = taskExecutor;
+            this.changelogStoragesManager = changelogStoragesManager;
+            this.metricGroup = metricGroup;
+            this.offerSlotsLatch = offerSlotsLatch;
         }
 
         private void start() {
@@ -2934,4 +2972,28 @@ public class TaskExecutorTest extends TestLogger {
             return result;
         }
     }
+
+    private void requestSlot(
+            TaskExecutorGateway gateway,
+            JobID jobId,
+            AllocationID allocationId,
+            SlotID slotId,
+            ResourceProfile profile,
+            String address,
+            ResourceManagerId token)
+            throws InterruptedException, ExecutionException {
+        gateway.requestSlot(slotId, jobId, allocationId, profile, address, token, timeout).get();
+    }
+
+    private SlotID buildSlotID(int slotIndex) {
+        return new SlotID(unresolvedTaskManagerLocation.getResourceID(), slotIndex);
+    }
+
+    private static <T> T callInMain(TaskExecutorTestingContext ctx, Callable<T> booleanCallable)
+            throws InterruptedException, ExecutionException {
+        return ctx.taskExecutor
+                .getMainThreadExecutableForTesting()
+                .callAsync(booleanCallable, Time.seconds(5))
+                .get();
+    }
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index ed3d250..c092547 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -765,14 +765,8 @@ public class OneInputStreamTaskTest extends TestLogger {
         final TaskMetricGroup taskMetricGroup =
                 TaskManagerMetricGroup.createTaskManagerMetricGroup(
                                 NoOpMetricRegistry.INSTANCE, "host", ResourceID.generate())
-                        .addTaskForJob(
-                                new JobID(),
-                                "jobname",
-                                new JobVertexID(),
-                                new ExecutionAttemptID(),
-                                "task",
-                                0,
-                                0);
+                        .addJob(new JobID(), "jobname")
+                        .addTask(new JobVertexID(), new ExecutionAttemptID(), "task", 0, 0);
 
         final StreamMockEnvironment env =
                 new StreamMockEnvironment(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 0b0390f..2bae6ed 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -555,14 +555,8 @@ public class StreamTaskTestHarness<OUT> {
     static TaskMetricGroup createTaskMetricGroup(Map<String, Metric> metrics) {
         return TaskManagerMetricGroup.createTaskManagerMetricGroup(
                         new TestMetricRegistry(metrics), "localhost", ResourceID.generate())
-                .addTaskForJob(
-                        new JobID(),
-                        "jobName",
-                        new JobVertexID(0, 0),
-                        new ExecutionAttemptID(),
-                        "test",
-                        0,
-                        0);
+                .addJob(new JobID(), "jobName")
+                .addTask(new JobVertexID(0, 0), new ExecutionAttemptID(), "test", 0, 0);
     }
 
     /** The metric registry for storing the registered metrics to verify in tests. */
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 21daedf..2686ad1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -477,14 +477,8 @@ public class TwoInputStreamTaskTest {
         final TaskMetricGroup taskMetricGroup =
                 TaskManagerMetricGroup.createTaskManagerMetricGroup(
                                 NoOpMetricRegistry.INSTANCE, "host", ResourceID.generate())
-                        .addTaskForJob(
-                                new JobID(),
-                                "jobname",
-                                new JobVertexID(),
-                                new ExecutionAttemptID(),
-                                "task",
-                                0,
-                                0);
+                        .addJob(new JobID(), "jobname")
+                        .addTask(new JobVertexID(), new ExecutionAttemptID(), "task", 0, 0);
 
         final StreamMockEnvironment env =
                 new StreamMockEnvironment(