You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/07/17 18:24:26 UTC
incubator-gobblin git commit: [GOBBLIN-537] Dump workunits to logs
for debugging
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 46c604067 -> 56be9b230
[GOBBLIN-537] Dump workunits to logs for debugging
Closes #2400 from htran1/workunit_logging
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/56be9b23
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/56be9b23
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/56be9b23
Branch: refs/heads/master
Commit: 56be9b230ae9324661ae1af514b8063ca961b88d
Parents: 46c6040
Author: Hung Tran <hu...@linkedin.com>
Authored: Tue Jul 17 11:24:20 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Jul 17 11:24:20 2018 -0700
----------------------------------------------------------------------
.../gobblin/configuration/ConfigurationKeys.java | 1 +
.../apache/gobblin/source/workunit/MultiWorkUnit.java | 3 +++
.../org/apache/gobblin/source/workunit/WorkUnit.java | 2 +-
.../apache/gobblin/runtime/AbstractJobLauncher.java | 13 +++++++++++++
.../gobblin/runtime/GobblinMultiTaskAttempt.java | 7 +++++++
5 files changed, 25 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/56be9b23/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 01fa490..47233e8 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -178,6 +178,7 @@ public class ConfigurationKeys {
public static final String WORK_UNIT_RETRY_ENABLED_KEY = "workunit.retry.enabled";
public static final String WORK_UNIT_CREATION_TIME_IN_MILLIS = "workunit.creation.time.in.millis";
public static final String WORK_UNIT_CREATION_AND_RUN_INTERVAL = "workunit.creation.and.run.interval";
+ public static final String WORK_UNIT_ENABLE_TRACKING_LOGS = "workunit.enableTrackingLogs";
public static final String JOB_RUN_ONCE_KEY = "job.runonce";
public static final String JOB_DISABLED_KEY = "job.disabled";
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/56be9b23/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java
index d521974..d254de0 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java
@@ -26,6 +26,8 @@ import java.util.List;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import lombok.ToString;
+
/**
* A class that wraps multiple {@link WorkUnit}s so they can executed within a single task.
@@ -40,6 +42,7 @@ import com.google.common.collect.Lists;
*
* @author Yinan Li
*/
+@ToString(callSuper = true)
public class MultiWorkUnit extends WorkUnit {
private final List<WorkUnit> workUnits = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/56be9b23/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java
index 7d3f5d3..bf38c35 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java
@@ -45,7 +45,7 @@ import lombok.ToString;
*
* @author kgoodhop
*/
-@ToString
+@ToString(callSuper=true)
public class WorkUnit extends State {
private Extract extract;
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/56be9b23/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 0c50c32..7f359dc 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -433,6 +433,19 @@ public abstract class AbstractJobLauncher implements JobLauncher {
jobState.addTaskState(new TaskState(new WorkUnitState(workUnit, jobState)));
}
});
+
+ // dump the work unit if tracking logs are enabled
+ if (jobState.getPropAsBoolean(ConfigurationKeys.WORK_UNIT_ENABLE_TRACKING_LOGS)) {
+ workUnitStream = workUnitStream.transform(new Function<WorkUnit, WorkUnit>() {
+ @Nullable
+ @Override
+ public WorkUnit apply(@Nullable WorkUnit input) {
+ LOG.info("Work unit tracking log: {}", input);
+ return input;
+ }
+ });
+ }
+
workUnitsPreparationTimer.stop(this.eventMetadataGenerator.getMetadata(this.jobContext,
EventName.WORK_UNITS_PREPARATION));
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/56be9b23/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index e5643c0..e06f338 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -421,6 +421,13 @@ public class GobblinMultiTaskAttempt {
StateStore<TaskState> taskStateStore,
CommitPolicy multiTaskAttemptCommitPolicy, SharedResourcesBroker<GobblinScopeTypes> jobBroker)
throws IOException, InterruptedException {
+
+ // dump the work unit if tracking logs are enabled
+ if (jobState.getPropAsBoolean(ConfigurationKeys.WORK_UNIT_ENABLE_TRACKING_LOGS)) {
+ Logger log = LoggerFactory.getLogger(GobblinMultiTaskAttempt.class.getName());
+ log.info("Work unit tracking log: {}", workUnits);
+ }
+
GobblinMultiTaskAttempt multiTaskAttempt =
new GobblinMultiTaskAttempt(workUnits.iterator(), jobId, jobState, taskStateTracker, taskExecutor,
Optional.of(containerId), Optional.of(taskStateStore), jobBroker);