You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2018/11/08 18:13:36 UTC
[beam] branch master updated: Tolerate duplicate counters from SDK
in StreamingDataflowWorker. (#6951)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9a21209 Tolerate duplicate counters from SDK in StreamingDataflowWorker. (#6951)
9a21209 is described below
commit 9a212095cb61589d90bfed871bb71a5810bc7e6c
Author: Raghu Angadi <ra...@apache.org>
AuthorDate: Thu Nov 8 10:13:27 2018 -0800
Tolerate duplicate counters from SDK in StreamingDataflowWorker. (#6951)
---
.../dataflow/worker/StreamingDataflowWorker.java | 74 ++++++++++++++++++++--
1 file changed, 69 insertions(+), 5 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 9329bb3..a72fbcc 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -36,6 +36,8 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.EvictingQueue;
import com.google.common.collect.Iterables;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.MultimapBuilder;
import com.google.common.graph.MutableNetwork;
import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.Uninterruptibles;
@@ -47,6 +49,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@@ -1703,20 +1706,75 @@ public class StreamingDataflowWorker {
}
}
+ /**
+ * Returns key for a counter update. It is a String in case of legacy counter and
+ * CounterStructuredName in the case of a structured counter.
+ */
+ private Object getCounterUpdateKey(CounterUpdate counterUpdate) {
+ Object key = null;
+ if (counterUpdate.getNameAndKind() != null) {
+ key = counterUpdate.getNameAndKind().getName();
+ } else if (counterUpdate.getStructuredNameAndMetadata() != null) {
+ key = counterUpdate.getStructuredNameAndMetadata().getName();
+ }
+ checkArgument(key != null, "Could not find name for CounterUpdate: %s", counterUpdate);
+ return key;
+ }
+
/** Sends counter updates to Dataflow backend. */
private void sendWorkerUpdatesToDataflowService(
CounterSet deltaCounters, CounterSet cumulativeCounters) throws IOException {
- List<CounterUpdate> counterUpdates = new ArrayList<>();
+ List<CounterUpdate> counterUpdates = new ArrayList<>(128);
+
if (publishCounters) {
stageInfoMap.values().forEach(s -> counterUpdates.addAll(s.extractCounterUpdates()));
-
counterUpdates.addAll(
cumulativeCounters.extractUpdates(false, DataflowCounterUpdateExtractor.INSTANCE));
counterUpdates.addAll(
deltaCounters.extractModifiedDeltaUpdates(DataflowCounterUpdateExtractor.INSTANCE));
}
+ // Handle duplicate counters from different stages. Store all the counters in a multi-map and
+ // send the counters that appear multiple times in separate RPCs. Same logical counter could
+ // appear in multiple stages if a step runs in multiple stages (as with flatten-unzipped stages)
+ // especially if the counter definition does not set execution_step_name.
+ ListMultimap<Object, CounterUpdate> counterMultimap =
+ MultimapBuilder.hashKeys(counterUpdates.size()).linkedListValues().build();
+ boolean hasDuplicates = false;
+
+ for (CounterUpdate c : counterUpdates) {
+ Object key = getCounterUpdateKey(c);
+ if (counterMultimap.containsKey(key)) {
+ hasDuplicates = true;
+ }
+ counterMultimap.put(key, c);
+ }
+
+ // Clears counterUpdates and enqueues unique counters from counterMultimap. If a counter
+ // appears more than once, one of them is extracted leaving the remaining in the map.
+ Runnable extractUniqueCounters =
+ () -> {
+ counterUpdates.clear();
+ for (Iterator<Object> iter = counterMultimap.keySet().iterator(); iter.hasNext(); ) {
+ List<CounterUpdate> counters = counterMultimap.get(iter.next());
+ counterUpdates.add(counters.get(0));
+ if (counters.size() == 1) {
+ // There is single value. Remove the entry through the iterator.
+ iter.remove();
+ } else {
+ // Otherwise remove the first value.
+ counters.remove(0);
+ }
+ }
+ };
+
+ if (hasDuplicates) {
+ extractUniqueCounters.run();
+ } else { // Common case: no duplicates. We can just send counterUpdates, empty the multimap.
+ counterMultimap.clear();
+ }
+
List<Status> errors;
synchronized (pendingFailuresToReport) {
errors = new ArrayList<>(pendingFailuresToReport.size());
@@ -1734,10 +1792,16 @@ public class StreamingDataflowWorker {
.setWorkItemId(WINDMILL_COUNTER_UPDATE_WORK_ID)
.setErrors(errors)
.setCounterUpdates(counterUpdates);
- // We can't populate per-workitem fields like TotalThrottlerWaitTimeSeconds in WorkItemStatus.
- // This work item does not represent any specific stage on DFE.
-
workUnitClient.reportWorkItemStatus(workItemStatus);
+
+ // Send any counters appearing more than once in subsequent RPCs:
+ while (!counterMultimap.isEmpty()) {
+ extractUniqueCounters.run();
+ workUnitClient.reportWorkItemStatus(
+ new WorkItemStatus()
+ .setWorkItemId(WINDMILL_COUNTER_UPDATE_WORK_ID)
+ .setCounterUpdates(counterUpdates));
+ }
}
/**