You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/07/17 18:24:26 UTC

incubator-gobblin git commit: [GOBBLIN-537] Dump workunits to logs for debugging

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 46c604067 -> 56be9b230


[GOBBLIN-537] Dump workunits to logs for debugging

Closes #2400 from htran1/workunit_logging


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/56be9b23
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/56be9b23
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/56be9b23

Branch: refs/heads/master
Commit: 56be9b230ae9324661ae1af514b8063ca961b88d
Parents: 46c6040
Author: Hung Tran <hu...@linkedin.com>
Authored: Tue Jul 17 11:24:20 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Jul 17 11:24:20 2018 -0700

----------------------------------------------------------------------
 .../gobblin/configuration/ConfigurationKeys.java       |  1 +
 .../apache/gobblin/source/workunit/MultiWorkUnit.java  |  3 +++
 .../org/apache/gobblin/source/workunit/WorkUnit.java   |  2 +-
 .../apache/gobblin/runtime/AbstractJobLauncher.java    | 13 +++++++++++++
 .../gobblin/runtime/GobblinMultiTaskAttempt.java       |  7 +++++++
 5 files changed, 25 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/56be9b23/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 01fa490..47233e8 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -178,6 +178,7 @@ public class ConfigurationKeys {
   public static final String WORK_UNIT_RETRY_ENABLED_KEY = "workunit.retry.enabled";
   public static final String WORK_UNIT_CREATION_TIME_IN_MILLIS = "workunit.creation.time.in.millis";
   public static final String WORK_UNIT_CREATION_AND_RUN_INTERVAL = "workunit.creation.and.run.interval";
+  public static final String WORK_UNIT_ENABLE_TRACKING_LOGS = "workunit.enableTrackingLogs";
 
   public static final String JOB_RUN_ONCE_KEY = "job.runonce";
   public static final String JOB_DISABLED_KEY = "job.disabled";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/56be9b23/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java
index d521974..d254de0 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java
@@ -26,6 +26,8 @@ import java.util.List;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
+import lombok.ToString;
+
 
 /**
  * A class that wraps multiple {@link WorkUnit}s so they can executed within a single task.
@@ -40,6 +42,7 @@ import com.google.common.collect.Lists;
  *
  * @author Yinan Li
  */
+@ToString(callSuper = true)
 public class MultiWorkUnit extends WorkUnit {
 
   private final List<WorkUnit> workUnits = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/56be9b23/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java
index 7d3f5d3..bf38c35 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java
@@ -45,7 +45,7 @@ import lombok.ToString;
  *
  * @author kgoodhop
  */
-@ToString
+@ToString(callSuper=true)
 public class WorkUnit extends State {
 
   private Extract extract;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/56be9b23/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 0c50c32..7f359dc 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -433,6 +433,19 @@ public abstract class AbstractJobLauncher implements JobLauncher {
               jobState.addTaskState(new TaskState(new WorkUnitState(workUnit, jobState)));
             }
           });
+
+          // dump the work unit if tracking logs are enabled
+          if (jobState.getPropAsBoolean(ConfigurationKeys.WORK_UNIT_ENABLE_TRACKING_LOGS)) {
+            workUnitStream = workUnitStream.transform(new Function<WorkUnit, WorkUnit>() {
+              @Nullable
+              @Override
+              public WorkUnit apply(@Nullable WorkUnit input) {
+                LOG.info("Work unit tracking log: {}", input);
+                return input;
+              }
+            });
+          }
+
           workUnitsPreparationTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext,
               EventName.WORK_UNITS_PREPARATION));
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/56be9b23/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index e5643c0..e06f338 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -421,6 +421,13 @@ public class GobblinMultiTaskAttempt {
       StateStore<TaskState> taskStateStore,
       CommitPolicy multiTaskAttemptCommitPolicy, SharedResourcesBroker<GobblinScopeTypes> jobBroker)
       throws IOException, InterruptedException {
+
+    // dump the work unit if tracking logs are enabled
+    if (jobState.getPropAsBoolean(ConfigurationKeys.WORK_UNIT_ENABLE_TRACKING_LOGS)) {
+      Logger log = LoggerFactory.getLogger(GobblinMultiTaskAttempt.class.getName());
+      log.info("Work unit tracking log: {}", workUnits);
+    }
+
     GobblinMultiTaskAttempt multiTaskAttempt =
         new GobblinMultiTaskAttempt(workUnits.iterator(), jobId, jobState, taskStateTracker, taskExecutor,
             Optional.of(containerId), Optional.of(taskStateStore), jobBroker);