You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2021/07/30 21:55:32 UTC

[GitHub] [gobblin] umustafi opened a new pull request #3336: [GOBBLIN-1493] Report data copy's progress

umustafi opened a new pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336


   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   - [X] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
       - https://issues.apache.org/jira/browse/GOBBLIN-1493
   
   
   ### Description
   - [X] Here are some details about my PR, including screenshots (if applicable):
   Progress reporting for a data copy will provide users with quantitative feedback on the progress of a data copy job as a percentage as well as an estimate of the time remaining for completion. This will update the existing job status endpoint to include the progress percentage and estimate of time left. 
   
   This PR decorates each work unit with its size in bytes and uses that information to determine the progress of a copy while individual tasks are completed. It emits GobblinTrackingEvents containing the progress as a percentage intermittently and calculates the remaining time for the copy, storing both in the job status. 
   
   ### Tests
   - [X] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   
   
   ### Commits
   - [X] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] arjun4084346 commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r678706597



##########
File path: gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
##########
@@ -180,6 +180,14 @@
                 "name" : "processedCount",
                 "type" : "long",
                 "doc" : "number of records processed in the last job execution"
+              }, {
+                "name" : "jobProgress",
+                "type" : "int",
+                "doc" : "job progress as a percentage (0-100)"
+              }, {
+                "name" : "estimatedMillisecondsToCompletion",

Review comment:
       Sorry for the late comment. Do you think we should have `estimatedSecondsToCompletion` instead of Milliseconds?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] jack-moseley commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
jack-moseley commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r679356124



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -235,6 +255,49 @@ public Void call() throws Exception {
     this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
   }
 
+  /**
+   * Uses the size of work units to determine a job's progress and reports the progress as a percentage via
+   * GobblinTrackingEvents
+   * @param taskState of job launched
+   */
+  private void reportJobProgress(TaskState taskState) {
+    String stringSize = taskState.getProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE);
+    if (stringSize == null) {
+      LOGGER.warn("Expected to report job progress but work unit byte size property null");
+      return;
+    }
+
+    Long taskByteSize = Long.parseLong(stringSize);
+
+    // if progress reporting is enabled, value should be present
+    if (!this.jobState.contains(AbstractJobLauncher.TOTAL_BYTES_TO_COPY)) {
+      LOGGER.warn("Expected to report job progress but total bytes to copy property null");
+      return;
+    }
+    this.totalSizeToCopy = this.jobState.getPropAsLong(AbstractJobLauncher.TOTAL_BYTES_TO_COPY);
+
+    // avoid flooding Kafka message queue by sending GobblinTrackingEvents only when threshold is passed
+    this.bytesCopiedSoFar += taskByteSize;
+    Double newPercentageCopied = this.bytesCopiedSoFar / this.totalSizeToCopy;
+
+    if (newPercentageCopied - this.lastPercentageReported > ConfigurationKeys.DEFAULT_PROGRESS_REPORTING_THRESHOLD) {
+      this.lastPercentageReported = newPercentageCopied;
+      int percentageToReport = (int) Math.round(this.lastPercentageReported * 100);
+
+      Map<String, String> progress = new HashMap<>();
+      progress.put(TimingEvent.JOB_COMPLETION_PERCENTAGE, String.valueOf(percentageToReport));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));

Review comment:
       If we do need to add these, let's check for existence in the jobState before adding them, because this feature could be turned on for a non-gaas distcp job and then we would be setting those properties to null.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] jack-moseley commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
jack-moseley commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r677791888



##########
File path: gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
##########
@@ -681,6 +681,8 @@
   public static final String TASK_STATE_COLLECTOR_INTERVAL_SECONDS = "task.state.collector.interval.secs";
   public static final int DEFAULT_TASK_STATE_COLLECTOR_INTERVAL_SECONDS = 60;
   public static final String TASK_STATE_COLLECTOR_HANDLER_CLASS = "task.state.collector.handler.class";
+  public static final String REPORT_JOB_PROGRESS = "job.report.progress";

Review comment:
       I would add a constant `public static final boolean DEFAULT_REPORT_JOB_PROGRESS = false;` as well, makes it a bit easier if we want to change the default later.

##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -463,6 +469,19 @@ public void apply(JobListener jobListener, JobContext jobContext)
           return;
         }
 
+        // calculation of total bytes to copy in a job used to track a job's copy progress
+        if (jobState.getPropAsBoolean(ConfigurationKeys.REPORT_JOB_PROGRESS, false)) {
+          if (workUnitStream.isSafeToMaterialize()) {
+            Collection<WorkUnit> workUnits = JobLauncherUtils.flattenWorkUnits(workUnitStream.getMaterializedWorkUnitCollection());
+            long totalSizeInBytes = sumWorkUnitsSizes(workUnits);
+            this.jobContext.getJobState().setProp(TOTAL_BYTES_TO_COPY, totalSizeInBytes);
+          } else {
+            throw new RuntimeException("Property " + ConfigurationKeys.REPORT_JOB_PROGRESS + " is turned on, but "
+                + "progress cannot be reported for infinite work unit streams. Turn off property "
+                + ConfigurationKeys.REPORT_JOB_PROGRESS + " and rerun job.");

Review comment:
       IMO this could just print a `log.error` line instead of terminating the job, I think most other metrics related failures in gobblin would just log the issue instead of throwing an exception.

##########
File path: gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
##########
@@ -199,4 +206,20 @@ public static FlowExecution convertFlowStatus(org.apache.gobblin.service.monitor
   private static long getFlowStartTime(org.apache.gobblin.service.monitoring.FlowStatus flowStatus) {
     return flowStatus.getFlowExecutionId();
   }
+
+  /**
+   * Estimate the time left to complete the copy based on the following formula -
+   * timeLeft = (100/completionPercentage) * timeElapsed

Review comment:
       Is this formula correct? If a job completed 50% in 10 minutes, the formula would return `(100/50)*(10 minutes) = 20 minutes`, which is the total expected time instead of the expected remaining time right?

##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -235,6 +255,49 @@ public Void call() throws Exception {
     this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
   }
 
+  /**
+   * Uses the size of work units to determine a job's progress and reports the progress as a percentage via
+   * GobblinTrackingEvents
+   * @param taskState of job launched
+   */
+  private void reportJobProgress(TaskState taskState) {
+    String stringSize = taskState.getProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE);
+    if (stringSize == null) {
+      LOGGER.warn("Expected to report job progress but work unit byte size property null");
+      return;
+    }
+
+    Long taskByteSize = Long.parseLong(stringSize);
+
+    // if progress reporting is enabled, value should be present
+    if (!this.jobState.contains(AbstractJobLauncher.TOTAL_BYTES_TO_COPY)) {
+      LOGGER.warn("Expected to report job progress but total bytes to copy property null");
+      return;
+    }
+    this.totalSizeToCopy = this.jobState.getPropAsLong(AbstractJobLauncher.TOTAL_BYTES_TO_COPY);
+
+    // avoid flooding Kafka message queue by sending GobblinTrackingEvents only when threshold is passed
+    this.bytesCopiedSoFar += taskByteSize;
+    Double newPercentageCopied = this.bytesCopiedSoFar / this.totalSizeToCopy;
+
+    if (newPercentageCopied - this.lastPercentageReported > ConfigurationKeys.DEFAULT_PROGRESS_REPORTING_THRESHOLD) {
+      this.lastPercentageReported = newPercentageCopied;
+      int percentageToReport = (int) Math.round(this.lastPercentageReported * 100);
+
+      Map<String, String> progress = new HashMap<>();
+      progress.put(TimingEvent.JOB_COMPLETION_PERCENTAGE, String.valueOf(percentageToReport));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));

Review comment:
       I think adding these properties is not actually required. You are using the `eventSubmitter` passed from `MRJobLauncher`, and that already includes a bunch of additional metadata including all three of these properties that are added by default.
   
   If you look at other job level events sent by GaaS such as `WorkUnitsCreated`, they already contain these despite not explicitly adding them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] arjun4084346 commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r677883082



##########
File path: gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
##########
@@ -681,6 +681,8 @@
   public static final String TASK_STATE_COLLECTOR_INTERVAL_SECONDS = "task.state.collector.interval.secs";
   public static final int DEFAULT_TASK_STATE_COLLECTOR_INTERVAL_SECONDS = 60;
   public static final String TASK_STATE_COLLECTOR_HANDLER_CLASS = "task.state.collector.handler.class";
+  public static final String REPORT_JOB_PROGRESS = "job.report.progress";

Review comment:
       Let's not jumble the words in key and value




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] arjun4084346 commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r677886489



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -17,11 +17,13 @@
 
 package org.apache.gobblin.runtime;
 
+import avro.shaded.com.google.common.annotations.VisibleForTesting;

