You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2021/08/17 23:06:36 UTC

[gobblin] branch master updated: [GOBBLIN-1516] Only schedule TaskMetricsUpdateder when metric-reporting is enabled by config (#3365)

This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 10f1dbd  [GOBBLIN-1516] Only schedule TaskMetricsUpdateder when metric-reporting is enabled by config (#3365)
10f1dbd is described below

commit 10f1dbd8943a271fc3fb605cd4e5a7553339890a
Author: Lei <au...@users.noreply.github.com>
AuthorDate: Tue Aug 17 16:06:31 2021 -0700

    [GOBBLIN-1516] Only schedule TaskMetricsUpdateder when metric-reporting is enabled by config (#3365)
    
    * Only schedule TaskMetricsUpdateder is metrics reporting is enabled: The removal of MetricsUpdateder is happening only after task's commit, where all task objects will be holding up until all other tasks completed
    
    * Populate the changes into all implementation of registerNewTask
---
 .../gobblin/cluster/GobblinHelixTaskStateTracker.java      |  4 +++-
 .../instrumented/writer/InstrumentedDataWriterBase.java    | 14 +++++++-------
 .../gobblin/runtime/local/LocalTaskStateTracker.java       |  5 ++++-
 .../gobblin/runtime/mapreduce/MRTaskStateTracker.java      |  4 +++-
 4 files changed, 17 insertions(+), 10 deletions(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskStateTracker.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskStateTracker.java
index c1f4c63..070184e 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskStateTracker.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskStateTracker.java
@@ -61,7 +61,9 @@ public class GobblinHelixTaskStateTracker extends AbstractTaskStateTracker {
   @Override
   public void registerNewTask(Task task) {
     try {
-      this.scheduledReporters.put(task.getTaskId(), scheduleTaskMetricsUpdater(new TaskMetricsUpdater(task), task));
+      if (GobblinMetrics.isEnabled(task.getTaskState().getWorkunit())) {
+        this.scheduledReporters.put(task.getTaskId(), scheduleTaskMetricsUpdater(new TaskMetricsUpdater(task), task));
+      }
     } catch (RejectedExecutionException ree) {
       // Propagate the exception to caller that has full control of the life-cycle of a helix task.
       log.error(String.format("Scheduling of task state reporter for task %s was rejected", task.getTaskId()));
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterBase.java b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterBase.java
index d5b83cb..2d3f72f 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterBase.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterBase.java
@@ -54,7 +54,7 @@ import org.apache.gobblin.writer.DataWriter;
  */
 abstract class InstrumentedDataWriterBase<D> implements DataWriter<D>, Instrumentable, Closeable, FinalState {
 
-  private final Optional<ScheduledThreadPoolExecutor> writerMetricsUpdater;
+  private final Optional<ScheduledThreadPoolExecutor> writerMetricsUpdateExecutor;
   private final boolean instrumentationEnabled;
 
   private MetricContext metricContext;
@@ -80,10 +80,10 @@ abstract class InstrumentedDataWriterBase<D> implements DataWriter<D>, Instrumen
     this.metricContext = this.closer.register(Instrumented.getMetricContext(state, classTag.or(this.getClass())));
 
     if (this.instrumentationEnabled) {
-      this.writerMetricsUpdater = Optional.of(buildWriterMetricsUpdater());
-      scheduleWriterMetricsUpdater(this.writerMetricsUpdater.get(), getWriterMetricsUpdaterInterval(state));
+      this.writerMetricsUpdateExecutor = Optional.of(buildWriterMetricsUpdateExecutor());
+      scheduleWriterMetricsUpdater(this.writerMetricsUpdateExecutor.get(), getWriterMetricsUpdaterInterval(state));
     } else {
-      this.writerMetricsUpdater = Optional.absent();
+      this.writerMetricsUpdateExecutor = Optional.absent();
     }
 
     regenerateMetrics();
@@ -214,8 +214,8 @@ abstract class InstrumentedDataWriterBase<D> implements DataWriter<D>, Instrumen
     try {
       this.closer.close();
     } finally {
-      if (this.writerMetricsUpdater.isPresent()) {
-        ExecutorsUtils.shutdownExecutorService(this.writerMetricsUpdater.get(), Optional.of(log));
+      if (this.writerMetricsUpdateExecutor.isPresent()) {
+        ExecutorsUtils.shutdownExecutorService(this.writerMetricsUpdateExecutor.get(), Optional.of(log));
       }
     }
   }
@@ -262,7 +262,7 @@ abstract class InstrumentedDataWriterBase<D> implements DataWriter<D>, Instrumen
   /**
    * Build a {@link ScheduledThreadPoolExecutor} that updates record-level and byte-level metrics.
    */
-  private static ScheduledThreadPoolExecutor buildWriterMetricsUpdater() {
+  private static ScheduledThreadPoolExecutor buildWriterMetricsUpdateExecutor() {
     return new ScheduledThreadPoolExecutor(1,
         ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("WriterMetricsUpdater-%d")));
   }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalTaskStateTracker.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalTaskStateTracker.java
index 27b3477..b2e3e8b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalTaskStateTracker.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/local/LocalTaskStateTracker.java
@@ -51,6 +51,7 @@ public class LocalTaskStateTracker extends AbstractTaskStateTracker {
   private final JobState jobState;
 
   // This is used to retry failed tasks
+  // Life cycle of this object is managed by the same ServiceManager managing the life cycle of LocalStateTracker
   private final TaskExecutor taskExecutor;
 
   // Mapping between tasks and the task state reporters associated with them
@@ -75,7 +76,9 @@ public class LocalTaskStateTracker extends AbstractTaskStateTracker {
   @Override
   public void registerNewTask(Task task) {
     try {
-      this.scheduledReporters.put(task.getTaskId(), scheduleTaskMetricsUpdater(new TaskMetricsUpdater(task), task));
+      if (GobblinMetrics.isEnabled(task.getTaskState().getWorkunit())) {
+        this.scheduledReporters.put(task.getTaskId(), scheduleTaskMetricsUpdater(new TaskMetricsUpdater(task), task));
+      }
     } catch (RejectedExecutionException ree) {
       LOG.error(String.format("Scheduling of task state reporter for task %s was rejected", task.getTaskId()));
     }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRTaskStateTracker.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRTaskStateTracker.java
index 95d701c..e812d8d 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRTaskStateTracker.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/mapreduce/MRTaskStateTracker.java
@@ -60,7 +60,9 @@ public class MRTaskStateTracker extends AbstractTaskStateTracker {
   @Override
   public void registerNewTask(Task task) {
     try {
-      scheduleTaskMetricsUpdater(new MRTaskMetricsUpdater(task, this.context), task);
+      if (GobblinMetrics.isEnabled(task.getTaskState().getWorkunit())) {
+        scheduleTaskMetricsUpdater(new MRTaskMetricsUpdater(task, this.context), task);
+      }
     } catch (RejectedExecutionException ree) {
       LOG.error(String.format("Scheduling of task state reporter for task %s was rejected", task.getTaskId()));
     }