You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2017/10/17 15:17:39 UTC

[04/13] beam git commit: 1) Add a note about metadata expiration. 2) Fetch number of partitions if numShards is not set.

1) Add a note about metadata expiration.
2) Fetch number of partitions if numShards is not set.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d9c95f0e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d9c95f0e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d9c95f0e

Branch: refs/heads/master
Commit: d9c95f0ea40be1b5d4fdef9e123321367186fc9d
Parents: 6bf618d
Author: Raghu Angadi <ra...@google.com>
Authored: Wed Jul 26 14:02:27 2017 -0700
Committer: Raghu Angadi <ra...@google.com>
Committed: Tue Oct 17 00:02:04 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 68 ++++++++++++--------
 1 file changed, 42 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d9c95f0e/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index cb1c287..9f5f493 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -55,6 +55,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -1558,8 +1559,9 @@ public class KafkaIO {
 
       if (isEOS()) {
         EOSWrite.ensureEOSSupport();
-        checkArgument(getSinkGroupId() != null,
-                      "withSinkGroupId() is required for exactly-once sink");
+        checkNotNull(getSinkGroupId(), "A group id is required for exactly-once sink");
+        checkNotNull(getKeyCoder(), "Key coder is required for exact-once sink");
+        checkNotNull(getValueCoder(), "Value coder is required for exactly-once sink");
 
         // TODO: Verify that the group_id does not have existing state stored on Kafka unless
         //       this is an upgrade. This avoids issues with simple mistake of reusing group_id
@@ -1833,13 +1835,23 @@ public class KafkaIO {
 
     @Override
     public PCollection<Void> expand(PCollection<KV<K, V>> input) {
+
+      int numShards = spec.getNumShards();
+      if (numShards <= 0) {
+        try (Consumer<?, ?> consumer = openConsumer(spec)) {
+          numShards = consumer.partitionsFor(spec.getTopic()).size();
+          LOG.info("Using {} shards for exactly-once, matching number of partitions for topic '{}'",
+                   numShards, spec.getTopic());
+        }
+      }
+      checkState(numShards > 0, "Could not set number of shards");
+
       return input
           .apply(Window.<KV<K, V>>into(new GlobalWindows()) // Everything into global window.
                      .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
                      .discardingFiredPanes())
-          .apply(String.format("Shuffle across %d shards", spec.getNumShards()),
-                 ParDo.of(new EOSReshard<K, V>(DateTimeUtils.currentTimeMillis(),
-                                               spec.getNumShards())))
+          .apply(String.format("Shuffle across %d shards", numShards),
+                 ParDo.of(new EOSReshard<K, V>(numShards)))
           .apply("Persist sharding", GroupByKey.<Integer, KV<K, V>>create())
           .apply("Assign sequential ids", ParDo.of(new EOSSequencer<K, V>()))
           .apply("Persist ids", GroupByKey.<Integer, KV<Long, KV<K, V>>>create())
@@ -1854,17 +1866,16 @@ public class KafkaIO {
    * Shuffle messages assigning each randomly to a shard.
    */
   private static class EOSReshard<K, V> extends DoFn<KV<K, V>, KV<Integer, KV<K, V>>> {
-    private final Random random;
     private final int numShards;
 
-    EOSReshard(long seed, int numShards) {
-      this.random = new Random(seed);
+    EOSReshard(int numShards) {
       this.numShards = numShards;
     }
 
     @ProcessElement
     public void processElement(ProcessContext ctx) {
-      ctx.output(KV.of(random.nextInt(numShards), ctx.element()));
+      int shard = ThreadLocalRandom.current().nextInt(numShards);
+      ctx.output(KV.of(shard, ctx.element()));
     }
   }
 
@@ -2093,14 +2104,15 @@ public class KafkaIO {
 
       void commitTxn(long lastRecordId) throws IOException {
         try {
-          // Store id in consumer group metadata for the partition
+          // Store id in consumer group metadata for the partition.
+          // NOTE: Kafka keeps this metadata for 24 hours since the last update. This limits
+          // how long the pipeline could be down before resuming it. It does not look like
+          // this TTL can be adjusted (asked about it on Kafka users list).
           producer.sendOffsetsToTransaction(
               ImmutableMap.of(new TopicPartition(spec.getTopic(), shard),
                               new OffsetAndMetadata(
-                                Long.MAX_VALUE, // So that consumer group does not expire.
-                                JSON_MAPPER.writeValueAsString(new ShardMetadata(lastRecordId,
-                                                                                 writerId)
-                                ))),
+                                0L, JSON_MAPPER.writeValueAsString(new ShardMetadata(lastRecordId,
+                                                                                     writerId)))),
               spec.getSinkGroupId());
           producer.commitTransaction();
 
@@ -2147,20 +2159,10 @@ public class KafkaIO {
       // Fetch latest committed metadata for the partition (if any). Checks committed sequence ids.
       try {
 
-        Consumer<?, ?> consumer = spec.getConsumerFactoryFn().apply((ImmutableMap.of(
-            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, spec
-                .getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
-            ConsumerConfig.GROUP_ID_CONFIG, spec.getSinkGroupId(),
-            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class,
-            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class
-        )));
-
         OffsetAndMetadata committed;
-        try {
-          committed = consumer.committed(new TopicPartition(spec.getTopic(), shard));
 
-        } finally {
-          consumer.close();
+        try (Consumer<?, ?> consumer = openConsumer(spec)) {
+          committed = consumer.committed(new TopicPartition(spec.getTopic(), shard));
         }
 
         long committedSeqId = -1;
@@ -2236,6 +2238,20 @@ public class KafkaIO {
     }
   }
 
+  /**
+   * Opens a generic consumer that is mainly meant for metadata operations like fetching
+   * number of partitions for a topic rather than for fetching messages.
+   */
+  private static Consumer<?, ?> openConsumer(Write<?, ?> spec) {
+    return spec.getConsumerFactoryFn().apply((ImmutableMap.of(
+      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, spec
+        .getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
+      ConsumerConfig.GROUP_ID_CONFIG, spec.getSinkGroupId(),
+      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class,
+      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class
+    )));
+  }
+
   private static <K, V> Producer<K, V> initializeEosProducer(Write<K, V> spec,
                                                              String producerName) {