You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2017/10/17 15:17:42 UTC

[07/13] beam git commit: Input PCollection has the coder for elements. No need to ask for them explicitly.

Input PCollection has the coder for elements. No need to ask for them explicitly.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5767156f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5767156f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5767156f

Branch: refs/heads/master
Commit: 5767156fe7810a2c2156d9644687c5768bc75ad1
Parents: 99f5541
Author: Raghu Angadi <ra...@google.com>
Authored: Thu Jul 27 13:02:26 2017 -0700
Committer: Raghu Angadi <ra...@google.com>
Committed: Tue Oct 17 00:02:05 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 30 ++++----------------
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   |  2 --
 2 files changed, 6 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5767156f/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 654a137..5d50cf7 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -1437,8 +1437,6 @@ public class KafkaIO {
     abstract boolean isEOS();
     @Nullable abstract String getSinkGroupId();
     abstract int getNumShards();
-    @Nullable abstract Coder<K> getKeyCoder();
-    @Nullable abstract Coder<V> getValueCoder();
     @Nullable abstract
     SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> getConsumerFactoryFn();
 
@@ -1455,8 +1453,6 @@ public class KafkaIO {
       abstract Builder<K, V> setEOS(boolean eosEnabled);
       abstract Builder<K, V> setSinkGroupId(String sinkGroupId);
       abstract Builder<K, V> setNumShards(int numShards);
-      abstract Builder<K, V> setKeyCoder(Coder<K> keyCoder);
-      abstract Builder<K, V> setValueCoder(Coder<V> valueCoder);
       abstract Builder<K, V> setConsumerFactoryFn(
           SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> fn);
       abstract Write<K, V> build();
@@ -1530,14 +1526,6 @@ public class KafkaIO {
       return toBuilder().setNumShards(numShards).build();
     }
 
-    public Write<K, V> withKeyCoder(Coder<K> keyCoder) {
-      return toBuilder().setKeyCoder(keyCoder).build();
-    }
-
-    public Write<K, V> withValueCoder(Coder<V> valueCoder) {
-      return toBuilder().setValueCoder(valueCoder).build();
-    }
-
     public Write<K, V> withConsumerFactoryFn(
         SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> consumerFactoryFn) {
       return toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
@@ -1561,8 +1549,6 @@ public class KafkaIO {
       if (isEOS()) {
         EOSWrite.ensureEOSSupport();
         checkNotNull(getSinkGroupId(), "A group id is required for exactly-once sink");
-        checkNotNull(getKeyCoder(), "Key coder is required for exact-once sink");
-        checkNotNull(getValueCoder(), "Value coder is required for exactly-once sink");
 
         // TODO: Verify that the group_id does not have existing state stored on Kafka unless
         //       this is an upgrade. This avoids issues with simple mistake of reusing group_id
@@ -1857,9 +1843,7 @@ public class KafkaIO {
           .apply("Assign sequential ids", ParDo.of(new EOSSequencer<K, V>()))
           .apply("Persist ids", GroupByKey.<Integer, KV<Long, KV<K, V>>>create())
           .apply(String.format("Write to Kafka topic '%s'", spec.getTopic()),
-                 ParDo.of(new KafkaEOWriter<>(spec)));
-
-      // TODO: add metrics.
+                 ParDo.of(new KafkaEOWriter<>(spec, input.getCoder())));
     }
   }
 
@@ -1918,10 +1902,9 @@ public class KafkaIO {
     private final StateSpec<ValueState<Long>> minBufferedId = StateSpecs.value();
     @StateId(OUT_OF_ORDER_BUFFER)
     private final StateSpec<BagState<KV<Long, KV<K, V>>>> outOfOrderBuffer;
-    // A random id assigned to each shard.
-    // Helps with detecting when multiple jobs are mistakenly started with same 'groupId' used for
-    // storing state on Kafka side. This also include the case where a job is restarted with same
-    // groupId, but the state is not explicitly cleared.
+    // A random id assigned to each shard. Helps with detecting when multiple jobs are mistakenly
+    // started with same groupId used for storing state on Kafka side including the case where
+    // a job is restarted with same groupId, but the metadata from previous run is not removed.
     // Better to be safe and error out with a clear message.
     @StateId(WRITER_ID)
     private final StateSpec<ValueState<String>> writerIdSpec = StateSpecs.value();
@@ -1939,10 +1922,9 @@ public class KafkaIO {
     private final Counter elementsBuffered = Metrics.counter(METRIC_NAMESPACE, "elementsBuffered");
     private final Counter numTransactions = Metrics.counter(METRIC_NAMESPACE, "numTransactions");
 
-    KafkaEOWriter(Write<K, V> spec) {
+    KafkaEOWriter(Write<K, V> spec, Coder<KV<K, V>> elemCoder) {
       this.spec = spec;
-      this.outOfOrderBuffer = StateSpecs.bag(KvCoder.of(
-          BigEndianLongCoder.of(), KvCoder.of(spec.getKeyCoder(), spec.getValueCoder())));
+      this.outOfOrderBuffer = StateSpecs.bag(KvCoder.of(BigEndianLongCoder.of(), elemCoder));
     }
 
     @ProcessElement

http://git-wip-us.apache.org/repos/asf/beam/blob/5767156f/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 3a6c974..eaf30d6 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -743,8 +743,6 @@ public class KafkaIOTest {
             .withEOS()
             .withSinkGroupId("test")
             .withNumShards(1)
-            .withKeyCoder(BigEndianIntegerCoder.of())
-            .withValueCoder(BigEndianLongCoder.of())
             .withConsumerFactoryFn(new ConsumerFactoryFn(
               Lists.newArrayList(topic), 10, 10, OffsetResetStrategy.EARLIEST))
             .withProducerFactoryFn(new ProducerFactoryFn()));