You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/06/14 17:15:38 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-800] Remove the metric context cache from GobblinMetricsRegistry

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new cd0b505  [GOBBLIN-800] Remove the metric context cache from GobblinMetricsRegistry
cd0b505 is described below

commit cd0b505445a5f1b49620c7700fb4ffc954c9f82a
Author: Kuai Yu <ku...@linkedin.com>
AuthorDate: Fri Jun 14 10:15:32 2019 -0700

    [GOBBLIN-800] Remove the metric context cache from GobblinMetricsRegistry
    
    Closes #2667 from yukuai518/aaa
---
 .../org/apache/gobblin/cluster/SingleTask.java     |  2 +
 .../gobblin/cluster/ClusterIntegrationTest.java    | 28 ++++++-
 .../cluster/suite/IntegrationBasicSuite.java       | 14 +++-
 ...IntegrationDedicatedTaskDriverClusterSuite.java |  2 +-
 .../cluster/suite/IntegrationJobCancelSuite.java   |  2 +-
 .../cluster/suite/IntegrationJobFactorySuite.java  |  2 +-
 .../org/apache/gobblin/metrics/GobblinMetrics.java |  1 +
 .../gobblin/metrics/GobblinMetricsRegistry.java    | 19 +++++
 .../gobblin/runtime/GobblinMultiTaskAttempt.java   | 28 ++++++-
 .../java/org/apache/gobblin/runtime/TaskState.java |  1 +
 .../java/org/apache/gobblin/runtime/fork/Fork.java | 32 +-------
 .../apache/gobblin/runtime/util/ForkMetrics.java   | 80 +++++++++++++++++++
 .../apache/gobblin/runtime/util/JobMetrics.java    | 90 +++++++++++++++++++---
 .../apache/gobblin/runtime/util/TaskMetrics.java   | 29 ++++++-
 14 files changed, 276 insertions(+), 54 deletions(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
index 94b13d3..6d5d8e4 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
@@ -87,6 +87,8 @@ public class SingleTask {
 
       _taskattempt = _taskAttemptBuilder.build(workUnits.iterator(), _jobId, jobState, jobBroker);
       _taskattempt.runAndOptionallyCommitTaskAttempt(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE);
+    } finally {
+      _taskattempt.cleanMetrics();
     }
   }
 
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
index 55a2c0f..95a6cb8 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.cluster;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Paths;
+import java.util.Map;
 
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
@@ -54,6 +55,7 @@ import org.apache.gobblin.util.ConfigUtils;
 
 
 @Slf4j
