You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2018/11/08 18:13:36 UTC

[beam] branch master updated: Tolerate duplicate counters from SDK in StreamingDataflowWorker. (#6951)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a21209  Tolerate duplicate counters from SDK in StreamingDataflowWorker. (#6951)
9a21209 is described below

commit 9a212095cb61589d90bfed871bb71a5810bc7e6c
Author: Raghu Angadi <ra...@apache.org>
AuthorDate: Thu Nov 8 10:13:27 2018 -0800

    Tolerate duplicate counters from SDK in StreamingDataflowWorker. (#6951)
---
 .../dataflow/worker/StreamingDataflowWorker.java   | 74 ++++++++++++++++++++--
 1 file changed, 69 insertions(+), 5 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 9329bb3..a72fbcc 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -36,6 +36,8 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
 import com.google.common.collect.EvictingQueue;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.MultimapBuilder;
 import com.google.common.graph.MutableNetwork;
 import com.google.common.net.HostAndPort;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -47,6 +49,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -1703,20 +1706,75 @@ public class StreamingDataflowWorker {
     }
   }
 
+  /**
+   * Returns key for a counter update. It is a String in case of legacy counter and
+   * CounterStructuredName in the case of a structured counter.
+   */
+  private Object getCounterUpdateKey(CounterUpdate counterUpdate) {
+    Object key = null;
+    if (counterUpdate.getNameAndKind() != null) {
+      key = counterUpdate.getNameAndKind().getName();
+    } else if (counterUpdate.getStructuredNameAndMetadata() != null) {
+      key = counterUpdate.getStructuredNameAndMetadata().getName();
+    }
+    checkArgument(key != null, "Could not find name for CounterUpdate: %s", counterUpdate);
+    return key;
+  }
+
   /** Sends counter updates to Dataflow backend. */
   private void sendWorkerUpdatesToDataflowService(
       CounterSet deltaCounters, CounterSet cumulativeCounters) throws IOException {
 
-    List<CounterUpdate> counterUpdates = new ArrayList<>();
+    List<CounterUpdate> counterUpdates = new ArrayList<>(128);
+
     if (publishCounters) {
       stageInfoMap.values().forEach(s -> counterUpdates.addAll(s.extractCounterUpdates()));
-
       counterUpdates.addAll(
           cumulativeCounters.extractUpdates(false, DataflowCounterUpdateExtractor.INSTANCE));
       counterUpdates.addAll(
           deltaCounters.extractModifiedDeltaUpdates(DataflowCounterUpdateExtractor.INSTANCE));
     }
 
+    // Handle duplicate counters from different stages. Store all the counters in a multi-map and
+    // send the counters that appear multiple times in separate RPCs. Same logical counter could
+    // appear in multiple stages if a step runs in multiple stages (as with flatten-unzipped stages)
+    // especially if the counter definition does not set execution_step_name.
+    ListMultimap<Object, CounterUpdate> counterMultimap =
+        MultimapBuilder.hashKeys(counterUpdates.size()).linkedListValues().build();
+    boolean hasDuplicates = false;
+
+    for (CounterUpdate c : counterUpdates) {
+      Object key = getCounterUpdateKey(c);
+      if (counterMultimap.containsKey(key)) {
+        hasDuplicates = true;
+      }
+      counterMultimap.put(key, c);
+    }
+
+    // Clears counterUpdates and enqueues unique counters from counterMultimap. If a counter
+    // appears more than once, one of them is extracted leaving the remaining in the map.
+    Runnable extractUniqueCounters =
+        () -> {
+          counterUpdates.clear();
+          for (Iterator<Object> iter = counterMultimap.keySet().iterator(); iter.hasNext(); ) {
+            List<CounterUpdate> counters = counterMultimap.get(iter.next());
+            counterUpdates.add(counters.get(0));
+            if (counters.size() == 1) {
+              // There is single value. Remove the entry through the iterator.
+              iter.remove();
+            } else {
+              // Otherwise remove the first value.
+              counters.remove(0);
+            }
+          }
+        };
+
+    if (hasDuplicates) {
+      extractUniqueCounters.run();
+    } else { // Common case: no duplicates. We can just send counterUpdates, empty the multimap.
+      counterMultimap.clear();
+    }
+
     List<Status> errors;
     synchronized (pendingFailuresToReport) {
       errors = new ArrayList<>(pendingFailuresToReport.size());
@@ -1734,10 +1792,16 @@ public class StreamingDataflowWorker {
             .setWorkItemId(WINDMILL_COUNTER_UPDATE_WORK_ID)
             .setErrors(errors)
             .setCounterUpdates(counterUpdates);
-    // We can't populate per-workitem fields like TotalThrottlerWaitTimeSeconds in WorkItemStatus.
-    // This work item does not represent any specific stage on DFE.
-
     workUnitClient.reportWorkItemStatus(workItemStatus);
+
+    // Send any counters appearing more than once in subsequent RPCs:
+    while (!counterMultimap.isEmpty()) {
+      extractUniqueCounters.run();
+      workUnitClient.reportWorkItemStatus(
+          new WorkItemStatus()
+              .setWorkItemId(WINDMILL_COUNTER_UPDATE_WORK_ID)
+              .setCounterUpdates(counterUpdates));
+    }
   }
 
   /**