Review comment:
       just keep it com.google.common.annotations.VisibleForTesting




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] codecov-commenter edited a comment on pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#issuecomment-884587476


   # [Codecov](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3336](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6fbb1f6) into [master](https://codecov.io/gh/apache/gobblin/commit/08db23e15ab73654142998ecabb01bbb51f42a61?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (08db23e) will **decrease** coverage by `4.59%`.
   > The diff coverage is `25.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/gobblin/pull/3336/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3336      +/-   ##
   ============================================
   - Coverage     46.55%   41.96%   -4.60%     
   + Complexity    10138     2601    -7537     
   ============================================
     Files          2051      630    -1421     
     Lines         79547    23748   -55799     
     Branches       8880     2455    -6425     
   ============================================
   - Hits          37036     9966   -27070     
   + Misses        39086    12874   -26212     
   + Partials       3425      908    -2517     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...pache/gobblin/configuration/ConfigurationKeys.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vY29uZmlndXJhdGlvbi9Db25maWd1cmF0aW9uS2V5cy5qYXZh) | `0.00% <ø> (ø)` | |
   | [.../org/apache/gobblin/service/ServiceConfigKeys.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vc2VydmljZS9TZXJ2aWNlQ29uZmlnS2V5cy5qYXZh) | `0.00% <ø> (ø)` | |
   | [.../service/monitoring/KafkaAvroJobStatusMonitor.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9uaXRvcmluZy9LYWZrYUF2cm9Kb2JTdGF0dXNNb25pdG9yLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...lin/service/FlowExecutionResourceLocalHandler.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1yZXN0bGkvZ29iYmxpbi1mbG93LWNvbmZpZy1zZXJ2aWNlL2dvYmJsaW4tZmxvdy1jb25maWctc2VydmljZS1zZXJ2ZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vc2VydmljZS9GbG93RXhlY3V0aW9uUmVzb3VyY2VMb2NhbEhhbmRsZXIuamF2YQ==) | `4.59% <30.00%> (+4.59%)` | :arrow_up: |
   | [...lin/restli/throttling/ZookeeperLeaderElection.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1yZXN0bGkvZ29iYmxpbi10aHJvdHRsaW5nLXNlcnZpY2UvZ29iYmxpbi10aHJvdHRsaW5nLXNlcnZpY2Utc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3Jlc3RsaS90aHJvdHRsaW5nL1pvb2tlZXBlckxlYWRlckVsZWN0aW9uLmphdmE=) | `70.00% <0.00%> (-2.23%)` | :arrow_down: |
   | [...blin/source/extractor/watermark/DateWatermark.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3Ivd2F0ZXJtYXJrL0RhdGVXYXRlcm1hcmsuamF2YQ==) | | |
   | [...ompaction/mapreduce/CompactionJobConfigurator.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jb21wYWN0aW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbXBhY3Rpb24vbWFwcmVkdWNlL0NvbXBhY3Rpb25Kb2JDb25maWd1cmF0b3IuamF2YQ==) | | |
   | [...main/java/org/apache/gobblin/metrics/Counters.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1tZXRyaWNzLWxpYnMvZ29iYmxpbi1tZXRyaWNzLWJhc2Uvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vbWV0cmljcy9Db3VudGVycy5qYXZh) | | |
   | [...ent/retention/dataset/ModificationTimeDataset.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1kYXRhLW1hbmFnZW1lbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YS9tYW5hZ2VtZW50L3JldGVudGlvbi9kYXRhc2V0L01vZGlmaWNhdGlvblRpbWVEYXRhc2V0LmphdmE=) | | |
   | [...eplication/CopyRouteGeneratorOptimizedLatency.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1kYXRhLW1hbmFnZW1lbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YS9tYW5hZ2VtZW50L2NvcHkvcmVwbGljYXRpb24vQ29weVJvdXRlR2VuZXJhdG9yT3B0aW1pemVkTGF0ZW5jeS5qYXZh) | | |
   | ... and [1409 more](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [08db23e...6fbb1f6](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] arjun4084346 commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r677888970



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -463,6 +469,19 @@ public void apply(JobListener jobListener, JobContext jobContext)
           return;
         }
 
+        // calculation of total bytes to copy in a job used to track a job's copy progress
+        if (jobState.getPropAsBoolean(ConfigurationKeys.REPORT_JOB_PROGRESS, false)) {
+          if (workUnitStream.isSafeToMaterialize()) {
+            Collection<WorkUnit> workUnits = JobLauncherUtils.flattenWorkUnits(workUnitStream.getMaterializedWorkUnitCollection());

Review comment:
       I think workUnits can go inside sumWorkUnitsSizes because it is not used here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] arjun4084346 commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r677888650



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -463,6 +469,19 @@ public void apply(JobListener jobListener, JobContext jobContext)
           return;
         }
 
+        // calculation of total bytes to copy in a job used to track a job's copy progress
+        if (jobState.getPropAsBoolean(ConfigurationKeys.REPORT_JOB_PROGRESS, false)) {
+          if (workUnitStream.isSafeToMaterialize()) {
+            Collection<WorkUnit> workUnits = JobLauncherUtils.flattenWorkUnits(workUnitStream.getMaterializedWorkUnitCollection());
+            long totalSizeInBytes = sumWorkUnitsSizes(workUnits);
+            this.jobContext.getJobState().setProp(TOTAL_BYTES_TO_COPY, totalSizeInBytes);
+          } else {
+            throw new RuntimeException("Property " + ConfigurationKeys.REPORT_JOB_PROGRESS + " is turned on, but "

Review comment:
       UnsupportedOperationException is more appropriate, IMO




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] umustafi commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
umustafi commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r675884393



##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
##########
@@ -163,6 +163,9 @@ public GobblinTrackingEvent deserializeEvent(DecodeableKafkaRecord<byte[],byte[]
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.CANCELLED.name());
         properties.put(TimingEvent.JOB_END_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
         break;
+      case TimingEvent.JOB_COMPLETION_PERCENTAGE:
+        properties.put(TimingEvent.JOB_LAST_PROGRESS_PERCENT_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));

Review comment:
       I believe METADATA_END_TIME is set when event.stop() is called (which is what I use to obtain the timestamp of when the progress event was set)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] arjun4084346 commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r679507183



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -235,6 +255,49 @@ public Void call() throws Exception {
     this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
   }
 
+  /**
+   * Uses the size of work units to determine a job's progress and reports the progress as a percentage via
+   * GobblinTrackingEvents
+   * @param taskState of job launched
+   */
+  private void reportJobProgress(TaskState taskState) {
+    String stringSize = taskState.getProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE);
+    if (stringSize == null) {
+      LOGGER.warn("Expected to report job progress but work unit byte size property null");
+      return;
+    }
+
+    Long taskByteSize = Long.parseLong(stringSize);
+
+    // if progress reporting is enabled, value should be present
+    if (!this.jobState.contains(AbstractJobLauncher.TOTAL_BYTES_TO_COPY)) {
+      LOGGER.warn("Expected to report job progress but total bytes to copy property null");
+      return;
+    }
+    this.totalSizeToCopy = this.jobState.getPropAsLong(AbstractJobLauncher.TOTAL_BYTES_TO_COPY);
+
+    // avoid flooding Kafka message queue by sending GobblinTrackingEvents only when threshold is passed
+    this.bytesCopiedSoFar += taskByteSize;
+    Double newPercentageCopied = this.bytesCopiedSoFar / this.totalSizeToCopy;
+
+    if (newPercentageCopied - this.lastPercentageReported > ConfigurationKeys.DEFAULT_PROGRESS_REPORTING_THRESHOLD) {
+      this.lastPercentageReported = newPercentageCopied;
+      int percentageToReport = (int) Math.round(this.lastPercentageReported * 100);
+
+      Map<String, String> progress = new HashMap<>();
+      progress.put(TimingEvent.JOB_COMPLETION_PERCENTAGE, String.valueOf(percentageToReport));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));

Review comment:
       @umustafi these params are added in IvyJobLauncher. Let's remove them from here and if it does not work, we will debug.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] umustafi commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
umustafi commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r679563392



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -235,6 +255,49 @@ public Void call() throws Exception {
     this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
   }
 
+  /**
+   * Uses the size of work units to determine a job's progress and reports the progress as a percentage via
+   * GobblinTrackingEvents
+   * @param taskState of job launched
+   */
+  private void reportJobProgress(TaskState taskState) {
+    String stringSize = taskState.getProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE);
+    if (stringSize == null) {
+      LOGGER.warn("Expected to report job progress but work unit byte size property null");
+      return;
+    }
+
+    Long taskByteSize = Long.parseLong(stringSize);
+
+    // if progress reporting is enabled, value should be present
+    if (!this.jobState.contains(AbstractJobLauncher.TOTAL_BYTES_TO_COPY)) {
+      LOGGER.warn("Expected to report job progress but total bytes to copy property null");
+      return;
+    }
+    this.totalSizeToCopy = this.jobState.getPropAsLong(AbstractJobLauncher.TOTAL_BYTES_TO_COPY);
+
+    // avoid flooding Kafka message queue by sending GobblinTrackingEvents only when threshold is passed
+    this.bytesCopiedSoFar += taskByteSize;

Review comment:
       Yes it will be 0 until the first mapper completes but the percentage of progress will also be 0 - one will not be set without the other and we hope this is understandable enough to the user. In the usual case we expect the first mapper to complete in seconds or 1-2 min. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] umustafi commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
umustafi commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r678535868



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -463,6 +469,19 @@ public void apply(JobListener jobListener, JobContext jobContext)
           return;
         }
 
+        // calculation of total bytes to copy in a job used to track a job's copy progress
+        if (jobState.getPropAsBoolean(ConfigurationKeys.REPORT_JOB_PROGRESS, false)) {
+          if (workUnitStream.isSafeToMaterialize()) {
+            Collection<WorkUnit> workUnits = JobLauncherUtils.flattenWorkUnits(workUnitStream.getMaterializedWorkUnitCollection());
+            long totalSizeInBytes = sumWorkUnitsSizes(workUnits);
+            this.jobContext.getJobState().setProp(TOTAL_BYTES_TO_COPY, totalSizeInBytes);
+          } else {
+            throw new RuntimeException("Property " + ConfigurationKeys.REPORT_JOB_PROGRESS + " is turned on, but "
+                + "progress cannot be reported for infinite work unit streams. Turn off property "
+                + ConfigurationKeys.REPORT_JOB_PROGRESS + " and rerun job.");

Review comment:
       good point, job failure may not make sense if we just expect the user to turn off the property and retry it. Changing it to an error message. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] arjun4084346 commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r677898442



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
##########
@@ -105,6 +105,8 @@ protected JobStatus getJobStatus(State jobState) {
     int maxAttempts = Integer.parseInt(jobState.getProp(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, "1"));
     int currentAttempts = Integer.parseInt(jobState.getProp(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, "1"));
     boolean shouldRetry = Boolean.parseBoolean(jobState.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, "false"));
+    int progressPercentage = jobState.getPropAsInt(TimingEvent.JOB_COMPLETION_PERCENTAGE, 0);
+    long lastProgressEventTime = jobState.getPropAsLong(TimingEvent.JOB_LAST_PROGRESS_PERCENT_TIME, 0);

Review comment:
       It took me a little while to figure this out. IMO it could be named JOB_LAST_PROGRESS_EVENT_TIME 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] umustafi commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
umustafi commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r677683129



##########
File path: gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/TestCopyableDataset.java
##########
@@ -72,6 +73,7 @@ public TestCopyableDataset() {
     return files;
   }
 
+

Review comment:
       removing




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] umustafi closed pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
umustafi closed pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] umustafi commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
umustafi commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r680090848



##########
File path: gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
##########
@@ -58,6 +58,10 @@
   public static final String GOBBLIN_SERVICE_FLOW_CATALOG_LOCAL_COMMIT = GOBBLIN_SERVICE_PREFIX + "flowCatalog.localCommit";
   public static final boolean DEFAULT_GOBBLIN_SERVICE_FLOW_CATALOG_LOCAL_COMMIT = true;
 
+  // Job Level Keys
+  public static final String WORK_UNIT_BYTE_SIZE = GOBBLIN_SERVICE_PREFIX + ".work.unit.byte.size";

Review comment:
       This isn't meant for only file-based sources. It will also be used for record-based sources so it will make more sense to call it `work_unit_size` like you're suggesting. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] umustafi commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
umustafi commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r678683993



##########
File path: gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
##########
@@ -199,4 +206,20 @@ public static FlowExecution convertFlowStatus(org.apache.gobblin.service.monitor
   private static long getFlowStartTime(org.apache.gobblin.service.monitoring.FlowStatus flowStatus) {
     return flowStatus.getFlowExecutionId();
   }
+
+  /**
+   * Estimate the time left to complete the copy based on the following formula -
+   * timeLeft = (100/completionPercentage) * timeElapsed

Review comment:
       also added a test with this case




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] umustafi commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
umustafi commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r675755726



##########
File path: gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/JobStatistics.pdl
##########
@@ -19,4 +19,14 @@ record JobStatistics {
    * number of records processed in the last job execution
    */
   processedCount: long
+
+  /**
+   * data copy progress as a percentage
+   */
+  copyProgress: long
+
+  /**
+   * estimate of time left until copy completion
+   */
+  estimatedTimeLeft: long

Review comment:
       I have it in milliseconds since that's how the executionStart/EndTimes are set. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] umustafi commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
umustafi commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r678709912



##########
File path: gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
##########
@@ -180,6 +180,14 @@
                 "name" : "processedCount",
                 "type" : "long",
                 "doc" : "number of records processed in the last job execution"
+              }, {
+                "name" : "jobProgress",
+                "type" : "int",
+                "doc" : "job progress as a percentage (0-100)"
+              }, {
+                "name" : "estimatedMillisecondsToCompletion",

Review comment:
       nvm good point, changing it to seconds as that will be more useful




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] umustafi commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
umustafi commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r679561366



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -235,6 +255,49 @@ public Void call() throws Exception {
     this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
   }
 
+  /**
+   * Uses the size of work units to determine a job's progress and reports the progress as a percentage via
+   * GobblinTrackingEvents
+   * @param taskState of job launched
+   */
+  private void reportJobProgress(TaskState taskState) {
+    String stringSize = taskState.getProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE);
+    if (stringSize == null) {
+      LOGGER.warn("Expected to report job progress but work unit byte size property null");
+      return;
+    }
+
+    Long taskByteSize = Long.parseLong(stringSize);
+
+    // if progress reporting is enabled, value should be present
+    if (!this.jobState.contains(AbstractJobLauncher.TOTAL_BYTES_TO_COPY)) {
+      LOGGER.warn("Expected to report job progress but total bytes to copy property null");
+      return;
+    }
+    this.totalSizeToCopy = this.jobState.getPropAsLong(AbstractJobLauncher.TOTAL_BYTES_TO_COPY);
+
+    // avoid flooding Kafka message queue by sending GobblinTrackingEvents only when threshold is passed
+    this.bytesCopiedSoFar += taskByteSize;
+    Double newPercentageCopied = this.bytesCopiedSoFar / this.totalSizeToCopy;
+
+    if (newPercentageCopied - this.lastPercentageReported > ConfigurationKeys.DEFAULT_PROGRESS_REPORTING_THRESHOLD) {
+      this.lastPercentageReported = newPercentageCopied;
+      int percentageToReport = (int) Math.round(this.lastPercentageReported * 100);
+
+      Map<String, String> progress = new HashMap<>();
+      progress.put(TimingEvent.JOB_COMPLETION_PERCENTAGE, String.valueOf(percentageToReport));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));

Review comment:
       removed them because they should be present




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] arjun4084346 commented on pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#issuecomment-889546235


   +1 LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] arjun4084346 commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r679507183



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -235,6 +255,49 @@ public Void call() throws Exception {
     this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
   }
 
+  /**
+   * Uses the size of work units to determine a job's progress and reports the progress as a percentage via
+   * GobblinTrackingEvents
+   * @param taskState of job launched
+   */
+  private void reportJobProgress(TaskState taskState) {
+    String stringSize = taskState.getProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE);
+    if (stringSize == null) {
+      LOGGER.warn("Expected to report job progress but work unit byte size property null");
+      return;
+    }
+
+    Long taskByteSize = Long.parseLong(stringSize);
+
+    // if progress reporting is enabled, value should be present
+    if (!this.jobState.contains(AbstractJobLauncher.TOTAL_BYTES_TO_COPY)) {
+      LOGGER.warn("Expected to report job progress but total bytes to copy property null");
+      return;
+    }
+    this.totalSizeToCopy = this.jobState.getPropAsLong(AbstractJobLauncher.TOTAL_BYTES_TO_COPY);
+
+    // avoid flooding Kafka message queue by sending GobblinTrackingEvents only when threshold is passed
+    this.bytesCopiedSoFar += taskByteSize;
+    Double newPercentageCopied = this.bytesCopiedSoFar / this.totalSizeToCopy;
+
+    if (newPercentageCopied - this.lastPercentageReported > ConfigurationKeys.DEFAULT_PROGRESS_REPORTING_THRESHOLD) {
+      this.lastPercentageReported = newPercentageCopied;
+      int percentageToReport = (int) Math.round(this.lastPercentageReported * 100);
+
+      Map<String, String> progress = new HashMap<>();
+      progress.put(TimingEvent.JOB_COMPLETION_PERCENTAGE, String.valueOf(percentageToReport));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));

Review comment:
       @umustafi these params are added in IvyJobLauncher. Let's remove them from here and if it does not work, we will debug.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] arjun4084346 commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r678703598



##########
File path: gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
##########
@@ -199,4 +206,20 @@ public static FlowExecution convertFlowStatus(org.apache.gobblin.service.monitor
   private static long getFlowStartTime(org.apache.gobblin.service.monitoring.FlowStatus flowStatus) {
     return flowStatus.getFlowExecutionId();
   }
+
+  /**
+   * Estimate the time left to complete the copy based on the following formula -
+   * timeLeft = (100/completionPercentage) * timeElapsed
+   */
+  public static long estimateCopyTimeLeft(Long currentTime, Long startTime, int completionPercentage) {
+    if (completionPercentage == 0) {
+      return 0;
+    }
+
+    Instant current = Instant.ofEpochMilli(currentTime);
+    Instant start = Instant.ofEpochMilli(startTime);
+    Long timeBetween = Duration.between(start, current).toMillis();

Review comment:
       Is not in simply `current-start` ? Or did I miss something?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] sv2000 merged pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
sv2000 merged pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] umustafi commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
umustafi commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r675865553



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -463,6 +468,17 @@ public void apply(JobListener jobListener, JobContext jobContext)
           return;
         }
 
+        // calculation of total bytes to copy in a job used to track a job's copy progress
+        if (jobState.getPropAsBoolean(ConfigurationKeys.REPORT_JOB_COPY_PROGRESS, false)) {
+          if (workUnitStream.isSafeToMaterialize()) {
+            Collection<WorkUnit> workUnits = JobLauncherUtils.flattenWorkUnits(workUnitStream.getMaterializedWorkUnitCollection());
+            long totalSizeInBytes = sumWorkUnitsSizes(workUnits);
+            this.jobContext.getJobState().setProp(TOTAL_BYTES_TO_COPY, totalSizeInBytes);
+          } else {
+            throw new RuntimeException(DestinationDatasetHandlerService.class.getName() + " does not support work unit streams");

Review comment:
       Great point, the error message you suggested will be more helpful!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] codecov-commenter commented on pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#issuecomment-884587476


   # [Codecov](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3336](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6fbb1f6) into [master](https://codecov.io/gh/apache/gobblin/commit/08db23e15ab73654142998ecabb01bbb51f42a61?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (08db23e) will **increase** coverage by `6.73%`.
   > The diff coverage is `25.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/gobblin/pull/3336/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3336      +/-   ##
   ============================================
   + Coverage     46.55%   53.29%   +6.73%     
   + Complexity    10138     1124    -9014     
   ============================================
     Files          2051      249    -1802     
     Lines         79547     8192   -71355     
     Branches       8880      918    -7962     
   ============================================
   - Hits          37036     4366   -32670     
   + Misses        39086     3375   -35711     
   + Partials       3425      451    -2974     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...pache/gobblin/configuration/ConfigurationKeys.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vY29uZmlndXJhdGlvbi9Db25maWd1cmF0aW9uS2V5cy5qYXZh) | `0.00% <ø> (ø)` | |
   | [.../org/apache/gobblin/service/ServiceConfigKeys.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vc2VydmljZS9TZXJ2aWNlQ29uZmlnS2V5cy5qYXZh) | `0.00% <ø> (ø)` | |
   | [.../service/monitoring/KafkaAvroJobStatusMonitor.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9uaXRvcmluZy9LYWZrYUF2cm9Kb2JTdGF0dXNNb25pdG9yLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...lin/service/FlowExecutionResourceLocalHandler.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1yZXN0bGkvZ29iYmxpbi1mbG93LWNvbmZpZy1zZXJ2aWNlL2dvYmJsaW4tZmxvdy1jb25maWctc2VydmljZS1zZXJ2ZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vc2VydmljZS9GbG93RXhlY3V0aW9uUmVzb3VyY2VMb2NhbEhhbmRsZXIuamF2YQ==) | `4.59% <30.00%> (+4.59%)` | :arrow_up: |
   | [...lin/restli/throttling/ZookeeperLeaderElection.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1yZXN0bGkvZ29iYmxpbi10aHJvdHRsaW5nLXNlcnZpY2UvZ29iYmxpbi10aHJvdHRsaW5nLXNlcnZpY2Utc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3Jlc3RsaS90aHJvdHRsaW5nL1pvb2tlZXBlckxlYWRlckVsZWN0aW9uLmphdmE=) | `70.00% <0.00%> (-2.23%)` | :arrow_down: |
   | [...org/apache/gobblin/writer/http/RestJsonWriter.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3dyaXRlci9odHRwL1Jlc3RKc29uV3JpdGVyLmphdmE=) | | |
   | [...blin/converter/filter/AvroProjectionConverter.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jb3JlLWJhc2Uvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vY29udmVydGVyL2ZpbHRlci9BdnJvUHJvamVjdGlvbkNvbnZlcnRlci5qYXZh) | | |
   | [...on/mapreduce/CompactionCombineFileInputFormat.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jb21wYWN0aW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbXBhY3Rpb24vbWFwcmVkdWNlL0NvbXBhY3Rpb25Db21iaW5lRmlsZUlucHV0Rm9ybWF0LmphdmE=) | | |
   | [.../source/extractor/extract/QueryBasedExtractor.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3IvZXh0cmFjdC9RdWVyeUJhc2VkRXh0cmFjdG9yLmphdmE=) | | |
   | [...ta/management/dataset/TimePartitionGlobFinder.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1kYXRhLW1hbmFnZW1lbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YS9tYW5hZ2VtZW50L2RhdGFzZXQvVGltZVBhcnRpdGlvbkdsb2JGaW5kZXIuamF2YQ==) | | |
   | ... and [1790 more](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [08db23e...6fbb1f6](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] Will-Lo commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
Will-Lo commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r677031613



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -206,12 +222,53 @@ public Void call() throws Exception {
 
     LOGGER.info(String.format("Collected task state of %d completed tasks", taskStateQueue.size()));
 
+    Long taskByteSize;
+    String stringSize;
+    Double newPercentageCopied;
     // Add the TaskStates of completed tasks to the JobState so when the control
     // returns to the launcher, it sees the TaskStates of all completed tasks.
     for (TaskState taskState : taskStateQueue) {
       consumeTaskIssues(taskState);
       taskState.setJobState(this.jobState);
       this.jobState.addTaskState(taskState);
+

Review comment:
       I think a separate method for now is good here, until we can determine more use cases/extensions to the progress reporting




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] arjun4084346 commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r679387608



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -235,6 +255,49 @@ public Void call() throws Exception {
     this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
   }
 
+  /**
+   * Uses the size of work units to determine a job's progress and reports the progress as a percentage via
+   * GobblinTrackingEvents
+   * @param taskState of job launched
+   */
+  private void reportJobProgress(TaskState taskState) {
+    String stringSize = taskState.getProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE);
+    if (stringSize == null) {
+      LOGGER.warn("Expected to report job progress but work unit byte size property null");
+      return;
+    }
+
+    Long taskByteSize = Long.parseLong(stringSize);
+
+    // if progress reporting is enabled, value should be present
+    if (!this.jobState.contains(AbstractJobLauncher.TOTAL_BYTES_TO_COPY)) {
+      LOGGER.warn("Expected to report job progress but total bytes to copy property null");
+      return;
+    }
+    this.totalSizeToCopy = this.jobState.getPropAsLong(AbstractJobLauncher.TOTAL_BYTES_TO_COPY);
+
+    // avoid flooding Kafka message queue by sending GobblinTrackingEvents only when threshold is passed
+    this.bytesCopiedSoFar += taskByteSize;

Review comment:
       so the bytesCopiedSoFar will be updated once the first mapper completes? Is this true?
   If yes, is it okay to show timeLeft = 0 before any mapper finished? Should we return "-1"  or something? Usually how long will it take for first mapper to complete?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] umustafi commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
umustafi commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r677023743



##########
File path: gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
##########
@@ -199,4 +206,22 @@ public static FlowExecution convertFlowStatus(org.apache.gobblin.service.monitor
   private static long getFlowStartTime(org.apache.gobblin.service.monitoring.FlowStatus flowStatus) {
     return flowStatus.getFlowExecutionId();
   }
+
+  /**
+   * Estimate the time left to complete the copy based on the following formula -
+   * timeLeft = (100/completionPercentage) * timeElapsed
+   */
+  public static long estimateCopyTimeLeft(Long currentTime, Long startTime, int completionPercentage) {
+    if (completionPercentage == 0) {
+      return 0;
+    }
+
+    Instant current = Instant.ofEpochMilli(currentTime);
+    Instant start = Instant.ofEpochMilli(startTime);
+    Long timeBetween = Duration.between(current, start).toMillis();
+    Long timeElapsed = currentTime - startTime;
+    log.info("mylogs: timeBetween is " + timeBetween + " and elapsed is " + timeElapsed);

Review comment:
       will remove this log statement




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] arjun4084346 commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r678706597



##########
File path: gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
##########
@@ -180,6 +180,14 @@
                 "name" : "processedCount",
                 "type" : "long",
                 "doc" : "number of records processed in the last job execution"
+              }, {
+                "name" : "jobProgress",
+                "type" : "int",
+                "doc" : "job progress as a percentage (0-100)"
+              }, {
+                "name" : "estimatedMillisecondsToCompletion",

Review comment:
       Sorry for the late comment. Do you think we should have `estimatedSecondsToCompletion` instead of Milliseconds? One will never be interested in ms left, imo.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] arjun4084346 commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r678703598



##########
File path: gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
##########
@@ -199,4 +206,20 @@ public static FlowExecution convertFlowStatus(org.apache.gobblin.service.monitor
   private static long getFlowStartTime(org.apache.gobblin.service.monitoring.FlowStatus flowStatus) {
     return flowStatus.getFlowExecutionId();
   }
+
+  /**
+   * Estimate the time left to complete the copy based on the following formula -
+   * timeLeft = (100/completionPercentage) * timeElapsed
+   */
+  public static long estimateCopyTimeLeft(Long currentTime, Long startTime, int completionPercentage) {
+    if (completionPercentage == 0) {
+      return 0;
+    }
+
+    Instant current = Instant.ofEpochMilli(currentTime);
+    Instant start = Instant.ofEpochMilli(startTime);
+    Long timeBetween = Duration.between(start, current).toMillis();

Review comment:
       Is not in simply `currentTime - startTime` ? Or did I miss something?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] codecov-commenter edited a comment on pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#issuecomment-884587476


   # [Codecov](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3336](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (88208cc) into [master](https://codecov.io/gh/apache/gobblin/commit/08db23e15ab73654142998ecabb01bbb51f42a61?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (08db23e) will **decrease** coverage by `4.11%`.
   > The diff coverage is `25.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/gobblin/pull/3336/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3336      +/-   ##
   ============================================
   - Coverage     46.55%   42.44%   -4.12%     
   + Complexity    10138     4542    -5596     
   ============================================
     Files          2051     1024    -1027     
     Lines         79547    40624   -38923     
     Branches       8880     4530    -4350     
   ============================================
   - Hits          37036    17242   -19794     
   + Misses        39086    21674   -17412     
   + Partials       3425     1708    -1717     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...pache/gobblin/configuration/ConfigurationKeys.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vY29uZmlndXJhdGlvbi9Db25maWd1cmF0aW9uS2V5cy5qYXZh) | `0.00% <ø> (ø)` | |
   | [.../org/apache/gobblin/service/ServiceConfigKeys.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vc2VydmljZS9TZXJ2aWNlQ29uZmlnS2V5cy5qYXZh) | `0.00% <ø> (ø)` | |
   | [...pache/gobblin/cluster/GobblinHelixJobLauncher.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpbkhlbGl4Sm9iTGF1bmNoZXIuamF2YQ==) | `81.37% <ø> (ø)` | |
   | [.../service/monitoring/KafkaAvroJobStatusMonitor.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9uaXRvcmluZy9LYWZrYUF2cm9Kb2JTdGF0dXNNb25pdG9yLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...lin/service/FlowExecutionResourceLocalHandler.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1yZXN0bGkvZ29iYmxpbi1mbG93LWNvbmZpZy1zZXJ2aWNlL2dvYmJsaW4tZmxvdy1jb25maWctc2VydmljZS1zZXJ2ZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vc2VydmljZS9GbG93RXhlY3V0aW9uUmVzb3VyY2VMb2NhbEhhbmRsZXIuamF2YQ==) | `4.59% <30.00%> (+4.59%)` | :arrow_up: |
   | [.../org/apache/gobblin/cluster/GobblinTaskRunner.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpblRhc2tSdW5uZXIuamF2YQ==) | `63.46% <0.00%> (-0.33%)` | :arrow_down: |
   | [...blin/source/extractor/schema/ColumnAttributes.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NvdXJjZS9leHRyYWN0b3Ivc2NoZW1hL0NvbHVtbkF0dHJpYnV0ZXMuamF2YQ==) | | |
   | [...hive/filter/DateRangePartitionFilterGenerator.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1kYXRhLW1hbmFnZW1lbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YS9tYW5hZ2VtZW50L2NvcHkvaGl2ZS9maWx0ZXIvRGF0ZVJhbmdlUGFydGl0aW9uRmlsdGVyR2VuZXJhdG9yLmphdmE=) | | |
   | [...g/apache/gobblin/writer/MetadataWriterWrapper.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jb3JlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3dyaXRlci9NZXRhZGF0YVdyaXRlcldyYXBwZXIuamF2YQ==) | | |
   | [...in/data/management/copy/replication/CopyRoute.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1kYXRhLW1hbmFnZW1lbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YS9tYW5hZ2VtZW50L2NvcHkvcmVwbGljYXRpb24vQ29weVJvdXRlLmphdmE=) | | |
   | ... and [1018 more](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [08db23e...88208cc](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] Will-Lo commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
Will-Lo commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r677025828



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -463,6 +469,20 @@ public void apply(JobListener jobListener, JobContext jobContext)
           return;
         }
 
+        // calculation of total bytes to copy in a job used to track a job's copy progress
+        if (jobState.getPropAsBoolean(ConfigurationKeys.REPORT_JOB_PROGRESS, false)) {
+          if (workUnitStream.isSafeToMaterialize()) {
+            Collection<WorkUnit> workUnits = JobLauncherUtils.flattenWorkUnits(workUnitStream.getMaterializedWorkUnitCollection());
+            long totalSizeInBytes = sumWorkUnitsSizes(workUnits);
+            this.jobContext.getJobState().setProp(TOTAL_BYTES_TO_COPY, totalSizeInBytes);
+          } else {
+            throw new RuntimeException("Property " + ConfigurationKeys.REPORT_JOB_PROGRESS + " is turned on, but "
+                + "progress cannot be collected because " + DestinationDatasetHandlerService.class.getName()
+                + " does not support work unit streams. Turn off property " + ConfigurationKeys.REPORT_JOB_PROGRESS
+                + " and rerun job.");

Review comment:
       I think it's not necessarily true that it's due to `DestinationDatasetHandler` not supporting workunitstreams, but that this feature also does not support work unit streams (as total size is determined only once at the beginning). 

##########
File path: gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
##########
@@ -382,9 +384,17 @@ public Void call() {
           setWorkUnitWatermark(workUnit, watermarkGenerator, copyEntity);
           computeAndSetWorkUnitGuid(workUnit);
           addLineageInfo(copyEntity, workUnit);
-          if (copyEntity instanceof CopyableFile && DistcpFileSplitter.allowSplit(this.state, this.targetFs)) {
-            workUnitsForPartition.addAll(DistcpFileSplitter.splitFile((CopyableFile) copyEntity, workUnit, this.targetFs));
+          if (copyEntity instanceof CopyableFile) {
+            CopyableFile castedCopyEntity = (CopyableFile) copyEntity;
+            fileSize = castedCopyEntity.getFileStatus().getLen();
+            workUnit.setProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE, fileSize);
+            if (DistcpFileSplitter.allowSplit(this.state, this.targetFs)) {
+              workUnitsForPartition.addAll(DistcpFileSplitter.splitFile((CopyableFile) copyEntity, workUnit, this.targetFs));
+            } else {
+              workUnitsForPartition.add(workUnit);
+            }
           } else {
+            workUnit.setProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE, 0);

Review comment:
       Can you leave a comment for when this condition would apply? If I recall correctly it's for some state files/post publish states

##########
File path: gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
##########
@@ -382,9 +384,17 @@ public Void call() {
           setWorkUnitWatermark(workUnit, watermarkGenerator, copyEntity);
           computeAndSetWorkUnitGuid(workUnit);
           addLineageInfo(copyEntity, workUnit);
-          if (copyEntity instanceof CopyableFile && DistcpFileSplitter.allowSplit(this.state, this.targetFs)) {
-            workUnitsForPartition.addAll(DistcpFileSplitter.splitFile((CopyableFile) copyEntity, workUnit, this.targetFs));
+          if (copyEntity instanceof CopyableFile) {
+            CopyableFile castedCopyEntity = (CopyableFile) copyEntity;
+            fileSize = castedCopyEntity.getFileStatus().getLen();
+            workUnit.setProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE, fileSize);
+            if (DistcpFileSplitter.allowSplit(this.state, this.targetFs)) {
+              workUnitsForPartition.addAll(DistcpFileSplitter.splitFile((CopyableFile) copyEntity, workUnit, this.targetFs));
+            } else {
+              workUnitsForPartition.add(workUnit);
+            }
           } else {
+            workUnit.setProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE, 0);

Review comment:
       Can we leave a comment for when this case is true? From what I remember it had to do with local state files/ post publishing steps also existing in the directory.

##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -644,6 +660,15 @@ public void apply(JobListener jobListener, JobContext jobContext)
     }
   }
 
+  public static long sumWorkUnitsSizes (Collection<WorkUnit> workUnits) {

Review comment:
       If you add the annotation the function should be protected/private, in this case I think it's good to make protected.

##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -463,6 +469,20 @@ public void apply(JobListener jobListener, JobContext jobContext)
           return;
         }
 
+        // calculation of total bytes to copy in a job used to track a job's copy progress
+        if (jobState.getPropAsBoolean(ConfigurationKeys.REPORT_JOB_PROGRESS, false)) {
+          if (workUnitStream.isSafeToMaterialize()) {
+            Collection<WorkUnit> workUnits = JobLauncherUtils.flattenWorkUnits(workUnitStream.getMaterializedWorkUnitCollection());
+            long totalSizeInBytes = sumWorkUnitsSizes(workUnits);
+            this.jobContext.getJobState().setProp(TOTAL_BYTES_TO_COPY, totalSizeInBytes);
+          } else {
+            throw new RuntimeException("Property " + ConfigurationKeys.REPORT_JOB_PROGRESS + " is turned on, but "
+                + "progress cannot be collected because " + DestinationDatasetHandlerService.class.getName()
+                + " does not support work unit streams. Turn off property " + ConfigurationKeys.REPORT_JOB_PROGRESS
+                + " and rerun job.");

Review comment:
       We can put a better exception message here to let the user know that copy progression doesn't work for infinite streams (which make sense, since how would we determine that the work is completed?)

##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -235,6 +255,49 @@ public Void call() throws Exception {
     this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
   }
 
+  /**
+   * Uses the size of work units to determine a job's progress and reports the progress as a percentage via
+   * GobblinTrackingEvents
+   * @param taskState of job launched
+   */
+  private void reportJobProgress(TaskState taskState) {
+    String stringSize = taskState.getProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE);
+    if (stringSize == null) {
+      LOGGER.warn("Expected to report job progress but work unit byte size property null");
+      return;
+    }
+
+    Long taskByteSize = Long.parseLong(stringSize);
+    // if progress reporting is enabled, value should be present
+    String strTotalSizeToCopy = this.jobState.getProp(AbstractJobLauncher.TOTAL_BYTES_TO_COPY);

Review comment:
       You can use `PropertiesUtils.getPropAsLong` here (and for task byte size) instead of pulling the String. Instead of checking for null, check that the property exists in the state.

##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/JobStatusRetriever.java
##########
@@ -105,6 +105,8 @@ protected JobStatus getJobStatus(State jobState) {
     int maxAttempts = Integer.parseInt(jobState.getProp(TimingEvent.FlowEventConstants.MAX_ATTEMPTS_FIELD, "1"));
     int currentAttempts = Integer.parseInt(jobState.getProp(TimingEvent.FlowEventConstants.CURRENT_ATTEMPTS_FIELD, "1"));
     boolean shouldRetry = Boolean.parseBoolean(jobState.getProp(TimingEvent.FlowEventConstants.SHOULD_RETRY_FIELD, "false"));
+    int progressPercentage = Integer.parseInt(jobState.getProp(TimingEvent.JOB_COMPLETION_PERCENTAGE, "0"));
+    long lastProgressEventTime = Long.parseLong(jobState.getProp(TimingEvent.JOB_LAST_PROGRESS_PERCENT_TIME, "0"));

Review comment:
       Similar comment here, you can use PropertiesUtils to parse ints/longs

##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -644,6 +660,15 @@ public void apply(JobListener jobListener, JobContext jobContext)
     }
   }
 
+  public static long sumWorkUnitsSizes (Collection<WorkUnit> workUnits) {

Review comment:
       If you mark @VisibleForTesting you should make the function private/protected (probably protected in this scenario)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] umustafi commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
umustafi commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r678647641



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -235,6 +255,49 @@ public Void call() throws Exception {
     this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
   }
 
+  /**
+   * Uses the size of work units to determine a job's progress and reports the progress as a percentage via
+   * GobblinTrackingEvents
+   * @param taskState of job launched
+   */
+  private void reportJobProgress(TaskState taskState) {
+    String stringSize = taskState.getProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE);
+    if (stringSize == null) {
+      LOGGER.warn("Expected to report job progress but work unit byte size property null");
+      return;
+    }
+
+    Long taskByteSize = Long.parseLong(stringSize);
+
+    // if progress reporting is enabled, value should be present
+    if (!this.jobState.contains(AbstractJobLauncher.TOTAL_BYTES_TO_COPY)) {
+      LOGGER.warn("Expected to report job progress but total bytes to copy property null");
+      return;
+    }
+    this.totalSizeToCopy = this.jobState.getPropAsLong(AbstractJobLauncher.TOTAL_BYTES_TO_COPY);
+
+    // avoid flooding Kafka message queue by sending GobblinTrackingEvents only when threshold is passed
+    this.bytesCopiedSoFar += taskByteSize;
+    Double newPercentageCopied = this.bytesCopiedSoFar / this.totalSizeToCopy;
+
+    if (newPercentageCopied - this.lastPercentageReported > ConfigurationKeys.DEFAULT_PROGRESS_REPORTING_THRESHOLD) {
+      this.lastPercentageReported = newPercentageCopied;
+      int percentageToReport = (int) Math.round(this.lastPercentageReported * 100);
+
+      Map<String, String> progress = new HashMap<>();
+      progress.put(TimingEvent.JOB_COMPLETION_PERCENTAGE, String.valueOf(percentageToReport));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));

Review comment:
       Yes, I had noticed that other events do not need these three properties because they are not processed by the `KafkaAvroStatusMonitor` (ie: in the case of `WorkUnitsCreated`) or are explicitly adding them. From printing the metadata it seems it only has the following and yet needs the flow level props:
   `{jobName, jobId, azkabanJobId, azkabanFlowURL, azkabanJobExecURL, azkabanJobURL, azkabanProjectName, azkabanFlowId, azkabanExecId, azkabanURL, clusterIdentifier, metricContextID, metricContextName}`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] codecov-commenter edited a comment on pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#issuecomment-884587476


   # [Codecov](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3336](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (88208cc) into [master](https://codecov.io/gh/apache/gobblin/commit/08db23e15ab73654142998ecabb01bbb51f42a61?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (08db23e) will **decrease** coverage by `0.11%`.
   > The diff coverage is `25.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/gobblin/pull/3336/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3336      +/-   ##
   ============================================
   - Coverage     46.55%   46.44%   -0.12%     
   + Complexity    10138     3065    -7073     
   ============================================
     Files          2051      643    -1408     
     Lines         79547    25068   -54479     
     Branches       8880     2993    -5887     
   ============================================
   - Hits          37036    11642   -25394     
   + Misses        39086    12175   -26911     
   + Partials       3425     1251    -2174     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...pache/gobblin/configuration/ConfigurationKeys.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vY29uZmlndXJhdGlvbi9Db25maWd1cmF0aW9uS2V5cy5qYXZh) | `0.00% <ø> (ø)` | |
   | [.../org/apache/gobblin/service/ServiceConfigKeys.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1hcGkvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vc2VydmljZS9TZXJ2aWNlQ29uZmlnS2V5cy5qYXZh) | `0.00% <ø> (ø)` | |
   | [...pache/gobblin/cluster/GobblinHelixJobLauncher.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpbkhlbGl4Sm9iTGF1bmNoZXIuamF2YQ==) | `81.37% <ø> (ø)` | |
   | [.../service/monitoring/KafkaAvroJobStatusMonitor.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9uaXRvcmluZy9LYWZrYUF2cm9Kb2JTdGF0dXNNb25pdG9yLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...lin/service/FlowExecutionResourceLocalHandler.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1yZXN0bGkvZ29iYmxpbi1mbG93LWNvbmZpZy1zZXJ2aWNlL2dvYmJsaW4tZmxvdy1jb25maWctc2VydmljZS1zZXJ2ZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vc2VydmljZS9GbG93RXhlY3V0aW9uUmVzb3VyY2VMb2NhbEhhbmRsZXIuamF2YQ==) | `4.59% <30.00%> (+4.59%)` | :arrow_up: |
   | [.../org/apache/gobblin/cluster/GobblinTaskRunner.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpblRhc2tSdW5uZXIuamF2YQ==) | `63.46% <0.00%> (-0.33%)` | :arrow_down: |
   | [...pache/gobblin/hive/HiveMetaStoreClientFactory.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1oaXZlLXJlZ2lzdHJhdGlvbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvZ29iYmxpbi9oaXZlL0hpdmVNZXRhU3RvcmVDbGllbnRGYWN0b3J5LmphdmE=) | | |
   | [...in/kafka/schemareg/KafkaSchemaRegistryFactory.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4ta2Fma2EtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2thZmthL3NjaGVtYXJlZy9LYWZrYVNjaGVtYVJlZ2lzdHJ5RmFjdG9yeS5qYXZh) | | |
   | [...conversion/hive/writer/HiveQueryWriterBuilder.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1kYXRhLW1hbmFnZW1lbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YS9tYW5hZ2VtZW50L2NvbnZlcnNpb24vaGl2ZS93cml0ZXIvSGl2ZVF1ZXJ5V3JpdGVyQnVpbGRlci5qYXZh) | | |
   | [...he/gobblin/compaction/source/CompactionSource.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jb21wYWN0aW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NvbXBhY3Rpb24vc291cmNlL0NvbXBhY3Rpb25Tb3VyY2UuamF2YQ==) | | |
   | ... and [1399 more](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [08db23e...88208cc](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] umustafi commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
umustafi commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r679561366



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -235,6 +255,49 @@ public Void call() throws Exception {
     this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
   }
 
+  /**
+   * Uses the size of work units to determine a job's progress and reports the progress as a percentage via
+   * GobblinTrackingEvents
+   * @param taskState of job launched
+   */
+  private void reportJobProgress(TaskState taskState) {
+    String stringSize = taskState.getProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE);
+    if (stringSize == null) {
+      LOGGER.warn("Expected to report job progress but work unit byte size property null");
+      return;
+    }
+
+    Long taskByteSize = Long.parseLong(stringSize);
+
+    // if progress reporting is enabled, value should be present
+    if (!this.jobState.contains(AbstractJobLauncher.TOTAL_BYTES_TO_COPY)) {
+      LOGGER.warn("Expected to report job progress but total bytes to copy property null");
+      return;
+    }
+    this.totalSizeToCopy = this.jobState.getPropAsLong(AbstractJobLauncher.TOTAL_BYTES_TO_COPY);
+
+    // avoid flooding Kafka message queue by sending GobblinTrackingEvents only when threshold is passed
+    this.bytesCopiedSoFar += taskByteSize;
+    Double newPercentageCopied = this.bytesCopiedSoFar / this.totalSizeToCopy;
+
+    if (newPercentageCopied - this.lastPercentageReported > ConfigurationKeys.DEFAULT_PROGRESS_REPORTING_THRESHOLD) {
+      this.lastPercentageReported = newPercentageCopied;
+      int percentageToReport = (int) Math.round(this.lastPercentageReported * 100);
+
+      Map<String, String> progress = new HashMap<>();
+      progress.put(TimingEvent.JOB_COMPLETION_PERCENTAGE, String.valueOf(percentageToReport));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));

Review comment:
       Retested and you're correct! They are present in the eventSubmitter although not the metricContext so it's not necessary to explicitly add them. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] umustafi commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
