You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ap...@apache.org on 2021/09/16 16:44:46 UTC
[gobblin] branch master updated: [GOBBLIN-1543] Set default for
work unit size (#3394)
This is an automated email from the ASF dual-hosted git repository.
aplex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 982a2dc [GOBBLIN-1543] Set default for work unit size (#3394)
982a2dc is described below
commit 982a2dcf418f948ce1ee2ec7691c5b4fe40f8e33
Author: umustafi <um...@gmail.com>
AuthorDate: Thu Sep 16 09:44:40 2021 -0700
[GOBBLIN-1543] Set default for work unit size (#3394)
User encountered exception when summing work units for retention jobs that do not have size associated with it, so I am defaulting to progress reporting in terms of work units for cases that do not have size associated with it.
---
.../gobblin/runtime/AbstractJobLauncher.java | 2 +-
.../gobblin/runtime/TaskStateCollectorService.java | 26 ++++++++++++++++++----
2 files changed, 23 insertions(+), 5 deletions(-)
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 562d688..684a00a 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -663,7 +663,7 @@ public abstract class AbstractJobLauncher implements JobLauncher {
@VisibleForTesting
public static long sumWorkUnitsSizes (WorkUnitStream workUnitStream) {
Collection<WorkUnit> workUnits = JobLauncherUtils.flattenWorkUnits(workUnitStream.getMaterializedWorkUnitCollection());
- long totalSizeInBytes = workUnits.stream().mapToLong(wu -> wu.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE)).sum();
+ long totalSizeInBytes = workUnits.stream().mapToLong(wu -> wu.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE, 0)).sum();
return totalSizeInBytes;
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
index 9377727..4316463 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
@@ -90,6 +90,10 @@ public class TaskStateCollectorService extends AbstractScheduledService {
private double bytesCopiedSoFar;
+ private double totalNumWorkUnits;
+
+ private double workUnitsCompletedSoFar;
+
private double lastPercentageReported;
/**
@@ -269,17 +273,31 @@ public class TaskStateCollectorService extends AbstractScheduledService {
Long taskByteSize = Long.parseLong(stringSize);
- // if progress reporting is enabled, value should be present
+ // If progress reporting is enabled, value should be present
if (!this.jobState.contains(ServiceConfigKeys.TOTAL_WORK_UNIT_SIZE)) {
LOGGER.warn("Expected to report job progress but total bytes to copy property null");
return;
}
this.totalSizeToCopy = this.jobState.getPropAsLong(ServiceConfigKeys.TOTAL_WORK_UNIT_SIZE);
- // avoid flooding Kafka message queue by sending GobblinTrackingEvents only when threshold is passed
- this.bytesCopiedSoFar += taskByteSize;
- Double newPercentageCopied = this.bytesCopiedSoFar / this.totalSizeToCopy;
+ // If total size in bytes cannot be calculated, then default to progress reporting in terms of workunits
+ Double newPercentageCopied;
+ if (this.totalSizeToCopy == 0) {
+ this.totalNumWorkUnits = this.jobState.getPropAsLong(AbstractJobLauncher.NUM_WORKUNITS);
+ this.workUnitsCompletedSoFar += 1;
+
+ if (this.totalNumWorkUnits == 0) {
+ LOGGER.warn("Expected to report job progress but work units are not countable");
+ return;
+ }
+ newPercentageCopied = this.workUnitsCompletedSoFar / this.totalNumWorkUnits;
+ } else {
+ this.bytesCopiedSoFar += taskByteSize;
+ newPercentageCopied = this.bytesCopiedSoFar / this.totalSizeToCopy;
+ }
+
+ // Avoid flooding Kafka message queue by sending GobblinTrackingEvents only when threshold is passed
// Report progress when it reaches 100% regardless of difference from lastPercentageReported
if (newPercentageCopied - this.lastPercentageReported >= ConfigurationKeys.DEFAULT_PROGRESS_REPORTING_THRESHOLD ||
(Math.abs(newPercentageCopied - 1.0) < 0.001)) {