You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2017/10/17 15:17:42 UTC
[07/13] beam git commit: Input PCollection has the coder for
elements. No need to ask for them explicitly.
Input PCollection has the coder for elements. No need to ask for them explicitly.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5767156f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5767156f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5767156f
Branch: refs/heads/master
Commit: 5767156fe7810a2c2156d9644687c5768bc75ad1
Parents: 99f5541
Author: Raghu Angadi <ra...@google.com>
Authored: Thu Jul 27 13:02:26 2017 -0700
Committer: Raghu Angadi <ra...@google.com>
Committed: Tue Oct 17 00:02:05 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 30 ++++----------------
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 2 --
2 files changed, 6 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5767156f/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 654a137..5d50cf7 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -1437,8 +1437,6 @@ public class KafkaIO {
abstract boolean isEOS();
@Nullable abstract String getSinkGroupId();
abstract int getNumShards();
- @Nullable abstract Coder<K> getKeyCoder();
- @Nullable abstract Coder<V> getValueCoder();
@Nullable abstract
SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> getConsumerFactoryFn();
@@ -1455,8 +1453,6 @@ public class KafkaIO {
abstract Builder<K, V> setEOS(boolean eosEnabled);
abstract Builder<K, V> setSinkGroupId(String sinkGroupId);
abstract Builder<K, V> setNumShards(int numShards);
- abstract Builder<K, V> setKeyCoder(Coder<K> keyCoder);
- abstract Builder<K, V> setValueCoder(Coder<V> valueCoder);
abstract Builder<K, V> setConsumerFactoryFn(
SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> fn);
abstract Write<K, V> build();
@@ -1530,14 +1526,6 @@ public class KafkaIO {
return toBuilder().setNumShards(numShards).build();
}
- public Write<K, V> withKeyCoder(Coder<K> keyCoder) {
- return toBuilder().setKeyCoder(keyCoder).build();
- }
-
- public Write<K, V> withValueCoder(Coder<V> valueCoder) {
- return toBuilder().setValueCoder(valueCoder).build();
- }
-
public Write<K, V> withConsumerFactoryFn(
SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> consumerFactoryFn) {
return toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
@@ -1561,8 +1549,6 @@ public class KafkaIO {
if (isEOS()) {
EOSWrite.ensureEOSSupport();
checkNotNull(getSinkGroupId(), "A group id is required for exactly-once sink");
- checkNotNull(getKeyCoder(), "Key coder is required for exact-once sink");
- checkNotNull(getValueCoder(), "Value coder is required for exactly-once sink");
// TODO: Verify that the group_id does not have existing state stored on Kafka unless
// this is an upgrade. This avoids issues with simple mistake of reusing group_id
@@ -1857,9 +1843,7 @@ public class KafkaIO {
.apply("Assign sequential ids", ParDo.of(new EOSSequencer<K, V>()))
.apply("Persist ids", GroupByKey.<Integer, KV<Long, KV<K, V>>>create())
.apply(String.format("Write to Kafka topic '%s'", spec.getTopic()),
- ParDo.of(new KafkaEOWriter<>(spec)));
-
- // TODO: add metrics.
+ ParDo.of(new KafkaEOWriter<>(spec, input.getCoder())));
}
}
@@ -1918,10 +1902,9 @@ public class KafkaIO {
private final StateSpec<ValueState<Long>> minBufferedId = StateSpecs.value();
@StateId(OUT_OF_ORDER_BUFFER)
private final StateSpec<BagState<KV<Long, KV<K, V>>>> outOfOrderBuffer;
- // A random id assigned to each shard.
- // Helps with detecting when multiple jobs are mistakenly started with same 'groupId' used for
- // storing state on Kafka side. This also include the case where a job is restarted with same
- // groupId, but the state is not explicitly cleared.
+ // A random id assigned to each shard. Helps with detecting when multiple jobs are mistakenly
+ // started with same groupId used for storing state on Kafka side including the case where
+ // a job is restarted with same groupId, but the metadata from previous run is not removed.
// Better to be safe and error out with a clear message.
@StateId(WRITER_ID)
private final StateSpec<ValueState<String>> writerIdSpec = StateSpecs.value();
@@ -1939,10 +1922,9 @@ public class KafkaIO {
private final Counter elementsBuffered = Metrics.counter(METRIC_NAMESPACE, "elementsBuffered");
private final Counter numTransactions = Metrics.counter(METRIC_NAMESPACE, "numTransactions");
- KafkaEOWriter(Write<K, V> spec) {
+ KafkaEOWriter(Write<K, V> spec, Coder<KV<K, V>> elemCoder) {
this.spec = spec;
- this.outOfOrderBuffer = StateSpecs.bag(KvCoder.of(
- BigEndianLongCoder.of(), KvCoder.of(spec.getKeyCoder(), spec.getValueCoder())));
+ this.outOfOrderBuffer = StateSpecs.bag(KvCoder.of(BigEndianLongCoder.of(), elemCoder));
}
@ProcessElement
http://git-wip-us.apache.org/repos/asf/beam/blob/5767156f/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 3a6c974..eaf30d6 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -743,8 +743,6 @@ public class KafkaIOTest {
.withEOS()
.withSinkGroupId("test")
.withNumShards(1)
- .withKeyCoder(BigEndianIntegerCoder.of())
- .withValueCoder(BigEndianLongCoder.of())
.withConsumerFactoryFn(new ConsumerFactoryFn(
Lists.newArrayList(topic), 10, 10, OffsetResetStrategy.EARLIEST))
.withProducerFactoryFn(new ProducerFactoryFn()));