umustafi commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r675881502



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -206,12 +222,53 @@ public Void call() throws Exception {
 
     LOGGER.info(String.format("Collected task state of %d completed tasks", taskStateQueue.size()));
 
+    Long taskByteSize;
+    String stringSize;
+    Double newPercentageCopied;
     // Add the TaskStates of completed tasks to the JobState so when the control
     // returns to the launcher, it sees the TaskStates of all completed tasks.
     for (TaskState taskState : taskStateQueue) {
       consumeTaskIssues(taskState);
       taskState.setJobState(this.jobState);
       this.jobState.addTaskState(taskState);
+

Review comment:
       Moving to a separate method for clarity because the state of the progress related class is tied to one instance of the taskCollectorService. If it still makes sense to separate the two into classes then I can attempt that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] arjun4084346 commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r677890923



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -644,6 +663,16 @@ public void apply(JobListener jobListener, JobContext jobContext)
     }
   }
 
+  @VisibleForTesting
+  public static long sumWorkUnitsSizes (Collection<WorkUnit> workUnits) {
+    long totalSizeInBytes = 0;
+    for (WorkUnit workUnit : workUnits) {

Review comment:
       This is okay too. can be written with lamba expressions too
   `long totalSizeInBytes = workUnits.stream().mapToLong(wu -> Long.parseLong(wu.getProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE))).sum();`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] arjun4084346 commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r677890923



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -644,6 +663,16 @@ public void apply(JobListener jobListener, JobContext jobContext)
     }
   }
 
