You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2018/11/02 16:52:07 UTC

[beam] branch master updated: Dataflow Worker: Fix to report millis for couple processing times metrics.

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9de7281  Dataflow Worker: Fix to report millis for couple processing times metrics.
9de7281 is described below

commit 9de728151c855ecdec17174a5ce27f090b008a50
Author: Raghu Angadi <ra...@apache.org>
AuthorDate: Fri Nov 2 09:51:58 2018 -0700

    Dataflow Worker: Fix to report millis for couple processing times metrics.
---
 .../beam/runners/dataflow/worker/BatchModeExecutionContext.java   | 2 +-
 .../beam/runners/dataflow/worker/StreamingDataflowWorker.java     | 8 ++++----
 2 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
index 9979ec0..2f10a13 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
@@ -508,7 +508,7 @@ public class BatchModeExecutionContext
       CounterCell throttlingMsecs =
           container.tryGetCounter(DataflowSystemMetrics.THROTTLING_MSECS_METRIC_NAME);
       if (throttlingMsecs != null) {
-        totalThrottleTime += TimeUnit.MICROSECONDS.toSeconds(throttlingMsecs.getCumulative());
+        totalThrottleTime += TimeUnit.MILLISECONDS.toSeconds(throttlingMsecs.getCumulative());
       }
     }
     return totalThrottleTime;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index c0f4b44..fbbc069 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -1311,9 +1311,9 @@ public class StreamingDataflowWorker {
     } finally {
       // Update total processing time counters. Updating in finally clause ensures that
       // work items causing exceptions are also accounted in time spent.
-      long processingTimeMicros =
-          TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - processingStartTimeNanos);
-      stageInfo.totalProcessingMsecs.addValue(processingTimeMicros);
+      long processingTimeMsecs =
+          TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - processingStartTimeNanos);
+      stageInfo.totalProcessingMsecs.addValue(processingTimeMsecs);
 
       // Attribute all the processing to timers if the work item contains any timers.
       // Tests show that work items rarely contain both timers and message bundles. It should
@@ -1321,7 +1321,7 @@ public class StreamingDataflowWorker {
       // Another option: Derive time split between messages and timers based on recent totals.
       // either here or in DFE.
       if (work.getWorkItem().hasTimers()) {
-        stageInfo.timerProcessingMsecs.addValue(processingTimeMicros);
+        stageInfo.timerProcessingMsecs.addValue(processingTimeMsecs);
       }
 
       DataflowWorkerLoggingMDC.setWorkId(null);