You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2017/10/17 15:17:40 UTC

[05/13] beam git commit: assign a random shard id once in setup(), rather than each time in processElement().

assign a random shard id once in setup(), rather than each time in processElement().


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/236484b0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/236484b0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/236484b0

Branch: refs/heads/master
Commit: 236484b010a8e7b8bb6e6bc60c20ab7fced2b964
Parents: f4f6105
Author: Raghu Angadi <ra...@google.com>
Authored: Tue Aug 8 14:18:54 2017 -0700
Committer: Raghu Angadi <ra...@google.com>
Committed: Tue Oct 17 00:02:05 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/236484b0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 63dc734..78227a0 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -1866,15 +1866,21 @@ public class KafkaIO {
    */
   private static class EOSReshard<K, V> extends DoFn<KV<K, V>, KV<Integer, KV<K, V>>> {
     private final int numShards;
+    private transient int shardId;
 
     EOSReshard(int numShards) {
       this.numShards = numShards;
     }
 
+    @Setup
+    public void setup() {
+      shardId = ThreadLocalRandom.current().nextInt(numShards);
+    }
+
     @ProcessElement
     public void processElement(ProcessContext ctx) {
-      int shard = ThreadLocalRandom.current().nextInt(numShards);
-      ctx.output(KV.of(shard, ctx.element()));
+      shardId = (shardId + 1) % numShards; // round-robin among shards.
+      ctx.output(KV.of(shardId, ctx.element()));
     }
   }
 
@@ -2196,7 +2202,7 @@ public class KafkaIO {
             //
             throw new IllegalStateException(String.format(
               "Kafka metadata exists for shard %s, but there is no stored state for it. "
-              + "This mostly indicates groupId '%s' is already used else where or in earlier runs. "
+              + "This mostly indicates groupId '%s' is used else where or in earlier runs. "
               + "Try another group id. Metadata for this shard on Kafka : '%s'",
               shard, spec.getSinkGroupId(), committed.metadata()));
           }