You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/10/28 14:48:21 UTC

[47/50] incubator-beam git commit: Use a int Sequence instead of a Random UUID for Aggregator IDs

Use a int Sequence instead of a Random UUID for Aggregator IDs

Aggregator IDs are used to ensure that an Aggregator's identity is
consistent across synchronization barriers. This is only relevant when
constructing the map of Step -> Aggregator to enable querying, as the
DoFns represented within the graph may be serialized. The identity has
no impact on the interaction between the runner and aggregator, which is
the responsibility of the ProcessContext object and
setupDelegateAggregators.

UUID#randomUUID uses a shared SecureRandom to create the bytes of the
UUID; SecureRandom#nextBytes is a synchronized method, so regardless of
the underlying source of randomness, only one random UUID can be
generated at a time. Instead, use an atomically increasing int to
identify aggregators. This should be sufficient for user-created
aggregators, and system aggregators should not care about the id.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dd854b1a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dd854b1a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dd854b1a

Branch: refs/heads/apex-runner
Commit: dd854b1a71770b9b452361e0d92e018b65f1b3e8
Parents: f2ec824
Author: Thomas Groh <tg...@google.com>
Authored: Thu Oct 27 10:19:03 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Oct 27 16:28:32 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/DelegatingAggregator.java  | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd854b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
index d92bb71..e03d3b1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
@@ -22,7 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import com.google.common.base.MoreObjects;
 import java.io.Serializable;
 import java.util.Objects;
-import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 
 /**
@@ -37,7 +37,8 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
  */
 class DelegatingAggregator<AggInputT, AggOutputT>
     implements Aggregator<AggInputT, AggOutputT>, Serializable {
-  private final UUID id;
+  private static final AtomicInteger ID_GEN = new AtomicInteger();
+  private final int id;
 
   private final String name;
 
@@ -47,7 +48,7 @@ class DelegatingAggregator<AggInputT, AggOutputT>
 
   public DelegatingAggregator(String name,
       CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
-    this.id = UUID.randomUUID();
+    this.id = ID_GEN.getAndIncrement();
     this.name = checkNotNull(name, "name cannot be null");
     // Safe contravariant cast
     @SuppressWarnings("unchecked")