+@Test
 public class ClusterIntegrationTest {
 
   private IntegrationBasicSuite suite;
@@ -76,7 +78,8 @@ public class ClusterIntegrationTest {
     return helixManager;
   }
 
-  @Test void testJobShouldGetCancelled() throws Exception {
+  @Test
+  void testJobShouldGetCancelled() throws Exception {
     this.suite =new IntegrationJobCancelSuite();
     HelixManager helixManager = getHelixManager();
     suite.startCluster();
@@ -227,9 +230,32 @@ public class ClusterIntegrationTest {
       throws Exception {
     suite.startCluster();
     suite.waitForAndVerifyOutputFiles();
+    ensureJobLauncherFinished();
+    suite.verifyMetricsCleaned();
     suite.shutdownCluster();
   }
 
+  private void ensureJobLauncherFinished() throws Exception {
+    AssertWithBackoff asserter = AssertWithBackoff.create().logger(log).timeoutMs(120_000)
+       .maxSleepMs(100).backoffFactor(1.5);
+
+    asserter.assertTrue(this::isJobLauncherFinished, "Waiting for job launcher completion");
+  }
+
+  protected boolean isJobLauncherFinished(Void input) {
+    Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
+    for (Map.Entry<Thread, StackTraceElement[]> entry: map.entrySet()) {
+      for (StackTraceElement ste: entry.getValue()) {
+        if (ste.toString().contains(HelixRetriggeringJobCallable.class.getSimpleName())) {
+          return false;
+        }
+      }
+    }
+
+    return true;
+  }
+
+
   @AfterMethod
   public void tearDown() throws IOException {
     this.suite.deleteWorkDir();
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
index 23c18a6..7da4915 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
@@ -34,14 +34,13 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Scanner;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
+import org.testng.Assert;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.io.Resources;
@@ -51,7 +50,6 @@ import com.typesafe.config.ConfigParseOptions;
 import com.typesafe.config.ConfigRenderOptions;
 import com.typesafe.config.ConfigSyntax;
 
-import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.cluster.ClusterIntegrationTest;
@@ -60,6 +58,8 @@ import org.apache.gobblin.cluster.GobblinClusterManager;
 import org.apache.gobblin.cluster.GobblinTaskRunner;
 import org.apache.gobblin.cluster.HelixUtils;
 import org.apache.gobblin.cluster.TestHelper;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.GobblinMetricsRegistry;
 import org.apache.gobblin.testing.AssertWithBackoff;
 
 /**
@@ -71,6 +71,7 @@ import org.apache.gobblin.testing.AssertWithBackoff;
  */
 @Slf4j
 public class IntegrationBasicSuite {
+  public static final String JOB_NAME = "HelloWorldTestJob";
   public static final String JOB_CONF_NAME = "HelloWorldJob.conf";
   public static final String WORKER_INSTANCE_0 = "WorkerInstance_0";
   public static final String TEST_INSTANCE_NAME_KEY = "worker.instance.name";
@@ -162,7 +163,7 @@ public class IntegrationBasicSuite {
   }
 
   protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) {
-    return ImmutableMap.of("HelloWorldJob", rawJobConfig);
+    return ImmutableMap.of(JOB_NAME, rawJobConfig);
   }
 
   private void writeJobConf(String jobName, Config jobConfig) throws IOException {
@@ -302,6 +303,11 @@ public class IntegrationBasicSuite {
     }
   }
 
+  public void verifyMetricsCleaned() {
+    Collection<GobblinMetrics> all = GobblinMetricsRegistry.getInstance().getMetricsByPattern(".*" + JOB_NAME + ".*");
+    Assert.assertEquals(all.size(), 0);
+  }
+
   public void shutdownCluster() throws InterruptedException, IOException {
     this.workers.forEach(runner->runner.stop());
     this.taskDrivers.forEach(runner->runner.stop());
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedTaskDriverClusterSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedTaskDriverClusterSuite.java
index 01597fb..e3092ee 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedTaskDriverClusterSuite.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedTaskDriverClusterSuite.java
@@ -72,7 +72,7 @@ public class IntegrationDedicatedTaskDriverClusterSuite extends IntegrationBasic
     Config newConfig = ConfigFactory.parseMap(ImmutableMap.of(
         GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_ENABLED, true))
         .withFallback(rawJobConfig);
-    return ImmutableMap.of("HelloWorldJob", newConfig);
+    return ImmutableMap.of(JOB_NAME, newConfig);
   }
 
   @Override
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
index 62b3634..d6c305e 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
@@ -42,7 +42,7 @@ public class IntegrationJobCancelSuite extends IntegrationBasicSuite {
         GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, Boolean.TRUE,
         GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 10L, SleepingTask.TASK_STATE_FILE_KEY, TASK_STATE_FILE))
         .withFallback(rawJobConfig);
-    return ImmutableMap.of("HelloWorldJob", newConfig);
+    return ImmutableMap.of(JOB_NAME, newConfig);
   }
 
   @Override
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java
index 5166395..392713a 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java
@@ -45,7 +45,7 @@ public class IntegrationJobFactorySuite extends IntegrationBasicSuite {
         GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_ENABLED, true,
         GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER, "TestDistributedExecutionLauncherBuilder"))
         .withFallback(rawJobConfig);
-    return ImmutableMap.of("HelloWorldJob", newConfig);
+    return ImmutableMap.of(JOB_NAME, newConfig);
   }
 
   @Override
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
index b986cda..5dc6bb2 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
@@ -70,6 +70,7 @@ import org.apache.gobblin.util.PropertiesUtils;
  */
 public class GobblinMetrics {
 
+  public static final String METRICS_ID_PREFIX = "gobblin.metrics.";
   public static final String METRICS_STATE_CUSTOM_TAGS = "metrics.state.custom.tags";
 
   protected static final GobblinMetricsRegistry GOBBLIN_METRICS_REGISTRY = GobblinMetricsRegistry.getInstance();
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetricsRegistry.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetricsRegistry.java
index c37d56b..f64093e 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetricsRegistry.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetricsRegistry.java
@@ -17,11 +17,15 @@
 
 package org.apache.gobblin.metrics;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Throwables;
 import com.google.common.cache.Cache;
@@ -116,6 +120,21 @@ public class GobblinMetricsRegistry {
   }
 
   /**
+   * Retrieve the {@link GobblinMetrics} by check if the key in cache matches a given regex.
+   */
+  @VisibleForTesting
+  public Collection<GobblinMetrics> getMetricsByPattern(String regex) {
+    Map<String, GobblinMetrics> entries = this.metricsCache.asMap();
+    List<GobblinMetrics> rst = new ArrayList<>();
+    for (Map.Entry<String, GobblinMetrics> entry: entries.entrySet()) {
+      if (entry.getKey().matches(regex)) {
+        rst.add(entry.getValue());
+      }
+    }
+    return rst;
+  }
+
+  /**
    * <p>
    * Creates {@link org.apache.gobblin.metrics.MetricContext}. Tries to read the name of the parent context
    * from key "metrics.context.name" at state, and tries to get the parent context by name from
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index f327d8d..3fdf2d8 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -21,7 +21,6 @@ import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
 import java.lang.management.MemoryUsage;
-import java.text.NumberFormat;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -48,12 +47,14 @@ import org.apache.gobblin.commit.CommitStep;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.JobEvent;
 import org.apache.gobblin.runtime.task.TaskFactory;
 import org.apache.gobblin.runtime.task.TaskIFaceWrapper;
 import org.apache.gobblin.runtime.task.TaskUtils;
 import org.apache.gobblin.runtime.util.JobMetrics;
+import org.apache.gobblin.runtime.util.TaskMetrics;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.Either;
 import org.apache.gobblin.util.ExecutorsUtils;
@@ -88,6 +89,7 @@ public class GobblinMultiTaskAttempt {
   private final Logger log;
   private final Iterator<WorkUnit> workUnits;
   private final String jobId;
+  private final String attemptId;
   private final JobState jobState;
   private final TaskStateTracker taskStateTracker;
   private final TaskExecutor taskExecutor;
@@ -115,6 +117,7 @@ public class GobblinMultiTaskAttempt {
     super();
     this.workUnits = workUnits;
     this.jobId = jobId;
+    this.attemptId = this.getClass().getName() + "." + this.jobId;
     this.jobState = jobState;
     this.taskStateTracker = taskStateTracker;
     this.taskExecutor = taskExecutor;
@@ -414,7 +417,8 @@ public class GobblinMultiTaskAttempt {
       }
     }
 
-    new EventSubmitter.Builder(JobMetrics.get(this.jobId).getMetricContext(), "gobblin.runtime").build()
+    new EventSubmitter.Builder(JobMetrics.get(this.jobId, new JobMetrics.CreatorTag(this.attemptId)).getMetricContext(), "gobblin.runtime")
+        .build()
         .submit(JobEvent.TASKS_SUBMITTED, "tasksCount", Long.toString(countDownLatch.getRegisteredParties()));
 
     return tasks;
@@ -462,6 +466,26 @@ public class GobblinMultiTaskAttempt {
   }
 
   /**
+   * <p> During the task execution, the fork/task instances will create metric contexts (fork, task, job, container)
+   * along the hierarchy up to the root metric context. Although root metric context has a weak reference to
+   * those metric contexts, they are meanwhile cached by GobblinMetricsRegistry. Here we will remove all those
+   * strong reference from the cache to make sure it can be reclaimed by Java GC when JVM has run out of memory.
+   *
+   * <p> Task metrics are cleaned by iterating all tasks. Job level metrics cleaning needs some caveat. The
+   * cleaning will only succeed if the creator of this job level metrics initiates the removal. This means if a task
+   * tries to remove the {@link JobMetrics} which is created by others, the removal won't take effect. This is handled by
+   * {@link JobMetrics#attemptRemove(String, Tag)}.
+   */
+  public void cleanMetrics() {
+    tasks.forEach(task-> {
+      TaskMetrics.remove(task);
+      JobMetrics.attemptRemove(this.jobId, new JobMetrics.CreatorTag(task.getTaskId()));
+    });
+
+    JobMetrics.attemptRemove(this.jobId, new JobMetrics.CreatorTag(this.attemptId));
+  }
+
+  /**
    * FIXME this method is provided for backwards compatibility in the LocalJobLauncher since it does
    * not access the task state store. This should be addressed as all task executions should be
    * updating the task state.
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
index 50bba37..18709fb 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
@@ -311,6 +311,7 @@ public class TaskState extends WorkUnitState implements TaskProgress {
     this.jobId = text.toString().intern();
     text.readFields(in);
     this.taskId = text.toString().intern();
+    this.taskAttemptId = Optional.absent();
     this.setId(this.taskId);
     this.startTime = in.readLong();
     this.endTime = in.readLong();
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
index c165825..e80e7d3 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
@@ -43,7 +43,6 @@ import org.apache.gobblin.converter.Converter;
 import org.apache.gobblin.converter.DataConversionException;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.GobblinMetrics;
-import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.publisher.TaskPublisher;
 import org.apache.gobblin.qualitychecker.row.RowLevelPolicyCheckResults;
 import org.apache.gobblin.qualitychecker.row.RowLevelPolicyChecker;
@@ -59,7 +58,7 @@ import org.apache.gobblin.runtime.Task;
 import org.apache.gobblin.runtime.TaskContext;
 import org.apache.gobblin.runtime.TaskExecutor;
 import org.apache.gobblin.runtime.TaskState;
-import org.apache.gobblin.runtime.util.TaskMetrics;
+import org.apache.gobblin.runtime.util.ForkMetrics;
 import org.apache.gobblin.state.ConstructState;
 import org.apache.gobblin.stream.ControlMessage;
 import org.apache.gobblin.stream.RecordEnvelope;
@@ -135,7 +134,6 @@ public class Fork<S, D> implements Closeable, FinalState, RecordStreamConsumer<S
   // An AtomicReference is still used here for the compareAntSet operation.
   private final AtomicReference<ForkState> forkState;
 
-  private static final String FORK_METRICS_BRANCH_NAME_KEY = "forkBranchName";
   protected static final Object SHUTDOWN_RECORD = new Object();
   private SharedResourcesBroker<GobblinScopeTypes> broker;
 
@@ -176,9 +174,7 @@ public class Fork<S, D> implements Closeable, FinalState, RecordStreamConsumer<S
      * {@link Instrumented#setMetricContextName(State, String)} will be children of the forkMetrics.
      */
     if (GobblinMetrics.isEnabled(this.taskState)) {
-      GobblinMetrics forkMetrics = GobblinMetrics
-          .get(getForkMetricsName(taskContext.getTaskMetrics(), this.taskState, index),
-              taskContext.getTaskMetrics().getMetricContext(), getForkMetricsTags(this.taskState, index));
+      ForkMetrics forkMetrics = ForkMetrics.get(this.taskState, index);
       this.closer.register(forkMetrics.getMetricContext());
       Instrumented.setMetricContextName(this.taskState, forkMetrics.getMetricContext().getName());
     }
@@ -666,30 +662,6 @@ public class Fork<S, D> implements Closeable, FinalState, RecordStreamConsumer<S
     }
   }
 
