You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2021/08/17 23:06:36 UTC
[gobblin] branch master updated: [GOBBLIN-1516] Only schedule
TaskMetricsUpdateder when metric-reporting is enabled by config (#3365)
This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 10f1dbd [GOBBLIN-1516] Only schedule TaskMetricsUpdateder when metric-reporting is enabled by config (#3365)
10f1dbd is described below
commit 10f1dbd8943a271fc3fb605cd4e5a7553339890a
Author: Lei <au...@users.noreply.github.com>
AuthorDate: Tue Aug 17 16:06:31 2021 -0700
[GOBBLIN-1516] Only schedule TaskMetricsUpdateder when metric-reporting is enabled by config (#3365)
* Only schedule TaskMetricsUpdateder is metrics reporting is enabled: The removal of MetricsUpdateder is happening only after task's commit, where all task objects will be holding up until all other tasks completed
* Populate the changes into all implementation of registerNewTask
---
.../gobblin/cluster/GobblinHelixTaskStateTracker.java | 4 +++-
.../instrumented/writer/InstrumentedDataWriterBase.java | 14 +++++++-------
.../gobblin/runtime/local/LocalTaskStateTracker.java | 5 ++++-
.../gobblin/runtime/mapreduce/MRTaskStateTracker.java | 4 +++-
4 files changed, 17 insertions(+), 10 deletions(-)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskStateTracker.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskStateTracker.java
index c1f4c63..070184e 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskStateTracker.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskStateTracker.java
@@ -61,7 +61,9 @@ public class GobblinHelixTaskStateTracker extends AbstractTaskStateTracker {
@Override
public void registerNewTask(Task task) {
try {
- this.scheduledReporters.put(task.getTaskId(), scheduleTaskMetricsUpdater(new TaskMetricsUpdater(task), task));
+ if (GobblinMetrics.isEnabled(task.getTaskState().getWorkunit())) {
+ this.scheduledReporters.put(task.getTaskId(), scheduleTaskMetricsUpdater(new TaskMetricsUpdater(task), task));
+ }
} catch (RejectedExecutionException ree) {
// Propagate the exception to caller that has full control of the life-cycle of a helix task.
log.error(String.format("Scheduling of task state reporter for task %s was rejected", task.getTaskId()));
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterBase.java b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterBase.java
index d5b83cb..2d3f72f 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterBase.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterBase.java
@@ -54,7 +54,7 @@ import org.apache.gobblin.writer.DataWriter;
*/
abstract class InstrumentedDataWriterBase<D> implements DataWriter<D>, Instrumentable, Closeable, FinalState {
- private final Optional<ScheduledThreadPoolExecutor> writerMetricsUpdater;
+ private final Optional<ScheduledThreadPoolExecutor> writerMetricsUpdateExecutor;
private final boolean instrumentationEnabled;
private MetricContext metricContext;
@@ -80,10 +80,10 @@ abstract class InstrumentedDataWriterBase<D> implements DataWriter<D>, Instrumen
this.metricContext = this.closer.register(Instrumented.getMetricContext(state, classTag.or(this.getClass())));
if (this.instrumentationEnabled) {
- this.writerMetricsUpdater = Optional.of(buildWriterMetricsUpdater());
- scheduleWriterMetricsUpdater(this.writerMetricsUpdater.get(), getWriterMetricsUpdaterInterval(state));
+ this.writerMetricsUpdateExecutor = Optional.of(buildWriterMetricsUpdateExecutor());
+ scheduleWriterMetricsUpdater(this.writerMetricsUpdateExecutor.get(), getWriterMetricsUpdaterInterval(state));
} else {
- this.writerMetricsUpdater = Optional.absent();
+ this.writerMetricsUpdateExecutor = Optional.absent();
}
regenerateMetrics();
@@ -214,8 +214,8 @@ abstract class InstrumentedDataWriterBase<D> implements DataWriter<D>, Instrumen
try {
this.closer.close();
} finally {
- if (this.writerMetricsUpdater.isPresent()) {
- ExecutorsUtils.shutdownExecutorService(this.writerMetricsUpdater.get(), Optional.of(log));
+ if (this.writerMetricsUpdateExecutor.isPresent()) {
+ ExecutorsUtils.shutdownExecutorService(this.writerMetricsUpdateExecutor.get(), Optional.of(log));
}
}
}
@@ -262,7 +262,7 @@ abstract class InstrumentedDataWriterBase<D> implements DataWriter<D>, Instrumen
/**
* Build a {@link ScheduledThreadPoolExecutor} that updates record-level and byte-level metrics.
*/
- private static ScheduledThreadPoolExecutor buildWriterMetricsUpdater() {
+ private static ScheduledThreadPoolExecutor buildWriterMetricsUpdateExecutor() {
return new ScheduledThreadPoolExecutor(1,
ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("WriterMetricsUpdater-%d")));
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalTaskStateTracker.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalTaskStateTracker.java
index 27b3477..b2e3e8b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalTaskStateTracker.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalTaskStateTracker.java
@@ -51,6 +51,7 @@ public class LocalTaskStateTracker extends AbstractTaskStateTracker {
private final JobState jobState;
// This is used to retry failed tasks
+ // Life cycle of this object is managed by the same ServiceManager managing the life cycle of LocalStateTracker
private final TaskExecutor taskExecutor;
// Mapping between tasks and the task state reporters associated with them
@@ -75,7 +76,9 @@ public class LocalTaskStateTracker extends AbstractTaskStateTracker {
@Override
public void registerNewTask(Task task) {
try {
- this.scheduledReporters.put(task.getTaskId(), scheduleTaskMetricsUpdater(new TaskMetricsUpdater(task), task));
+ if (GobblinMetrics.isEnabled(task.getTaskState().getWorkunit())) {
+ this.scheduledReporters.put(task.getTaskId(), scheduleTaskMetricsUpdater(new TaskMetricsUpdater(task), task));
+ }
} catch (RejectedExecutionException ree) {
LOG.error(String.format("Scheduling of task state reporter for task %s was rejected", task.getTaskId()));
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRTaskStateTracker.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRTaskStateTracker.java
index 95d701c..e812d8d 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRTaskStateTracker.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRTaskStateTracker.java
@@ -60,7 +60,9 @@ public class MRTaskStateTracker extends AbstractTaskStateTracker {
@Override
public void registerNewTask(Task task) {
try {
- scheduleTaskMetricsUpdater(new MRTaskMetricsUpdater(task, this.context), task);
+ if (GobblinMetrics.isEnabled(task.getTaskState().getWorkunit())) {
+ scheduleTaskMetricsUpdater(new MRTaskMetricsUpdater(task, this.context), task);
+ }
} catch (RejectedExecutionException ree) {
LOG.error(String.format("Scheduling of task state reporter for task %s was rejected", task.getTaskId()));
}