You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2017/10/17 15:17:40 UTC
[05/13] beam git commit: assign a random shard id once in setup(),
rather than each time in processElement().
assign a random shard id once in setup(), rather than each time in processElement().
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/236484b0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/236484b0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/236484b0
Branch: refs/heads/master
Commit: 236484b010a8e7b8bb6e6bc60c20ab7fced2b964
Parents: f4f6105
Author: Raghu Angadi <ra...@google.com>
Authored: Tue Aug 8 14:18:54 2017 -0700
Committer: Raghu Angadi <ra...@google.com>
Committed: Tue Oct 17 00:02:05 2017 -0700
----------------------------------------------------------------------
.../main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 12 +++++++++---
1 file changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/236484b0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 63dc734..78227a0 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -1866,15 +1866,21 @@ public class KafkaIO {
*/
private static class EOSReshard<K, V> extends DoFn<KV<K, V>, KV<Integer, KV<K, V>>> {
private final int numShards;
+ private transient int shardId;
EOSReshard(int numShards) {
this.numShards = numShards;
}
+ @Setup
+ public void setup() {
+ shardId = ThreadLocalRandom.current().nextInt(numShards);
+ }
+
@ProcessElement
public void processElement(ProcessContext ctx) {
- int shard = ThreadLocalRandom.current().nextInt(numShards);
- ctx.output(KV.of(shard, ctx.element()));
+ shardId = (shardId + 1) % numShards; // round-robin among shards.
+ ctx.output(KV.of(shardId, ctx.element()));
}
}
@@ -2196,7 +2202,7 @@ public class KafkaIO {
//
throw new IllegalStateException(String.format(
"Kafka metadata exists for shard %s, but there is no stored state for it. "
- + "This mostly indicates groupId '%s' is already used else where or in earlier runs. "
+ + "This mostly indicates groupId '%s' is used else where or in earlier runs. "
+ "Try another group id. Metadata for this shard on Kafka : '%s'",
shard, spec.getSinkGroupId(), committed.metadata()));
}