You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/11/12 00:14:19 UTC

[1/2] incubator-beam git commit: Use Avro serializer for Kafka checkpoint mark.

Repository: incubator-beam
Updated Branches:
  refs/heads/master b25131422 -> f0f4af581


Use Avro serializer for Kafka checkpoint mark.

This is more partable.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/937ac3b2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/937ac3b2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/937ac3b2

Branch: refs/heads/master
Commit: 937ac3b2ddc60fd9446440c9354139c6234cb625
Parents: b251314
Author: Raghu Angadi <ra...@google.com>
Authored: Tue Nov 8 07:08:32 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Nov 11 16:14:07 2016 -0800

----------------------------------------------------------------------
 .../beam/sdk/io/kafka/KafkaCheckpointMark.java  | 32 +++++++++++++-------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 18 ++++++-----
 2 files changed, 32 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/937ac3b2/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
index 4f9e96f..763a98a 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
@@ -20,19 +20,21 @@ package org.apache.beam.sdk.io.kafka;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
+
+import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.kafka.common.TopicPartition;
 
 /**
  * Checkpoint for an unbounded KafkaIO.Read. Consists of Kafka topic name, partition id,
  * and the latest offset consumed so far.
  */
-@DefaultCoder(SerializableCoder.class)
-public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark, Serializable {
+@DefaultCoder(AvroCoder.class)
+public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark {
+
+  private List<PartitionMark> partitions;
 
-  private final List<PartitionMark> partitions;
+  private KafkaCheckpointMark() {} // for Avro
 
   public KafkaCheckpointMark(List<PartitionMark> partitions) {
     this.partitions = partitions;
@@ -55,16 +57,24 @@ public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark, Seri
    * for a single partition.
    */
   public static class PartitionMark implements Serializable {
-    private final TopicPartition topicPartition;
-    private final long nextOffset;
+    private String topic;
+    private int partition;
+    private long nextOffset;
+
+    private PartitionMark() {} // for Avro
 
-    public PartitionMark(TopicPartition topicPartition, long offset) {
-      this.topicPartition = topicPartition;
+    public PartitionMark(String topic, int partition, long offset) {
+      this.topic = topic;
+      this.partition = partition;
       this.nextOffset = offset;
     }
 
-    public TopicPartition getTopicPartition() {
-      return topicPartition;
+    public String getTopic() {
+      return topic;
+    }
+
+    public int getPartition() {
+      return partition;
     }
 
     public long getNextOffset() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/937ac3b2/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 834104e..4212d59 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -49,11 +49,12 @@ import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.Read.Unbounded;
 import org.apache.beam.sdk.io.UnboundedSource;
@@ -721,7 +722,7 @@ public class KafkaIO {
 
     @Override
     public Coder<KafkaCheckpointMark> getCheckpointMarkCoder() {
-      return SerializableCoder.of(KafkaCheckpointMark.class);
+      return AvroCoder.of(KafkaCheckpointMark.class);
     }
 
     @Override
@@ -856,10 +857,11 @@ public class KafkaIO {
         for (int i = 0; i < source.assignedPartitions.size(); i++) {
           PartitionMark ckptMark = checkpointMark.getPartitions().get(i);
           TopicPartition assigned = source.assignedPartitions.get(i);
-
-          checkState(ckptMark.getTopicPartition().equals(assigned),
-              "checkpointed partition %s and assigned partition %s don't match",
-              ckptMark.getTopicPartition(), assigned);
+          TopicPartition partition = new TopicPartition(ckptMark.getTopic(),
+                                                        ckptMark.getPartition());
+          checkState(partition.equals(assigned),
+                     "checkpointed partition %s and assigned partition %s don't match",
+                     partition, assigned);
 
           partitionStates.get(i).nextOffset = ckptMark.getNextOffset();
         }
@@ -1084,7 +1086,9 @@ public class KafkaIO {
           Lists.transform(partitionStates,
               new Function<PartitionState, PartitionMark>() {
                 public PartitionMark apply(PartitionState p) {
-                  return new PartitionMark(p.topicPartition, p.nextOffset);
+                  return new PartitionMark(p.topicPartition.topic(),
+                                           p.topicPartition.partition(),
+                                           p.nextOffset);
                 }
               }
           )));


[2/2] incubator-beam git commit: Closes #1312

Posted by dh...@apache.org.
Closes #1312


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f0f4af58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f0f4af58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f0f4af58

Branch: refs/heads/master
Commit: f0f4af581f2cb6317ded367d4ddda35df94a7451
Parents: b251314 937ac3b
Author: Dan Halperin <dh...@google.com>
Authored: Fri Nov 11 16:14:15 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Nov 11 16:14:15 2016 -0800

----------------------------------------------------------------------
 .../beam/sdk/io/kafka/KafkaCheckpointMark.java  | 32 +++++++++++++-------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 18 ++++++-----
 2 files changed, 32 insertions(+), 18 deletions(-)
----------------------------------------------------------------------