You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/06/14 17:15:38 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-800] Remove the
metric context cache from GobblinMetricsRegistry
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new cd0b505 [GOBBLIN-800] Remove the metric context cache from GobblinMetricsRegistry
cd0b505 is described below
commit cd0b505445a5f1b49620c7700fb4ffc954c9f82a
Author: Kuai Yu <ku...@linkedin.com>
AuthorDate: Fri Jun 14 10:15:32 2019 -0700
[GOBBLIN-800] Remove the metric context cache from GobblinMetricsRegistry
Closes #2667 from yukuai518/aaa
---
.../org/apache/gobblin/cluster/SingleTask.java | 2 +
.../gobblin/cluster/ClusterIntegrationTest.java | 28 ++++++-
.../cluster/suite/IntegrationBasicSuite.java | 14 +++-
...IntegrationDedicatedTaskDriverClusterSuite.java | 2 +-
.../cluster/suite/IntegrationJobCancelSuite.java | 2 +-
.../cluster/suite/IntegrationJobFactorySuite.java | 2 +-
.../org/apache/gobblin/metrics/GobblinMetrics.java | 1 +
.../gobblin/metrics/GobblinMetricsRegistry.java | 19 +++++
.../gobblin/runtime/GobblinMultiTaskAttempt.java | 28 ++++++-
.../java/org/apache/gobblin/runtime/TaskState.java | 1 +
.../java/org/apache/gobblin/runtime/fork/Fork.java | 32 +-------
.../apache/gobblin/runtime/util/ForkMetrics.java | 80 +++++++++++++++++++
.../apache/gobblin/runtime/util/JobMetrics.java | 90 +++++++++++++++++++---
.../apache/gobblin/runtime/util/TaskMetrics.java | 29 ++++++-
14 files changed, 276 insertions(+), 54 deletions(-)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
index 94b13d3..6d5d8e4 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
@@ -87,6 +87,8 @@ public class SingleTask {
_taskattempt = _taskAttemptBuilder.build(workUnits.iterator(), _jobId, jobState, jobBroker);
_taskattempt.runAndOptionallyCommitTaskAttempt(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE);
+ } finally {
+ _taskattempt.cleanMetrics();
}
}
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
index 55a2c0f..95a6cb8 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.cluster;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
+import java.util.Map;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
@@ -54,6 +55,7 @@ import org.apache.gobblin.util.ConfigUtils;
@Slf4j
+@Test
public class ClusterIntegrationTest {
private IntegrationBasicSuite suite;
@@ -76,7 +78,8 @@ public class ClusterIntegrationTest {
return helixManager;
}
- @Test void testJobShouldGetCancelled() throws Exception {
+ @Test
+ void testJobShouldGetCancelled() throws Exception {
this.suite =new IntegrationJobCancelSuite();
HelixManager helixManager = getHelixManager();
suite.startCluster();
@@ -227,9 +230,32 @@ public class ClusterIntegrationTest {
throws Exception {
suite.startCluster();
suite.waitForAndVerifyOutputFiles();
+ ensureJobLauncherFinished();
+ suite.verifyMetricsCleaned();
suite.shutdownCluster();
}
+ private void ensureJobLauncherFinished() throws Exception {
+ AssertWithBackoff asserter = AssertWithBackoff.create().logger(log).timeoutMs(120_000)
+ .maxSleepMs(100).backoffFactor(1.5);
+
+ asserter.assertTrue(this::isJobLauncherFinished, "Waiting for job launcher completion");
+ }
+
+ protected boolean isJobLauncherFinished(Void input) {
+ Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
+ for (Map.Entry<Thread, StackTraceElement[]> entry: map.entrySet()) {
+ for (StackTraceElement ste: entry.getValue()) {
+ if (ste.toString().contains(HelixRetriggeringJobCallable.class.getSimpleName())) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
+
@AfterMethod
public void tearDown() throws IOException {
this.suite.deleteWorkDir();
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
index 23c18a6..7da4915 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
@@ -34,14 +34,13 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import java.util.Scanner;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
+import org.testng.Assert;
import com.google.common.base.Charsets;
import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Resources;
@@ -51,7 +50,6 @@ import com.typesafe.config.ConfigParseOptions;
import com.typesafe.config.ConfigRenderOptions;
import com.typesafe.config.ConfigSyntax;
-import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.cluster.ClusterIntegrationTest;
@@ -60,6 +58,8 @@ import org.apache.gobblin.cluster.GobblinClusterManager;
import org.apache.gobblin.cluster.GobblinTaskRunner;
import org.apache.gobblin.cluster.HelixUtils;
import org.apache.gobblin.cluster.TestHelper;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.GobblinMetricsRegistry;
import org.apache.gobblin.testing.AssertWithBackoff;
/**
@@ -71,6 +71,7 @@ import org.apache.gobblin.testing.AssertWithBackoff;
*/
@Slf4j
public class IntegrationBasicSuite {
+ public static final String JOB_NAME = "HelloWorldTestJob";
public static final String JOB_CONF_NAME = "HelloWorldJob.conf";
public static final String WORKER_INSTANCE_0 = "WorkerInstance_0";
public static final String TEST_INSTANCE_NAME_KEY = "worker.instance.name";
@@ -162,7 +163,7 @@ public class IntegrationBasicSuite {
}
protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) {
- return ImmutableMap.of("HelloWorldJob", rawJobConfig);
+ return ImmutableMap.of(JOB_NAME, rawJobConfig);
}
private void writeJobConf(String jobName, Config jobConfig) throws IOException {
@@ -302,6 +303,11 @@ public class IntegrationBasicSuite {
}
}
+ public void verifyMetricsCleaned() {
+ Collection<GobblinMetrics> all = GobblinMetricsRegistry.getInstance().getMetricsByPattern(".*" + JOB_NAME + ".*");
+ Assert.assertEquals(all.size(), 0);
+ }
+
public void shutdownCluster() throws InterruptedException, IOException {
this.workers.forEach(runner->runner.stop());
this.taskDrivers.forEach(runner->runner.stop());
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedTaskDriverClusterSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedTaskDriverClusterSuite.java
index 01597fb..e3092ee 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedTaskDriverClusterSuite.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedTaskDriverClusterSuite.java
@@ -72,7 +72,7 @@ public class IntegrationDedicatedTaskDriverClusterSuite extends IntegrationBasic
Config newConfig = ConfigFactory.parseMap(ImmutableMap.of(
GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_ENABLED, true))
.withFallback(rawJobConfig);
- return ImmutableMap.of("HelloWorldJob", newConfig);
+ return ImmutableMap.of(JOB_NAME, newConfig);
}
@Override
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
index 62b3634..d6c305e 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java
@@ -42,7 +42,7 @@ public class IntegrationJobCancelSuite extends IntegrationBasicSuite {
GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, Boolean.TRUE,
GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 10L, SleepingTask.TASK_STATE_FILE_KEY, TASK_STATE_FILE))
.withFallback(rawJobConfig);
- return ImmutableMap.of("HelloWorldJob", newConfig);
+ return ImmutableMap.of(JOB_NAME, newConfig);
}
@Override
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java
index 5166395..392713a 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java
@@ -45,7 +45,7 @@ public class IntegrationJobFactorySuite extends IntegrationBasicSuite {
GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_ENABLED, true,
GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER, "TestDistributedExecutionLauncherBuilder"))
.withFallback(rawJobConfig);
- return ImmutableMap.of("HelloWorldJob", newConfig);
+ return ImmutableMap.of(JOB_NAME, newConfig);
}
@Override
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
index b986cda..5dc6bb2 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetrics.java
@@ -70,6 +70,7 @@ import org.apache.gobblin.util.PropertiesUtils;
*/
public class GobblinMetrics {
+ public static final String METRICS_ID_PREFIX = "gobblin.metrics.";
public static final String METRICS_STATE_CUSTOM_TAGS = "metrics.state.custom.tags";
protected static final GobblinMetricsRegistry GOBBLIN_METRICS_REGISTRY = GobblinMetricsRegistry.getInstance();
diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetricsRegistry.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetricsRegistry.java
index c37d56b..f64093e 100644
--- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetricsRegistry.java
+++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/org/apache/gobblin/metrics/GobblinMetricsRegistry.java
@@ -17,11 +17,15 @@
package org.apache.gobblin.metrics;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
@@ -116,6 +120,21 @@ public class GobblinMetricsRegistry {
}
/**
+ * Retrieve the {@link GobblinMetrics} by check if the key in cache matches a given regex.
+ */
+ @VisibleForTesting
+ public Collection<GobblinMetrics> getMetricsByPattern(String regex) {
+ Map<String, GobblinMetrics> entries = this.metricsCache.asMap();
+ List<GobblinMetrics> rst = new ArrayList<>();
+ for (Map.Entry<String, GobblinMetrics> entry: entries.entrySet()) {
+ if (entry.getKey().matches(regex)) {
+ rst.add(entry.getValue());
+ }
+ }
+ return rst;
+ }
+
+ /**
* <p>
* Creates {@link org.apache.gobblin.metrics.MetricContext}. Tries to read the name of the parent context
* from key "metrics.context.name" at state, and tries to get the parent context by name from
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index f327d8d..3fdf2d8 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
-import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -48,12 +47,14 @@ import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.JobEvent;
import org.apache.gobblin.runtime.task.TaskFactory;
import org.apache.gobblin.runtime.task.TaskIFaceWrapper;
import org.apache.gobblin.runtime.task.TaskUtils;
import org.apache.gobblin.runtime.util.JobMetrics;
+import org.apache.gobblin.runtime.util.TaskMetrics;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.ExecutorsUtils;
@@ -88,6 +89,7 @@ public class GobblinMultiTaskAttempt {
private final Logger log;
private final Iterator<WorkUnit> workUnits;
private final String jobId;
+ private final String attemptId;
private final JobState jobState;
private final TaskStateTracker taskStateTracker;
private final TaskExecutor taskExecutor;
@@ -115,6 +117,7 @@ public class GobblinMultiTaskAttempt {
super();
this.workUnits = workUnits;
this.jobId = jobId;
+ this.attemptId = this.getClass().getName() + "." + this.jobId;
this.jobState = jobState;
this.taskStateTracker = taskStateTracker;
this.taskExecutor = taskExecutor;
@@ -414,7 +417,8 @@ public class GobblinMultiTaskAttempt {
}
}
- new EventSubmitter.Builder(JobMetrics.get(this.jobId).getMetricContext(), "gobblin.runtime").build()
+ new EventSubmitter.Builder(JobMetrics.get(this.jobId, new JobMetrics.CreatorTag(this.attemptId)).getMetricContext(), "gobblin.runtime")
+ .build()
.submit(JobEvent.TASKS_SUBMITTED, "tasksCount", Long.toString(countDownLatch.getRegisteredParties()));
return tasks;
@@ -462,6 +466,26 @@ public class GobblinMultiTaskAttempt {
}
/**
+ * <p> During the task execution, the fork/task instances will create metric contexts (fork, task, job, container)
+ * along the hierarchy up to the root metric context. Although root metric context has a weak reference to
+ * those metric contexts, they are meanwhile cached by GobblinMetricsRegistry. Here we will remove all those
+ * strong reference from the cache to make sure it can be reclaimed by Java GC when JVM has run out of memory.
+ *
+ * <p> Task metrics are cleaned by iterating all tasks. Job level metrics cleaning needs some caveat. The
+ * cleaning will only succeed if the creator of this job level metrics initiates the removal. This means if a task
+ * tries to remove the {@link JobMetrics} which is created by others, the removal won't take effect. This is handled by
+ * {@link JobMetrics#attemptRemove(String, Tag)}.
+ */
+ public void cleanMetrics() {
+ tasks.forEach(task-> {
+ TaskMetrics.remove(task);
+ JobMetrics.attemptRemove(this.jobId, new JobMetrics.CreatorTag(task.getTaskId()));
+ });
+
+ JobMetrics.attemptRemove(this.jobId, new JobMetrics.CreatorTag(this.attemptId));
+ }
+
+ /**
* FIXME this method is provided for backwards compatibility in the LocalJobLauncher since it does
* not access the task state store. This should be addressed as all task executions should be
* updating the task state.
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
index 50bba37..18709fb 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
@@ -311,6 +311,7 @@ public class TaskState extends WorkUnitState implements TaskProgress {
this.jobId = text.toString().intern();
text.readFields(in);
this.taskId = text.toString().intern();
+ this.taskAttemptId = Optional.absent();
this.setId(this.taskId);
this.startTime = in.readLong();
this.endTime = in.readLong();
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
index c165825..e80e7d3 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
@@ -43,7 +43,6 @@ import org.apache.gobblin.converter.Converter;
import org.apache.gobblin.converter.DataConversionException;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.GobblinMetrics;
-import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.publisher.TaskPublisher;
import org.apache.gobblin.qualitychecker.row.RowLevelPolicyCheckResults;
import org.apache.gobblin.qualitychecker.row.RowLevelPolicyChecker;
@@ -59,7 +58,7 @@ import org.apache.gobblin.runtime.Task;
import org.apache.gobblin.runtime.TaskContext;
import org.apache.gobblin.runtime.TaskExecutor;
import org.apache.gobblin.runtime.TaskState;
-import org.apache.gobblin.runtime.util.TaskMetrics;
+import org.apache.gobblin.runtime.util.ForkMetrics;
import org.apache.gobblin.state.ConstructState;
import org.apache.gobblin.stream.ControlMessage;
import org.apache.gobblin.stream.RecordEnvelope;
@@ -135,7 +134,6 @@ public class Fork<S, D> implements Closeable, FinalState, RecordStreamConsumer<S
// An AtomicReference is still used here for the compareAntSet operation.
private final AtomicReference<ForkState> forkState;
- private static final String FORK_METRICS_BRANCH_NAME_KEY = "forkBranchName";
protected static final Object SHUTDOWN_RECORD = new Object();
private SharedResourcesBroker<GobblinScopeTypes> broker;
@@ -176,9 +174,7 @@ public class Fork<S, D> implements Closeable, FinalState, RecordStreamConsumer<S
* {@link Instrumented#setMetricContextName(State, String)} will be children of the forkMetrics.
*/
if (GobblinMetrics.isEnabled(this.taskState)) {
- GobblinMetrics forkMetrics = GobblinMetrics
- .get(getForkMetricsName(taskContext.getTaskMetrics(), this.taskState, index),
- taskContext.getTaskMetrics().getMetricContext(), getForkMetricsTags(this.taskState, index));
+ ForkMetrics forkMetrics = ForkMetrics.get(this.taskState, index);
this.closer.register(forkMetrics.getMetricContext());
Instrumented.setMetricContextName(this.taskState, forkMetrics.getMetricContext().getName());
}
@@ -666,30 +662,6 @@ public class Fork<S, D> implements Closeable, FinalState, RecordStreamConsumer<S
}
}
- /**
- * Creates a {@link List} of {@link Tag}s for a {@link Fork} instance. The {@link Tag}s are purely based on the
- * index and the branch name.
- */
- private static List<Tag<?>> getForkMetricsTags(State state, int index) {
- return ImmutableList.<Tag<?>>of(new Tag<>(FORK_METRICS_BRANCH_NAME_KEY, getForkMetricsId(state, index)));
- }
-
- /**
- * Creates a {@link String} that is a concatenation of the {@link TaskMetrics#getName()} and
- * {@link #getForkMetricsId(State, int)}.
- */
- private static String getForkMetricsName(TaskMetrics taskMetrics, State state, int index) {
- return taskMetrics.getName() + "." + getForkMetricsId(state, index);
- }
-
- /**
- * Creates a unique {@link String} representing this branch.
- */
- private static String getForkMetricsId(State state, int index) {
- return state.getProp(ConfigurationKeys.FORK_BRANCH_NAME_KEY + "." + index,
- ConfigurationKeys.DEFAULT_FORK_BRANCH_NAME + index);
- }
-
public boolean isSpeculativeExecutionSafe() {
if (!this.writer.isPresent()) {
return true;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/ForkMetrics.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/ForkMetrics.java
new file mode 100644
index 0000000..e38c567
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/ForkMetrics.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.gobblin.runtime.util;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metrics.GobblinMetrics;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.fork.Fork;
+
+/**
+ * An extension to {@link GobblinMetrics} specifically for {@link Fork}.
+ */
+public class ForkMetrics extends GobblinMetrics {
+ private static final String FORK_METRICS_BRANCH_NAME_KEY = "forkBranchName";
+
+ protected ForkMetrics(TaskState taskState, int index) {
+ super(name(taskState, index), parentContextForFork(taskState), getForkMetricsTags(taskState, index));
+ }
+
+ private static MetricContext parentContextForFork(TaskState taskState) {
+ return TaskMetrics.get(METRICS_ID_PREFIX + taskState.getJobId() + "." + taskState.getTaskId()).getMetricContext();
+ }
+
+ public static ForkMetrics get(final TaskState taskState, int index) {
+ return (ForkMetrics) GOBBLIN_METRICS_REGISTRY.getOrDefault(name(taskState, index), new Callable<GobblinMetrics>() {
+ @Override
+ public GobblinMetrics call() throws Exception {
+ return new ForkMetrics(taskState, index);
+ }
+ });
+ }
+
+ /**
+ * Creates a unique {@link String} representing this branch.
+ */
+ private static String getForkMetricsId(State state, int index) {
+ return state.getProp(ConfigurationKeys.FORK_BRANCH_NAME_KEY + "." + index,
+ ConfigurationKeys.DEFAULT_FORK_BRANCH_NAME + index);
+ }
+
+ /**
+ * Creates a {@link List} of {@link Tag}s for a {@link Fork} instance. The {@link Tag}s are purely based on the
+ * index and the branch name.
+ */
+ private static List<Tag<?>> getForkMetricsTags(State state, int index) {
+ return ImmutableList.<Tag<?>>of(new Tag<>(FORK_METRICS_BRANCH_NAME_KEY, getForkMetricsId(state, index)));
+ }
+
+ /**
+ * Creates a {@link String} that is a concatenation of the {@link TaskMetrics#getName()} and
+ * {@link #getForkMetricsId(State, int)}.
+ */
+ protected static String name(TaskState taskState, int index) {
+ return METRICS_ID_PREFIX + taskState.getJobId() + "." + taskState.getTaskId() + "." + getForkMetricsId(taskState, index);
+ }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java
index 102885c..82dccdb 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/JobMetrics.java
@@ -20,8 +20,12 @@ package org.apache.gobblin.runtime.util;
import java.util.List;
import java.util.concurrent.Callable;
+import com.google.common.base.Optional;
import com.google.common.collect.Lists;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
@@ -36,67 +40,112 @@ import org.apache.gobblin.util.ClustersNames;
*
* @author Yinan Li
*/
+@Slf4j
public class JobMetrics extends GobblinMetrics {
+ public static final CreatorTag DEFAULT_CREATOR_TAG = new CreatorTag( "driver");
protected final String jobName;
-
- protected JobMetrics(JobState job) {
- this(job, null);
+ @Getter
+ protected final CreatorTag creatorTag;
+ protected JobMetrics(JobState job, CreatorTag tag) {
+ this(job, null, tag);
}
- protected JobMetrics(JobState job, MetricContext parentContext) {
+ protected JobMetrics(JobState job, MetricContext parentContext, CreatorTag creatorTag) {
super(name(job), parentContext, tagsForJob(job));
this.jobName = job.getJobName();
+ this.creatorTag = creatorTag;
+ }
+
+ public static class CreatorTag extends Tag<String> {
+ public CreatorTag(String value) {
+ super("creator", value);
+ }
}
/**
* Get a new {@link GobblinMetrics} instance for a given job.
+ * @deprecated use {@link JobMetrics#get(String, String, CreatorTag)} instead.
*
* @param jobName job name
* @param jobId job ID
* @return a new {@link GobblinMetrics} instance for the given job
*/
+ @Deprecated
public static JobMetrics get(String jobName, String jobId) {
- return get(new JobState(jobName, jobId));
+ return get(new JobState(jobName, jobId), DEFAULT_CREATOR_TAG);
}
/**
* Get a new {@link GobblinMetrics} instance for a given job.
*
+ * @param creatorTag the unique id which can tell who initiates this get operation
+ * @param jobName job name
* @param jobId job ID
+ * @param creatorTag who creates this job metrics
* @return a new {@link GobblinMetrics} instance for the given job
*/
- public static JobMetrics get(String jobId) {
- return get(null, jobId);
+ public static JobMetrics get(String jobName, String jobId, CreatorTag creatorTag) {
+ return get(new JobState(jobName, jobId), creatorTag);
}
/**
* Get a new {@link GobblinMetrics} instance for a given job.
*
+ * @param creatorTag the unique id which can tell who initiates this get operation
+ * @param jobId job ID
+ * @return a new {@link GobblinMetrics} instance for the given job
+ */
+ public static JobMetrics get(String jobId, CreatorTag creatorTag) {
+ return get(null, jobId, creatorTag);
+ }
+
+ /**
+ * Get a new {@link GobblinMetrics} instance for a given job.
+ *
+ * @param creatorTag the unique id which can tell who initiates this get operation
* @param jobState the given {@link JobState} instance
* @param parentContext is the parent {@link MetricContext}
* @return a {@link JobMetrics} instance
*/
- public static JobMetrics get(final JobState jobState, final MetricContext parentContext) {
+ public static JobMetrics get(final JobState jobState, final MetricContext parentContext, CreatorTag creatorTag) {
return (JobMetrics) GOBBLIN_METRICS_REGISTRY.getOrDefault(name(jobState), new Callable<GobblinMetrics>() {
@Override
public GobblinMetrics call() throws Exception {
- return new JobMetrics(jobState, parentContext);
+ return new JobMetrics(jobState, parentContext, creatorTag);
}
});
}
/**
* Get a {@link JobMetrics} instance for the job with the given {@link JobState} instance.
+ * @deprecated use {@link JobMetrics#get(JobState, CreatorTag)} instead.
*
* @param jobState the given {@link JobState} instance
* @return a {@link JobMetrics} instance
*/
+ @Deprecated
public static JobMetrics get(final JobState jobState) {
return (JobMetrics) GOBBLIN_METRICS_REGISTRY.getOrDefault(name(jobState), new Callable<GobblinMetrics>() {
@Override
public GobblinMetrics call() throws Exception {
- return new JobMetrics(jobState);
+ return new JobMetrics(jobState, DEFAULT_CREATOR_TAG);
+ }
+ });
+ }
+
+ /**
+ * Get a {@link JobMetrics} instance for the job with the given {@link JobState} instance and a creator tag.
+ *
+ * @param creatorTag the unique id which can tell who initiates this get operation
+ * @param jobState the given {@link JobState} instance
+ * @return a {@link JobMetrics} instance
+ */
+ public static JobMetrics get(final JobState jobState, CreatorTag creatorTag) {
+ return (JobMetrics) GOBBLIN_METRICS_REGISTRY.getOrDefault(name(jobState), new Callable<GobblinMetrics>() {
+ @Override
+ public GobblinMetrics call() throws Exception {
+ return new JobMetrics(jobState, creatorTag);
}
});
}
@@ -106,7 +155,7 @@ public class JobMetrics extends GobblinMetrics {
*
* <p>
* Removing a {@link JobMetrics} instance for a job will also remove the {@link TaskMetrics}s
- * of every tasks of the job.
+ * of every tasks of the job. This is only used by job driver where there is no {@link ForkMetrics}.
* </p>
* @param jobState the given {@link JobState} instance
*/
@@ -117,8 +166,25 @@ public class JobMetrics extends GobblinMetrics {
}
}
+ /**
+ * Attempt to remove the {@link JobMetrics} instance for the job with the given jobId.
+ * It also checks the creator tag of this {@link JobMetrics}. If the given tag doesn't
+ * match the creator tag, this {@link JobMetrics} won't be removed.
+ */
+ public static void attemptRemove(String jobId, Tag matchTag) {
+ Optional<GobblinMetrics> gobblinMetricsOptional = GOBBLIN_METRICS_REGISTRY.get(
+ GobblinMetrics.METRICS_ID_PREFIX + jobId);
+ JobMetrics jobMetrics = gobblinMetricsOptional.isPresent()? (JobMetrics) (gobblinMetricsOptional.get()): null;
+
+ // only remove if the tag matches
+ if (jobMetrics != null && jobMetrics.getCreatorTag().equals(matchTag)) {
+ log.info("Removing job metrics because creator matches : " + matchTag.getValue());
+ GOBBLIN_METRICS_REGISTRY.remove(GobblinMetrics.METRICS_ID_PREFIX + jobId);
+ }
+ }
+
private static String name(JobState jobState) {
- return "gobblin.metrics." + jobState.getJobId();
+ return METRICS_ID_PREFIX + jobState.getJobId();
}
private static List<Tag<?>> tagsForJob(JobState jobState) {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/TaskMetrics.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/TaskMetrics.java
index 982b381..0ef2934 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/TaskMetrics.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/TaskMetrics.java
@@ -27,6 +27,7 @@ import org.apache.gobblin.metrics.GobblinMetrics;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.TaskEvent;
+import org.apache.gobblin.runtime.Task;
import org.apache.gobblin.runtime.TaskState;
@@ -61,6 +62,8 @@ public class TaskMetrics extends GobblinMetrics {
/**
* Remove the {@link TaskMetrics} instance for the task with the given {@link TaskMetrics} instance.
+ * Please note this method is invoked by job driver so it won't delete any underlying {@link ForkMetrics}
+ * because the {@link org.apache.gobblin.runtime.fork.Fork} can be created on different nodes.
*
* @param taskState the given {@link TaskState} instance
*/
@@ -68,8 +71,26 @@ public class TaskMetrics extends GobblinMetrics {
remove(name(taskState));
}
+ /**
+ * Remove the {@link TaskMetrics} instance for the task with the given {@link TaskMetrics} instance.
+ * Please note that this will also delete the underlying {@link ForkMetrics} related to this specific task.
+ *
+ * @param task the given task instance
+ */
+ public static void remove(Task task) {
+ task.getForks().forEach(forkOpt -> {
+ remove(ForkMetrics.name(task.getTaskState(), forkOpt.get().getIndex()));
+ });
+
+ remove(name(task));
+ }
+
private static String name(TaskState taskState) {
- return "gobblin.metrics." + taskState.getJobId() + "." + taskState.getTaskId();
+ return METRICS_ID_PREFIX + taskState.getJobId() + "." + taskState.getTaskId();
+ }
+
+ private static String name(Task task) {
+ return name(task.getTaskState());
}
protected static List<Tag<?>> tagsForTask(TaskState taskState) {
@@ -83,7 +104,11 @@ public class TaskMetrics extends GobblinMetrics {
}
private static MetricContext parentContextForTask(TaskState taskState) {
- return JobMetrics.get(taskState.getProp(ConfigurationKeys.JOB_NAME_KEY), taskState.getJobId()).getMetricContext();
+ return JobMetrics.get(
+ taskState.getProp(ConfigurationKeys.JOB_NAME_KEY),
+ taskState.getJobId(),
+ new JobMetrics.CreatorTag(taskState.getTaskId()))
+ .getMetricContext();
}
public static String taskInstanceRemoved(String metricName) {