-  /**
-   * Creates a {@link List} of {@link Tag}s for a {@link Fork} instance. The {@link Tag}s are purely based on the
-   * index and the branch name.
-   */
-  private static List<Tag<?>> getForkMetricsTags(State state, int index) {
-    return ImmutableList.<Tag<?>>of(new Tag<>(FORK_METRICS_BRANCH_NAME_KEY, getForkMetricsId(state, index)));
-  }
-
-  /**
-   * Creates a {@link String} that is a concatenation of the {@link TaskMetrics#getName()} and
-   * {@link #getForkMetricsId(State, int)}.
-   */
-  private static String getForkMetricsName(TaskMetrics taskMetrics, State state, int index) {
-    return taskMetrics.getName() + "." + getForkMetricsId(state, index);
-  }
-
-  /**
-   * Creates a unique {@link String} representing this branch.
-   */
-  private static String getForkMetricsId(State state, int index) {
-    return state.getProp(ConfigurationKeys.FORK_BRANCH_NAME_KEY + "." + index,
-        ConfigurationKeys.DEFAULT_FORK_BRANCH_NAME + index);
-  }
-
   public boolean isSpeculativeExecutionSafe() {
     if (!this.writer.isPresent()) {
       return true;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/ForkMetrics.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/ForkMetrics.java
new file mode 100644
index 0000000..e38c567
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/ForkMetrics.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.gobblin.runtime.util;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.fork.Fork;
+
+/**
+ * An extension to {@link GobblinMetrics} specifically for {@link Fork}.
+ */
+public class ForkMetrics extends GobblinMetrics {
+  private static final String FORK_METRICS_BRANCH_NAME_KEY = "forkBranchName";
+
+  protected ForkMetrics(TaskState taskState, int index) {
+    super(name(taskState, index), parentContextForFork(taskState), getForkMetricsTags(taskState, index));
+  }
+
+  private static MetricContext parentContextForFork(TaskState taskState) {
+    return TaskMetrics.get(METRICS_ID_PREFIX + taskState.getJobId() + "." + taskState.getTaskId()).getMetricContext();
+  }
+
+  public static ForkMetrics get(final TaskState taskState, int index) {
+    return (ForkMetrics) GOBBLIN_METRICS_REGISTRY.getOrDefault(name(taskState, index), new Callable<GobblinMetrics>() {
+      @Override
+      public GobblinMetrics call() throws Exception {
+        return new ForkMetrics(taskState, index);
+      }
+    });
+  }
+
+  /**
+   * Creates a unique {@link String} representing this branch.
+   */
+  private static String getForkMetricsId(State state, int index) {
+    return state.getProp(ConfigurationKeys.FORK_BRANCH_NAME_KEY + "." + index,
+        ConfigurationKeys.DEFAULT_FORK_BRANCH_NAME + index);
+  }
+
+  /**
+   * Creates a {@link List} of {@link Tag}s for a {@link Fork} instance. The {@link Tag}s are purely based on the
+   * index and the branch name.
+   */
+  private static List<Tag<?>> getForkMetricsTags(State state, int index) {
+    return ImmutableList.<Tag<?>>of(new Tag<>(FORK_METRICS_BRANCH_NAME_KEY, getForkMetricsId(state, index)));
+  }
+
+  /**
+   * Creates a {@link String} that is a concatenation of the {@link TaskMetrics#getName()} and
+   * {@link #getForkMetricsId(State, int)}.
+   */
+  protected static String name(TaskState taskState, int index) {
+    return METRICS_ID_PREFIX + taskState.getJobId() + "." + taskState.getTaskId() + "." + getForkMetricsId(taskState, index);
+  }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java
index 102885c..82dccdb 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java
@@ -20,8 +20,12 @@ package org.apache.gobblin.runtime.util;
 import java.util.List;
 import java.util.concurrent.Callable;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
@@ -36,67 +40,112 @@ import org.apache.gobblin.util.ClustersNames;
  *
  * @author Yinan Li
  */
+@Slf4j
 public class JobMetrics extends GobblinMetrics {
 
+  public static final CreatorTag DEFAULT_CREATOR_TAG = new CreatorTag( "driver");
   protected final String jobName;
-
-  protected JobMetrics(JobState job) {
-    this(job, null);
+  @Getter
+  protected final CreatorTag creatorTag;
+  protected JobMetrics(JobState job, CreatorTag tag) {
+    this(job, null, tag);
   }
 
-  protected JobMetrics(JobState job, MetricContext parentContext) {
+  protected JobMetrics(JobState job, MetricContext parentContext, CreatorTag creatorTag) {
     super(name(job), parentContext, tagsForJob(job));
     this.jobName = job.getJobName();
+    this.creatorTag = creatorTag;
+  }
+
+  public static class CreatorTag extends Tag<String> {
+    public CreatorTag(String value) {
+      super("creator", value);
+    }
   }
 
   /**
    * Get a new {@link GobblinMetrics} instance for a given job.
+   * @deprecated  use {@link JobMetrics#get(String, String, CreatorTag)} instead.
    *
    * @param jobName job name
    * @param jobId job ID
    * @return a new {@link GobblinMetrics} instance for the given job
    */
+  @Deprecated
   public static JobMetrics get(String jobName, String jobId) {
-    return get(new JobState(jobName, jobId));
+    return get(new JobState(jobName, jobId), DEFAULT_CREATOR_TAG);
   }
 
   /**
    * Get a new {@link GobblinMetrics} instance for a given job.
    *
+   * @param creatorTag the unique id which can tell who initiates this get operation
+   * @param jobName job name
    * @param jobId job ID
+   * @param creatorTag who creates this job metrics
    * @return a new {@link GobblinMetrics} instance for the given job
    */
-  public static JobMetrics get(String jobId) {
-    return get(null, jobId);
+  public static JobMetrics get(String jobName, String jobId, CreatorTag creatorTag) {
+    return get(new JobState(jobName, jobId), creatorTag);
   }
 
   /**
    * Get a new {@link GobblinMetrics} instance for a given job.
    *
+   * @param creatorTag the unique id which can tell who initiates this get operation
+   * @param jobId job ID
+   * @return a new {@link GobblinMetrics} instance for the given job
+   */
+  public static JobMetrics get(String jobId, CreatorTag creatorTag) {
+    return get(null, jobId, creatorTag);
+  }
+
+  /**
+   * Get a new {@link GobblinMetrics} instance for a given job.
+   *
+   * @param creatorTag the unique id which can tell who initiates this get operation
    * @param jobState the given {@link JobState} instance
    * @param parentContext is the parent {@link MetricContext}
    * @return a {@link JobMetrics} instance
    */
-  public static JobMetrics get(final JobState jobState, final MetricContext parentContext) {
+  public static JobMetrics get(final JobState jobState, final MetricContext parentContext, CreatorTag creatorTag) {
     return (JobMetrics) GOBBLIN_METRICS_REGISTRY.getOrDefault(name(jobState), new Callable<GobblinMetrics>() {
       @Override
       public GobblinMetrics call() throws Exception {
-        return new JobMetrics(jobState, parentContext);
+        return new JobMetrics(jobState, parentContext, creatorTag);
       }
     });
   }
 
   /**
    * Get a {@link JobMetrics} instance for the job with the given {@link JobState} instance.
+   * @deprecated  use {@link JobMetrics#get(JobState, CreatorTag)} instead.
    *
    * @param jobState the given {@link JobState} instance
    * @return a {@link JobMetrics} instance
    */
+  @Deprecated
   public static JobMetrics get(final JobState jobState) {
     return (JobMetrics) GOBBLIN_METRICS_REGISTRY.getOrDefault(name(jobState), new Callable<GobblinMetrics>() {
       @Override
       public GobblinMetrics call() throws Exception {
-        return new JobMetrics(jobState);
+        return new JobMetrics(jobState, DEFAULT_CREATOR_TAG);
+      }
+    });
+  }
+
+  /**
+   * Get a {@link JobMetrics} instance for the job with the given {@link JobState} instance and a creator tag.
+   *
+   * @param creatorTag the unique id which can tell who initiates this get operation
+   * @param jobState the given {@link JobState} instance
+   * @return a {@link JobMetrics} instance
+   */
+  public static JobMetrics get(final JobState jobState, CreatorTag creatorTag) {
+    return (JobMetrics) GOBBLIN_METRICS_REGISTRY.getOrDefault(name(jobState), new Callable<GobblinMetrics>() {
+      @Override
+      public GobblinMetrics call() throws Exception {
+        return new JobMetrics(jobState, creatorTag);
       }
     });
   }
@@ -106,7 +155,7 @@ public class JobMetrics extends GobblinMetrics {
    *
    * <p>
    *   Removing a {@link JobMetrics} instance for a job will also remove the {@link TaskMetrics}s
-   *   of every tasks of the job.
+   *   of every tasks of the job. This is only used by job driver where there is no {@link ForkMetrics}.
    * </p>
    * @param jobState the given {@link JobState} instance
    */
@@ -117,8 +166,25 @@ public class JobMetrics extends GobblinMetrics {
     }
   }
 
+  /**
+   * Attempt to remove the {@link JobMetrics} instance for the job with the given jobId.
+   * It also checks the creator tag of this {@link JobMetrics}. If the given tag doesn't
+   * match the creator tag, this {@link JobMetrics} won't be removed.
+   */
+  public static void attemptRemove(String jobId, Tag matchTag) {
+    Optional<GobblinMetrics> gobblinMetricsOptional = GOBBLIN_METRICS_REGISTRY.get(
+        GobblinMetrics.METRICS_ID_PREFIX + jobId);
+    JobMetrics jobMetrics = gobblinMetricsOptional.isPresent()? (JobMetrics) (gobblinMetricsOptional.get()): null;
+
+    // only remove if the tag matches
+    if (jobMetrics != null && jobMetrics.getCreatorTag().equals(matchTag)) {
+      log.info("Removing job metrics because creator matches : " + matchTag.getValue());
+      GOBBLIN_METRICS_REGISTRY.remove(GobblinMetrics.METRICS_ID_PREFIX + jobId);
+    }
+  }
+
   private static String name(JobState jobState) {
-    return "gobblin.metrics." + jobState.getJobId();
+    return METRICS_ID_PREFIX + jobState.getJobId();
   }
 
   private static List<Tag<?>> tagsForJob(JobState jobState) {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/TaskMetrics.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/TaskMetrics.java
index 982b381..0ef2934 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/TaskMetrics.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/TaskMetrics.java
@@ -27,6 +27,7 @@ import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.TaskEvent;
+import org.apache.gobblin.runtime.Task;
 import org.apache.gobblin.runtime.TaskState;
 
 
@@ -61,6 +62,8 @@ public class TaskMetrics extends GobblinMetrics {
 
   /**
    * Remove the {@link TaskMetrics} instance for the task with the given {@link TaskMetrics} instance.
+   * Please note this method is invoked by job driver so it won't delete any underlying {@link ForkMetrics}
+   * because the {@link org.apache.gobblin.runtime.fork.Fork} can be created on different nodes.
    *
    * @param taskState the given {@link TaskState} instance
    */
@@ -68,8 +71,26 @@ public class TaskMetrics extends GobblinMetrics {
     remove(name(taskState));
   }
 
+  /**
+   * Remove the {@link TaskMetrics} instance for the task with the given {@link TaskMetrics} instance.
+   * Please note that this will also delete the underlying {@link ForkMetrics} related to this specific task.
+   *
+   * @param task the given task instance
+   */
+  public static void remove(Task task) {
+    task.getForks().forEach(forkOpt -> {
+      remove(ForkMetrics.name(task.getTaskState(), forkOpt.get().getIndex()));
+    });
+
+    remove(name(task));
+  }
+
   private static String name(TaskState taskState) {
-    return "gobblin.metrics." + taskState.getJobId() + "." + taskState.getTaskId();
+    return METRICS_ID_PREFIX + taskState.getJobId() + "." + taskState.getTaskId();
+  }
+
+  private static String name(Task task) {
+    return name(task.getTaskState());
   }
 
   protected static List<Tag<?>> tagsForTask(TaskState taskState) {
@@ -83,7 +104,11 @@ public class TaskMetrics extends GobblinMetrics {
   }
 
   private static MetricContext parentContextForTask(TaskState taskState) {
-    return JobMetrics.get(taskState.getProp(ConfigurationKeys.JOB_NAME_KEY), taskState.getJobId()).getMetricContext();
+    return JobMetrics.get(
+        taskState.getProp(ConfigurationKeys.JOB_NAME_KEY),
+        taskState.getJobId(),
+        new JobMetrics.CreatorTag(taskState.getTaskId()))
+        .getMetricContext();
   }
 
   public static String taskInstanceRemoved(String metricName) {