+  @VisibleForTesting
+  public static long sumWorkUnitsSizes (Collection<WorkUnit> workUnits) {
+    long totalSizeInBytes = 0;
+    for (WorkUnit workUnit : workUnits) {

Review comment:
       This is okay too. can be written concisely with lamba expressions too
   `long totalSizeInBytes = workUnits.stream().mapToLong(wu -> wu.getPropAsLong(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE)).sum();`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] umustafi commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
umustafi commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r678709912



##########
File path: gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
##########
@@ -180,6 +180,14 @@
                 "name" : "processedCount",
                 "type" : "long",
                 "doc" : "number of records processed in the last job execution"
+              }, {
+                "name" : "jobProgress",
+                "type" : "int",
+                "doc" : "job progress as a percentage (0-100)"
+              }, {
+                "name" : "estimatedMillisecondsToCompletion",

Review comment:
       the `executionStart/EndTimes` are in milliseconds, that's why I left it in the same format. I agree we won't pay attention to milliseconds but I was planning on rounding down to appropriate units of time before formatting the result to user in the jobStatus endpoint. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] umustafi commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
umustafi commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r679561366



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -235,6 +255,49 @@ public Void call() throws Exception {
     this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
   }
 
+  /**
+   * Uses the size of work units to determine a job's progress and reports the progress as a percentage via
+   * GobblinTrackingEvents
+   * @param taskState of job launched
+   */
+  private void reportJobProgress(TaskState taskState) {
+    String stringSize = taskState.getProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE);
+    if (stringSize == null) {
+      LOGGER.warn("Expected to report job progress but work unit byte size property null");
+      return;
+    }
+
+    Long taskByteSize = Long.parseLong(stringSize);
+
+    // if progress reporting is enabled, value should be present
+    if (!this.jobState.contains(AbstractJobLauncher.TOTAL_BYTES_TO_COPY)) {
+      LOGGER.warn("Expected to report job progress but total bytes to copy property null");
+      return;
+    }
+    this.totalSizeToCopy = this.jobState.getPropAsLong(AbstractJobLauncher.TOTAL_BYTES_TO_COPY);
+
+    // avoid flooding Kafka message queue by sending GobblinTrackingEvents only when threshold is passed
+    this.bytesCopiedSoFar += taskByteSize;
+    Double newPercentageCopied = this.bytesCopiedSoFar / this.totalSizeToCopy;
+
+    if (newPercentageCopied - this.lastPercentageReported > ConfigurationKeys.DEFAULT_PROGRESS_REPORTING_THRESHOLD) {
+      this.lastPercentageReported = newPercentageCopied;
+      int percentageToReport = (int) Math.round(this.lastPercentageReported * 100);
+
+      Map<String, String> progress = new HashMap<>();
+      progress.put(TimingEvent.JOB_COMPLETION_PERCENTAGE, String.valueOf(percentageToReport));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));

