You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ap...@apache.org on 2021/09/16 16:44:46 UTC

[gobblin] branch master updated: [GOBBLIN-1543] Set default for work unit size (#3394)

This is an automated email from the ASF dual-hosted git repository.

aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 982a2dc  [GOBBLIN-1543] Set default for work unit size (#3394)
982a2dc is described below

commit 982a2dcf418f948ce1ee2ec7691c5b4fe40f8e33
Author: umustafi <um...@gmail.com>
AuthorDate: Thu Sep 16 09:44:40 2021 -0700

    [GOBBLIN-1543] Set default for work unit size (#3394)
    
    User encountered exception when summing work units for retention jobs that do not have size associated with it, so I am defaulting to progress reporting in terms of work units for cases that do not have size associated with it.
---
 .../gobblin/runtime/AbstractJobLauncher.java       |  2 +-
 .../gobblin/runtime/TaskStateCollectorService.java | 26 ++++++++++++++++++----
 2 files changed, 23 insertions(+), 5 deletions(-)

diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 562d688..684a00a 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -663,7 +663,7 @@ public abstract class AbstractJobLauncher implements JobLauncher {
   @VisibleForTesting
   public static long sumWorkUnitsSizes (WorkUnitStream workUnitStream) {
     Collection<WorkUnit> workUnits = JobLauncherUtils.flattenWorkUnits(workUnitStream.getMaterializedWorkUnitCollection());
-    long totalSizeInBytes = workUnits.stream().mapToLong(wu -> wu.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE)).sum();
+    long totalSizeInBytes = workUnits.stream().mapToLong(wu -> wu.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE, 0)).sum();
     return totalSizeInBytes;
   }
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
index 9377727..4316463 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
@@ -90,6 +90,10 @@ public class TaskStateCollectorService extends AbstractScheduledService {
 
   private double bytesCopiedSoFar;
 
+  private double totalNumWorkUnits;
+
+  private double workUnitsCompletedSoFar;
+
   private double lastPercentageReported;
 
   /**
@@ -269,17 +273,31 @@ public class TaskStateCollectorService extends AbstractScheduledService {
 
     Long taskByteSize = Long.parseLong(stringSize);
 
-    // if progress reporting is enabled, value should be present
+    // If progress reporting is enabled, value should be present
     if (!this.jobState.contains(ServiceConfigKeys.TOTAL_WORK_UNIT_SIZE)) {
       LOGGER.warn("Expected to report job progress but total bytes to copy property null");
       return;
     }
     this.totalSizeToCopy = this.jobState.getPropAsLong(ServiceConfigKeys.TOTAL_WORK_UNIT_SIZE);
 
-    // avoid flooding Kafka message queue by sending GobblinTrackingEvents only when threshold is passed
-    this.bytesCopiedSoFar += taskByteSize;
-    Double newPercentageCopied = this.bytesCopiedSoFar / this.totalSizeToCopy;
+    // If total size in bytes cannot be calculated, then default to progress reporting in terms of workunits
+    Double newPercentageCopied;
+    if (this.totalSizeToCopy == 0) {
+      this.totalNumWorkUnits = this.jobState.getPropAsLong(AbstractJobLauncher.NUM_WORKUNITS);
+      this.workUnitsCompletedSoFar += 1;
+
+      if (this.totalNumWorkUnits == 0) {
+        LOGGER.warn("Expected to report job progress but work units are not countable");
+        return;
+      }
+      newPercentageCopied = this.workUnitsCompletedSoFar / this.totalNumWorkUnits;
+    } else {
+      this.bytesCopiedSoFar += taskByteSize;
+      newPercentageCopied = this.bytesCopiedSoFar / this.totalSizeToCopy;
+    }
+
 
+    // Avoid flooding Kafka message queue by sending GobblinTrackingEvents only when threshold is passed
     // Report progress when it reaches 100% regardless of difference from lastPercentageReported
     if (newPercentageCopied - this.lastPercentageReported >= ConfigurationKeys.DEFAULT_PROGRESS_REPORTING_THRESHOLD ||
         (Math.abs(newPercentageCopied - 1.0) < 0.001)) {