You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2017/10/17 15:17:39 UTC
[04/13] beam git commit: 1) Add a note about metadata expiration. 2)
Fetch number of partitions if numShards is not set.
1) Add a note about metadata expiration.
2) Fetch number of partitions if numShards is not set.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d9c95f0e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d9c95f0e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d9c95f0e
Branch: refs/heads/master
Commit: d9c95f0ea40be1b5d4fdef9e123321367186fc9d
Parents: 6bf618d
Author: Raghu Angadi <ra...@google.com>
Authored: Wed Jul 26 14:02:27 2017 -0700
Committer: Raghu Angadi <ra...@google.com>
Committed: Tue Oct 17 00:02:04 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 68 ++++++++++++--------
1 file changed, 42 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d9c95f0e/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index cb1c287..9f5f493 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -55,6 +55,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -1558,8 +1559,9 @@ public class KafkaIO {
if (isEOS()) {
EOSWrite.ensureEOSSupport();
- checkArgument(getSinkGroupId() != null,
- "withSinkGroupId() is required for exactly-once sink");
+ checkNotNull(getSinkGroupId(), "A group id is required for exactly-once sink");
+ checkNotNull(getKeyCoder(), "Key coder is required for exact-once sink");
+ checkNotNull(getValueCoder(), "Value coder is required for exactly-once sink");
// TODO: Verify that the group_id does not have existing state stored on Kafka unless
// this is an upgrade. This avoids issues with simple mistake of reusing group_id
@@ -1833,13 +1835,23 @@ public class KafkaIO {
@Override
public PCollection<Void> expand(PCollection<KV<K, V>> input) {
+
+ int numShards = spec.getNumShards();
+ if (numShards <= 0) {
+ try (Consumer<?, ?> consumer = openConsumer(spec)) {
+ numShards = consumer.partitionsFor(spec.getTopic()).size();
+ LOG.info("Using {} shards for exactly-once, matching number of partitions for topic '{}'",
+ numShards, spec.getTopic());
+ }
+ }
+ checkState(numShards > 0, "Could not set number of shards");
+
return input
.apply(Window.<KV<K, V>>into(new GlobalWindows()) // Everything into global window.
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.discardingFiredPanes())
- .apply(String.format("Shuffle across %d shards", spec.getNumShards()),
- ParDo.of(new EOSReshard<K, V>(DateTimeUtils.currentTimeMillis(),
- spec.getNumShards())))
+ .apply(String.format("Shuffle across %d shards", numShards),
+ ParDo.of(new EOSReshard<K, V>(numShards)))
.apply("Persist sharding", GroupByKey.<Integer, KV<K, V>>create())
.apply("Assign sequential ids", ParDo.of(new EOSSequencer<K, V>()))
.apply("Persist ids", GroupByKey.<Integer, KV<Long, KV<K, V>>>create())
@@ -1854,17 +1866,16 @@ public class KafkaIO {
* Shuffle messages assigning each randomly to a shard.
*/
private static class EOSReshard<K, V> extends DoFn<KV<K, V>, KV<Integer, KV<K, V>>> {
- private final Random random;
private final int numShards;
- EOSReshard(long seed, int numShards) {
- this.random = new Random(seed);
+ EOSReshard(int numShards) {
this.numShards = numShards;
}
@ProcessElement
public void processElement(ProcessContext ctx) {
- ctx.output(KV.of(random.nextInt(numShards), ctx.element()));
+ int shard = ThreadLocalRandom.current().nextInt(numShards);
+ ctx.output(KV.of(shard, ctx.element()));
}
}
@@ -2093,14 +2104,15 @@ public class KafkaIO {
void commitTxn(long lastRecordId) throws IOException {
try {
- // Store id in consumer group metadata for the partition
+ // Store id in consumer group metadata for the partition.
+ // NOTE: Kafka keeps this metadata for 24 hours since the last update. This limits
+ // how long the pipeline could be down before resuming it. It does not look like
+ // this TTL can be adjusted (asked about it on Kafka users list).
producer.sendOffsetsToTransaction(
ImmutableMap.of(new TopicPartition(spec.getTopic(), shard),
new OffsetAndMetadata(
- Long.MAX_VALUE, // So that consumer group does not expire.
- JSON_MAPPER.writeValueAsString(new ShardMetadata(lastRecordId,
- writerId)
- ))),
+ 0L, JSON_MAPPER.writeValueAsString(new ShardMetadata(lastRecordId,
+ writerId)))),
spec.getSinkGroupId());
producer.commitTransaction();
@@ -2147,20 +2159,10 @@ public class KafkaIO {
// Fetch latest committed metadata for the partition (if any). Checks committed sequence ids.
try {
- Consumer<?, ?> consumer = spec.getConsumerFactoryFn().apply((ImmutableMap.of(
- ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, spec
- .getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
- ConsumerConfig.GROUP_ID_CONFIG, spec.getSinkGroupId(),
- ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class,
- ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class
- )));
-
OffsetAndMetadata committed;
- try {
- committed = consumer.committed(new TopicPartition(spec.getTopic(), shard));
- } finally {
- consumer.close();
+ try (Consumer<?, ?> consumer = openConsumer(spec)) {
+ committed = consumer.committed(new TopicPartition(spec.getTopic(), shard));
}
long committedSeqId = -1;
@@ -2236,6 +2238,20 @@ public class KafkaIO {
}
}
+ /**
+ * Opens a generic consumer that is mainly meant for metadata operations like fetching
+ * number of partitions for a topic rather than for fetching messages.
+ */
+ private static Consumer<?, ?> openConsumer(Write<?, ?> spec) {
+ return spec.getConsumerFactoryFn().apply((ImmutableMap.of(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, spec
+ .getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
+ ConsumerConfig.GROUP_ID_CONFIG, spec.getSinkGroupId(),
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class,
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class
+ )));
+ }
+
private static <K, V> Producer<K, V> initializeEosProducer(Write<K, V> spec,
String producerName) {