Review comment:
       removed them because they should be present in the metric context, from a previous check i know they are not in jobState




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] sv2000 commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r680064376



##########
File path: gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
##########
@@ -58,6 +58,10 @@
   public static final String GOBBLIN_SERVICE_FLOW_CATALOG_LOCAL_COMMIT = GOBBLIN_SERVICE_PREFIX + "flowCatalog.localCommit";
   public static final boolean DEFAULT_GOBBLIN_SERVICE_FLOW_CATALOG_LOCAL_COMMIT = true;
 
+  // Job Level Keys
+  public static final String WORK_UNIT_BYTE_SIZE = GOBBLIN_SERVICE_PREFIX + ".work.unit.byte.size";

Review comment:
       WORK_UNIT_BYTE_SIZE is confusing to read. Can we just call it work_unit_size? Also, is this config only intended for File-based sources? How about other sources e.g. Kafka Source, where this config could be used to mean number of records? 

##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -463,6 +469,18 @@ public void apply(JobListener jobListener, JobContext jobContext)
           return;
         }
 
+        // calculation of total bytes to copy in a job used to track a job's copy progress
+        if (jobState.getPropAsBoolean(ConfigurationKeys.REPORT_JOB_PROGRESS, ConfigurationKeys.DEFAULT_REPORT_JOB_PROGRESS)) {
+          if (workUnitStream.isSafeToMaterialize()) {
+            long totalSizeInBytes = sumWorkUnitsSizes(workUnitStream);
+            this.jobContext.getJobState().setProp(TOTAL_BYTES_TO_COPY, totalSizeInBytes);

Review comment:
       TOTAL_BYTES_TO_COPY seems very Distcp-centric. We may want to think more broadly and just call this TOTAL_WORK_UNIT_SIZE. In case when individual sources do not provide workunit sizes, this should sum to total number of workunits. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] codecov-commenter edited a comment on pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#issuecomment-884587476


   # [Codecov](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3336](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (88208cc) into [master](https://codecov.io/gh/apache/gobblin/commit/08db23e15ab73654142998ecabb01bbb51f42a61?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (08db23e) will **decrease** coverage by `3.52%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/gobblin/pull/3336/graphs/tree.svg?width=650&height=150&src=pr&token=4MgURJ0bGc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #3336      +/-   ##
   ============================================
   - Coverage     46.55%   43.03%   -3.53%     
   + Complexity    10138     1942    -8196     
   ============================================
     Files          2051      394    -1657     
     Lines         79547    16876   -62671     
     Branches       8880     2075    -6805     
   ============================================
   - Hits          37036     7263   -29773     
   + Misses        39086     8813   -30273     
   + Partials       3425      800    -2625     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...pache/gobblin/cluster/GobblinHelixJobLauncher.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpbkhlbGl4Sm9iTGF1bmNoZXIuamF2YQ==) | `81.37% <ø> (ø)` | |
   | [.../org/apache/gobblin/cluster/GobblinTaskRunner.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1jbHVzdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL2NsdXN0ZXIvR29iYmxpblRhc2tSdW5uZXIuamF2YQ==) | `63.46% <0.00%> (-0.33%)` | :arrow_down: |
   | [...ersion/finder/ModDateTimeDatasetVersionFinder.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1kYXRhLW1hbmFnZW1lbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YS9tYW5hZ2VtZW50L3JldGVudGlvbi92ZXJzaW9uL2ZpbmRlci9Nb2REYXRlVGltZURhdGFzZXRWZXJzaW9uRmluZGVyLmphdmE=) | | |
   | [...estion/google/webmaster/GoogleWebMasterSource.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1tb2R1bGVzL2dvb2dsZS1pbmdlc3Rpb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vaW5nZXN0aW9uL2dvb2dsZS93ZWJtYXN0ZXIvR29vZ2xlV2ViTWFzdGVyU291cmNlLmphdmE=) | | |
   | [.../gobblin/service/modules/dataset/FormatConfig.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1zZXJ2aWNlL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3NlcnZpY2UvbW9kdWxlcy9kYXRhc2V0L0Zvcm1hdENvbmZpZy5qYXZh) | | |
   | [...ent/retention/policy/TimeBasedRetentionPolicy.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1kYXRhLW1hbmFnZW1lbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2dvYmJsaW4vZGF0YS9tYW5hZ2VtZW50L3JldGVudGlvbi9wb2xpY3kvVGltZUJhc2VkUmV0ZW50aW9uUG9saWN5LmphdmE=) | | |
   | [.../org/apache/gobblin/runtime/CountBasedLimiter.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1ydW50aW1lL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3J1bnRpbWUvQ291bnRCYXNlZExpbWl0ZXIuamF2YQ==) | | |
   | [...e/gobblin/azkaban/AzkabanStateStoreCleanerJob.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tYXprYWJhbi9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvZ29iYmxpbi9hemthYmFuL0F6a2FiYW5TdGF0ZVN0b3JlQ2xlYW5lckpvYi5qYXZh) | | |
   | [.../apache/gobblin/binary\_creation/AvroTestTools.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1iaW5hcnktbWFuYWdlbWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvZ29iYmxpbi9iaW5hcnlfY3JlYXRpb24vQXZyb1Rlc3RUb29scy5qYXZh) | | |
   | [...blin/writer/commands/TeradataBufferedInserter.java](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Z29iYmxpbi1tb2R1bGVzL2dvYmJsaW4tc3FsL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9nb2JibGluL3dyaXRlci9jb21tYW5kcy9UZXJhZGF0YUJ1ZmZlcmVkSW5zZXJ0ZXIuamF2YQ==) | | |
   | ... and [1638 more](https://codecov.io/gh/apache/gobblin/pull/3336/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [08db23e...88208cc](https://codecov.io/gh/apache/gobblin/pull/3336?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] umustafi commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
umustafi commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r678705247



##########
File path: gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
##########
@@ -199,4 +206,20 @@ public static FlowExecution convertFlowStatus(org.apache.gobblin.service.monitor
   private static long getFlowStartTime(org.apache.gobblin.service.monitoring.FlowStatus flowStatus) {
     return flowStatus.getFlowExecutionId();
   }
+
+  /**
+   * Estimate the time left to complete the copy based on the following formula -
+   * timeLeft = (100/completionPercentage) * timeElapsed
+   */
+  public static long estimateCopyTimeLeft(Long currentTime, Long startTime, int completionPercentage) {
+    if (completionPercentage == 0) {
+      return 0;
+    }
+
+    Instant current = Instant.ofEpochMilli(currentTime);
+    Instant start = Instant.ofEpochMilli(startTime);
+    Long timeBetween = Duration.between(start, current).toMillis();

Review comment:
       it is however alex had recommended using the Instant/Duration classes earlier to make it clear what units the times were in. I agree that this may not add much clarity and perhaps a comment can do the same. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] umustafi commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
umustafi commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r678520761



##########
File path: gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
##########
@@ -199,4 +206,20 @@ public static FlowExecution convertFlowStatus(org.apache.gobblin.service.monitor
   private static long getFlowStartTime(org.apache.gobblin.service.monitoring.FlowStatus flowStatus) {
     return flowStatus.getFlowExecutionId();
   }
+
+  /**
+   * Estimate the time left to complete the copy based on the following formula -
+   * timeLeft = (100/completionPercentage) * timeElapsed

Review comment:
       Good catch, it should be timeElapsed * (100/completionPercentage - 1)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] arjun4084346 commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
arjun4084346 commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r677890923



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -644,6 +663,16 @@ public void apply(JobListener jobListener, JobContext jobContext)
     }
   }
 
+  @VisibleForTesting
+  public static long sumWorkUnitsSizes (Collection<WorkUnit> workUnits) {
+    long totalSizeInBytes = 0;
+    for (WorkUnit workUnit : workUnits) {

Review comment:
       This is okay too. can be written with lamba expressions too
   `long totalSizeInBytes = workUnits.stream().mapToLong(wu -> wu.getPropAsLong(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE)).sum();`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] umustafi commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
umustafi commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r680212135



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -235,6 +255,49 @@ public Void call() throws Exception {
     this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
   }
 
+  /**
+   * Uses the size of work units to determine a job's progress and reports the progress as a percentage via
+   * GobblinTrackingEvents
+   * @param taskState of job launched
+   */
+  private void reportJobProgress(TaskState taskState) {
+    String stringSize = taskState.getProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE);
+    if (stringSize == null) {
+      LOGGER.warn("Expected to report job progress but work unit byte size property null");
+      return;
+    }
+
+    Long taskByteSize = Long.parseLong(stringSize);
+
+    // if progress reporting is enabled, value should be present
+    if (!this.jobState.contains(AbstractJobLauncher.TOTAL_BYTES_TO_COPY)) {
+      LOGGER.warn("Expected to report job progress but total bytes to copy property null");
+      return;
+    }
+    this.totalSizeToCopy = this.jobState.getPropAsLong(AbstractJobLauncher.TOTAL_BYTES_TO_COPY);
+
+    // avoid flooding Kafka message queue by sending GobblinTrackingEvents only when threshold is passed
+    this.bytesCopiedSoFar += taskByteSize;

Review comment:
       right now we have this for jobEndTime as well I noticed in the jobStatistics (everything starts at 0)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] aplex commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
aplex commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r675209138



##########
File path: gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/JobStatistics.pdl
##########
@@ -19,4 +19,14 @@ record JobStatistics {
    * number of records processed in the last job execution
    */
   processedCount: long
+
+  /**
+   * data copy progress as a percentage
+   */
+  copyProgress: long
+
+  /**
+   * estimate of time left until copy completion
+   */
+  estimatedTimeLeft: long

Review comment:
       What is the unit of time here? Could be worth adding the unit into the name. e.g. estimatedSecondsToCompletion, estimatedTimeLeftSeconds or something similar.

##########
File path: gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
##########
@@ -199,4 +204,17 @@ public static FlowExecution convertFlowStatus(org.apache.gobblin.service.monitor
   private static long getFlowStartTime(org.apache.gobblin.service.monitoring.FlowStatus flowStatus) {
     return flowStatus.getFlowExecutionId();
   }
+
+  /**
+   * Estimate the time left to complete the copy based on the following formula -
+   * timeLeft = (100/completionPercentage) * timeElapsed
+   */
+  public static long estimateCopyTimeLeft(Long currentTime, Long startTime, Long completionPercentage) {

Review comment:
       Inside the code, I suggest to use Duration/Interval classes to represent time periods. That will remove the questions of time units. Restli  does not have a built-in type for time, so we had to covert it to long or string.

##########
File path: gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/JobStatistics.pdl
##########
@@ -19,4 +19,14 @@ record JobStatistics {
    * number of records processed in the last job execution
    */
   processedCount: long
+
+  /**
+   * data copy progress as a percentage
+   */
+  copyProgress: long

Review comment:
       I think this is not just for copy. Ingestion job too can have a progress. For example, we were chatting about having mysql ingestion through GaaS. 

##########
File path: gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandlerTest.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service;
+
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class FlowExecutionResourceLocalHandlerTest {
+
+  @Test
+  public void testEstimateCopyTimeLeftSimple() throws Exception {
+    long currentTime = 50;
+    long startTime = 20;
+    long copyPercentage = 10;
+
+    long timeLeft = FlowExecutionResourceLocalHandler.estimateCopyTimeLeft(currentTime, startTime, copyPercentage);
+    Assert.assertEquals(timeLeft,300);

Review comment:
       There is a missing space after coma. For new files, you can basically run "silent code cleanup" command in IntelliJ. It will correct all code style issues in the file. You can bind it to a shortcut for more convenience. I usually don't pay attention to  setting those spaces and new lines, and just reformat the file all the time.
   
   For existing file, you can select a code block that you've changed and reformat it automatically in the same way. As you'll find out, some old files have existing code style issues, and if the whole file is reformatted, a lot of unrelated lines will get included in PR.

##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -644,6 +660,15 @@ public void apply(JobListener jobListener, JobContext jobContext)
     }
   }
 
+  public static long sumWorkUnitsSizes (Collection<WorkUnit> workUnits) {

Review comment:
       Looks like this method is public only to allow testing of it. You can put @VisibleForTesting annotation to signify that. Otherwise folks will suggest you to make it private.

##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
##########
@@ -463,6 +468,17 @@ public void apply(JobListener jobListener, JobContext jobContext)
           return;
         }
 
+        // calculation of total bytes to copy in a job used to track a job's copy progress
+        if (jobState.getPropAsBoolean(ConfigurationKeys.REPORT_JOB_COPY_PROGRESS, false)) {
+          if (workUnitStream.isSafeToMaterialize()) {
+            Collection<WorkUnit> workUnits = JobLauncherUtils.flattenWorkUnits(workUnitStream.getMaterializedWorkUnitCollection());
+            long totalSizeInBytes = sumWorkUnitsSizes(workUnits);
+            this.jobContext.getJobState().setProp(TOTAL_BYTES_TO_COPY, totalSizeInBytes);
+          } else {
+            throw new RuntimeException(DestinationDatasetHandlerService.class.getName() + " does not support work unit streams");

Review comment:
       You can add more context here, so when people see it in logs they would know what to do. For example, you can say that a specific property is turned on, but progress cannot be collected because ... Then you can suggest people to either turn the property X off, or use a different Z to get this thing working.
   
   Detailed error messages will reduce the support and oncall volume, and create overall better experience for users and SREs. I wrote more about this in the doc - https://docs.google.com/document/d/154girrJkI_hFNAKiHEyT3T4RtHclxqO15uzaXga74Uw/edit#heading=h.cfg1x3y98n7i

##########
File path: gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/JobStatistics.pdl
##########
@@ -19,4 +19,14 @@ record JobStatistics {
    * number of records processed in the last job execution
    */
   processedCount: long
+
+  /**
+   * data copy progress as a percentage
+   */
+  copyProgress: long

Review comment:
       I assume this will go from 0 to 100, so we can use int here. You can also add the possible range to the comment above for clarity.

##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -206,12 +222,53 @@ public Void call() throws Exception {
 
     LOGGER.info(String.format("Collected task state of %d completed tasks", taskStateQueue.size()));
 
+    Long taskByteSize;
+    String stringSize;
+    Double newPercentageCopied;
     // Add the TaskStates of completed tasks to the JobState so when the control
     // returns to the launcher, it sees the TaskStates of all completed tasks.
     for (TaskState taskState : taskStateQueue) {
       consumeTaskIssues(taskState);
       taskState.setJobState(this.jobState);
       this.jobState.addTaskState(taskState);
+
+      if (jobState.getPropAsBoolean(ConfigurationKeys.REPORT_JOB_COPY_PROGRESS, false)) {
+        stringSize = taskState.getProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE);
+        if (stringSize == null) {
+          LOGGER.warn("Expected to report data copy progress but work unit byte size property null");
+          break;
+        }
+
+        taskByteSize = Long.parseLong(stringSize);
+        // if progress reporting is enabled, value should be present
+        String strTotalSizeToCopy = this.jobState.getProp(AbstractJobLauncher.TOTAL_BYTES_TO_COPY);
+        if (strTotalSizeToCopy == null) {
+          LOGGER.warn("Expected to report data copy progress but total bytes to copy property null");
+          break;
+        }
+
+        this.totalSizeToCopy = Long.parseLong(strTotalSizeToCopy);
+        // avoid flooding Kafka message queue by sending GobblinTrackingEvents only when threshold is passed
+        this.bytesCopiedSoFar += taskByteSize;
+        newPercentageCopied = this.bytesCopiedSoFar / this.totalSizeToCopy;
+        if (newPercentageCopied - this.lastPercentageReported > ConfigurationKeys.DEFAULT_PROGRESS_REPORTING_THRESHOLD) {
+          this.lastPercentageReported = newPercentageCopied;
+          Long percentageToReport = (long) (this.lastPercentageReported * 100);

Review comment:
       Should we do a Math.Round() here? I think by default the value will be truncated when you convert double to long. so 99.999 will be 99 , not 100. And we can end up with jobs that are fully complete, but still show 99%  progress.

##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -206,12 +222,53 @@ public Void call() throws Exception {
 
     LOGGER.info(String.format("Collected task state of %d completed tasks", taskStateQueue.size()));
 
+    Long taskByteSize;
+    String stringSize;
+    Double newPercentageCopied;
     // Add the TaskStates of completed tasks to the JobState so when the control
     // returns to the launcher, it sees the TaskStates of all completed tasks.
     for (TaskState taskState : taskStateQueue) {
       consumeTaskIssues(taskState);
       taskState.setJobState(this.jobState);
       this.jobState.addTaskState(taskState);
+

Review comment:
       This method is already quite complicated. Generally, we should try to have no more a screens of code in each method, to make it easier to understand and read. You can try moving the logic to a separate method or class, whatever makes more sense.
   
   Fields like "lastPercentageReported" can be more meaningful in separate class, rather than in task state collector. That will better match single responsibility principle - https://en.wikipedia.org/wiki/SOLID
   

##########
File path: gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
##########
@@ -163,6 +163,9 @@ public GobblinTrackingEvent deserializeEvent(DecodeableKafkaRecord<byte[],byte[]
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, ExecutionStatus.CANCELLED.name());
         properties.put(TimingEvent.JOB_END_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));
         break;
+      case TimingEvent.JOB_COMPLETION_PERCENTAGE:
+        properties.put(TimingEvent.JOB_LAST_PROGRESS_PERCENT_TIME, properties.getProperty(TimingEvent.METADATA_END_TIME));

Review comment:
       Is METADATA_END_TIME the correct value here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [gobblin] umustafi commented on a change in pull request #3336: [GOBBLIN-1493] Report data copy's progress

Posted by GitBox <gi...@apache.org>.
umustafi commented on a change in pull request #3336:
URL: https://github.com/apache/gobblin/pull/3336#discussion_r678647641



##########
File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskStateCollectorService.java
##########
@@ -235,6 +255,49 @@ public Void call() throws Exception {
     this.eventBus.post(new NewTaskCompletionEvent(ImmutableList.copyOf(taskStateQueue)));
   }
 
+  /**
+   * Uses the size of work units to determine a job's progress and reports the progress as a percentage via
+   * GobblinTrackingEvents
+   * @param taskState of job launched
+   */
+  private void reportJobProgress(TaskState taskState) {
+    String stringSize = taskState.getProp(ServiceConfigKeys.WORK_UNIT_BYTE_SIZE);
+    if (stringSize == null) {
+      LOGGER.warn("Expected to report job progress but work unit byte size property null");
+      return;
+    }
+
+    Long taskByteSize = Long.parseLong(stringSize);
+
+    // if progress reporting is enabled, value should be present
+    if (!this.jobState.contains(AbstractJobLauncher.TOTAL_BYTES_TO_COPY)) {
+      LOGGER.warn("Expected to report job progress but total bytes to copy property null");
+      return;
+    }
+    this.totalSizeToCopy = this.jobState.getPropAsLong(AbstractJobLauncher.TOTAL_BYTES_TO_COPY);
+
+    // avoid flooding Kafka message queue by sending GobblinTrackingEvents only when threshold is passed
+    this.bytesCopiedSoFar += taskByteSize;
+    Double newPercentageCopied = this.bytesCopiedSoFar / this.totalSizeToCopy;
+
+    if (newPercentageCopied - this.lastPercentageReported > ConfigurationKeys.DEFAULT_PROGRESS_REPORTING_THRESHOLD) {
+      this.lastPercentageReported = newPercentageCopied;
+      int percentageToReport = (int) Math.round(this.lastPercentageReported * 100);
+
+      Map<String, String> progress = new HashMap<>();
+      progress.put(TimingEvent.JOB_COMPLETION_PERCENTAGE, String.valueOf(percentageToReport));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_GROUP_KEY));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_NAME_KEY));
+      progress.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+          this.jobState.getProp(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));

Review comment:
       Yes, I had noticed that other events do not need these three properties because they are not processed by the `KafkaAvroStatusMonitor` (ie: in the case of `WorkUnitsCreated`) or are explicitly adding them. From printing the metadata it seems it only has the following and yet needs the flow level props:
   `{jobName, jobId, azkabanJobId=autotest-hive-copy_hive-copy, azkabanFlowURL, azkabanJobExecURL, azkabanJobURL, azkabanProjectName, azkabanFlowId, azkabanExecId, azkabanURL, clusterIdentifier, metricContextID, metricContextName}`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org