You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "y1chi (via GitHub)" <gi...@apache.org> on 2023/05/30 17:01:36 UTC

[GitHub] [beam] y1chi commented on a diff in pull request #26085: Populate getWorkStream latencies in dataflow streaming worker harness

y1chi commented on code in PR #26085:
URL: https://github.com/apache/beam/pull/26085#discussion_r1210569301


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -969,6 +987,76 @@ protected void startThrottleTimer() {
       getWorkThrottleTimer.start();
     }
 
+    private class GetWorkTimingInfosTracker {

Review Comment:
   Done.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -969,6 +987,76 @@ protected void startThrottleTimer() {
       getWorkThrottleTimer.start();
     }
 
+    private class GetWorkTimingInfosTracker {
+      private final Map<State, Duration> getWorkStreamLatencies;
+
+      public GetWorkTimingInfosTracker() {
+        this.getWorkStreamLatencies = new EnumMap<>(State.class);
+      }
+
+      public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
+        // We want to record duration for each stage and also be reflective on total work item
+        // processing time. It can be tricky because timings of different
+        // StreamingGetWorkResponseChunks can be interleaved. Current strategy is to record the
+        // maximum duration in each stage across different chunks, this will allow us to identify
+        // the slow stage, but note the sum duration of each slowest stages may be larger than the
+        // duration from first chunk creation to last chunk reception by user worker.
+        Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
+        for (GetWorkStreamTimingInfo info : infos) {
+          getWorkStreamTimings.putIfAbsent(
+              info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 1000));
+        }
+
+        for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
+          Event start = cell.getRowKey();
+          Event end = cell.getColumnKey();
+          State state = cell.getValue();
+          if (getWorkStreamTimings.containsKey(start) && getWorkStreamTimings.containsKey(end)) {
+            getWorkStreamLatencies.compute(
+                state,
+                (state_key, duration) -> {
+                  Duration newDuration =
+                      new Duration(getWorkStreamTimings.get(start), getWorkStreamTimings.get(end));
+                  if (duration == null) {
+                    return newDuration;
+                  }
+                  return newDuration.isLongerThan(duration) ? newDuration : duration;
+                });
+          }
+        }
+        if (getWorkStreamTimings.containsKey(Event.GET_WORK_RECEIVED_BY_DISPATCHER)) {
+          getWorkStreamLatencies.compute(
+              State.GET_WORK_IN_TRANSIT_TO_USER_WORKER,
+              (state_key, duration) -> {
+                Duration newDuration =
+                    new Duration(
+                        getWorkStreamTimings.get(Event.GET_WORK_RECEIVED_BY_DISPATCHER),
+                        Instant.now());
+                if (duration == null) {
+                  return newDuration;
+                }
+                return newDuration.isLongerThan(duration) ? newDuration : duration;
+              });
+        }
+      }
+
+      private List<LatencyAttribution> getLatencyAttributions() {
+        List<LatencyAttribution> latencyAttributions = new ArrayList<>();

Review Comment:
   Done.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java:
##########
@@ -969,6 +987,76 @@ protected void startThrottleTimer() {
       getWorkThrottleTimer.start();
     }
 
+    private class GetWorkTimingInfosTracker {
+      private final Map<State, Duration> getWorkStreamLatencies;
+
+      public GetWorkTimingInfosTracker() {
+        this.getWorkStreamLatencies = new EnumMap<>(State.class);
+      }
+
+      public void addTimingInfo(Collection<GetWorkStreamTimingInfo> infos) {
+        // We want to record duration for each stage and also be reflective on total work item
+        // processing time. It can be tricky because timings of different
+        // StreamingGetWorkResponseChunks can be interleaved. Current strategy is to record the
+        // maximum duration in each stage across different chunks, this will allow us to identify
+        // the slow stage, but note the sum duration of each slowest stages may be larger than the
+        // duration from first chunk creation to last chunk reception by user worker.
+        Map<Event, Instant> getWorkStreamTimings = new HashMap<>();
+        for (GetWorkStreamTimingInfo info : infos) {
+          getWorkStreamTimings.putIfAbsent(
+              info.getEvent(), Instant.ofEpochMilli(info.getTimestampUsec() / 1000));
+        }
+
+        for (Cell<Event, Event, State> cell : EVENT_STATE_TABLE.cellSet()) {
+          Event start = cell.getRowKey();
+          Event end = cell.getColumnKey();
+          State state = cell.getValue();
+          if (getWorkStreamTimings.containsKey(start) && getWorkStreamTimings.containsKey(end)) {
+            getWorkStreamLatencies.compute(
+                state,
+                (state_key, duration) -> {
+                  Duration newDuration =
+                      new Duration(getWorkStreamTimings.get(start), getWorkStreamTimings.get(end));

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org