You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "y1chi (via GitHub)" <gi...@apache.org> on 2023/04/03 23:17:02 UTC

[GitHub] [beam] y1chi opened a new pull request, #26085: Populate getWorkStream latencies in dataflow streaming worker harness

y1chi opened a new pull request, #26085:
URL: https://github.com/apache/beam/pull/26085

   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on PR #26085:
URL: https://github.com/apache/beam/pull/26085#issuecomment-1526087677

   gentle ping


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] github-actions[bot] commented on pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #26085:
URL: https://github.com/apache/beam/pull/26085#issuecomment-1496296864

   Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on PR #26085:
URL: https://github.com/apache/beam/pull/26085#issuecomment-1518064491

   Run Java_Examples_Dataflow_Java11 PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1247346196


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -3612,6 +3620,46 @@ public void testLatencyAttributionToCommittingState() throws Exception {
             .equals(Duration.millis(1000)));
   }
 
+  @Test
+  public void testLatencyAttributionPopulatedInCommitRequest() throws Exception {
+    final int workToken = 7272; // A unique id makes it easier to search logs.
+
+    FakeClock clock = new FakeClock();
+    List<ParallelInstruction> instructions =
+        Arrays.asList(
+            makeSourceInstruction(StringUtf8Coder.of()),
+            makeDoFnInstruction(
+                new FakeSlowDoFn(clock, Duration.millis(1000)), 0, StringUtf8Coder.of()),

Review Comment:
   Done.



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java:
##########
@@ -273,7 +274,11 @@ public boolean awaitTermination(int time, TimeUnit unit) throws InterruptedExcep
                     computationWork.getInputDataWatermark());
             for (Windmill.WorkItem workItem : computationWork.getWorkList()) {
               receiver.receiveWork(
-                  computationWork.getComputationId(), inputDataWatermark, Instant.now(), workItem);
+                  computationWork.getComputationId(),
+                  inputDataWatermark,
+                  Instant.now(),
+                  workItem,
+                  Collections.emptyList());

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1223629182


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -886,60 +871,72 @@ public final Instant startTime() {
   }
 
   static class GetWorkTimingInfosTracker {
-    private final Map<State, Duration> getWorkStreamLatencies;
-    private Instant workItemCreationStartTime = Instant.ofEpochMilli(Long.MAX_VALUE);
+
+    private Instant workItemCreationEndTime = Instant.EPOCH;
     private Instant workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
 
-    public GetWorkTimingInfosTracker() {
-      this.getWorkStreamLatencies = new EnumMap<>(State.class);
+    private LatencyAttribution workItemCreationLatency = null;
+    private final Map<State, Duration> aggregatedGetWorkStreamLatencies;
+
+    private final MillisProvider clock;
+
+    public GetWorkTimingInfosTracker(MillisProvider clock) {
+      this.aggregatedGetWorkStreamLatencies = new EnumMap<>(State.class);
+      this.clock = clock;
     }
 
     public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
       // We want to record duration for each stage and also be reflective on total work item
       // processing time. It can be tricky because timings of different
       // StreamingGetWorkResponseChunks can be interleaved. Current strategy is to record the
-      // sum duration in each stage across different chunks, then divide the total duration (start
-      // from the first chunk creation in the windmill worker to the end of last chunk reception by
-      // the user worker) proportionally according the sum duration values across the many stages.
-      // This should allow us to identify the slow stage meanwhile avoid confusions for comparing
-      // the stage duration to the total processing elapsed wall time.
+      // sum duration in each transmission stage across different chunks, then divide the total
+      // duration (start from the chunk creation end in the windmill worker to the end of last chunk
+      // reception by the user worker) proportionally according the sum duration values across the
+      // many stages. This should allow us to identify the slow stage meanwhile avoid confusions for
+      // comparing the stage duration to the total processing elapsed wall time.
       Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
       for (GetWorkStreamTimingInfo info : infos) {
         getWorkStreamTimings.putIfAbsent(
             info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 1000));
       }
-
-      for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
-        Event start = cell.getRowKey();
-        Event end = cell.getColumnKey();
-        State state = cell.getValue();
-        Instant startTiming = getWorkStreamTimings.get(start);
-        Instant endTiming = getWorkStreamTimings.get(end);
-        if (startTiming != null && endTiming != null) {
-          getWorkStreamLatencies.compute(
-              state,
-              (state_key, duration) -> {
-                Duration newDuration = new Duration(startTiming, endTiming);
-                if (duration == null) {
-                  return newDuration;
-                }
-                return duration.plus(newDuration);
-              });
-        }
+      Instant workItemCreationStart = getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
+      Instant workItemCreationEnd = getWorkStreamTimings.get(Event.GET_WORK_CREATION_END);
+      // Record the work item creation end time.
+      if (workItemCreationStart != null
+          && workItemCreationEnd != null
+          && workItemCreationLatency == null) {
+        workItemCreationLatency =
+            LatencyAttribution.newBuilder()
+                .setState(State.GET_WORK_IN_WINDMILL_WORKER)
+                .setTotalDurationMillis(
+                    new Duration(workItemCreationStart, workItemCreationEnd).getMillis())
+                .build();
       }
-      Instant getWorkCreationStartTime = getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
-      if (getWorkCreationStartTime != null
-          && getWorkCreationStartTime.isBefore(workItemCreationStartTime)) {
-        workItemCreationStartTime = getWorkCreationStartTime;
+      if (workItemCreationEnd != null && workItemCreationEnd.isAfter(workItemCreationEndTime)) {

Review Comment:
   workItemCreationEndTime is always initialized as Instant.EPOCH.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -950,42 +947,48 @@ public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
     }
 
     List<LatencyAttribution> getLatencyAttributions() {
-      if (getWorkStreamLatencies.size() == 0) {
-        return new ArrayList<>();
+      if (workItemCreationLatency == null && aggregatedGetWorkStreamLatencies.size() == 0) {
+        return Collections.emptyList();
+      }
+      List<LatencyAttribution> latencyAttributions =
+          new ArrayList<>(aggregatedGetWorkStreamLatencies.size() + 1);
+      if (workItemCreationLatency != null) {
+        latencyAttributions.add(workItemCreationLatency);
       }
-      if (workItemCreationStartTime.isAfter(workItemLastChunkReceivedByWorkerTime)) {
+      if (workItemCreationEndTime.isAfter(workItemLastChunkReceivedByWorkerTime)) {
         LOG.warn(
-            String.format(
-                "Work item creation time %s is after the work received time %s, "
-                    + "one or more GetWorkStream timing infos are missing.",
-                workItemCreationStartTime, workItemLastChunkReceivedByWorkerTime));
-        return new ArrayList<>();
-      }
-      List<LatencyAttribution> latencyAttributions = new ArrayList<>(getWorkStreamLatencies.size());
-      long totalDurationWallTimeMills =
-          new Duration(workItemCreationStartTime, workItemLastChunkReceivedByWorkerTime)
-              .getMillis();
+            "Work item creation time {} is after the work received time {}, "
+                + "one or more GetWorkStream timing infos are missing.",
+            workItemCreationEndTime,
+            workItemLastChunkReceivedByWorkerTime);
+        return latencyAttributions;
+      }
+      long totalTransmissionDurationElapsedTime =
+          new Duration(workItemCreationEndTime, workItemLastChunkReceivedByWorkerTime).getMillis();
       long totalSumDurationTimeMills = 0;
-      for (Duration duration : getWorkStreamLatencies.values()) {
+      for (Duration duration : aggregatedGetWorkStreamLatencies.values()) {
         totalSumDurationTimeMills += duration.getMillis();
       }
 
-      for (Map.Entry<State, Duration> duration : getWorkStreamLatencies.entrySet()) {
+      for (Map.Entry<State, Duration> duration : aggregatedGetWorkStreamLatencies.entrySet()) {
         latencyAttributions.add(
             LatencyAttribution.newBuilder()
                 .setState(duration.getKey())
                 .setTotalDurationMillis(
-                    (duration.getValue().getMillis() / totalSumDurationTimeMills)
-                        * totalDurationWallTimeMills)
+                    (long)
+                        (((double) duration.getValue().getMillis()
+                                / (double) totalSumDurationTimeMills)

Review Comment:
   done, double cast to long is not implicit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1167036488


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1133,7 +1147,14 @@ public Instant getStateStartTime() {
       return stateStartTime;
     }
 
-    public Iterable<Windmill.LatencyAttribution> getLatencyAttributionList() {
+    public void recordGetWorkStreamLatencies(List<LatencyAttribution> getWorkStreamLatencies) {

Review Comment:
   Changed to just take a collection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1226968779


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1048,6 +1056,7 @@ public void run() {
                 this);
           }
         };
+    work.recordGetWorkStreamLatencies(getWorkStreamLatencies);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on PR #26085:
URL: https://github.com/apache/beam/pull/26085#issuecomment-1579085033

   Gentle ping.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on PR #26085:
URL: https://github.com/apache/beam/pull/26085#issuecomment-1577069333

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1210569301


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -969,6 +987,76 @@ protected void startThrottleTimer() {
       getWorkThrottleTimer.start();
     }
 
+    private class GetWorkTimingInfosTracker {

Review Comment:
   Done.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -969,6 +987,76 @@ protected void startThrottleTimer() {
       getWorkThrottleTimer.start();
     }
 
+    private class GetWorkTimingInfosTracker {
+      private final Map<State, Duration> getWorkStreamLatencies;
+
+      public GetWorkTimingInfosTracker() {
+        this.getWorkStreamLatencies = new EnumMap<>(State.class);
+      }
+
+      public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
+        // We want to record duration for each stage and also be reflective on total work item
+        // processing time. It can be tricky because timings of different
+        // StreamingGetWorkResponseChunks can be interleaved. Current strategy is to record the
+        // maximum duration in each stage across different chunks, this will allow us to identify
+        // the slow stage, but note the sum duration of each slowest stages may be larger than the
+        // duration from first chunk creation to last chunk reception by user worker.
+        Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
+        for (GetWorkStreamTimingInfo info : infos) {
+          getWorkStreamTimings.putIfAbsent(
+              info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 1000));
+        }
+
+        for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
+          Event start = cell.getRowKey();
+          Event end = cell.getColumnKey();
+          State state = cell.getValue();
+          if (getWorkStreamTimings.containsKey(start) && getWorkStreamTimings.containsKey(end)) {
+            getWorkStreamLatencies.compute(
+                state,
+                (state_key, duration) -> {
+                  Duration newDuration =
+                      new Duration(getWorkStreamTimings.get(start), getWorkStreamTimings.get(end));
+                  if (duration == null) {
+                    return newDuration;
+                  }
+                  return newDuration.isLongerThan(duration) ? newDuration : duration;
+                });
+          }
+        }
+        if (getWorkStreamTimings.containsKey(Event.GET_WORK_RECEIVED_BY_DISPATCHER)) {
+          getWorkStreamLatencies.compute(
+              State.GET_WORK_IN_TRANSIT_TO_USER_WORKER,
+              (state_key, duration) -> {
+                Duration newDuration =
+                    new Duration(
+                        getWorkStreamTimings.get(Event.GET_WORK_RECEIVED_BY_DISPATCHER),
+                        Instant.now());
+                if (duration == null) {
+                  return newDuration;
+                }
+                return newDuration.isLongerThan(duration) ? newDuration : duration;
+              });
+        }
+      }
+
+      private List<LatencyAttribution> getLatencyAttributions() {
+        List<LatencyAttribution> latencyAttributions = new ArrayList<>();

Review Comment:
   Done.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -969,6 +987,76 @@ protected void startThrottleTimer() {
       getWorkThrottleTimer.start();
     }
 
+    private class GetWorkTimingInfosTracker {
+      private final Map<State, Duration> getWorkStreamLatencies;
+
+      public GetWorkTimingInfosTracker() {
+        this.getWorkStreamLatencies = new EnumMap<>(State.class);
+      }
+
+      public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
+        // We want to record duration for each stage and also be reflective on total work item
+        // processing time. It can be tricky because timings of different
+        // StreamingGetWorkResponseChunks can be interleaved. Current strategy is to record the
+        // maximum duration in each stage across different chunks, this will allow us to identify
+        // the slow stage, but note the sum duration of each slowest stages may be larger than the
+        // duration from first chunk creation to last chunk reception by user worker.
+        Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
+        for (GetWorkStreamTimingInfo info : infos) {
+          getWorkStreamTimings.putIfAbsent(
+              info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 1000));
+        }
+
+        for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
+          Event start = cell.getRowKey();
+          Event end = cell.getColumnKey();
+          State state = cell.getValue();
+          if (getWorkStreamTimings.containsKey(start) && getWorkStreamTimings.containsKey(end)) {
+            getWorkStreamLatencies.compute(
+                state,
+                (state_key, duration) -> {
+                  Duration newDuration =
+                      new Duration(getWorkStreamTimings.get(start), getWorkStreamTimings.get(end));

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1178393788


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -987,6 +998,84 @@ public void append(StreamingGetWorkResponseChunk chunk) {
 
         this.data = data.concat(chunk.getSerializedWorkItem());
         this.bufferedSize += chunk.getSerializedWorkItem().size();
+        for (GetWorkStreamTimingInfo info : chunk.getPerWorkItemTimingInfosList()) {
+          getWorkStreamTimings.compute(
+              info.getEvent(),
+              (event, recordedTime) -> {
+                Instant newTimingForEvent = Instant.ofEpochMilli(info.getTimestampUsec() / 1000);

Review Comment:
   Added comments. I agree that we probably want to identifying the max elapsed time with each stage, which at least tells us where exactly the slowness is. I've changed implementation to take that. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "scwhittle (via GitHub)" <gi...@apache.org>.
scwhittle commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1246939261


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java:
##########
@@ -273,7 +274,11 @@ public boolean awaitTermination(int time, TimeUnit unit) throws InterruptedExcep
                     computationWork.getInputDataWatermark());
             for (Windmill.WorkItem workItem : computationWork.getWorkList()) {
               receiver.receiveWork(
-                  computationWork.getComputationId(), inputDataWatermark, Instant.now(), workItem);
+                  computationWork.getComputationId(),
+                  inputDataWatermark,
+                  Instant.now(),
+                  workItem,
+                  Collections.emptyList());

Review Comment:
   can you have some way to configure the fake to pass a non-empty list for attributions, so that we can verify that it is plumbed through to the commit?
   
   Or alternatively always pass some latencies here, and then most of the StreamingDataflowWorkerTests ignore it by filtering dynamic fields. And the ones that don't can be updated to verify that it was kept.



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -3612,6 +3620,46 @@ public void testLatencyAttributionToCommittingState() throws Exception {
             .equals(Duration.millis(1000)));
   }
 
+  @Test
+  public void testLatencyAttributionPopulatedInCommitRequest() throws Exception {
+    final int workToken = 7272; // A unique id makes it easier to search logs.
+
+    FakeClock clock = new FakeClock();
+    List<ParallelInstruction> instructions =
+        Arrays.asList(
+            makeSourceInstruction(StringUtf8Coder.of()),
+            makeDoFnInstruction(
+                new FakeSlowDoFn(clock, Duration.millis(1000)), 0, StringUtf8Coder.of()),

Review Comment:
   nit: replace the 1000 with a constant that you use below when setting up expectation. Will help make it clearer which two things should be equal



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on PR #26085:
URL: https://github.com/apache/beam/pull/26085#issuecomment-1520466307

   @slavachernyak gentle ping


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1167036268


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -986,7 +987,11 @@ private void dispatchLoop() {
                 computationWork.getDependentRealtimeInputWatermark());
         for (final Windmill.WorkItem workItem : computationWork.getWorkList()) {
           scheduleWorkItem(
-              computationState, inputDataWatermark, synchronizedProcessingTime, workItem);
+              computationState,
+              inputDataWatermark,
+              synchronizedProcessingTime,
+              workItem,
+              /*getWorkStreamLatencies=*/ new ArrayList<>());

Review Comment:
   Changed to use a empty list.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1236023051


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -1441,6 +1623,11 @@ private void issueSingleRequest(final long id, PendingRequest pendingRequest) {
           .setRequestId(id)
           .setShardingKey(pendingRequest.request.getShardingKey())
           .setSerializedWorkItemCommit(pendingRequest.request.toByteString());
+      if (!pendingRequest.latencyAttributions.isEmpty()) {
+        requestBuilder
+            .getCommitChunkBuilder(0)
+            .addAllPerWorkItemLatencyAttributions(pendingRequest.latencyAttributions);

Review Comment:
   Done. thanks for the suggestion.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -862,6 +870,161 @@ public final Instant startTime() {
     }
   }
 
+  static class GetWorkTimingInfosTracker {
+
+    private Instant workItemCreationEndTime = Instant.EPOCH;
+    private Instant workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
+
+    private LatencyAttribution workItemCreationLatency = null;
+    private final Map<State, Duration> aggregatedGetWorkStreamLatencies;

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1237397587


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java:
##########
@@ -273,7 +277,11 @@ public boolean awaitTermination(int time, TimeUnit unit) throws InterruptedExcep
                     computationWork.getInputDataWatermark());
             for (Windmill.WorkItem workItem : computationWork.getWorkList()) {
               receiver.receiveWork(
-                  computationWork.getComputationId(), inputDataWatermark, Instant.now(), workItem);
+                  computationWork.getComputationId(),
+                  inputDataWatermark,
+                  Instant.now(),
+                  workItem,
+                  new ArrayList<>());

Review Comment:
   Done.



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java:
##########
@@ -211,7 +211,11 @@ public CommitWorkResponse commitWork(Windmill.CommitWorkRequest request) {
     validateCommitWorkRequest(request);
     for (ComputationCommitWorkRequest computationRequest : request.getRequestsList()) {
       for (WorkItemCommitRequest commit : computationRequest.getRequestsList()) {
-        commitsReceived.put(commit.getWorkToken(), commit);
+        // Throw away per work item latency attributions because they are not deterministic in tests

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "scwhittle (via GitHub)" <gi...@apache.org>.
scwhittle commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1166037324


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -986,7 +987,11 @@ private void dispatchLoop() {
                 computationWork.getDependentRealtimeInputWatermark());
         for (final Windmill.WorkItem workItem : computationWork.getWorkList()) {
           scheduleWorkItem(
-              computationState, inputDataWatermark, synchronizedProcessingTime, workItem);
+              computationState,
+              inputDataWatermark,
+              synchronizedProcessingTime,
+              workItem,
+              /*getWorkStreamLatencies=*/ new ArrayList<>());

Review Comment:
   could use emptyIterable() if you change below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1210569615


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -969,6 +987,76 @@ protected void startThrottleTimer() {
       getWorkThrottleTimer.start();
     }
 
+    private class GetWorkTimingInfosTracker {
+      private final Map<State, Duration> getWorkStreamLatencies;
+
+      public GetWorkTimingInfosTracker() {
+        this.getWorkStreamLatencies = new EnumMap<>(State.class);
+      }
+
+      public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
+        // We want to record duration for each stage and also be reflective on total work item
+        // processing time. It can be tricky because timings of different
+        // StreamingGetWorkResponseChunks can be interleaved. Current strategy is to record the
+        // maximum duration in each stage across different chunks, this will allow us to identify
+        // the slow stage, but note the sum duration of each slowest stages may be larger than the
+        // duration from first chunk creation to last chunk reception by user worker.
+        Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
+        for (GetWorkStreamTimingInfo info : infos) {
+          getWorkStreamTimings.putIfAbsent(
+              info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 1000));
+        }
+
+        for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
+          Event start = cell.getRowKey();
+          Event end = cell.getColumnKey();
+          State state = cell.getValue();
+          if (getWorkStreamTimings.containsKey(start) && getWorkStreamTimings.containsKey(end)) {
+            getWorkStreamLatencies.compute(
+                state,
+                (state_key, duration) -> {
+                  Duration newDuration =
+                      new Duration(getWorkStreamTimings.get(start), getWorkStreamTimings.get(end));
+                  if (duration == null) {
+                    return newDuration;
+                  }
+                  return newDuration.isLongerThan(duration) ? newDuration : duration;
+                });
+          }
+        }
+        if (getWorkStreamTimings.containsKey(Event.GET_WORK_RECEIVED_BY_DISPATCHER)) {
+          getWorkStreamLatencies.compute(
+              State.GET_WORK_IN_TRANSIT_TO_USER_WORKER,
+              (state_key, duration) -> {
+                Duration newDuration =
+                    new Duration(
+                        getWorkStreamTimings.get(Event.GET_WORK_RECEIVED_BY_DISPATCHER),

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1210574213


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -969,6 +987,76 @@ protected void startThrottleTimer() {
       getWorkThrottleTimer.start();
     }
 
+    private class GetWorkTimingInfosTracker {
+      private final Map<State, Duration> getWorkStreamLatencies;
+
+      public GetWorkTimingInfosTracker() {
+        this.getWorkStreamLatencies = new EnumMap<>(State.class);
+      }
+
+      public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
+        // We want to record duration for each stage and also be reflective on total work item
+        // processing time. It can be tricky because timings of different
+        // StreamingGetWorkResponseChunks can be interleaved. Current strategy is to record the
+        // maximum duration in each stage across different chunks, this will allow us to identify
+        // the slow stage, but note the sum duration of each slowest stages may be larger than the
+        // duration from first chunk creation to last chunk reception by user worker.

Review Comment:
   Do you mind elaborate on how to scale them, do you mean apply a multiplier?
   From testing it looks like the GetWork creation and transmission are on the magnitude of  0 ~ 200 ms for a work item (unless there is significant delay). They are likely incomparable to the latencies measured on the user worker anyways which is a few seconds to a few minutes most likely. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on PR #26085:
URL: https://github.com/apache/beam/pull/26085#issuecomment-1604538452

   Run Java_Examples_Dataflow_Java17 PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on PR #26085:
URL: https://github.com/apache/beam/pull/26085#issuecomment-1609901676

   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "scwhittle (via GitHub)" <gi...@apache.org>.
scwhittle commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1237012279


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java:
##########
@@ -211,7 +211,11 @@ public CommitWorkResponse commitWork(Windmill.CommitWorkRequest request) {
     validateCommitWorkRequest(request);
     for (ComputationCommitWorkRequest computationRequest : request.getRequestsList()) {
       for (WorkItemCommitRequest commit : computationRequest.getRequestsList()) {
-        commitsReceived.put(commit.getWorkToken(), commit);
+        // Throw away per work item latency attributions because they are not deterministic in tests

Review Comment:
   should we have an option or separate method to do this after pulling off of commitsReceived?
   That would let us have a test that verified that things were reasonable.



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java:
##########
@@ -273,7 +277,11 @@ public boolean awaitTermination(int time, TimeUnit unit) throws InterruptedExcep
                     computationWork.getInputDataWatermark());
             for (Windmill.WorkItem workItem : computationWork.getWorkList()) {
               receiver.receiveWork(
-                  computationWork.getComputationId(), inputDataWatermark, Instant.now(), workItem);
+                  computationWork.getComputationId(),
+                  inputDataWatermark,
+                  Instant.now(),
+                  workItem,
+                  new ArrayList<>());

Review Comment:
   nit: Collections.emptyList()?



##########
runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto:
##########
@@ -538,6 +574,10 @@ message StreamingGetWorkResponseChunk {
   // from other stream_ids may be interleaved on the physical stream.
   optional fixed64 stream_id = 4;
 
+  // Timing infos for the work item. Windmill Dispatcher and user worker should
+  // propagate critical event timings if the list is not empty.
+  repeated GetWorkStreamTimingInfo per_work_item_timing_infos = 8;

Review Comment:
   below in StreamingCommitWorkRequest should we have similar
   repeated CommitWorkStreamingTimingInfo timing_infos = 8;
   
   That would let us attribute latency on the commit path similarly (fine if we want to do this as follow up too)



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java:
##########
@@ -367,6 +375,9 @@ public boolean commitWorkItem(
         errorCollector.checkThat(
             request.getShardingKey(), allOf(greaterThan(0L), lessThan(Long.MAX_VALUE)));
         errorCollector.checkThat(request.getCacheToken(), not(equalTo(0L)));
+        // Throw away per work item latency attributions because they are not deterministic in tests
+        // for valid comparison.
+        request = request.toBuilder().clearPerWorkItemLatencyAttributions().build();

Review Comment:
   ditto (or perhaps just here in the stream case since we're not doing this for appliance)



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -3067,8 +3069,14 @@ public void testExceptionInvalidatesCache() throws Exception {
 
       assertThat(
           // The commit will include a timer to clean up state - this timer is irrelevant
-          // for the current test. Also remove source_bytes_processed because it's dynamic.
-          setValuesTimestamps(commit.toBuilder().clearOutputTimers().clearSourceBytesProcessed())
+          // for the current test. Also remove source_bytes_processed and
+          // per_work_item_latecy_attributions because they're dynamic.

Review Comment:
   ie you could extract to a helper method
   removeDynamicFields(CommitWorkRequest request) 
   
   and use that here and other places instead of dropping in the "server"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1237401960


##########
runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto:
##########
@@ -538,6 +574,10 @@ message StreamingGetWorkResponseChunk {
   // from other stream_ids may be interleaved on the physical stream.
   optional fixed64 stream_id = 4;
 
+  // Timing infos for the work item. Windmill Dispatcher and user worker should
+  // propagate critical event timings if the list is not empty.
+  repeated GetWorkStreamTimingInfo per_work_item_timing_infos = 8;

Review Comment:
   I think we can do this as a follow up when we are seeing gaps from COMMITING latency attribution and internal streamz commit latency metrics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on PR #26085:
URL: https://github.com/apache/beam/pull/26085#issuecomment-1604538359

   Run Java_Examples_Dataflow_Java11 PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "scwhittle (via GitHub)" <gi...@apache.org>.
scwhittle commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1222595576


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -886,60 +871,72 @@ public final Instant startTime() {
   }
 
   static class GetWorkTimingInfosTracker {
-    private final Map<State, Duration> getWorkStreamLatencies;
-    private Instant workItemCreationStartTime = Instant.ofEpochMilli(Long.MAX_VALUE);
+
+    private Instant workItemCreationEndTime = Instant.EPOCH;
     private Instant workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
 
-    public GetWorkTimingInfosTracker() {
-      this.getWorkStreamLatencies = new EnumMap<>(State.class);
+    private LatencyAttribution workItemCreationLatency = null;
+    private final Map<State, Duration> aggregatedGetWorkStreamLatencies;
+
+    private final MillisProvider clock;
+
+    public GetWorkTimingInfosTracker(MillisProvider clock) {
+      this.aggregatedGetWorkStreamLatencies = new EnumMap<>(State.class);
+      this.clock = clock;
     }
 
     public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
       // We want to record duration for each stage and also be reflective on total work item
       // processing time. It can be tricky because timings of different
       // StreamingGetWorkResponseChunks can be interleaved. Current strategy is to record the
-      // sum duration in each stage across different chunks, then divide the total duration (start
-      // from the first chunk creation in the windmill worker to the end of last chunk reception by
-      // the user worker) proportionally according the sum duration values across the many stages.
-      // This should allow us to identify the slow stage meanwhile avoid confusions for comparing
-      // the stage duration to the total processing elapsed wall time.
+      // sum duration in each transmission stage across different chunks, then divide the total
+      // duration (start from the chunk creation end in the windmill worker to the end of last chunk
+      // reception by the user worker) proportionally according the sum duration values across the
+      // many stages. This should allow us to identify the slow stage meanwhile avoid confusions for
+      // comparing the stage duration to the total processing elapsed wall time.
       Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
       for (GetWorkStreamTimingInfo info : infos) {
         getWorkStreamTimings.putIfAbsent(
             info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 1000));
       }
-
-      for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
-        Event start = cell.getRowKey();
-        Event end = cell.getColumnKey();
-        State state = cell.getValue();
-        Instant startTiming = getWorkStreamTimings.get(start);
-        Instant endTiming = getWorkStreamTimings.get(end);
-        if (startTiming != null && endTiming != null) {
-          getWorkStreamLatencies.compute(
-              state,
-              (state_key, duration) -> {
-                Duration newDuration = new Duration(startTiming, endTiming);
-                if (duration == null) {
-                  return newDuration;
-                }
-                return duration.plus(newDuration);
-              });
-        }
+      Instant workItemCreationStart = getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
+      Instant workItemCreationEnd = getWorkStreamTimings.get(Event.GET_WORK_CREATION_END);
+      // Record the work item creation end time.
+      if (workItemCreationStart != null
+          && workItemCreationEnd != null
+          && workItemCreationLatency == null) {
+        workItemCreationLatency =
+            LatencyAttribution.newBuilder()
+                .setState(State.GET_WORK_IN_WINDMILL_WORKER)
+                .setTotalDurationMillis(
+                    new Duration(workItemCreationStart, workItemCreationEnd).getMillis())
+                .build();
       }
-      Instant getWorkCreationStartTime = getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
-      if (getWorkCreationStartTime != null
-          && getWorkCreationStartTime.isBefore(workItemCreationStartTime)) {
-        workItemCreationStartTime = getWorkCreationStartTime;
+      if (workItemCreationEnd != null && workItemCreationEnd.isAfter(workItemCreationEndTime)) {

Review Comment:
   do you need a null check for workItemCreationEndTime too?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -886,60 +871,72 @@ public final Instant startTime() {
   }
 
   static class GetWorkTimingInfosTracker {
-    private final Map<State, Duration> getWorkStreamLatencies;
-    private Instant workItemCreationStartTime = Instant.ofEpochMilli(Long.MAX_VALUE);
+
+    private Instant workItemCreationEndTime = Instant.EPOCH;
     private Instant workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
 
-    public GetWorkTimingInfosTracker() {
-      this.getWorkStreamLatencies = new EnumMap<>(State.class);
+    private LatencyAttribution workItemCreationLatency = null;
+    private final Map<State, Duration> aggregatedGetWorkStreamLatencies;
+
+    private final MillisProvider clock;
+
+    public GetWorkTimingInfosTracker(MillisProvider clock) {
+      this.aggregatedGetWorkStreamLatencies = new EnumMap<>(State.class);
+      this.clock = clock;
     }
 
     public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
       // We want to record duration for each stage and also be reflective on total work item
       // processing time. It can be tricky because timings of different
       // StreamingGetWorkResponseChunks can be interleaved. Current strategy is to record the
-      // sum duration in each stage across different chunks, then divide the total duration (start
-      // from the first chunk creation in the windmill worker to the end of last chunk reception by
-      // the user worker) proportionally according the sum duration values across the many stages.
-      // This should allow us to identify the slow stage meanwhile avoid confusions for comparing
-      // the stage duration to the total processing elapsed wall time.
+      // sum duration in each transmission stage across different chunks, then divide the total
+      // duration (start from the chunk creation end in the windmill worker to the end of last chunk
+      // reception by the user worker) proportionally according the sum duration values across the
+      // many stages. This should allow us to identify the slow stage meanwhile avoid confusions for
+      // comparing the stage duration to the total processing elapsed wall time.
       Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
       for (GetWorkStreamTimingInfo info : infos) {
         getWorkStreamTimings.putIfAbsent(
             info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 1000));
       }
-
-      for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
-        Event start = cell.getRowKey();
-        Event end = cell.getColumnKey();
-        State state = cell.getValue();
-        Instant startTiming = getWorkStreamTimings.get(start);
-        Instant endTiming = getWorkStreamTimings.get(end);
-        if (startTiming != null && endTiming != null) {
-          getWorkStreamLatencies.compute(
-              state,
-              (state_key, duration) -> {
-                Duration newDuration = new Duration(startTiming, endTiming);
-                if (duration == null) {
-                  return newDuration;
-                }
-                return duration.plus(newDuration);
-              });
-        }
+      Instant workItemCreationStart = getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
+      Instant workItemCreationEnd = getWorkStreamTimings.get(Event.GET_WORK_CREATION_END);
+      // Record the work item creation end time.
+      if (workItemCreationStart != null
+          && workItemCreationEnd != null
+          && workItemCreationLatency == null) {
+        workItemCreationLatency =
+            LatencyAttribution.newBuilder()
+                .setState(State.GET_WORK_IN_WINDMILL_WORKER)
+                .setTotalDurationMillis(
+                    new Duration(workItemCreationStart, workItemCreationEnd).getMillis())
+                .build();
       }
-      Instant getWorkCreationStartTime = getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
-      if (getWorkCreationStartTime != null
-          && getWorkCreationStartTime.isBefore(workItemCreationStartTime)) {
-        workItemCreationStartTime = getWorkCreationStartTime;
+      if (workItemCreationEnd != null && workItemCreationEnd.isAfter(workItemCreationEndTime)) {
+        workItemCreationEndTime = workItemCreationEnd;
       }
+
       Instant receivedByDispatcherTiming =
           getWorkStreamTimings.get(Event.GET_WORK_RECEIVED_BY_DISPATCHER);
-      Instant now = Instant.now();
-      if (receivedByDispatcherTiming != null) {
-        getWorkStreamLatencies.compute(
+      if (workItemCreationEnd != null && receivedByDispatcherTiming != null) {

Review Comment:
   // Record the latency of each chunk between send on worker and arrival on dispatcher.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -886,60 +871,72 @@ public final Instant startTime() {
   }
 
   static class GetWorkTimingInfosTracker {
-    private final Map<State, Duration> getWorkStreamLatencies;
-    private Instant workItemCreationStartTime = Instant.ofEpochMilli(Long.MAX_VALUE);
+
+    private Instant workItemCreationEndTime = Instant.EPOCH;
     private Instant workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
 
-    public GetWorkTimingInfosTracker() {
-      this.getWorkStreamLatencies = new EnumMap<>(State.class);
+    private LatencyAttribution workItemCreationLatency = null;
+    private final Map<State, Duration> aggregatedGetWorkStreamLatencies;
+
+    private final MillisProvider clock;
+
+    public GetWorkTimingInfosTracker(MillisProvider clock) {
+      this.aggregatedGetWorkStreamLatencies = new EnumMap<>(State.class);
+      this.clock = clock;
     }
 
     public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
       // We want to record duration for each stage and also be reflective on total work item
       // processing time. It can be tricky because timings of different
       // StreamingGetWorkResponseChunks can be interleaved. Current strategy is to record the
-      // sum duration in each stage across different chunks, then divide the total duration (start
-      // from the first chunk creation in the windmill worker to the end of last chunk reception by
-      // the user worker) proportionally according the sum duration values across the many stages.
-      // This should allow us to identify the slow stage meanwhile avoid confusions for comparing
-      // the stage duration to the total processing elapsed wall time.
+      // sum duration in each transmission stage across different chunks, then divide the total
+      // duration (start from the chunk creation end in the windmill worker to the end of last chunk
+      // reception by the user worker) proportionally according the sum duration values across the
+      // many stages. This should allow us to identify the slow stage meanwhile avoid confusions for
+      // comparing the stage duration to the total processing elapsed wall time.
       Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
       for (GetWorkStreamTimingInfo info : infos) {
         getWorkStreamTimings.putIfAbsent(
             info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 1000));
       }
-
-      for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
-        Event start = cell.getRowKey();
-        Event end = cell.getColumnKey();
-        State state = cell.getValue();
-        Instant startTiming = getWorkStreamTimings.get(start);
-        Instant endTiming = getWorkStreamTimings.get(end);
-        if (startTiming != null && endTiming != null) {
-          getWorkStreamLatencies.compute(
-              state,
-              (state_key, duration) -> {
-                Duration newDuration = new Duration(startTiming, endTiming);
-                if (duration == null) {
-                  return newDuration;
-                }
-                return duration.plus(newDuration);
-              });
-        }
+      Instant workItemCreationStart = getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
+      Instant workItemCreationEnd = getWorkStreamTimings.get(Event.GET_WORK_CREATION_END);
+      // Record the work item creation end time.

Review Comment:
   as the difference between starting to get work and the first chunk being sent.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -886,60 +871,72 @@ public final Instant startTime() {
   }
 
   static class GetWorkTimingInfosTracker {
-    private final Map<State, Duration> getWorkStreamLatencies;
-    private Instant workItemCreationStartTime = Instant.ofEpochMilli(Long.MAX_VALUE);
+
+    private Instant workItemCreationEndTime = Instant.EPOCH;
     private Instant workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
 
-    public GetWorkTimingInfosTracker() {
-      this.getWorkStreamLatencies = new EnumMap<>(State.class);
+    private LatencyAttribution workItemCreationLatency = null;
+    private final Map<State, Duration> aggregatedGetWorkStreamLatencies;
+
+    private final MillisProvider clock;
+
+    public GetWorkTimingInfosTracker(MillisProvider clock) {
+      this.aggregatedGetWorkStreamLatencies = new EnumMap<>(State.class);
+      this.clock = clock;
     }
 
     public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
       // We want to record duration for each stage and also be reflective on total work item
       // processing time. It can be tricky because timings of different
       // StreamingGetWorkResponseChunks can be interleaved. Current strategy is to record the
-      // sum duration in each stage across different chunks, then divide the total duration (start
-      // from the first chunk creation in the windmill worker to the end of last chunk reception by
-      // the user worker) proportionally according the sum duration values across the many stages.
-      // This should allow us to identify the slow stage meanwhile avoid confusions for comparing
-      // the stage duration to the total processing elapsed wall time.
+      // sum duration in each transmission stage across different chunks, then divide the total
+      // duration (start from the chunk creation end in the windmill worker to the end of last chunk
+      // reception by the user worker) proportionally according the sum duration values across the
+      // many stages. This should allow us to identify the slow stage meanwhile avoid confusions for
+      // comparing the stage duration to the total processing elapsed wall time.
       Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
       for (GetWorkStreamTimingInfo info : infos) {
         getWorkStreamTimings.putIfAbsent(
             info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 1000));
       }
-
-      for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
-        Event start = cell.getRowKey();
-        Event end = cell.getColumnKey();
-        State state = cell.getValue();
-        Instant startTiming = getWorkStreamTimings.get(start);
-        Instant endTiming = getWorkStreamTimings.get(end);
-        if (startTiming != null && endTiming != null) {
-          getWorkStreamLatencies.compute(
-              state,
-              (state_key, duration) -> {
-                Duration newDuration = new Duration(startTiming, endTiming);
-                if (duration == null) {
-                  return newDuration;
-                }
-                return duration.plus(newDuration);
-              });
-        }
+      Instant workItemCreationStart = getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
+      Instant workItemCreationEnd = getWorkStreamTimings.get(Event.GET_WORK_CREATION_END);
+      // Record the work item creation end time.
+      if (workItemCreationStart != null
+          && workItemCreationEnd != null
+          && workItemCreationLatency == null) {
+        workItemCreationLatency =
+            LatencyAttribution.newBuilder()
+                .setState(State.GET_WORK_IN_WINDMILL_WORKER)
+                .setTotalDurationMillis(
+                    new Duration(workItemCreationStart, workItemCreationEnd).getMillis())
+                .build();
       }
-      Instant getWorkCreationStartTime = getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
-      if (getWorkCreationStartTime != null
-          && getWorkCreationStartTime.isBefore(workItemCreationStartTime)) {
-        workItemCreationStartTime = getWorkCreationStartTime;
+      if (workItemCreationEnd != null && workItemCreationEnd.isAfter(workItemCreationEndTime)) {
+        workItemCreationEndTime = workItemCreationEnd;
       }
+
       Instant receivedByDispatcherTiming =
           getWorkStreamTimings.get(Event.GET_WORK_RECEIVED_BY_DISPATCHER);
-      Instant now = Instant.now();
-      if (receivedByDispatcherTiming != null) {
-        getWorkStreamLatencies.compute(
+      if (workItemCreationEnd != null && receivedByDispatcherTiming != null) {
+        aggregatedGetWorkStreamLatencies.compute(
+            State.GET_WORK_IN_TRANSIT_TO_DISPATCHER,
+            (state_key, duration) -> {
+              Duration newDuration = new Duration(workItemCreationEnd, receivedByDispatcherTiming);
+              if (duration == null) {
+                return newDuration;
+              }
+              return duration.plus(newDuration);
+            });
+      }
+      Instant forwardedByDispatcherTiming =
+          getWorkStreamTimings.get(Event.GET_WORK_FORWARDED_BY_DISPATCHER);
+      Instant now = Instant.ofEpochMilli(clock.getMillis());
+      if (forwardedByDispatcherTiming != null) {

Review Comment:
   // Record the latency of each chunk between send on dispatcher and arrival on worker



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -950,42 +947,48 @@ public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
     }
 
     List<LatencyAttribution> getLatencyAttributions() {
-      if (getWorkStreamLatencies.size() == 0) {
-        return new ArrayList<>();
+      if (workItemCreationLatency == null && aggregatedGetWorkStreamLatencies.size() == 0) {
+        return Collections.emptyList();
+      }
+      List<LatencyAttribution> latencyAttributions =
+          new ArrayList<>(aggregatedGetWorkStreamLatencies.size() + 1);
+      if (workItemCreationLatency != null) {
+        latencyAttributions.add(workItemCreationLatency);
       }
-      if (workItemCreationStartTime.isAfter(workItemLastChunkReceivedByWorkerTime)) {
+      if (workItemCreationEndTime.isAfter(workItemLastChunkReceivedByWorkerTime)) {
         LOG.warn(
-            String.format(
-                "Work item creation time %s is after the work received time %s, "
-                    + "one or more GetWorkStream timing infos are missing.",
-                workItemCreationStartTime, workItemLastChunkReceivedByWorkerTime));
-        return new ArrayList<>();
-      }
-      List<LatencyAttribution> latencyAttributions = new ArrayList<>(getWorkStreamLatencies.size());
-      long totalDurationWallTimeMills =
-          new Duration(workItemCreationStartTime, workItemLastChunkReceivedByWorkerTime)
-              .getMillis();
+            "Work item creation time {} is after the work received time {}, "
+                + "one or more GetWorkStream timing infos are missing.",
+            workItemCreationEndTime,
+            workItemLastChunkReceivedByWorkerTime);
+        return latencyAttributions;
+      }
+      long totalTransmissionDurationElapsedTime =
+          new Duration(workItemCreationEndTime, workItemLastChunkReceivedByWorkerTime).getMillis();
       long totalSumDurationTimeMills = 0;
-      for (Duration duration : getWorkStreamLatencies.values()) {
+      for (Duration duration : aggregatedGetWorkStreamLatencies.values()) {
         totalSumDurationTimeMills += duration.getMillis();
       }
 
-      for (Map.Entry<State, Duration> duration : getWorkStreamLatencies.entrySet()) {
+      for (Map.Entry<State, Duration> duration : aggregatedGetWorkStreamLatencies.entrySet()) {

Review Comment:
   nit: prefer forEach instead of entrySet. You can give better names to than getKey/getValue and it avoids some object creations which can add up for frequently done loops (https://github.com/apache/beam/pull/25930/files)
   
   aggregatedWorkStreamLatencies.forEach(
     (state, duration) -> {
   
   });



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -886,60 +871,72 @@ public final Instant startTime() {
   }
 
   static class GetWorkTimingInfosTracker {
-    private final Map<State, Duration> getWorkStreamLatencies;
-    private Instant workItemCreationStartTime = Instant.ofEpochMilli(Long.MAX_VALUE);
+
+    private Instant workItemCreationEndTime = Instant.EPOCH;
     private Instant workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
 
-    public GetWorkTimingInfosTracker() {
-      this.getWorkStreamLatencies = new EnumMap<>(State.class);
+    private LatencyAttribution workItemCreationLatency = null;
+    private final Map<State, Duration> aggregatedGetWorkStreamLatencies;
+
+    private final MillisProvider clock;
+
+    public GetWorkTimingInfosTracker(MillisProvider clock) {
+      this.aggregatedGetWorkStreamLatencies = new EnumMap<>(State.class);
+      this.clock = clock;
     }
 
     public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
       // We want to record duration for each stage and also be reflective on total work item
       // processing time. It can be tricky because timings of different
       // StreamingGetWorkResponseChunks can be interleaved. Current strategy is to record the
-      // sum duration in each stage across different chunks, then divide the total duration (start
-      // from the first chunk creation in the windmill worker to the end of last chunk reception by
-      // the user worker) proportionally according the sum duration values across the many stages.
-      // This should allow us to identify the slow stage meanwhile avoid confusions for comparing
-      // the stage duration to the total processing elapsed wall time.
+      // sum duration in each transmission stage across different chunks, then divide the total
+      // duration (start from the chunk creation end in the windmill worker to the end of last chunk
+      // reception by the user worker) proportionally according the sum duration values across the
+      // many stages. This should allow us to identify the slow stage meanwhile avoid confusions for
+      // comparing the stage duration to the total processing elapsed wall time.
       Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
       for (GetWorkStreamTimingInfo info : infos) {
         getWorkStreamTimings.putIfAbsent(
             info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 1000));
       }
-
-      for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
-        Event start = cell.getRowKey();
-        Event end = cell.getColumnKey();
-        State state = cell.getValue();
-        Instant startTiming = getWorkStreamTimings.get(start);
-        Instant endTiming = getWorkStreamTimings.get(end);
-        if (startTiming != null && endTiming != null) {
-          getWorkStreamLatencies.compute(
-              state,
-              (state_key, duration) -> {
-                Duration newDuration = new Duration(startTiming, endTiming);
-                if (duration == null) {
-                  return newDuration;
-                }
-                return duration.plus(newDuration);
-              });
-        }
+      Instant workItemCreationStart = getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
+      Instant workItemCreationEnd = getWorkStreamTimings.get(Event.GET_WORK_CREATION_END);
+      // Record the work item creation end time.
+      if (workItemCreationStart != null
+          && workItemCreationEnd != null
+          && workItemCreationLatency == null) {
+        workItemCreationLatency =
+            LatencyAttribution.newBuilder()
+                .setState(State.GET_WORK_IN_WINDMILL_WORKER)
+                .setTotalDurationMillis(
+                    new Duration(workItemCreationStart, workItemCreationEnd).getMillis())
+                .build();
       }
-      Instant getWorkCreationStartTime = getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
-      if (getWorkCreationStartTime != null
-          && getWorkCreationStartTime.isBefore(workItemCreationStartTime)) {
-        workItemCreationStartTime = getWorkCreationStartTime;
+      if (workItemCreationEnd != null && workItemCreationEnd.isAfter(workItemCreationEndTime)) {

Review Comment:
   You could look into removing the nullness warning suppression at the top of the class to automate this catch if needed.  But that might require more fixes than we want to make in this cl



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java:
##########
@@ -971,39 +970,52 @@ public void onCompleted() {
 
   @Test
   public void testGetWorkTimingInfosTracker() throws Exception {
-    GetWorkTimingInfosTracker tracker = new GetWorkTimingInfosTracker();
+    GetWorkTimingInfosTracker tracker = new GetWorkTimingInfosTracker(() -> 50);
     List<GetWorkStreamTimingInfo> infos = new ArrayList<>();
     for (int i = 0; i <= 3; i++) {
       infos.add(
           GetWorkStreamTimingInfo.newBuilder()
               .setEvent(Event.GET_WORK_CREATION_START)
-              .setTimestampUsec(i)
+              .setTimestampUsec(0)
               .build());
       infos.add(
           GetWorkStreamTimingInfo.newBuilder()
               .setEvent(Event.GET_WORK_CREATION_END)
-              .setTimestampUsec(i + 1)
+              .setTimestampUsec(10000)
               .build());
       infos.add(
           GetWorkStreamTimingInfo.newBuilder()
               .setEvent(Event.GET_WORK_RECEIVED_BY_DISPATCHER)
-              .setTimestampUsec(i + 2)
+              .setTimestampUsec((i + 11) * 1000)
               .build());
       infos.add(
           GetWorkStreamTimingInfo.newBuilder()
               .setEvent(Event.GET_WORK_FORWARDED_BY_DISPATCHER)
-              .setTimestampUsec(i + 3)
+              .setTimestampUsec((i + 16) * 1000)
               .build());
+      tracker.addTimingInfo(infos);
+      infos.clear();
     }
-    tracker.addTimingInfo(infos);
-    Set<State> states = new HashSet<>();
+    // durations for each chunk:
+    // GET_WORK_IN_WINDMILL_WORKER: 10, 10, 10, 10
+    // GET_WORK_IN_TRANSIT_TO_DISPATCHER: 1, 2, 3, 4 -> sum to 10
+    // GET_WORK_IN_TRANSIT_TO_USER_WORKER: 34, 33, 32, 31 -> sum to 130
+    Map<State, LatencyAttribution> latencies = new HashMap<>();
     List<LatencyAttribution> attributions = tracker.getLatencyAttributions();
     assertEquals(3, attributions.size());
     for (LatencyAttribution attribution : attributions) {
-      states.add(attribution.getState());
+      latencies.put(attribution.getState(), attribution);
     }
-    assertTrue(states.contains(State.GET_WORK_IN_WINDMILL_WORKER));
-    assertTrue(states.contains(State.GET_WORK_IN_TRANSIT_TO_DISPATCHER));
-    assertTrue(states.contains(State.GET_WORK_IN_TRANSIT_TO_USER_WORKER));
+    assertEquals(10L, latencies.get(State.GET_WORK_IN_WINDMILL_WORKER).getTotalDurationMillis());
+    // elapsed time from 10 -> 50;
+    long elapsedTime = 40;
+    // sumDurations: 1 + 2 + 3 + 4 + 34 + 33 + 32 + 31;
+    long sumDurations = 140;
+    assertEquals(

Review Comment:
   hmm, doing the math here is
   130/140*40=37
   
   That's making me think we need to do something else than total per state and then scale.  In this case it is odd that the 37 is greater than any of the individual latencies 31,32,33,34 for that stage.
   
   Other ideas:
   - take max of each type, scale down if needed to not exceed total latency
   - take the average of each type, scale down if needed to not exceed total latency



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -950,42 +947,48 @@ public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
     }
 
     List<LatencyAttribution> getLatencyAttributions() {
-      if (getWorkStreamLatencies.size() == 0) {
-        return new ArrayList<>();
+      if (workItemCreationLatency == null && aggregatedGetWorkStreamLatencies.size() == 0) {

Review Comment:
   nit: use isEmpty() instead of size() == 0



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -950,42 +947,48 @@ public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
     }
 
     List<LatencyAttribution> getLatencyAttributions() {
-      if (getWorkStreamLatencies.size() == 0) {
-        return new ArrayList<>();
+      if (workItemCreationLatency == null && aggregatedGetWorkStreamLatencies.size() == 0) {
+        return Collections.emptyList();
+      }
+      List<LatencyAttribution> latencyAttributions =
+          new ArrayList<>(aggregatedGetWorkStreamLatencies.size() + 1);
+      if (workItemCreationLatency != null) {
+        latencyAttributions.add(workItemCreationLatency);
       }
-      if (workItemCreationStartTime.isAfter(workItemLastChunkReceivedByWorkerTime)) {
+      if (workItemCreationEndTime.isAfter(workItemLastChunkReceivedByWorkerTime)) {
         LOG.warn(
-            String.format(
-                "Work item creation time %s is after the work received time %s, "
-                    + "one or more GetWorkStream timing infos are missing.",
-                workItemCreationStartTime, workItemLastChunkReceivedByWorkerTime));
-        return new ArrayList<>();
-      }
-      List<LatencyAttribution> latencyAttributions = new ArrayList<>(getWorkStreamLatencies.size());
-      long totalDurationWallTimeMills =
-          new Duration(workItemCreationStartTime, workItemLastChunkReceivedByWorkerTime)
-              .getMillis();
+            "Work item creation time {} is after the work received time {}, "
+                + "one or more GetWorkStream timing infos are missing.",
+            workItemCreationEndTime,
+            workItemLastChunkReceivedByWorkerTime);
+        return latencyAttributions;
+      }
+      long totalTransmissionDurationElapsedTime =
+          new Duration(workItemCreationEndTime, workItemLastChunkReceivedByWorkerTime).getMillis();
       long totalSumDurationTimeMills = 0;
-      for (Duration duration : getWorkStreamLatencies.values()) {
+      for (Duration duration : aggregatedGetWorkStreamLatencies.values()) {
         totalSumDurationTimeMills += duration.getMillis();
       }
 
-      for (Map.Entry<State, Duration> duration : getWorkStreamLatencies.entrySet()) {
+      for (Map.Entry<State, Duration> duration : aggregatedGetWorkStreamLatencies.entrySet()) {
         latencyAttributions.add(
             LatencyAttribution.newBuilder()
                 .setState(duration.getKey())
                 .setTotalDurationMillis(
-                    (duration.getValue().getMillis() / totalSumDurationTimeMills)
-                        * totalDurationWallTimeMills)
+                    (long)
+                        (((double) duration.getValue().getMillis()
+                                / (double) totalSumDurationTimeMills)

Review Comment:
   think you can remove one of the double casts, as with a single double double arithmetic will apply
   is the cast to (long) not implicit with java?



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java:
##########
@@ -971,39 +970,52 @@ public void onCompleted() {
 
   @Test
   public void testGetWorkTimingInfosTracker() throws Exception {
-    GetWorkTimingInfosTracker tracker = new GetWorkTimingInfosTracker();
+    GetWorkTimingInfosTracker tracker = new GetWorkTimingInfosTracker(() -> 50);
     List<GetWorkStreamTimingInfo> infos = new ArrayList<>();
     for (int i = 0; i <= 3; i++) {
       infos.add(
           GetWorkStreamTimingInfo.newBuilder()
               .setEvent(Event.GET_WORK_CREATION_START)
-              .setTimestampUsec(i)
+              .setTimestampUsec(0)
               .build());
       infos.add(
           GetWorkStreamTimingInfo.newBuilder()
               .setEvent(Event.GET_WORK_CREATION_END)
-              .setTimestampUsec(i + 1)
+              .setTimestampUsec(10000)
               .build());
       infos.add(
           GetWorkStreamTimingInfo.newBuilder()
               .setEvent(Event.GET_WORK_RECEIVED_BY_DISPATCHER)
-              .setTimestampUsec(i + 2)
+              .setTimestampUsec((i + 11) * 1000)
               .build());
       infos.add(
           GetWorkStreamTimingInfo.newBuilder()
               .setEvent(Event.GET_WORK_FORWARDED_BY_DISPATCHER)
-              .setTimestampUsec(i + 3)
+              .setTimestampUsec((i + 16) * 1000)
               .build());
+      tracker.addTimingInfo(infos);
+      infos.clear();
     }
-    tracker.addTimingInfo(infos);
-    Set<State> states = new HashSet<>();
+    // durations for each chunk:
+    // GET_WORK_IN_WINDMILL_WORKER: 10, 10, 10, 10
+    // GET_WORK_IN_TRANSIT_TO_DISPATCHER: 1, 2, 3, 4 -> sum to 10
+    // GET_WORK_IN_TRANSIT_TO_USER_WORKER: 34, 33, 32, 31 -> sum to 130
+    Map<State, LatencyAttribution> latencies = new HashMap<>();
     List<LatencyAttribution> attributions = tracker.getLatencyAttributions();
     assertEquals(3, attributions.size());
     for (LatencyAttribution attribution : attributions) {
-      states.add(attribution.getState());
+      latencies.put(attribution.getState(), attribution);
     }
-    assertTrue(states.contains(State.GET_WORK_IN_WINDMILL_WORKER));
-    assertTrue(states.contains(State.GET_WORK_IN_TRANSIT_TO_DISPATCHER));
-    assertTrue(states.contains(State.GET_WORK_IN_TRANSIT_TO_USER_WORKER));
+    assertEquals(10L, latencies.get(State.GET_WORK_IN_WINDMILL_WORKER).getTotalDurationMillis());
+    // elapsed time from 10 -> 50;
+    long elapsedTime = 40;
+    // sumDurations: 1 + 2 + 3 + 4 + 34 + 33 + 32 + 31;
+    long sumDurations = 140;
+    assertEquals(
+        (long) (elapsedTime * ((double) 10 / (double) sumDurations)),

Review Comment:
   nit: 10.0 instead of (double) 10
   and think you can remove other double cast



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java:
##########
@@ -971,39 +970,52 @@ public void onCompleted() {
 
   @Test
   public void testGetWorkTimingInfosTracker() throws Exception {
-    GetWorkTimingInfosTracker tracker = new GetWorkTimingInfosTracker();
+    GetWorkTimingInfosTracker tracker = new GetWorkTimingInfosTracker(() -> 50);
     List<GetWorkStreamTimingInfo> infos = new ArrayList<>();
     for (int i = 0; i <= 3; i++) {
       infos.add(
           GetWorkStreamTimingInfo.newBuilder()
               .setEvent(Event.GET_WORK_CREATION_START)
-              .setTimestampUsec(i)
+              .setTimestampUsec(0)
               .build());
       infos.add(
           GetWorkStreamTimingInfo.newBuilder()
               .setEvent(Event.GET_WORK_CREATION_END)
-              .setTimestampUsec(i + 1)
+              .setTimestampUsec(10000)
               .build());
       infos.add(
           GetWorkStreamTimingInfo.newBuilder()
               .setEvent(Event.GET_WORK_RECEIVED_BY_DISPATCHER)
-              .setTimestampUsec(i + 2)
+              .setTimestampUsec((i + 11) * 1000)
               .build());
       infos.add(
           GetWorkStreamTimingInfo.newBuilder()
               .setEvent(Event.GET_WORK_FORWARDED_BY_DISPATCHER)
-              .setTimestampUsec(i + 3)
+              .setTimestampUsec((i + 16) * 1000)
               .build());
+      tracker.addTimingInfo(infos);
+      infos.clear();
     }
-    tracker.addTimingInfo(infos);
-    Set<State> states = new HashSet<>();
+    // durations for each chunk:
+    // GET_WORK_IN_WINDMILL_WORKER: 10, 10, 10, 10
+    // GET_WORK_IN_TRANSIT_TO_DISPATCHER: 1, 2, 3, 4 -> sum to 10
+    // GET_WORK_IN_TRANSIT_TO_USER_WORKER: 34, 33, 32, 31 -> sum to 130
+    Map<State, LatencyAttribution> latencies = new HashMap<>();
     List<LatencyAttribution> attributions = tracker.getLatencyAttributions();
     assertEquals(3, attributions.size());
     for (LatencyAttribution attribution : attributions) {
-      states.add(attribution.getState());
+      latencies.put(attribution.getState(), attribution);
     }
-    assertTrue(states.contains(State.GET_WORK_IN_WINDMILL_WORKER));
-    assertTrue(states.contains(State.GET_WORK_IN_TRANSIT_TO_DISPATCHER));
-    assertTrue(states.contains(State.GET_WORK_IN_TRANSIT_TO_USER_WORKER));
+    assertEquals(10L, latencies.get(State.GET_WORK_IN_WINDMILL_WORKER).getTotalDurationMillis());
+    // elapsed time from 10 -> 50;
+    long elapsedTime = 40;
+    // sumDurations: 1 + 2 + 3 + 4 + 34 + 33 + 32 + 31;
+    long sumDurations = 140;
+    assertEquals(
+        (long) (elapsedTime * ((double) 10 / (double) sumDurations)),
+        latencies.get(State.GET_WORK_IN_TRANSIT_TO_DISPATCHER).getTotalDurationMillis());
+    assertEquals(
+        (long) (elapsedTime * ((double) 130 / (double) sumDurations)),

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "scwhittle (via GitHub)" <gi...@apache.org>.
scwhittle commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1230751031


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -862,6 +870,161 @@ public final Instant startTime() {
     }
   }
 
+  static class GetWorkTimingInfosTracker {
+
+    private Instant workItemCreationEndTime = Instant.EPOCH;
+    private Instant workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
+
+    private LatencyAttribution workItemCreationLatency = null;
+    private final Map<State, Duration> aggregatedGetWorkStreamLatencies;

Review Comment:
   nit: have a single Map from State -> sum/max
   saves lookups but also makes it clearer the invariant that key is always in both maps



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -1441,6 +1623,11 @@ private void issueSingleRequest(final long id, PendingRequest pendingRequest) {
           .setRequestId(id)
           .setShardingKey(pendingRequest.request.getShardingKey())
           .setSerializedWorkItemCommit(pendingRequest.request.toByteString());
+      if (!pendingRequest.latencyAttributions.isEmpty()) {
+        requestBuilder
+            .getCommitChunkBuilder(0)
+            .addAllPerWorkItemLatencyAttributions(pendingRequest.latencyAttributions);

Review Comment:
   Let's consider adding the latency information for the work item itself to the WorkItemCommitRequest
   
   Then we can just have similar timing information on the StreamingCommitWorkRequest to calculate latencies on the commit path.
   
   Also note below you are not setting the latency information in issueMultiChunkRequest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1222094321


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java:
##########
@@ -957,4 +968,42 @@ public void onCompleted() {
     stream.close();
     assertTrue(stream.awaitTermination(30, TimeUnit.SECONDS));
   }
+
+  @Test
+  public void testGetWorkTimingInfosTracker() throws Exception {
+    GetWorkTimingInfosTracker tracker = new GetWorkTimingInfosTracker();
+    List<GetWorkStreamTimingInfo> infos = new ArrayList<>();
+    for (int i = 0; i <= 3; i++) {
+      infos.add(
+          GetWorkStreamTimingInfo.newBuilder()
+              .setEvent(Event.GET_WORK_CREATION_START)
+              .setTimestampUsec(i)
+              .build());
+      infos.add(
+          GetWorkStreamTimingInfo.newBuilder()
+              .setEvent(Event.GET_WORK_CREATION_END)
+              .setTimestampUsec(i + 1)
+              .build());
+      infos.add(
+          GetWorkStreamTimingInfo.newBuilder()
+              .setEvent(Event.GET_WORK_RECEIVED_BY_DISPATCHER)
+              .setTimestampUsec(i + 2)
+              .build());
+      infos.add(
+          GetWorkStreamTimingInfo.newBuilder()
+              .setEvent(Event.GET_WORK_FORWARDED_BY_DISPATCHER)
+              .setTimestampUsec(i + 3)
+              .build());
+    }
+    tracker.addTimingInfo(infos);
+    Set<State> states = new HashSet<>();
+    List<LatencyAttribution> attributions = tracker.getLatencyAttributions();
+    assertEquals(3, attributions.size());
+    for (LatencyAttribution attribution : attributions) {
+      states.add(attribution.getState());
+    }
+    assertTrue(states.contains(State.GET_WORK_IN_WINDMILL_WORKER));

Review Comment:
   Done.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -862,6 +885,110 @@ public final Instant startTime() {
     }
   }
 
+  static class GetWorkTimingInfosTracker {
+    private final Map<State, Duration> getWorkStreamLatencies;
+    private Instant workItemCreationStartTime = Instant.ofEpochMilli(Long.MAX_VALUE);
+    private Instant workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
+
+    public GetWorkTimingInfosTracker() {
+      this.getWorkStreamLatencies = new EnumMap<>(State.class);
+    }
+
+    public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
+      // We want to record duration for each stage and also be reflective on total work item
+      // processing time. It can be tricky because timings of different
+      // StreamingGetWorkResponseChunks can be interleaved. Current strategy is to record the
+      // sum duration in each stage across different chunks, then divide the total duration (start
+      // from the first chunk creation in the windmill worker to the end of last chunk reception by
+      // the user worker) proportionally according the sum duration values across the many stages.
+      // This should allow us to identify the slow stage meanwhile avoid confusions for comparing
+      // the stage duration to the total processing elapsed wall time.
+      Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
+      for (GetWorkStreamTimingInfo info : infos) {
+        getWorkStreamTimings.putIfAbsent(
+            info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 1000));
+      }
+
+      for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
+        Event start = cell.getRowKey();
+        Event end = cell.getColumnKey();
+        State state = cell.getValue();
+        Instant startTiming = getWorkStreamTimings.get(start);
+        Instant endTiming = getWorkStreamTimings.get(end);
+        if (startTiming != null && endTiming != null) {
+          getWorkStreamLatencies.compute(
+              state,
+              (state_key, duration) -> {
+                Duration newDuration = new Duration(startTiming, endTiming);
+                if (duration == null) {
+                  return newDuration;
+                }
+                return duration.plus(newDuration);
+              });
+        }
+      }
+      Instant getWorkCreationStartTime = getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
+      if (getWorkCreationStartTime != null
+          && getWorkCreationStartTime.isBefore(workItemCreationStartTime)) {
+        workItemCreationStartTime = getWorkCreationStartTime;
+      }
+      Instant receivedByDispatcherTiming =
+          getWorkStreamTimings.get(Event.GET_WORK_RECEIVED_BY_DISPATCHER);
+      Instant now = Instant.now();

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1222094878


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -862,6 +885,110 @@ public final Instant startTime() {
     }
   }
 
+  static class GetWorkTimingInfosTracker {
+    private final Map<State, Duration> getWorkStreamLatencies;
+    private Instant workItemCreationStartTime = Instant.ofEpochMilli(Long.MAX_VALUE);
+    private Instant workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
+
+    public GetWorkTimingInfosTracker() {
+      this.getWorkStreamLatencies = new EnumMap<>(State.class);
+    }
+
+    public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
+      // We want to record duration for each stage and also be reflective on total work item
+      // processing time. It can be tricky because timings of different
+      // StreamingGetWorkResponseChunks can be interleaved. Current strategy is to record the
+      // sum duration in each stage across different chunks, then divide the total duration (start
+      // from the first chunk creation in the windmill worker to the end of last chunk reception by
+      // the user worker) proportionally according the sum duration values across the many stages.
+      // This should allow us to identify the slow stage meanwhile avoid confusions for comparing
+      // the stage duration to the total processing elapsed wall time.
+      Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
+      for (GetWorkStreamTimingInfo info : infos) {
+        getWorkStreamTimings.putIfAbsent(
+            info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 1000));
+      }
+
+      for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
+        Event start = cell.getRowKey();
+        Event end = cell.getColumnKey();
+        State state = cell.getValue();
+        Instant startTiming = getWorkStreamTimings.get(start);
+        Instant endTiming = getWorkStreamTimings.get(end);
+        if (startTiming != null && endTiming != null) {
+          getWorkStreamLatencies.compute(
+              state,
+              (state_key, duration) -> {
+                Duration newDuration = new Duration(startTiming, endTiming);
+                if (duration == null) {
+                  return newDuration;
+                }
+                return duration.plus(newDuration);
+              });
+        }
+      }
+      Instant getWorkCreationStartTime = getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
+      if (getWorkCreationStartTime != null
+          && getWorkCreationStartTime.isBefore(workItemCreationStartTime)) {
+        workItemCreationStartTime = getWorkCreationStartTime;
+      }
+      Instant receivedByDispatcherTiming =
+          getWorkStreamTimings.get(Event.GET_WORK_RECEIVED_BY_DISPATCHER);
+      Instant now = Instant.now();
+      if (receivedByDispatcherTiming != null) {
+        getWorkStreamLatencies.compute(
+            State.GET_WORK_IN_TRANSIT_TO_USER_WORKER,
+            (state_key, duration) -> {
+              Duration newDuration = new Duration(receivedByDispatcherTiming, now);
+              if (duration == null) {
+                return newDuration;
+              }
+              return duration.plus(newDuration);
+            });
+      }
+      workItemLastChunkReceivedByWorkerTime = now;
+    }
+
+    List<LatencyAttribution> getLatencyAttributions() {
+      if (getWorkStreamLatencies.size() == 0) {
+        return new ArrayList<>();
+      }
+      if (workItemCreationStartTime.isAfter(workItemLastChunkReceivedByWorkerTime)) {
+        LOG.warn(
+            String.format(
+                "Work item creation time %s is after the work received time %s, "
+                    + "one or more GetWorkStream timing infos are missing.",
+                workItemCreationStartTime, workItemLastChunkReceivedByWorkerTime));
+        return new ArrayList<>();
+      }
+      List<LatencyAttribution> latencyAttributions = new ArrayList<>(getWorkStreamLatencies.size());
+      long totalDurationWallTimeMills =
+          new Duration(workItemCreationStartTime, workItemLastChunkReceivedByWorkerTime)

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1222094540


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -862,6 +885,110 @@ public final Instant startTime() {
     }
   }
 
+  static class GetWorkTimingInfosTracker {
+    private final Map<State, Duration> getWorkStreamLatencies;
+    private Instant workItemCreationStartTime = Instant.ofEpochMilli(Long.MAX_VALUE);
+    private Instant workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
+
+    public GetWorkTimingInfosTracker() {
+      this.getWorkStreamLatencies = new EnumMap<>(State.class);
+    }
+
+    public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
+      // We want to record duration for each stage and also be reflective on total work item
+      // processing time. It can be tricky because timings of different
+      // StreamingGetWorkResponseChunks can be interleaved. Current strategy is to record the
+      // sum duration in each stage across different chunks, then divide the total duration (start
+      // from the first chunk creation in the windmill worker to the end of last chunk reception by
+      // the user worker) proportionally according the sum duration values across the many stages.
+      // This should allow us to identify the slow stage meanwhile avoid confusions for comparing
+      // the stage duration to the total processing elapsed wall time.
+      Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
+      for (GetWorkStreamTimingInfo info : infos) {
+        getWorkStreamTimings.putIfAbsent(
+            info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 1000));
+      }
+
+      for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
+        Event start = cell.getRowKey();
+        Event end = cell.getColumnKey();
+        State state = cell.getValue();
+        Instant startTiming = getWorkStreamTimings.get(start);
+        Instant endTiming = getWorkStreamTimings.get(end);
+        if (startTiming != null && endTiming != null) {
+          getWorkStreamLatencies.compute(
+              state,
+              (state_key, duration) -> {
+                Duration newDuration = new Duration(startTiming, endTiming);
+                if (duration == null) {
+                  return newDuration;
+                }
+                return duration.plus(newDuration);
+              });
+        }
+      }
+      Instant getWorkCreationStartTime = getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
+      if (getWorkCreationStartTime != null
+          && getWorkCreationStartTime.isBefore(workItemCreationStartTime)) {
+        workItemCreationStartTime = getWorkCreationStartTime;
+      }
+      Instant receivedByDispatcherTiming =
+          getWorkStreamTimings.get(Event.GET_WORK_RECEIVED_BY_DISPATCHER);
+      Instant now = Instant.now();
+      if (receivedByDispatcherTiming != null) {
+        getWorkStreamLatencies.compute(
+            State.GET_WORK_IN_TRANSIT_TO_USER_WORKER,
+            (state_key, duration) -> {
+              Duration newDuration = new Duration(receivedByDispatcherTiming, now);
+              if (duration == null) {
+                return newDuration;
+              }
+              return duration.plus(newDuration);
+            });
+      }
+      workItemLastChunkReceivedByWorkerTime = now;
+    }
+
+    List<LatencyAttribution> getLatencyAttributions() {
+      if (getWorkStreamLatencies.size() == 0) {
+        return new ArrayList<>();

Review Comment:
   Done.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -862,6 +885,110 @@ public final Instant startTime() {
     }
   }
 
+  static class GetWorkTimingInfosTracker {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "scwhittle (via GitHub)" <gi...@apache.org>.
scwhittle commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1221361193


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1048,6 +1056,7 @@ public void run() {
                 this);
           }
         };
+    work.recordGetWorkStreamLatencies(getWorkStreamLatencies);

Review Comment:
   since the implementation just expects this to be called once (ie not merging if called multiple times), how about just modifying the Work constructor to take getWorkStreamLatencies



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -862,6 +885,110 @@ public final Instant startTime() {
     }
   }
 
+  static class GetWorkTimingInfosTracker {
+    private final Map<State, Duration> getWorkStreamLatencies;
+    private Instant workItemCreationStartTime = Instant.ofEpochMilli(Long.MAX_VALUE);
+    private Instant workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
+
+    public GetWorkTimingInfosTracker() {
+      this.getWorkStreamLatencies = new EnumMap<>(State.class);
+    }
+
+    public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
+      // We want to record duration for each stage and also be reflective on total work item
+      // processing time. It can be tricky because timings of different
+      // StreamingGetWorkResponseChunks can be interleaved. Current strategy is to record the
+      // sum duration in each stage across different chunks, then divide the total duration (start
+      // from the first chunk creation in the windmill worker to the end of last chunk reception by
+      // the user worker) proportionally according the sum duration values across the many stages.
+      // This should allow us to identify the slow stage meanwhile avoid confusions for comparing
+      // the stage duration to the total processing elapsed wall time.
+      Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
+      for (GetWorkStreamTimingInfo info : infos) {
+        getWorkStreamTimings.putIfAbsent(
+            info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 1000));
+      }
+
+      for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
+        Event start = cell.getRowKey();
+        Event end = cell.getColumnKey();
+        State state = cell.getValue();
+        Instant startTiming = getWorkStreamTimings.get(start);
+        Instant endTiming = getWorkStreamTimings.get(end);
+        if (startTiming != null && endTiming != null) {
+          getWorkStreamLatencies.compute(
+              state,
+              (state_key, duration) -> {
+                Duration newDuration = new Duration(startTiming, endTiming);
+                if (duration == null) {
+                  return newDuration;
+                }
+                return duration.plus(newDuration);
+              });
+        }
+      }
+      Instant getWorkCreationStartTime = getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
+      if (getWorkCreationStartTime != null
+          && getWorkCreationStartTime.isBefore(workItemCreationStartTime)) {
+        workItemCreationStartTime = getWorkCreationStartTime;
+      }
+      Instant receivedByDispatcherTiming =
+          getWorkStreamTimings.get(Event.GET_WORK_RECEIVED_BY_DISPATCHER);
+      Instant now = Instant.now();
+      if (receivedByDispatcherTiming != null) {
+        getWorkStreamLatencies.compute(
+            State.GET_WORK_IN_TRANSIT_TO_USER_WORKER,
+            (state_key, duration) -> {
+              Duration newDuration = new Duration(receivedByDispatcherTiming, now);
+              if (duration == null) {
+                return newDuration;
+              }
+              return duration.plus(newDuration);
+            });
+      }
+      workItemLastChunkReceivedByWorkerTime = now;
+    }
+
+    List<LatencyAttribution> getLatencyAttributions() {
+      if (getWorkStreamLatencies.size() == 0) {
+        return new ArrayList<>();

Review Comment:
   Collections.emptyList() for here and below?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -862,6 +885,110 @@ public final Instant startTime() {
     }
   }
 
+  static class GetWorkTimingInfosTracker {

Review Comment:
   add a unit test of this class



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java:
##########
@@ -957,4 +968,42 @@ public void onCompleted() {
     stream.close();
     assertTrue(stream.awaitTermination(30, TimeUnit.SECONDS));
   }
+
+  @Test
+  public void testGetWorkTimingInfosTracker() throws Exception {
+    GetWorkTimingInfosTracker tracker = new GetWorkTimingInfosTracker();
+    List<GetWorkStreamTimingInfo> infos = new ArrayList<>();
+    for (int i = 0; i <= 3; i++) {
+      infos.add(
+          GetWorkStreamTimingInfo.newBuilder()
+              .setEvent(Event.GET_WORK_CREATION_START)
+              .setTimestampUsec(i)
+              .build());
+      infos.add(
+          GetWorkStreamTimingInfo.newBuilder()
+              .setEvent(Event.GET_WORK_CREATION_END)
+              .setTimestampUsec(i + 1)
+              .build());
+      infos.add(
+          GetWorkStreamTimingInfo.newBuilder()
+              .setEvent(Event.GET_WORK_RECEIVED_BY_DISPATCHER)
+              .setTimestampUsec(i + 2)
+              .build());
+      infos.add(
+          GetWorkStreamTimingInfo.newBuilder()
+              .setEvent(Event.GET_WORK_FORWARDED_BY_DISPATCHER)
+              .setTimestampUsec(i + 3)
+              .build());
+    }
+    tracker.addTimingInfo(infos);
+    Set<State> states = new HashSet<>();
+    List<LatencyAttribution> attributions = tracker.getLatencyAttributions();
+    assertEquals(3, attributions.size());
+    for (LatencyAttribution attribution : attributions) {
+      states.add(attribution.getState());
+    }
+    assertTrue(states.contains(State.GET_WORK_IN_WINDMILL_WORKER));

Review Comment:
   can you inject a clock and verify the numbers calculated?
   In particular it would be good to verify the logic about scaling things



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -862,6 +885,110 @@ public final Instant startTime() {
     }
   }
 
+  static class GetWorkTimingInfosTracker {
+    private final Map<State, Duration> getWorkStreamLatencies;
+    private Instant workItemCreationStartTime = Instant.ofEpochMilli(Long.MAX_VALUE);
+    private Instant workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
+
+    public GetWorkTimingInfosTracker() {
+      this.getWorkStreamLatencies = new EnumMap<>(State.class);
+    }
+
+    public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
+      // We want to record duration for each stage and also be reflective on total work item
+      // processing time. It can be tricky because timings of different
+      // StreamingGetWorkResponseChunks can be interleaved. Current strategy is to record the
+      // sum duration in each stage across different chunks, then divide the total duration (start
+      // from the first chunk creation in the windmill worker to the end of last chunk reception by
+      // the user worker) proportionally according the sum duration values across the many stages.
+      // This should allow us to identify the slow stage meanwhile avoid confusions for comparing
+      // the stage duration to the total processing elapsed wall time.
+      Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
+      for (GetWorkStreamTimingInfo info : infos) {
+        getWorkStreamTimings.putIfAbsent(
+            info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 1000));
+      }
+
+      for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
+        Event start = cell.getRowKey();
+        Event end = cell.getColumnKey();
+        State state = cell.getValue();
+        Instant startTiming = getWorkStreamTimings.get(start);
+        Instant endTiming = getWorkStreamTimings.get(end);
+        if (startTiming != null && endTiming != null) {
+          getWorkStreamLatencies.compute(
+              state,
+              (state_key, duration) -> {
+                Duration newDuration = new Duration(startTiming, endTiming);
+                if (duration == null) {
+                  return newDuration;
+                }
+                return duration.plus(newDuration);
+              });
+        }
+      }
+      Instant getWorkCreationStartTime = getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
+      if (getWorkCreationStartTime != null
+          && getWorkCreationStartTime.isBefore(workItemCreationStartTime)) {
+        workItemCreationStartTime = getWorkCreationStartTime;
+      }
+      Instant receivedByDispatcherTiming =
+          getWorkStreamTimings.get(Event.GET_WORK_RECEIVED_BY_DISPATCHER);
+      Instant now = Instant.now();

Review Comment:
   use clock



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -862,6 +885,110 @@ public final Instant startTime() {
     }
   }
 
+  static class GetWorkTimingInfosTracker {
+    private final Map<State, Duration> getWorkStreamLatencies;
+    private Instant workItemCreationStartTime = Instant.ofEpochMilli(Long.MAX_VALUE);
+    private Instant workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
+
+    public GetWorkTimingInfosTracker() {
+      this.getWorkStreamLatencies = new EnumMap<>(State.class);
+    }
+
+    public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
+      // We want to record duration for each stage and also be reflective on total work item
+      // processing time. It can be tricky because timings of different
+      // StreamingGetWorkResponseChunks can be interleaved. Current strategy is to record the
+      // sum duration in each stage across different chunks, then divide the total duration (start
+      // from the first chunk creation in the windmill worker to the end of last chunk reception by
+      // the user worker) proportionally according the sum duration values across the many stages.
+      // This should allow us to identify the slow stage meanwhile avoid confusions for comparing
+      // the stage duration to the total processing elapsed wall time.
+      Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
+      for (GetWorkStreamTimingInfo info : infos) {
+        getWorkStreamTimings.putIfAbsent(
+            info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 1000));
+      }
+
+      for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
+        Event start = cell.getRowKey();
+        Event end = cell.getColumnKey();
+        State state = cell.getValue();
+        Instant startTiming = getWorkStreamTimings.get(start);
+        Instant endTiming = getWorkStreamTimings.get(end);
+        if (startTiming != null && endTiming != null) {
+          getWorkStreamLatencies.compute(
+              state,
+              (state_key, duration) -> {
+                Duration newDuration = new Duration(startTiming, endTiming);
+                if (duration == null) {
+                  return newDuration;
+                }
+                return duration.plus(newDuration);
+              });
+        }
+      }
+      Instant getWorkCreationStartTime = getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
+      if (getWorkCreationStartTime != null
+          && getWorkCreationStartTime.isBefore(workItemCreationStartTime)) {
+        workItemCreationStartTime = getWorkCreationStartTime;
+      }
+      Instant receivedByDispatcherTiming =
+          getWorkStreamTimings.get(Event.GET_WORK_RECEIVED_BY_DISPATCHER);
+      Instant now = Instant.now();
+      if (receivedByDispatcherTiming != null) {
+        getWorkStreamLatencies.compute(
+            State.GET_WORK_IN_TRANSIT_TO_USER_WORKER,
+            (state_key, duration) -> {
+              Duration newDuration = new Duration(receivedByDispatcherTiming, now);
+              if (duration == null) {
+                return newDuration;
+              }
+              return duration.plus(newDuration);
+            });
+      }
+      workItemLastChunkReceivedByWorkerTime = now;
+    }
+
+    List<LatencyAttribution> getLatencyAttributions() {
+      if (getWorkStreamLatencies.size() == 0) {
+        return new ArrayList<>();
+      }
+      if (workItemCreationStartTime.isAfter(workItemLastChunkReceivedByWorkerTime)) {
+        LOG.warn(
+            String.format(

Review Comment:
   LOG itself supports lazy formatting with {}



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -862,6 +885,110 @@ public final Instant startTime() {
     }
   }
 
+  static class GetWorkTimingInfosTracker {
+    private final Map<State, Duration> getWorkStreamLatencies;
+    private Instant workItemCreationStartTime = Instant.ofEpochMilli(Long.MAX_VALUE);
+    private Instant workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
+
+    public GetWorkTimingInfosTracker() {
+      this.getWorkStreamLatencies = new EnumMap<>(State.class);
+    }
+
+    public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
+      // We want to record duration for each stage and also be reflective on total work item
+      // processing time. It can be tricky because timings of different
+      // StreamingGetWorkResponseChunks can be interleaved. Current strategy is to record the
+      // sum duration in each stage across different chunks, then divide the total duration (start
+      // from the first chunk creation in the windmill worker to the end of last chunk reception by
+      // the user worker) proportionally according the sum duration values across the many stages.
+      // This should allow us to identify the slow stage meanwhile avoid confusions for comparing
+      // the stage duration to the total processing elapsed wall time.
+      Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
+      for (GetWorkStreamTimingInfo info : infos) {
+        getWorkStreamTimings.putIfAbsent(
+            info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 1000));
+      }
+
+      for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
+        Event start = cell.getRowKey();
+        Event end = cell.getColumnKey();
+        State state = cell.getValue();
+        Instant startTiming = getWorkStreamTimings.get(start);
+        Instant endTiming = getWorkStreamTimings.get(end);
+        if (startTiming != null && endTiming != null) {
+          getWorkStreamLatencies.compute(
+              state,
+              (state_key, duration) -> {
+                Duration newDuration = new Duration(startTiming, endTiming);
+                if (duration == null) {
+                  return newDuration;
+                }
+                return duration.plus(newDuration);
+              });
+        }
+      }
+      Instant getWorkCreationStartTime = getWorkStreamTimings.get(Event.GET_WORK_CREATION_START);
+      if (getWorkCreationStartTime != null
+          && getWorkCreationStartTime.isBefore(workItemCreationStartTime)) {
+        workItemCreationStartTime = getWorkCreationStartTime;
+      }
+      Instant receivedByDispatcherTiming =
+          getWorkStreamTimings.get(Event.GET_WORK_RECEIVED_BY_DISPATCHER);
+      Instant now = Instant.now();
+      if (receivedByDispatcherTiming != null) {
+        getWorkStreamLatencies.compute(
+            State.GET_WORK_IN_TRANSIT_TO_USER_WORKER,
+            (state_key, duration) -> {
+              Duration newDuration = new Duration(receivedByDispatcherTiming, now);
+              if (duration == null) {
+                return newDuration;
+              }
+              return duration.plus(newDuration);
+            });
+      }
+      workItemLastChunkReceivedByWorkerTime = now;
+    }
+
+    List<LatencyAttribution> getLatencyAttributions() {
+      if (getWorkStreamLatencies.size() == 0) {
+        return new ArrayList<>();
+      }
+      if (workItemCreationStartTime.isAfter(workItemLastChunkReceivedByWorkerTime)) {
+        LOG.warn(
+            String.format(
+                "Work item creation time %s is after the work received time %s, "
+                    + "one or more GetWorkStream timing infos are missing.",
+                workItemCreationStartTime, workItemLastChunkReceivedByWorkerTime));
+        return new ArrayList<>();
+      }
+      List<LatencyAttribution> latencyAttributions = new ArrayList<>(getWorkStreamLatencies.size());
+      long totalDurationWallTimeMills =
+          new Duration(workItemCreationStartTime, workItemLastChunkReceivedByWorkerTime)

Review Comment:
   I think that we can just take whatever workItemCreationStartTime to workItemCreationEndTime is as the work item creation latency.  That just happens once and should be exact.  If we do send it per-chunk and not just the first chunk I think that could be changed but it should always be the same if repeatedly sent for the same work item.
   
   So I think that you should only scale the other latency sums so that their latency s workItemCreationEndTime to workItemLastChunkReceivedByWorkerTime.
   
   Then we'll have total latency of workItemCreationStartTime to workItemLastChunkReceivedByWorkerTime given by:
   workItemCreationStart to workItemCreationEnd: reported entirely as GET_WORK_IN_WINDMILL_WORKER
   workItemCreationEnd to workItemLastChunkReceivedByWorkerTime: reported as GET_WORK_IN_TRANSIT_TO_DISPATCHER and GET_WORK_IN_TRANSIT_TO_USER_WORKER, where the sums of chunk latencies are scaled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1223692743


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServerTest.java:
##########
@@ -971,39 +970,52 @@ public void onCompleted() {
 
   @Test
   public void testGetWorkTimingInfosTracker() throws Exception {
-    GetWorkTimingInfosTracker tracker = new GetWorkTimingInfosTracker();
+    GetWorkTimingInfosTracker tracker = new GetWorkTimingInfosTracker(() -> 50);
     List<GetWorkStreamTimingInfo> infos = new ArrayList<>();
     for (int i = 0; i <= 3; i++) {
       infos.add(
           GetWorkStreamTimingInfo.newBuilder()
               .setEvent(Event.GET_WORK_CREATION_START)
-              .setTimestampUsec(i)
+              .setTimestampUsec(0)
               .build());
       infos.add(
           GetWorkStreamTimingInfo.newBuilder()
               .setEvent(Event.GET_WORK_CREATION_END)
-              .setTimestampUsec(i + 1)
+              .setTimestampUsec(10000)
               .build());
       infos.add(
           GetWorkStreamTimingInfo.newBuilder()
               .setEvent(Event.GET_WORK_RECEIVED_BY_DISPATCHER)
-              .setTimestampUsec(i + 2)
+              .setTimestampUsec((i + 11) * 1000)
               .build());
       infos.add(
           GetWorkStreamTimingInfo.newBuilder()
               .setEvent(Event.GET_WORK_FORWARDED_BY_DISPATCHER)
-              .setTimestampUsec(i + 3)
+              .setTimestampUsec((i + 16) * 1000)
               .build());
+      tracker.addTimingInfo(infos);
+      infos.clear();
     }
-    tracker.addTimingInfo(infos);
-    Set<State> states = new HashSet<>();
+    // durations for each chunk:
+    // GET_WORK_IN_WINDMILL_WORKER: 10, 10, 10, 10
+    // GET_WORK_IN_TRANSIT_TO_DISPATCHER: 1, 2, 3, 4 -> sum to 10
+    // GET_WORK_IN_TRANSIT_TO_USER_WORKER: 34, 33, 32, 31 -> sum to 130
+    Map<State, LatencyAttribution> latencies = new HashMap<>();
     List<LatencyAttribution> attributions = tracker.getLatencyAttributions();
     assertEquals(3, attributions.size());
     for (LatencyAttribution attribution : attributions) {
-      states.add(attribution.getState());
+      latencies.put(attribution.getState(), attribution);
     }
-    assertTrue(states.contains(State.GET_WORK_IN_WINDMILL_WORKER));
-    assertTrue(states.contains(State.GET_WORK_IN_TRANSIT_TO_DISPATCHER));
-    assertTrue(states.contains(State.GET_WORK_IN_TRANSIT_TO_USER_WORKER));
+    assertEquals(10L, latencies.get(State.GET_WORK_IN_WINDMILL_WORKER).getTotalDurationMillis());
+    // elapsed time from 10 -> 50;
+    long elapsedTime = 40;
+    // sumDurations: 1 + 2 + 3 + 4 + 34 + 33 + 32 + 31;
+    long sumDurations = 140;
+    assertEquals(

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1178201948


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -987,6 +998,84 @@ public void append(StreamingGetWorkResponseChunk chunk) {
 
         this.data = data.concat(chunk.getSerializedWorkItem());
         this.bufferedSize += chunk.getSerializedWorkItem().size();
+        for (GetWorkStreamTimingInfo info : chunk.getPerWorkItemTimingInfosList()) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on PR #26085:
URL: https://github.com/apache/beam/pull/26085#issuecomment-1520440197

   @scwhittle gentle ping


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "scwhittle (via GitHub)" <gi...@apache.org>.
scwhittle commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1166036784


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1133,7 +1147,14 @@ public Instant getStateStartTime() {
       return stateStartTime;
     }
 
-    public Iterable<Windmill.LatencyAttribution> getLatencyAttributionList() {
+    public void recordGetWorkStreamLatencies(List<LatencyAttribution> getWorkStreamLatencies) {

Review Comment:
   just take an iterable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "scwhittle (via GitHub)" <gi...@apache.org>.
scwhittle commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1178012079


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -987,6 +998,84 @@ public void append(StreamingGetWorkResponseChunk chunk) {
 
         this.data = data.concat(chunk.getSerializedWorkItem());
         this.bufferedSize += chunk.getSerializedWorkItem().size();
+        for (GetWorkStreamTimingInfo info : chunk.getPerWorkItemTimingInfosList()) {
+          getWorkStreamTimings.compute(
+              info.getEvent(),
+              (event, recordedTime) -> {
+                Instant newTimingForEvent = Instant.ofEpochMilli(info.getTimestampUsec() / 1000);
+                if (recordedTime == null) {
+                  return newTimingForEvent;
+                }
+                switch (event) {
+                  case GET_WORK_CREATION_START:
+                    return recordedTime.isBefore(newTimingForEvent)
+                        ? recordedTime
+                        : newTimingForEvent;
+                  case GET_WORK_CREATION_END:
+                  case GET_WORK_RECEIVED_BY_DISPATCHER:
+                  case GET_WORK_FORWARDED_BY_DISPATCHER:
+                    return recordedTime.isAfter(newTimingForEvent)
+                        ? recordedTime
+                        : newTimingForEvent;
+                  default:
+                    LOG.error("Unknown GetWorkStreamTimingInfo type: " + event.name());
+                }
+                return recordedTime;
+              });
+          if (Instant.now().isAfter(workItemReceiveTime)) {

Review Comment:
   only call now once here, and could also call it only once for iteration over all chunks.
   Nothing locked or async here so don't think we're losing anything by optimizing it.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -987,6 +998,84 @@ public void append(StreamingGetWorkResponseChunk chunk) {
 
         this.data = data.concat(chunk.getSerializedWorkItem());
         this.bufferedSize += chunk.getSerializedWorkItem().size();
+        for (GetWorkStreamTimingInfo info : chunk.getPerWorkItemTimingInfosList()) {

Review Comment:
   it might be nice to have a builder object that you just pass these timing infos to and then build/finalize to get the latencies. Then it would be easier to document and test independently too and we could experiment with keeping track differently.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -987,6 +998,84 @@ public void append(StreamingGetWorkResponseChunk chunk) {
 
         this.data = data.concat(chunk.getSerializedWorkItem());
         this.bufferedSize += chunk.getSerializedWorkItem().size();
+        for (GetWorkStreamTimingInfo info : chunk.getPerWorkItemTimingInfosList()) {
+          getWorkStreamTimings.compute(
+              info.getEvent(),
+              (event, recordedTime) -> {
+                Instant newTimingForEvent = Instant.ofEpochMilli(info.getTimestampUsec() / 1000);

Review Comment:
   add comment on current strategy
   - take min across chunks for each event and use that to divide up transit time
   
   As mentioned offline it seems tricky because:
   - we want total time for all chunks for the work item to match real time elapsed so it aligns with other times tracked for the work item.
   - chunks may be delayed due to downstream pushback
   
   Another possible way to track would be identifying the max elapsed time within each stage and scaling. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on PR #26085:
URL: https://github.com/apache/beam/pull/26085#issuecomment-1518133406

   Run Java_Examples_Dataflow_Java11 PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on PR #26085:
URL: https://github.com/apache/beam/pull/26085#issuecomment-1516863865

   Run Java_Examples_Dataflow_Java11 PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on PR #26085:
URL: https://github.com/apache/beam/pull/26085#issuecomment-1516863982

   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1237397174


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -3067,8 +3069,14 @@ public void testExceptionInvalidatesCache() throws Exception {
 
       assertThat(
           // The commit will include a timer to clean up state - this timer is irrelevant
-          // for the current test. Also remove source_bytes_processed because it's dynamic.
-          setValuesTimestamps(commit.toBuilder().clearOutputTimers().clearSourceBytesProcessed())
+          // for the current test. Also remove source_bytes_processed and
+          // per_work_item_latecy_attributions because they're dynamic.

Review Comment:
   Done.



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java:
##########
@@ -367,6 +375,9 @@ public boolean commitWorkItem(
         errorCollector.checkThat(
             request.getShardingKey(), allOf(greaterThan(0L), lessThan(Long.MAX_VALUE)));
         errorCollector.checkThat(request.getCacheToken(), not(equalTo(0L)));
+        // Throw away per work item latency attributions because they are not deterministic in tests
+        // for valid comparison.
+        request = request.toBuilder().clearPerWorkItemLatencyAttributions().build();

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1213527658


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -969,6 +987,76 @@ protected void startThrottleTimer() {
       getWorkThrottleTimer.start();
     }
 
+    private class GetWorkTimingInfosTracker {
+      private final Map<State, Duration> getWorkStreamLatencies;
+
+      public GetWorkTimingInfosTracker() {
+        this.getWorkStreamLatencies = new EnumMap<>(State.class);
+      }
+
+      public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
+        // We want to record duration for each stage and also be reflective on total work item
+        // processing time. It can be tricky because timings of different
+        // StreamingGetWorkResponseChunks can be interleaved. Current strategy is to record the
+        // maximum duration in each stage across different chunks, this will allow us to identify
+        // the slow stage, but note the sum duration of each slowest stages may be larger than the
+        // duration from first chunk creation to last chunk reception by user worker.

Review Comment:
   Done. I think it would make sense to scale the work creation time as well so the total elapsed time would be [GET_WORK_CREATION_START, last chunk arrived in user worker].



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "scwhittle (via GitHub)" <gi...@apache.org>.
scwhittle commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1213150951


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -969,6 +987,76 @@ protected void startThrottleTimer() {
       getWorkThrottleTimer.start();
     }
 
+    private class GetWorkTimingInfosTracker {
+      private final Map<State, Duration> getWorkStreamLatencies;
+
+      public GetWorkTimingInfosTracker() {
+        this.getWorkStreamLatencies = new EnumMap<>(State.class);
+      }
+
+      public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
+        // We want to record duration for each stage and also be reflective on total work item
+        // processing time. It can be tricky because timings of different
+        // StreamingGetWorkResponseChunks can be interleaved. Current strategy is to record the
+        // maximum duration in each stage across different chunks, this will allow us to identify
+        // the slow stage, but note the sum duration of each slowest stages may be larger than the
+        // duration from first chunk creation to last chunk reception by user worker.

Review Comment:
   I was thinking to scale the times to transmit to user worker such that those portions of the latency attribution match the time it takes from generating work to assembling it on the user worker.
   
   So the transmit time elapsed would be something like [GET_WORK_CREATION_END time, last chunk arrived in user worker].
   Then we can scale the components of latency for transmitting (ie GET_WORK_IN_TRANSIT_TO_USER_WORKER and GET_WORK_IN_TRANSIT_TO_DISPATCHER) so that their sum equals the transmit time.
   
   I think it's worthwhile as we are looking into low-latency processing because it isn't necessarily guaranteed that user worker processing will take longer and it's confusing if the total of all the latencies could be larger than the actual total time to process.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on PR #26085:
URL: https://github.com/apache/beam/pull/26085#issuecomment-1496295509

   R: @slavachernyak @scwhittle 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] scwhittle commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "scwhittle (via GitHub)" <gi...@apache.org>.
scwhittle commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1182388354


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -969,6 +987,76 @@ protected void startThrottleTimer() {
       getWorkThrottleTimer.start();
     }
 
+    private class GetWorkTimingInfosTracker {

Review Comment:
   can you add a unit test for this?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -969,6 +987,76 @@ protected void startThrottleTimer() {
       getWorkThrottleTimer.start();
     }
 
+    private class GetWorkTimingInfosTracker {
+      private final Map<State, Duration> getWorkStreamLatencies;
+
+      public GetWorkTimingInfosTracker() {
+        this.getWorkStreamLatencies = new EnumMap<>(State.class);
+      }
+
+      public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
+        // We want to record duration for each stage and also be reflective on total work item
+        // processing time. It can be tricky because timings of different
+        // StreamingGetWorkResponseChunks can be interleaved. Current strategy is to record the
+        // maximum duration in each stage across different chunks, this will allow us to identify
+        // the slow stage, but note the sum duration of each slowest stages may be larger than the
+        // duration from first chunk creation to last chunk reception by user worker.
+        Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
+        for (GetWorkStreamTimingInfo info : infos) {
+          getWorkStreamTimings.putIfAbsent(
+              info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 1000));
+        }
+
+        for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
+          Event start = cell.getRowKey();
+          Event end = cell.getColumnKey();
+          State state = cell.getValue();
+          if (getWorkStreamTimings.containsKey(start) && getWorkStreamTimings.containsKey(end)) {
+            getWorkStreamLatencies.compute(
+                state,
+                (state_key, duration) -> {
+                  Duration newDuration =
+                      new Duration(getWorkStreamTimings.get(start), getWorkStreamTimings.get(end));
+                  if (duration == null) {
+                    return newDuration;
+                  }
+                  return newDuration.isLongerThan(duration) ? newDuration : duration;
+                });
+          }
+        }
+        if (getWorkStreamTimings.containsKey(Event.GET_WORK_RECEIVED_BY_DISPATCHER)) {
+          getWorkStreamLatencies.compute(
+              State.GET_WORK_IN_TRANSIT_TO_USER_WORKER,
+              (state_key, duration) -> {
+                Duration newDuration =
+                    new Duration(
+                        getWorkStreamTimings.get(Event.GET_WORK_RECEIVED_BY_DISPATCHER),
+                        Instant.now());
+                if (duration == null) {
+                  return newDuration;
+                }
+                return newDuration.isLongerThan(duration) ? newDuration : duration;
+              });
+        }
+      }
+
+      private List<LatencyAttribution> getLatencyAttributions() {
+        List<LatencyAttribution> latencyAttributions = new ArrayList<>();

Review Comment:
   could size based upon entries



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -969,6 +987,76 @@ protected void startThrottleTimer() {
       getWorkThrottleTimer.start();
     }
 
+    private class GetWorkTimingInfosTracker {
+      private final Map<State, Duration> getWorkStreamLatencies;
+
+      public GetWorkTimingInfosTracker() {
+        this.getWorkStreamLatencies = new EnumMap<>(State.class);
+      }
+
+      public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
+        // We want to record duration for each stage and also be reflective on total work item
+        // processing time. It can be tricky because timings of different
+        // StreamingGetWorkResponseChunks can be interleaved. Current strategy is to record the
+        // maximum duration in each stage across different chunks, this will allow us to identify
+        // the slow stage, but note the sum duration of each slowest stages may be larger than the
+        // duration from first chunk creation to last chunk reception by user worker.

Review Comment:
   I think that we might want to scale them perhaps?
   I think that it will be confusing to compare these latencies to latencies measured on the user worker which match real-time elapsing otherwise.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -969,6 +987,76 @@ protected void startThrottleTimer() {
       getWorkThrottleTimer.start();
     }
 
+    private class GetWorkTimingInfosTracker {
+      private final Map<State, Duration> getWorkStreamLatencies;
+
+      public GetWorkTimingInfosTracker() {
+        this.getWorkStreamLatencies = new EnumMap<>(State.class);
+      }
+
+      public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
+        // We want to record duration for each stage and also be reflective on total work item
+        // processing time. It can be tricky because timings of different
+        // StreamingGetWorkResponseChunks can be interleaved. Current strategy is to record the
+        // maximum duration in each stage across different chunks, this will allow us to identify
+        // the slow stage, but note the sum duration of each slowest stages may be larger than the
+        // duration from first chunk creation to last chunk reception by user worker.
+        Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
+        for (GetWorkStreamTimingInfo info : infos) {
+          getWorkStreamTimings.putIfAbsent(
+              info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 1000));
+        }
+
+        for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
+          Event start = cell.getRowKey();
+          Event end = cell.getColumnKey();
+          State state = cell.getValue();
+          if (getWorkStreamTimings.containsKey(start) && getWorkStreamTimings.containsKey(end)) {
+            getWorkStreamLatencies.compute(
+                state,
+                (state_key, duration) -> {
+                  Duration newDuration =
+                      new Duration(getWorkStreamTimings.get(start), getWorkStreamTimings.get(end));

Review Comment:
   nit: just call get above and do nullcheck instead of containsKey, avoids double lookups



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -969,6 +987,76 @@ protected void startThrottleTimer() {
       getWorkThrottleTimer.start();
     }
 
+    private class GetWorkTimingInfosTracker {
+      private final Map<State, Duration> getWorkStreamLatencies;
+
+      public GetWorkTimingInfosTracker() {
+        this.getWorkStreamLatencies = new EnumMap<>(State.class);
+      }
+
+      public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
+        // We want to record duration for each stage and also be reflective on total work item
+        // processing time. It can be tricky because timings of different
+        // StreamingGetWorkResponseChunks can be interleaved. Current strategy is to record the
+        // maximum duration in each stage across different chunks, this will allow us to identify
+        // the slow stage, but note the sum duration of each slowest stages may be larger than the
+        // duration from first chunk creation to last chunk reception by user worker.
+        Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
+        for (GetWorkStreamTimingInfo info : infos) {
+          getWorkStreamTimings.putIfAbsent(
+              info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 1000));
+        }
+
+        for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
+          Event start = cell.getRowKey();
+          Event end = cell.getColumnKey();
+          State state = cell.getValue();
+          if (getWorkStreamTimings.containsKey(start) && getWorkStreamTimings.containsKey(end)) {
+            getWorkStreamLatencies.compute(
+                state,
+                (state_key, duration) -> {
+                  Duration newDuration =
+                      new Duration(getWorkStreamTimings.get(start), getWorkStreamTimings.get(end));
+                  if (duration == null) {
+                    return newDuration;
+                  }
+                  return newDuration.isLongerThan(duration) ? newDuration : duration;
+                });
+          }
+        }
+        if (getWorkStreamTimings.containsKey(Event.GET_WORK_RECEIVED_BY_DISPATCHER)) {
+          getWorkStreamLatencies.compute(
+              State.GET_WORK_IN_TRANSIT_TO_USER_WORKER,
+              (state_key, duration) -> {
+                Duration newDuration =
+                    new Duration(
+                        getWorkStreamTimings.get(Event.GET_WORK_RECEIVED_BY_DISPATCHER),

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on PR #26085:
URL: https://github.com/apache/beam/pull/26085#issuecomment-1611824186

   gentle ping.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi merged pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi merged PR #26085:
URL: https://github.com/apache/beam/pull/26085


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on PR #26085:
URL: https://github.com/apache/beam/pull/26085#issuecomment-1518064603

   Run Java_Examples_Dataflow PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] y1chi commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

Posted by "y1chi (via GitHub)" <gi...@apache.org>.
y1chi commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1167036268


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -986,7 +987,11 @@ private void dispatchLoop() {
                 computationWork.getDependentRealtimeInputWatermark());
         for (final Windmill.WorkItem workItem : computationWork.getWorkList()) {
           scheduleWorkItem(
-              computationState, inputDataWatermark, synchronizedProcessingTime, workItem);
+              computationState,
+              inputDataWatermark,
+              synchronizedProcessingTime,
+              workItem,
+              /*getWorkStreamLatencies=*/ new ArrayList<>());

Review Comment:
   Changed to use a empty collection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org