You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/22 02:34:56 UTC

[2/5] incubator-beam git commit: Kafka: various fixes

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java b/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java
deleted file mode 100644
index b4bdc83..0000000
--- a/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.contrib.kafka;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder;
-import com.google.cloud.dataflow.sdk.io.Read;
-import com.google.cloud.dataflow.sdk.io.UnboundedSource;
-import com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
-import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
-import com.google.cloud.dataflow.sdk.testing.TestPipeline;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.Max;
-import com.google.cloud.dataflow.sdk.transforms.Min;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
-import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
-import com.google.cloud.dataflow.sdk.transforms.Values;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.MockConsumer;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * Tests of {@link KafkaSource}.
- */
-@RunWith(JUnit4.class)
-public class KafkaIOTest {
-  /*
-   * The tests below borrow code and structure from CountingSourceTest. In addition verifies
-   * the reader interleaves the records from multiple partitions.
-   *
-   * Other tests to consider :
-   *   - test KafkaRecordCoder
-   */
-
-  // Update mock consumer with records distributed among the given topics, each with given number
-  // of partitions. Records are assigned in round-robin order among the partitions.
-  private static MockConsumer<byte[], byte[]> mkMockConsumer(
-      List<String> topics, int partitionsPerTopic, int numElements) {
-
-    final List<TopicPartition> partitions = new ArrayList<>();
-    final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records = new HashMap<>();
-    Map<String, List<PartitionInfo>> partitionMap = new HashMap<>();
-
-    for (String topic : topics) {
-      List<PartitionInfo> partIds = new ArrayList<>(partitionsPerTopic);
-      for (int i = 0; i < partitionsPerTopic; i++) {
-        partitions.add(new TopicPartition(topic, i));
-        partIds.add(new PartitionInfo(topic, i, null, null, null));
-      }
-      partitionMap.put(topic, partIds);
-    }
-
-    int numPartitions = partitions.size();
-    long[] offsets = new long[numPartitions];
-
-    for (int i = 0; i < numElements; i++) {
-      int pIdx = i % numPartitions;
-      TopicPartition tp = partitions.get(pIdx);
-
-      if (!records.containsKey(tp)) {
-        records.put(tp, new ArrayList<ConsumerRecord<byte[], byte[]>>());
-      }
-      records.get(tp).add(
-          // Note: this interface has changed in 0.10. may get fixed before the release.
-          new ConsumerRecord<byte[], byte[]>(
-              tp.topic(),
-              tp.partition(),
-              offsets[pIdx]++,
-              null, // key
-              ByteBuffer.wrap(new byte[8]).putLong(i).array())); // value is 8 byte record id.
-    }
-
-    MockConsumer<byte[], byte[]> consumer =
-        new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
-          // override assign() to add records that belong to the assigned partitions.
-          public void assign(List<TopicPartition> assigned) {
-            super.assign(assigned);
-            for (TopicPartition tp : assigned) {
-              for (ConsumerRecord<byte[], byte[]> r : records.get(tp)) {
-                addRecord(r);
-              }
-              updateBeginningOffsets(ImmutableMap.of(tp, 0L));
-              updateEndOffsets(ImmutableMap.of(tp, (long)records.get(tp).size()));
-              seek(tp, 0);
-            }
-          }
-        };
-
-    for (String topic : topics) {
-      consumer.updatePartitions(topic, partitionMap.get(topic));
-    }
-
-    return consumer;
-  }
-
-  private static class ConsumerFactoryFn
-                implements SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> {
-    private final List<String> topics;
-    private final int partitionsPerTopic;
-    private final int numElements;
-
-    public ConsumerFactoryFn(List<String> topics, int partitionsPerTopic, int numElements) {
-      this.topics = topics;
-      this.partitionsPerTopic = partitionsPerTopic;
-      this.numElements = numElements;
-    }
-
-    public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
-      return mkMockConsumer(topics, partitionsPerTopic, numElements);
-    }
-  }
-
-  /**
-   * Creates a consumer with two topics, with 5 partitions each.
-   * numElements are (round-robin) assigned all the 10 partitions.
-   */
-  private static KafkaIO.TypedRead<byte[], Long> mkKafkaReadTransform(
-      int numElements,
-      @Nullable SerializableFunction<KV<byte[], Long>, Instant> timestampFn) {
-
-    List<String> topics = ImmutableList.of("topic_a", "topic_b");
-
-    KafkaIO.Read<byte[], Long> reader = KafkaIO.read()
-        .withBootstrapServers("none")
-        .withTopics(topics)
-        .withConsumerFactoryFn(new ConsumerFactoryFn(topics, 10, numElements)) // 20 partitions
-        .withValueCoder(BigEndianLongCoder.of())
-        .withMaxNumRecords(numElements);
-
-    if (timestampFn != null) {
-      return reader.withTimestampFn(timestampFn);
-    } else {
-      return reader;
-    }
-  }
-
-  private static class AssertMultipleOf implements SerializableFunction<Iterable<Long>, Void> {
-    private final int num;
-
-    public AssertMultipleOf(int num) {
-      this.num = num;
-    }
-
-    @Override
-    public Void apply(Iterable<Long> values) {
-      for (Long v : values) {
-        assertEquals(0, v % num);
-      }
-      return null;
-    }
-  }
-
-  public static void addCountingAsserts(PCollection<Long> input, long numElements) {
-    // Count == numElements
-    DataflowAssert
-      .thatSingleton(input.apply("Count", Count.<Long>globally()))
-      .isEqualTo(numElements);
-    // Unique count == numElements
-    DataflowAssert
-      .thatSingleton(input.apply(RemoveDuplicates.<Long>create())
-                          .apply("UniqueCount", Count.<Long>globally()))
-      .isEqualTo(numElements);
-    // Min == 0
-    DataflowAssert
-      .thatSingleton(input.apply("Min", Min.<Long>globally()))
-      .isEqualTo(0L);
-    // Max == numElements-1
-    DataflowAssert
-      .thatSingleton(input.apply("Max", Max.<Long>globally()))
-      .isEqualTo(numElements - 1);
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testUnboundedSource() {
-    Pipeline p = TestPipeline.create();
-    int numElements = 1000;
-
-    PCollection<Long> input = p
-        .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
-            .withoutMetadata())
-        .apply(Values.<Long>create());
-
-    addCountingAsserts(input, numElements);
-    p.run();
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testUnboundedSourceWithExplicitPartitions() {
-    Pipeline p = TestPipeline.create();
-    int numElements = 1000;
-
-    List<String> topics = ImmutableList.of("test");
-
-    KafkaIO.TypedRead<byte[], Long> reader = KafkaIO.read()
-        .withBootstrapServers("none")
-        .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5)))
-        .withConsumerFactoryFn(new ConsumerFactoryFn(topics, 10, numElements)) // 10 partitions
-        .withValueCoder(BigEndianLongCoder.of())
-        .withMaxNumRecords(numElements / 10);
-
-    PCollection<Long> input = p
-        .apply(reader.withoutMetadata())
-        .apply(Values.<Long>create());
-
-    // assert that every element is a multiple of 5.
-    DataflowAssert
-      .that(input)
-      .satisfies(new AssertMultipleOf(5));
-
-    DataflowAssert
-      .thatSingleton(input.apply(Count.<Long>globally()))
-      .isEqualTo(numElements / 10L);
-
-    p.run();
-  }
-
-  private static class ElementValueDiff extends DoFn<Long, Long> {
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      c.output(c.element() - c.timestamp().getMillis());
-    }
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testUnboundedSourceTimestamps() {
-    Pipeline p = TestPipeline.create();
-    int numElements = 1000;
-
-    PCollection<Long> input = p
-        .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata())
-        .apply(Values.<Long>create());
-
-    addCountingAsserts(input, numElements);
-
-    PCollection<Long> diffs = input
-        .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
-        .apply("RemoveDuplicateTimestamps", RemoveDuplicates.<Long>create());
-    // This assert also confirms that diffs only has one unique value.
-    DataflowAssert.thatSingleton(diffs).isEqualTo(0L);
-
-    p.run();
-  }
-
-  private static class RemoveKafkaMetadata<K, V> extends DoFn<KafkaRecord<K, V>, KV<K, V>> {
-    @Override
-    public void processElement(ProcessContext ctx) throws Exception {
-      ctx.output(ctx.element().getKV());
-    }
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testUnboundedSourceSplits() throws Exception {
-    Pipeline p = TestPipeline.create();
-    int numElements = 1000;
-    int numSplits = 10;
-
-    UnboundedSource<KafkaRecord<byte[], Long>, ?> initial =
-        mkKafkaReadTransform(numElements, null).makeSource();
-    List<? extends UnboundedSource<KafkaRecord<byte[], Long>, ?>> splits =
-        initial.generateInitialSplits(numSplits, p.getOptions());
-    assertEquals("Expected exact splitting", numSplits, splits.size());
-
-    long elementsPerSplit = numElements / numSplits;
-    assertEquals("Expected even splits", numElements, elementsPerSplit * numSplits);
-    PCollectionList<Long> pcollections = PCollectionList.empty(p);
-    for (int i = 0; i < splits.size(); ++i) {
-      pcollections = pcollections.and(
-          p.apply("split" + i, Read.from(splits.get(i)).withMaxNumRecords(elementsPerSplit))
-           .apply("Remove Metadata " + i, ParDo.of(new RemoveKafkaMetadata<byte[], Long>()))
-           .apply("collection " + i, Values.<Long>create()));
-    }
-    PCollection<Long> input = pcollections.apply(Flatten.<Long>pCollections());
-
-    addCountingAsserts(input, numElements);
-    p.run();
-  }
-
-  /**
-   * A timestamp function that uses the given value as the timestamp.
-   */
-  private static class ValueAsTimestampFn
-                       implements SerializableFunction<KV<byte[], Long>, Instant> {
-    @Override
-    public Instant apply(KV<byte[], Long> input) {
-      return new Instant(input.getValue());
-    }
-  }
-
-  @Test
-  public void testUnboundedSourceCheckpointMark() throws Exception {
-    int numElements = 85; // 85 to make sure some partitions have more records than other.
-
-    // create a single split:
-    UnboundedSource<KafkaRecord<byte[], Long>, KafkaCheckpointMark> source =
-        mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
-          .makeSource()
-          .generateInitialSplits(1, PipelineOptionsFactory.fromArgs(new String[0]).create())
-          .get(0);
-
-    UnboundedReader<KafkaRecord<byte[], Long>> reader = source.createReader(null, null);
-    final int numToSkip = 3;
-    // advance once:
-    assertTrue(reader.start());
-
-    // Advance the source numToSkip-1 elements and manually save state.
-    for (long l = 0; l < numToSkip - 1; ++l) {
-      assertTrue(reader.advance());
-    }
-
-    // Confirm that we get the expected element in sequence before checkpointing.
-
-    assertEquals(numToSkip - 1, (long) reader.getCurrent().getKV().getValue());
-    assertEquals(numToSkip - 1, reader.getCurrentTimestamp().getMillis());
-
-    // Checkpoint and restart, and confirm that the source continues correctly.
-    KafkaCheckpointMark mark = CoderUtils.clone(
-        source.getCheckpointMarkCoder(), (KafkaCheckpointMark) reader.getCheckpointMark());
-    reader = source.createReader(null, mark);
-    assertTrue(reader.start());
-
-    // Confirm that we get the next elements in sequence.
-    // This also confirms that Reader interleaves records from each partitions by the reader.
-    for (int i = numToSkip; i < numElements; i++) {
-      assertEquals(i, (long) reader.getCurrent().getKV().getValue());
-      assertEquals(i, reader.getCurrentTimestamp().getMillis());
-      reader.advance();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/sdks/java/io/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml
new file mode 100644
index 0000000..98a091d
--- /dev/null
+++ b/sdks/java/io/kafka/pom.xml
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>io-parent</artifactId>
+    <version>0.1.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>kafka</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: IO :: Kafka</name>
+  <description>Library to read Kafka topics.</description>
+  <packaging>jar</packaging>
+
+  <build>
+  <plugins>
+    <plugin>
+      <groupId>org.apache.maven.plugins</groupId>
+      <artifactId>maven-compiler-plugin</artifactId>
+    </plugin>
+
+    <plugin>
+      <groupId>org.apache.maven.plugins</groupId>
+      <artifactId>maven-checkstyle-plugin</artifactId>
+      <version>2.12</version>
+      <dependencies>
+        <dependency>
+          <groupId>com.puppycrawl.tools</groupId>
+          <artifactId>checkstyle</artifactId>
+          <version>6.6</version>
+        </dependency>
+      </dependencies>
+      <configuration>
+        <configLocation>../../checkstyle.xml</configLocation>
+        <consoleOutput>true</consoleOutput>
+        <failOnViolation>true</failOnViolation>
+      </configuration>
+      <executions>
+        <execution>
+          <goals>
+            <goal>check</goal>
+          </goals>
+        </execution>
+      </executions>
+    </plugin>
+  </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>java-sdk-all</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>[0.9,)</version>
+    </dependency>
+
+    <!-- test dependencies-->
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <version>${hamcrest.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <version>${slf4j.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
new file mode 100644
index 0000000..4b6b976
--- /dev/null
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.UnboundedSource;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Checkpoint for an unbounded KafkaIO.Read. Consists of Kafka topic name, partition id,
+ * and the latest offset consumed so far.
+ */
+@DefaultCoder(SerializableCoder.class)
+public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark, Serializable {
+
+  private final List<PartitionMark> partitions;
+
+  public KafkaCheckpointMark(List<PartitionMark> partitions) {
+    this.partitions = partitions;
+  }
+
+  public List<PartitionMark> getPartitions() {
+    return partitions;
+  }
+
+  @Override
+  public void finalizeCheckpoint() throws IOException {
+    /* nothing to do */
+
+    // We might want to support committing offset in Kafka for better resume point when the job
+    // is restarted (checkpoint is not available for job restarts).
+  }
+
+  /**
+   * A tuple to hold topic, partition, and offset that comprise the checkpoint
+   * for a single partition.
+   */
+  public static class PartitionMark implements Serializable {
+    private final TopicPartition topicPartition;
+    private final long offset;
+
+    public PartitionMark(TopicPartition topicPartition, long offset) {
+      this.topicPartition = topicPartition;
+      this.offset = offset;
+    }
+
+    public TopicPartition getTopicPartition() {
+      return topicPartition;
+    }
+
+    public long getOffset() {
+      return offset;
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
new file mode 100644
index 0000000..e605311
--- /dev/null
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -0,0 +1,1054 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.Read.Unbounded;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
+import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
+import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.ExposedByteArrayInputStream;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+/**
+ * An unbounded source for <a href="http://kafka.apache.org/">Kafka</a> topics. Kafka version 0.9
+ * and above are supported.
+ *
+ * <h3>Reading from Kafka topics</h3>
+ *
+ * <p>KafkaIO source returns unbounded collection of Kafka records as
+ * {@code PCollection<KafkaRecord<K, V>>}. A {@link KafkaRecord} includes basic
+ * metadata like topic-partition and offset, along with key and value associated with a Kafka
+ * record.
+ *
+ * <p>Although most applications consumer single topic, the source can be configured to consume
+ * multiple topics or even a specific set of {@link TopicPartition}s.
+ *
+ * <p> To configure a Kafka source, you must specify at the minimum Kafka <tt>bootstrapServers</tt>
+ * and one or more topics to consume. The following example illustrates various options for
+ * configuring the source :
+ *
+ * <pre>{@code
+ *
+ *  pipeline
+ *    .apply(KafkaIO.read()
+ *       .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *       .withTopics(ImmutableList.of("topic_a", "topic_b"))
+ *       // above two are required configuration. returns PCollection<KafkaRecord<byte[], byte[]>
+ *
+ *       // rest of the settings are optional :
+ *
+ *       // set a Coder for Key and Value (note the change to return type)
+ *       .withKeyCoder(BigEndianLongCoder.of()) // PCollection<KafkaRecord<Long, byte[]>
+ *       .withValueCoder(StringUtf8Coder.of())  // PCollection<KafkaRecord<Long, String>
+ *
+ *       // you can further customize KafkaConsumer used to read the records by adding more
+ *       // settings for ConsumerConfig. e.g :
+ *       .updateConsumerProperties(ImmutableMap.of("receive.buffer.bytes", 1024 * 1024))
+ *
+ *       // custom function for calculating record timestamp (default is processing time)
+ *       .withTimestampFn(new MyTypestampFunction())
+ *
+ *       // custom function for watermark (default is record timestamp)
+ *       .withWatermarkFn(new MyWatermarkFunction())
+ *
+ *       // finally, if you don't need Kafka metadata, you can drop it
+ *       .withoutMetadata() // PCollection<KV<Long, String>>
+ *    )
+ *    .apply(Values.<String>create()) // PCollection<String>
+ *     ...
+ * }</pre>
+ *
+ * <h3>Partition Assignment and Checkpointing</h3>
+ * The Kafka partitions are evenly distributed among splits (workers).
+ * Dataflow checkpointing is fully supported and
+ * each split can resume from previous checkpoint. See
+ * {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for more details on
+ * splits and checkpoint support.
+ *
+ * <p>When the pipeline starts for the first time without any checkpoint, the source starts
+ * consuming from the <em>latest</em> offsets. You can override this behavior to consume from the
+ * beginning by setting appropriate appropriate properties in {@link ConsumerConfig}, through
+ * {@link Read#updateConsumerProperties(Map)}.
+ *
+ * <h3>Advanced Kafka Configuration</h3>
+ * KafakIO allows setting most of the properties in {@link ConsumerConfig}. E.g. if you would like
+ * to enable offset <em>auto commit</em> (for external monitoring or other purposes), you can set
+ * <tt>"group.id"</tt>, <tt>"enable.auto.commit"</tt>, etc.
+ */
+public class KafkaIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);
+
+  private static class NowTimestampFn<T> implements SerializableFunction<T, Instant> {
+    @Override
+    public Instant apply(T input) {
+      return Instant.now();
+    }
+  }
+
+
+  /**
+   * Creates and uninitialized {@link Read} {@link PTransform}. Before use, basic Kafka
+   * configuration should set with {@link Read#withBootstrapServers(String)} and
+   * {@link Read#withTopics(List)}. Other optional settings include key and value coders,
+   * custom timestamp and watermark functions.
+   */
+  public static Read<byte[], byte[]> read() {
+    return new Read<byte[], byte[]>(
+        new ArrayList<String>(),
+        new ArrayList<TopicPartition>(),
+        ByteArrayCoder.of(),
+        ByteArrayCoder.of(),
+        Read.KAFKA_9_CONSUMER_FACTORY_FN,
+        Read.DEFAULT_CONSUMER_PROPERTIES,
+        Long.MAX_VALUE,
+        null);
+  }
+
+  /**
+   * A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for more
+   * information on usage and configuration.
+   */
+  public static class Read<K, V> extends TypedRead<K, V> {
+
+    /**
+     * Returns a new {@link Read} with Kafka consumer pointing to {@code bootstrapServers}.
+     */
+    public Read<K, V> withBootstrapServers(String bootstrapServers) {
+      return updateConsumerProperties(
+          ImmutableMap.<String, Object>of(
+              ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers));
+    }
+
+    /**
+     * Returns a new {@link Read} that reads from the topics. All the partitions are from each
+     * of the topics is read.
+     * See {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for description
+     * of how the partitions are distributed among the splits.
+     */
+    public Read<K, V> withTopics(List<String> topics) {
+      checkState(topicPartitions.isEmpty(), "Only topics or topicPartitions can be set, not both");
+
+      return new Read<K, V>(ImmutableList.copyOf(topics), topicPartitions, keyCoder, valueCoder,
+          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+    }
+
+    /**
+     * Returns a new {@link Read} that reads from the partitions. This allows reading only a subset
+     * of partitions for one or more topics when (if ever) needed.
+     * See {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for description
+     * of how the partitions are distributed among the splits.
+     */
+    public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
+      checkState(topics.isEmpty(), "Only topics or topicPartitions can be set, not both");
+
+      return new Read<K, V>(topics, ImmutableList.copyOf(topicPartitions), keyCoder, valueCoder,
+          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+    }
+
+    /**
+     * Returns a new {@link Read} with {@link Coder} for key bytes.
+     */
+    public <KeyT> Read<KeyT, V> withKeyCoder(Coder<KeyT> keyCoder) {
+      return new Read<KeyT, V>(topics, topicPartitions, keyCoder, valueCoder,
+          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+    }
+
+    /**
+     * Returns a new {@link Read} with {@link Coder} for value bytes.
+     */
+    public <ValueT> Read<K, ValueT> withValueCoder(Coder<ValueT> valueCoder) {
+      return new Read<K, ValueT>(topics, topicPartitions, keyCoder, valueCoder,
+          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+    }
+
+    /**
+     * A factory to create Kafka {@link Consumer} from consumer configuration.
+     * This is useful for supporting another version of Kafka consumer.
+     * Default is {@link KafkaConsumer}.
+     */
+    public Read<K, V> withConsumerFactoryFn(
+        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn) {
+      return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder,
+          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+    }
+
+    /**
+     * Update consumer configuration with new properties.
+     */
+    public Read<K, V> updateConsumerProperties(Map<String, Object> configUpdates) {
+      for (String key : configUpdates.keySet()) {
+        checkArgument(!IGNORED_CONSUMER_PROPERTIES.containsKey(key),
+            "No need to configure '%s'. %s", key, IGNORED_CONSUMER_PROPERTIES.get(key));
+      }
+
+      Map<String, Object> config = new HashMap<>(consumerConfig);
+      config.putAll(configUpdates);
+
+      return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder,
+          consumerFactoryFn, config, maxNumRecords, maxReadTime);
+    }
+
+    /**
+     * Similar to {@link org.apache.beam.sdk.io.Read.Unbounded#withMaxNumRecords(long)}.
+     * Mainly used for tests and demo applications.
+     */
+    public Read<K, V> withMaxNumRecords(long maxNumRecords) {
+      return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder,
+          consumerFactoryFn, consumerConfig, maxNumRecords, null);
+    }
+
+    /**
+     * Similar to
+     * {@link org.apache.beam.sdk.io.Read.Unbounded#withMaxReadTime(Duration)}.
+     * Mainly used for tests and demo
+     * applications.
+     */
+    public Read<K, V> withMaxReadTime(Duration maxReadTime) {
+      return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder,
+          consumerFactoryFn, consumerConfig, Long.MAX_VALUE, maxReadTime);
+    }
+
+    ///////////////////////////////////////////////////////////////////////////////////////
+
+    private Read(
+        List<String> topics,
+        List<TopicPartition> topicPartitions,
+        Coder<K> keyCoder,
+        Coder<V> valueCoder,
+        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn,
+        Map<String, Object> consumerConfig,
+        long maxNumRecords,
+        @Nullable Duration maxReadTime) {
+
+      super(topics, topicPartitions, keyCoder, valueCoder, null, null,
+          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+    }
+
+    /**
+     * A set of properties that are not required or don't make sense for our consumer.
+     */
+    private static final Map<String, String> IGNORED_CONSUMER_PROPERTIES = ImmutableMap.of(
+        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyDecoderFn instead",
+        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueDecoderFn instead"
+        // "group.id", "enable.auto.commit", "auto.commit.interval.ms" :
+        //     lets allow these, applications can have better resume point for restarts.
+        );
+
+    // set config defaults
+    private static final Map<String, Object> DEFAULT_CONSUMER_PROPERTIES =
+        ImmutableMap.<String, Object>of(
+            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(),
+            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(),
+
+            // Use large receive buffer. Once KAFKA-3135 is fixed, this _may_ not be required.
+            // with default value of of 32K, It takes multiple seconds between successful polls.
+            // All the consumer work is done inside poll(), with smaller send buffer size, it
+            // takes many polls before a 1MB chunk from the server is fully read. In my testing
+            // about half of the time select() inside kafka consumer waited for 20-30ms, though
+            // the server had lots of data in tcp send buffers on its side. Compared to default,
+            // this setting increased throughput increased by many fold (3-4x).
+            ConsumerConfig.RECEIVE_BUFFER_CONFIG, 512 * 1024,
+
+            // default to latest offset when we are not resuming.
+            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest",
+            // disable auto commit of offsets. we don't require group_id. could be enabled by user.
+            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+    // default Kafka 0.9 Consumer supplier.
+    private static final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
+      KAFKA_9_CONSUMER_FACTORY_FN =
+        new SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>() {
+          public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
+            return new KafkaConsumer<>(config);
+          }
+        };
+  }
+
+  /**
+   * A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for more
+   * information on usage and configuration.
+   */
+  public static class TypedRead<K, V>
+                      extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
+
+    /**
+     * A function to assign a timestamp to a record. Default is processing timestamp.
+     */
+    public TypedRead<K, V> withTimestampFn2(
+        SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) {
+      checkNotNull(timestampFn);
+      return new TypedRead<K, V>(topics, topicPartitions, keyCoder, valueCoder,
+          timestampFn, watermarkFn, consumerFactoryFn, consumerConfig,
+          maxNumRecords, maxReadTime);
+    }
+
+    /**
+     * A function to calculate watermark after a record. Default is last record timestamp
+     * @see #withTimestampFn(SerializableFunction)
+     */
+    public TypedRead<K, V> withWatermarkFn2(
+        SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn) {
+      checkNotNull(watermarkFn);
+      return new TypedRead<K, V>(topics, topicPartitions, keyCoder, valueCoder,
+          timestampFn, watermarkFn, consumerFactoryFn, consumerConfig,
+          maxNumRecords, maxReadTime);
+    }
+
+    /**
+     * A function to assign a timestamp to a record. Default is processing timestamp.
+     */
+    public TypedRead<K, V> withTimestampFn(SerializableFunction<KV<K, V>, Instant> timestampFn) {
+      checkNotNull(timestampFn);
+      return withTimestampFn2(unwrapKafkaAndThen(timestampFn));
+    }
+
+    /**
+     * A function to calculate watermark after a record. Default is last record timestamp
+     * @see #withTimestampFn(SerializableFunction)
+     */
+    public TypedRead<K, V> withWatermarkFn(SerializableFunction<KV<K, V>, Instant> watermarkFn) {
+      checkNotNull(watermarkFn);
+      return withWatermarkFn2(unwrapKafkaAndThen(watermarkFn));
+    }
+
+    /**
+     * Returns a {@link PTransform} for PCollection of {@link KV}, dropping Kafka metatdata.
+     */
+    public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() {
+      return new TypedWithoutMetadata<K, V>(this);
+    }
+
+    @Override
+    public PCollection<KafkaRecord<K, V>> apply(PBegin input) {
+     // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set.
+      Unbounded<KafkaRecord<K, V>> unbounded =
+          org.apache.beam.sdk.io.Read.from(makeSource());
+
+      PTransform<PInput, PCollection<KafkaRecord<K, V>>> transform = unbounded;
+
+      if (maxNumRecords < Long.MAX_VALUE) {
+        transform = unbounded.withMaxNumRecords(maxNumRecords);
+      } else if (maxReadTime != null) {
+        transform = unbounded.withMaxReadTime(maxReadTime);
+      }
+
+      return input.getPipeline().apply(transform);
+    }
+
+    ////////////////////////////////////////////////////////////////////////////////////////
+
+    protected final List<String> topics;
+    protected final List<TopicPartition> topicPartitions; // mutually exclusive with topics
+    protected final Coder<K> keyCoder;
+    protected final Coder<V> valueCoder;
+    @Nullable protected final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn;
+    @Nullable protected final SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn;
+    protected final
+      SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn;
+    protected final Map<String, Object> consumerConfig;
+    protected final long maxNumRecords; // bounded read, mainly for testing
+    protected final Duration maxReadTime; // bounded read, mainly for testing
+
+    private TypedRead(List<String> topics,
+        List<TopicPartition> topicPartitions,
+        Coder<K> keyCoder,
+        Coder<V> valueCoder,
+        @Nullable SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn,
+        @Nullable SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn,
+        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn,
+        Map<String, Object> consumerConfig,
+        long maxNumRecords,
+        @Nullable Duration maxReadTime) {
+      super("KafkaIO.Read");
+
+      this.topics = topics;
+      this.topicPartitions = topicPartitions;
+      this.keyCoder = keyCoder;
+      this.valueCoder = valueCoder;
+      this.timestampFn = timestampFn;
+      this.watermarkFn = watermarkFn;
+      this.consumerFactoryFn = consumerFactoryFn;
+      this.consumerConfig = consumerConfig;
+      this.maxNumRecords = maxNumRecords;
+      this.maxReadTime = maxReadTime;
+    }
+
+    /**
+     * Creates an {@link UnboundedSource<KafkaRecord<K, V>, ?>} with the configuration in
+     * {@link TypedRead}. Primary use case is unit tests, should not be used in an
+     * application.
+     */
+    @VisibleForTesting
+    UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> makeSource() {
+      return new UnboundedKafkaSource<K, V>(
+          -1,
+          topics,
+          topicPartitions,
+          keyCoder,
+          valueCoder,
+          timestampFn,
+          Optional.fromNullable(watermarkFn),
+          consumerFactoryFn,
+          consumerConfig);
+    }
+
+    // utility method to convert KafkRecord<K, V> to user KV<K, V> before applying user functions
+    private static <KeyT, ValueT, OutT> SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT>
+      unwrapKafkaAndThen(final SerializableFunction<KV<KeyT, ValueT>, OutT> fn) {
+        return new SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT>() {
+          public OutT apply(KafkaRecord<KeyT, ValueT> record) {
+            return fn.apply(record.getKV());
+          }
+        };
+      }
+  }
+
+  /**
+   * A {@link PTransform} to read from Kafka topics. Similar to {@link KafkaIO.TypedRead}, but
+   * removes Kafka metatdata and returns a {@link PCollection} of {@link KV}.
+   * See {@link KafkaIO} for more information on usage and configuration of reader.
+   */
+  public static class TypedWithoutMetadata<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
+
+    private final TypedRead<K, V> typedRead;
+
+    TypedWithoutMetadata(TypedRead<K, V> read) {
+      super("KafkaIO.Read");
+      this.typedRead = read;
+    }
+
+    @Override
+    public PCollection<KV<K, V>> apply(PBegin begin) {
+      return typedRead
+          .apply(begin)
+          .apply("Remove Kafka Metadata",
+              ParDo.of(new DoFn<KafkaRecord<K, V>, KV<K, V>>() {
+                @Override
+                public void processElement(ProcessContext ctx) {
+                  ctx.output(ctx.element().getKV());
+                }
+              }));
+    }
+  }
+
+  /** Static class, prevent instantiation. */
+  private KafkaIO() {}
+
+  private static class UnboundedKafkaSource<K, V>
+      extends UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> {
+
+    private final int id; // split id, mainly for debugging
+    private final List<String> topics;
+    private final List<TopicPartition> assignedPartitions;
+    private final Coder<K> keyCoder;
+    private final Coder<V> valueCoder;
+    private final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn;
+    // would it be a good idea to pass currentTimestamp to watermarkFn?
+    private final Optional<SerializableFunction<KafkaRecord<K, V>, Instant>> watermarkFn;
+    private
+      SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn;
+    private final Map<String, Object> consumerConfig;
+
+    public UnboundedKafkaSource(
+        int id,
+        List<String> topics,
+        List<TopicPartition> assignedPartitions,
+        Coder<K> keyCoder,
+        Coder<V> valueCoder,
+        @Nullable SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn,
+        Optional<SerializableFunction<KafkaRecord<K, V>, Instant>> watermarkFn,
+        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn,
+        Map<String, Object> consumerConfig) {
+
+      this.id = id;
+      this.assignedPartitions = assignedPartitions;
+      this.topics = topics;
+      this.keyCoder = keyCoder;
+      this.valueCoder = valueCoder;
+      this.timestampFn =
+          (timestampFn == null ? new NowTimestampFn<KafkaRecord<K, V>>() : timestampFn);
+      this.watermarkFn = watermarkFn;
+      this.consumerFactoryFn = consumerFactoryFn;
+      this.consumerConfig = consumerConfig;
+    }
+
+    /**
+     * The partitions are evenly distributed among the splits. The number of splits returned is
+     * {@code min(desiredNumSplits, totalNumPartitions)}, though better not to depend on the exact
+     * count.
+     *
+     * <p> It is important to assign the partitions deterministically so that we can support
+     * resuming a split from last checkpoint. The Kafka partitions are sorted by
+     * {@code <topic, partition>} and then assigned to splits in round-robin order.
+     */
+    @Override
+    public List<UnboundedKafkaSource<K, V>> generateInitialSplits(
+        int desiredNumSplits, PipelineOptions options) throws Exception {
+
+      List<TopicPartition> partitions = new ArrayList<>(assignedPartitions);
+
+      // (a) fetch partitions for each topic
+      // (b) sort by <topic, partition>
+      // (c) round-robin assign the partitions to splits
+
+      if (partitions.isEmpty()) {
+        try (Consumer<?, ?> consumer = consumerFactoryFn.apply(consumerConfig)) {
+          for (String topic : topics) {
+            for (PartitionInfo p : consumer.partitionsFor(topic)) {
+              partitions.add(new TopicPartition(p.topic(), p.partition()));
+            }
+          }
+        }
+      }
+
+      Collections.sort(partitions, new Comparator<TopicPartition>() {
+        public int compare(TopicPartition tp1, TopicPartition tp2) {
+          return ComparisonChain
+              .start()
+              .compare(tp1.topic(), tp2.topic())
+              .compare(tp1.partition(), tp2.partition())
+              .result();
+        }
+      });
+
+      checkArgument(desiredNumSplits > 0);
+      checkState(partitions.size() > 0,
+          "Could not find any partitions. Please check Kafka configuration and topic names");
+
+      int numSplits = Math.min(desiredNumSplits, partitions.size());
+      List<List<TopicPartition>> assignments = new ArrayList<>(numSplits);
+
+      for (int i = 0; i < numSplits; i++) {
+        assignments.add(new ArrayList<TopicPartition>());
+      }
+      for (int i = 0; i < partitions.size(); i++) {
+        assignments.get(i % numSplits).add(partitions.get(i));
+      }
+
+      List<UnboundedKafkaSource<K, V>> result = new ArrayList<>(numSplits);
+
+      for (int i = 0; i < numSplits; i++) {
+        List<TopicPartition> assignedToSplit = assignments.get(i);
+
+        LOG.info("Partitions assigned to split {} (total {}): {}",
+            i, assignedToSplit.size(), Joiner.on(",").join(assignedToSplit));
+
+        result.add(new UnboundedKafkaSource<K, V>(
+            i,
+            this.topics,
+            assignedToSplit,
+            this.keyCoder,
+            this.valueCoder,
+            this.timestampFn,
+            this.watermarkFn,
+            this.consumerFactoryFn,
+            this.consumerConfig));
+      }
+
+      return result;
+    }
+
+    @Override
+    public UnboundedKafkaReader<K, V> createReader(PipelineOptions options,
+                                                   KafkaCheckpointMark checkpointMark) {
+      if (assignedPartitions.isEmpty()) {
+        LOG.warn("Looks like generateSplits() is not called. Generate single split.");
+        try {
+          return new UnboundedKafkaReader<K, V>(
+              generateInitialSplits(1, options).get(0), checkpointMark);
+        } catch (Exception e) {
+          Throwables.propagate(e);
+        }
+      }
+      return new UnboundedKafkaReader<K, V>(this, checkpointMark);
+    }
+
+    @Override
+    public Coder<KafkaCheckpointMark> getCheckpointMarkCoder() {
+      return SerializableCoder.of(KafkaCheckpointMark.class);
+    }
+
+    @Override
+    public boolean requiresDeduping() {
+      // Kafka records are ordered with in partitions. In addition checkpoint guarantees
+      // records are not consumed twice.
+      return false;
+    }
+
+    @Override
+    public void validate() {
+      checkNotNull(consumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
+          "Kafka bootstrap servers should be set");
+      checkArgument(topics.size() > 0 || assignedPartitions.size() > 0,
+          "Kafka topics or topic_partitions are required");
+    }
+
+    @Override
+    public Coder<KafkaRecord<K, V>> getDefaultOutputCoder() {
+      return KafkaRecordCoder.of(keyCoder, valueCoder);
+    }
+  }
+
+  private static class UnboundedKafkaReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> {
+
+    private final UnboundedKafkaSource<K, V> source;
+    private final String name;
+    private Consumer<byte[], byte[]> consumer;
+    private final List<PartitionState> partitionStates;
+    private KafkaRecord<K, V> curRecord;
+    private Instant curTimestamp;
+    private Iterator<PartitionState> curBatch = Collections.emptyIterator();
+
+    private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
+    // how long to wait for new records from kafka consumer inside advance()
+    private static final Duration NEW_RECORDS_POLL_TIMEOUT = Duration.millis(10);
+
+    // Use a separate thread to read Kafka messages. Kafka Consumer does all its work including
+    // network I/O inside poll(). Polling only inside #advance(), especially with a small timeout
+    // like 100 milliseconds does not work well. This along with large receive buffer for
+    // consumer achieved best throughput in tests (see `defaultConsumerProperties`).
+    private final ExecutorService consumerPollThread = Executors.newSingleThreadExecutor();
+    private final SynchronousQueue<ConsumerRecords<byte[], byte[]>> availableRecordsQueue =
+        new SynchronousQueue<>();
+    private volatile boolean closed = false;
+
+    // Backlog support :
+    // Kafka consumer does not have an API to fetch latest offset for topic. We need to seekToEnd()
+    // then look at position(). Use another consumer to do this so that the primary consumer does
+    // not need to be interrupted. The latest offsets are fetched periodically on another thread.
+    // This is still a hack. There could be unintended side effects, e.g. if user enabled offset
+    // auto commit in consumer config, this could interfere with the primary consumer (we will
+    // handle this particular problem). We might have to make this optional.
+    private Consumer<byte[], byte[]> offsetConsumer;
+    private final ScheduledExecutorService offsetFetcherThread =
+        Executors.newSingleThreadScheduledExecutor();
+    private static final int OFFSET_UPDATE_INTERVAL_SECONDS = 5;
+
+    /** watermark before any records have been read. */
+    private static Instant initialWatermark = new Instant(Long.MIN_VALUE);
+
+    public String toString() {
+      return name;
+    }
+
+    // maintains state of each assigned partition (buffered records, consumed offset, etc)
+    private static class PartitionState {
+      private final TopicPartition topicPartition;
+      private long consumedOffset;
+      private long latestOffset;
+      private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = Collections.emptyIterator();
+
+      // simple moving average for size of each record in bytes
+      private double avgRecordSize = 0;
+      private static final int movingAvgWindow = 1000; // very roughly avg of last 1000 elements
+
+
+      PartitionState(TopicPartition partition, long offset) {
+        this.topicPartition = partition;
+        this.consumedOffset = offset;
+        this.latestOffset = -1;
+      }
+
+      // update consumedOffset and avgRecordSize
+      void recordConsumed(long offset, int size) {
+        consumedOffset = offset;
+
+        // this is always updated from single thread. probably not worth making it an AtomicDouble
+        if (avgRecordSize <= 0) {
+          avgRecordSize = size;
+        } else {
+          // initially, first record heavily contributes to average.
+          avgRecordSize += ((size - avgRecordSize) / movingAvgWindow);
+        }
+      }
+
+      synchronized void setLatestOffset(long latestOffset) {
+        this.latestOffset = latestOffset;
+      }
+
+      synchronized long approxBacklogInBytes() {
+        // Note that is an an estimate of uncompressed backlog.
+        // Messages on Kafka might be comressed.
+        if (latestOffset < 0 || consumedOffset < 0) {
+          return UnboundedReader.BACKLOG_UNKNOWN;
+        }
+        if (latestOffset <= consumedOffset || consumedOffset < 0) {
+          return 0;
+        }
+        return (long) ((latestOffset - consumedOffset - 1) * avgRecordSize);
+      }
+    }
+
+    public UnboundedKafkaReader(
+        UnboundedKafkaSource<K, V> source,
+        @Nullable KafkaCheckpointMark checkpointMark) {
+
+      this.source = source;
+      this.name = "Reader-" + source.id;
+
+      partitionStates = ImmutableList.copyOf(Lists.transform(source.assignedPartitions,
+          new Function<TopicPartition, PartitionState>() {
+            public PartitionState apply(TopicPartition tp) {
+              return new PartitionState(tp, -1L);
+            }
+        }));
+
+      if (checkpointMark != null) {
+        // a) verify that assigned and check-pointed partitions match exactly
+        // b) set consumed offsets
+
+        checkState(checkpointMark.getPartitions().size() == source.assignedPartitions.size(),
+            "checkPointMark and assignedPartitions should match");
+        // we could consider allowing a mismatch, though it is not expected in current Dataflow
+
+        for (int i = 0; i < source.assignedPartitions.size(); i++) {
+          PartitionMark ckptMark = checkpointMark.getPartitions().get(i);
+          TopicPartition assigned = source.assignedPartitions.get(i);
+
+          checkState(ckptMark.getTopicPartition().equals(assigned),
+              "checkpointed partition %s and assigned partition %s don't match",
+              ckptMark.getTopicPartition(), assigned);
+
+          partitionStates.get(i).consumedOffset = ckptMark.getOffset();
+        }
+      }
+    }
+
+    private void consumerPollLoop() {
+      // Read in a loop and enqueue the batch of records, if any, to availableRecordsQueue
+      while (!closed) {
+        try {
+          ConsumerRecords<byte[], byte[]> records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
+          if (!records.isEmpty()) {
+            availableRecordsQueue.put(records); // blocks until dequeued.
+          }
+        } catch (InterruptedException e) {
+          LOG.warn("{}: consumer thread is interrupted", this, e); // not expected
+          break;
+        } catch (WakeupException e) {
+          break;
+        }
+      }
+
+      LOG.info("{}: Returning from consumer pool loop", this);
+    }
+
+    private void nextBatch() {
+      curBatch = Collections.emptyIterator();
+
+      ConsumerRecords<byte[], byte[]> records;
+      try {
+        records = availableRecordsQueue.poll(NEW_RECORDS_POLL_TIMEOUT.getMillis(),
+                                             TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        LOG.warn("{}: Unexpected", this, e);
+        return;
+      }
+
+      if (records == null) {
+        return;
+      }
+
+      List<PartitionState> nonEmpty = new LinkedList<>();
+
+      for (PartitionState p : partitionStates) {
+        p.recordIter = records.records(p.topicPartition).iterator();
+        if (p.recordIter.hasNext()) {
+          nonEmpty.add(p);
+        }
+      }
+
+      // cycle through the partitions in order to interleave records from each.
+      curBatch = Iterators.cycle(nonEmpty);
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      consumer = source.consumerFactoryFn.apply(source.consumerConfig);
+      consumer.assign(source.assignedPartitions);
+
+      // seek to consumedOffset + 1 if it is set
+      for (PartitionState p : partitionStates) {
+        if (p.consumedOffset >= 0) {
+          LOG.info("{}: resuming {} at {}", name, p.topicPartition, p.consumedOffset + 1);
+          consumer.seek(p.topicPartition, p.consumedOffset + 1);
+        } else {
+          LOG.info("{}: resuming {} at default offset", name, p.topicPartition);
+        }
+      }
+
+      // start consumer read loop.
+      // Note that consumer is not thread safe, should not accessed out side consumerPollLoop()
+      consumerPollThread.submit(
+          new Runnable() {
+            public void run() {
+              consumerPollLoop();
+            }
+          });
+
+      // offsetConsumer setup :
+
+      Object groupId = source.consumerConfig.get(ConsumerConfig.GROUP_ID_CONFIG);
+      // override group_id and disable auto_commit so that it does not interfere with main consumer
+      String offsetGroupId = String.format("%s_offset_consumer_%d_%s", name,
+          (new Random()).nextInt(Integer.MAX_VALUE), (groupId == null ? "none" : groupId));
+      Map<String, Object> offsetConsumerConfig = new HashMap<>(source.consumerConfig);
+      offsetConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, offsetGroupId);
+      offsetConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+      offsetConsumer = source.consumerFactoryFn.apply(offsetConsumerConfig);
+      offsetConsumer.assign(source.assignedPartitions);
+
+      offsetFetcherThread.scheduleAtFixedRate(
+          new Runnable() {
+            public void run() {
+              updateLatestOffsets();
+            }
+          }, 0, OFFSET_UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS);
+
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      /* Read first record (if any). we need to loop here because :
+       *  - (a) some records initially need to be skipped if they are before consumedOffset
+       *  - (b) if curBatch is empty, we want to fetch next batch and then advance.
+       *  - (c) curBatch is an iterator of iterators. we interleave the records from each.
+       *        curBatch.next() might return an empty iterator.
+       */
+      while (true) {
+        if (curBatch.hasNext()) {
+          PartitionState pState = curBatch.next();
+
+          if (!pState.recordIter.hasNext()) { // -- (c)
+            pState.recordIter = Collections.emptyIterator(); // drop ref
+            curBatch.remove();
+            continue;
+          }
+
+          ConsumerRecord<byte[], byte[]> rawRecord = pState.recordIter.next();
+          long consumed = pState.consumedOffset;
+          long offset = rawRecord.offset();
+
+          if (consumed >= 0 && offset <= consumed) { // -- (a)
+            // this can happen when compression is enabled in Kafka (seems to be fixed in 0.10)
+            // should we check if the offset is way off from consumedOffset (say > 1M)?
+            LOG.warn("{}: ignoring already consumed offset {} for {}",
+                this, offset, pState.topicPartition);
+            continue;
+          }
+
+          // sanity check
+          if (consumed >= 0 && (offset - consumed) != 1) {
+            LOG.warn("{}: gap in offsets for {} after {}. {} records missing.",
+                this, pState.topicPartition, consumed, offset - consumed - 1);
+          }
+
+          if (curRecord == null) {
+            LOG.info("{}: first record offset {}", name, offset);
+          }
+
+          curRecord = null; // user coders below might throw.
+
+          // apply user coders. might want to allow skipping records that fail to decode.
+          // TODO: wrap exceptions from coders to make explicit to users
+          KafkaRecord<K, V> record = new KafkaRecord<K, V>(
+              rawRecord.topic(),
+              rawRecord.partition(),
+              rawRecord.offset(),
+              decode(rawRecord.key(), source.keyCoder),
+              decode(rawRecord.value(), source.valueCoder));
+
+          curTimestamp = source.timestampFn.apply(record);
+          curRecord = record;
+
+          int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length) +
+              (rawRecord.value() == null ? 0 : rawRecord.value().length);
+          pState.recordConsumed(offset, recordSize);
+          return true;
+
+        } else { // -- (b)
+          nextBatch();
+
+          if (!curBatch.hasNext()) {
+            return false;
+          }
+        }
+      }
+    }
+
+    private static byte[] nullBytes = new byte[0];
+    private static <T> T decode(byte[] bytes, Coder<T> coder) throws IOException {
+      // If 'bytes' is null, use byte[0]. It is common for key in Kakfa record to be null.
+      // This makes it impossible for user to distinguish between zero length byte and null.
+      // Alternately, we could have a ByteArrayCoder that handles nulls, and use that for default
+      // coder.
+      byte[] toDecode = bytes == null ? nullBytes : bytes;
+      return coder.decode(new ExposedByteArrayInputStream(toDecode), Coder.Context.OUTER);
+    }
+
+    // update latest offset for each partition.
+    // called from offsetFetcher thread
+    private void updateLatestOffsets() {
+      for (PartitionState p : partitionStates) {
+        try {
+          offsetConsumer.seekToEnd(p.topicPartition);
+          long offset = offsetConsumer.position(p.topicPartition);
+          p.setLatestOffset(offset);;
+        } catch (Exception e) {
+          LOG.warn("{}: exception while fetching latest offsets. ignored.",  this, e);
+          p.setLatestOffset(-1L); // reset
+        }
+
+        LOG.debug("{}: latest offset update for {} : {} (consumed offset {}, avg record size {})",
+            this, p.topicPartition, p.latestOffset, p.consumedOffset, p.avgRecordSize);
+      }
+
+      LOG.debug("{}:  backlog {}", this, getSplitBacklogBytes());
+    }
+
+    @Override
+    public Instant getWatermark() {
+      if (curRecord == null) {
+        LOG.warn("{}: getWatermark() : no records have been read yet.", name);
+        return initialWatermark;
+      }
+
+      return source.watermarkFn.isPresent() ?
+          source.watermarkFn.get().apply(curRecord) : curTimestamp;
+    }
+
+    @Override
+    public CheckpointMark getCheckpointMark() {
+      return new KafkaCheckpointMark(ImmutableList.copyOf(// avoid lazy (consumedOffset can change)
+          Lists.transform(partitionStates,
+              new Function<PartitionState, PartitionMark>() {
+                public PartitionMark apply(PartitionState p) {
+                  return new PartitionMark(p.topicPartition, p.consumedOffset);
+                }
+              }
+          )));
+    }
+
+    @Override
+    public UnboundedSource<KafkaRecord<K, V>, ?> getCurrentSource() {
+      return source;
+    }
+
+    @Override
+    public KafkaRecord<K, V> getCurrent() throws NoSuchElementException {
+      // should we delay updating consumed offset till this point? Mostly not required.
+      return curRecord;
+    }
+
+    @Override
+    public Instant getCurrentTimestamp() throws NoSuchElementException {
+      return curTimestamp;
+    }
+
+
+    @Override
+    public long getSplitBacklogBytes() {
+      long backlogBytes = 0;
+
+      for (PartitionState p : partitionStates) {
+        long pBacklog = p.approxBacklogInBytes();
+        if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) {
+          return UnboundedReader.BACKLOG_UNKNOWN;
+        }
+        backlogBytes += pBacklog;
+      }
+
+      return backlogBytes;
+    }
+
+    @Override
+    public void close() throws IOException {
+      closed = true;
+      availableRecordsQueue.poll(); // drain unread batch, this unblocks consumer thread.
+      consumer.wakeup();
+      consumerPollThread.shutdown();
+      offsetFetcherThread.shutdown();
+      Closeables.close(offsetConsumer, true);
+      Closeables.close(consumer, true);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
new file mode 100644
index 0000000..76e688b
--- /dev/null
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import org.apache.beam.sdk.values.KV;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * KafkaRecord contains key and value of the record as well as metadata for the record (topic name,
+ * partition id, and offset).
+ */
+public class KafkaRecord<K, V> implements Serializable {
+
+  private final String topic;
+  private final int partition;
+  private final long offset;
+  private final KV<K, V> kv;
+
+  public KafkaRecord(
+      String topic,
+      int partition,
+      long offset,
+      K key,
+      V value) {
+    this(topic, partition, offset, KV.of(key, value));
+  }
+
+  public KafkaRecord(
+      String topic,
+      int partition,
+      long offset,
+      KV<K, V> kv) {
+
+    this.topic = topic;
+    this.partition = partition;
+    this.offset = offset;
+    this.kv = kv;
+  }
+
+  public String getTopic() {
+    return topic;
+  }
+
+  public int getPartition() {
+    return partition;
+  }
+
+  public long getOffset() {
+    return offset;
+  }
+
+  public KV<K, V> getKV() {
+    return kv;
+  }
+
+  @Override
+  public int hashCode() {
+    return Arrays.deepHashCode(new Object[]{topic, partition, offset, kv});
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof KafkaRecord) {
+      @SuppressWarnings("unchecked")
+      KafkaRecord<Object, Object> other = (KafkaRecord<Object, Object>) obj;
+      return topic.equals(other.topic)
+          && partition == other.partition
+          && offset == other.offset
+          && kv.equals(other.kv);
+    } else {
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
new file mode 100644
index 0000000..8a3e7f5
--- /dev/null
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.values.KV;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
+/**
+ * {@link Coder} for {@link KafkaRecord}.
+ */
+public class KafkaRecordCoder<K, V> extends StandardCoder<KafkaRecord<K, V>> {
+
+  private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
+  private static final VarLongCoder longCoder = VarLongCoder.of();
+  private static final VarIntCoder intCoder = VarIntCoder.of();
+
+  private final KvCoder<K, V> kvCoder;
+
+  @JsonCreator
+  public static KafkaRecordCoder<?, ?> of(@JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
+                                          List<Coder<?>> components) {
+    KvCoder<?, ?> kvCoder = KvCoder.of(components);
+    return of(kvCoder.getKeyCoder(), kvCoder.getValueCoder());
+  }
+
+  public static <K, V> KafkaRecordCoder<K, V> of(Coder<K> keyCoder, Coder<V> valueCoder) {
+    return new KafkaRecordCoder<K, V>(keyCoder, valueCoder);
+  }
+
+  public KafkaRecordCoder(Coder<K> keyCoder, Coder<V> valueCoder) {
+    this.kvCoder = KvCoder.of(keyCoder, valueCoder);
+  }
+
+  @Override
+  public void encode(KafkaRecord<K, V> value, OutputStream outStream, Context context)
+                         throws CoderException, IOException {
+    Context nested = context.nested();
+    stringCoder.encode(value.getTopic(), outStream, nested);
+    intCoder.encode(value.getPartition(), outStream, nested);
+    longCoder.encode(value.getOffset(), outStream, nested);
+    kvCoder.encode(value.getKV(), outStream, nested);
+  }
+
+  @Override
+  public KafkaRecord<K, V> decode(InputStream inStream, Context context)
+                                      throws CoderException, IOException {
+    Context nested = context.nested();
+    return new KafkaRecord<K, V>(
+        stringCoder.decode(inStream, nested),
+        intCoder.decode(inStream, nested),
+        longCoder.decode(inStream, nested),
+        kvCoder.decode(inStream, nested));
+  }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return kvCoder.getCoderArguments();
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    kvCoder.verifyDeterministic();
+  }
+
+  @Override
+  public boolean isRegisterByteSizeObserverCheap(KafkaRecord<K, V> value, Context context) {
+    return kvCoder.isRegisterByteSizeObserverCheap(value.getKV(), context);
+    //TODO : do we have to implement getEncodedSize()?
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Object structuralValue(KafkaRecord<K, V> value) throws Exception {
+    if (consistentWithEquals()) {
+      return value;
+    } else {
+      return new KafkaRecord<Object, Object>(
+          value.getTopic(),
+          value.getPartition(),
+          value.getOffset(),
+          (KV<Object, Object>) kvCoder.structuralValue(value.getKV()));
+    }
+  }
+
+  @Override
+  public boolean consistentWithEquals() {
+    return kvCoder.consistentWithEquals();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
new file mode 100644
index 0000000..96ffc98
--- /dev/null
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.Min;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.RemoveDuplicates;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+/**
+ * Tests of {@link KafkaSource}.
+ */
+@RunWith(JUnit4.class)
+public class KafkaIOTest {
+  /*
+   * The tests below borrow code and structure from CountingSourceTest. In addition verifies
+   * the reader interleaves the records from multiple partitions.
+   *
+   * Other tests to consider :
+   *   - test KafkaRecordCoder
+   */
+
+  // Update mock consumer with records distributed among the given topics, each with given number
+  // of partitions. Records are assigned in round-robin order among the partitions.
+  private static MockConsumer<byte[], byte[]> mkMockConsumer(
+      List<String> topics, int partitionsPerTopic, int numElements) {
+
+    final List<TopicPartition> partitions = new ArrayList<>();
+    final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records = new HashMap<>();
+    Map<String, List<PartitionInfo>> partitionMap = new HashMap<>();
+
+    for (String topic : topics) {
+      List<PartitionInfo> partIds = new ArrayList<>(partitionsPerTopic);
+      for (int i = 0; i < partitionsPerTopic; i++) {
+        partitions.add(new TopicPartition(topic, i));
+        partIds.add(new PartitionInfo(topic, i, null, null, null));
+      }
+      partitionMap.put(topic, partIds);
+    }
+
+    int numPartitions = partitions.size();
+    long[] offsets = new long[numPartitions];
+
+    for (int i = 0; i < numElements; i++) {
+      int pIdx = i % numPartitions;
+      TopicPartition tp = partitions.get(pIdx);
+
+      if (!records.containsKey(tp)) {
+        records.put(tp, new ArrayList<ConsumerRecord<byte[], byte[]>>());
+      }
+      records.get(tp).add(
+          // Note: this interface has changed in 0.10. may get fixed before the release.
+          new ConsumerRecord<byte[], byte[]>(
+              tp.topic(),
+              tp.partition(),
+              offsets[pIdx]++,
+              null, // key
+              ByteBuffer.wrap(new byte[8]).putLong(i).array())); // value is 8 byte record id.
+    }
+
+    MockConsumer<byte[], byte[]> consumer =
+        new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
+          // override assign() to add records that belong to the assigned partitions.
+          public void assign(List<TopicPartition> assigned) {
+            super.assign(assigned);
+            for (TopicPartition tp : assigned) {
+              for (ConsumerRecord<byte[], byte[]> r : records.get(tp)) {
+                addRecord(r);
+              }
+              updateBeginningOffsets(ImmutableMap.of(tp, 0L));
+              updateEndOffsets(ImmutableMap.of(tp, (long) records.get(tp).size()));
+              seek(tp, 0);
+            }
+          }
+        };
+
+    for (String topic : topics) {
+      consumer.updatePartitions(topic, partitionMap.get(topic));
+    }
+
+    return consumer;
+  }
+
+  private static class ConsumerFactoryFn
+                implements SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> {
+    private final List<String> topics;
+    private final int partitionsPerTopic;
+    private final int numElements;
+
+    public ConsumerFactoryFn(List<String> topics, int partitionsPerTopic, int numElements) {
+      this.topics = topics;
+      this.partitionsPerTopic = partitionsPerTopic;
+      this.numElements = numElements;
+    }
+
+    public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
+      return mkMockConsumer(topics, partitionsPerTopic, numElements);
+    }
+  }
+
+  /**
+   * Creates a consumer with two topics, with 5 partitions each.
+   * numElements are (round-robin) assigned all the 10 partitions.
+   */
+  private static KafkaIO.TypedRead<byte[], Long> mkKafkaReadTransform(
+      int numElements,
+      @Nullable SerializableFunction<KV<byte[], Long>, Instant> timestampFn) {
+
+    List<String> topics = ImmutableList.of("topic_a", "topic_b");
+
+    KafkaIO.Read<byte[], Long> reader = KafkaIO.read()
+        .withBootstrapServers("none")
+        .withTopics(topics)
+        .withConsumerFactoryFn(new ConsumerFactoryFn(topics, 10, numElements)) // 20 partitions
+        .withValueCoder(BigEndianLongCoder.of())
+        .withMaxNumRecords(numElements);
+
+    if (timestampFn != null) {
+      return reader.withTimestampFn(timestampFn);
+    } else {
+      return reader;
+    }
+  }
+
+  private static class AssertMultipleOf implements SerializableFunction<Iterable<Long>, Void> {
+    private final int num;
+
+    public AssertMultipleOf(int num) {
+      this.num = num;
+    }
+
+    @Override
+    public Void apply(Iterable<Long> values) {
+      for (Long v : values) {
+        assertEquals(0, v % num);
+      }
+      return null;
+    }
+  }
+
+  public static void addCountingAsserts(PCollection<Long> input, long numElements) {
+    // Count == numElements
+    PAssert
+      .thatSingleton(input.apply("Count", Count.<Long>globally()))
+      .isEqualTo(numElements);
+    // Unique count == numElements
+    PAssert
+      .thatSingleton(input.apply(RemoveDuplicates.<Long>create())
+                          .apply("UniqueCount", Count.<Long>globally()))
+      .isEqualTo(numElements);
+    // Min == 0
+    PAssert
+      .thatSingleton(input.apply("Min", Min.<Long>globally()))
+      .isEqualTo(0L);
+    // Max == numElements-1
+    PAssert
+      .thatSingleton(input.apply("Max", Max.<Long>globally()))
+      .isEqualTo(numElements - 1);
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testUnboundedSource() {
+    Pipeline p = TestPipeline.create();
+    int numElements = 1000;
+
+    PCollection<Long> input = p
+        .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
+            .withoutMetadata())
+        .apply(Values.<Long>create());
+
+    addCountingAsserts(input, numElements);
+    p.run();
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testUnboundedSourceWithExplicitPartitions() {
+    Pipeline p = TestPipeline.create();
+    int numElements = 1000;
+
+    List<String> topics = ImmutableList.of("test");
+
+    KafkaIO.TypedRead<byte[], Long> reader = KafkaIO.read()
+        .withBootstrapServers("none")
+        .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5)))
+        .withConsumerFactoryFn(new ConsumerFactoryFn(topics, 10, numElements)) // 10 partitions
+        .withValueCoder(BigEndianLongCoder.of())
+        .withMaxNumRecords(numElements / 10);
+
+    PCollection<Long> input = p
+        .apply(reader.withoutMetadata())
+        .apply(Values.<Long>create());
+
+    // assert that every element is a multiple of 5.
+    PAssert
+      .that(input)
+      .satisfies(new AssertMultipleOf(5));
+
+    PAssert
+      .thatSingleton(input.apply(Count.<Long>globally()))
+      .isEqualTo(numElements / 10L);
+
+    p.run();
+  }
+
+  private static class ElementValueDiff extends DoFn<Long, Long> {
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      c.output(c.element() - c.timestamp().getMillis());
+    }
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testUnboundedSourceTimestamps() {
+    Pipeline p = TestPipeline.create();
+    int numElements = 1000;
+
+    PCollection<Long> input = p
+        .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata())
+        .apply(Values.<Long>create());
+
+    addCountingAsserts(input, numElements);
+
+    PCollection<Long> diffs = input
+        .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
+        .apply("RemoveDuplicateTimestamps", RemoveDuplicates.<Long>create());
+    // This assert also confirms that diffs only has one unique value.
+    PAssert.thatSingleton(diffs).isEqualTo(0L);
+
+    p.run();
+  }
+
+  private static class RemoveKafkaMetadata<K, V> extends DoFn<KafkaRecord<K, V>, KV<K, V>> {
+    @Override
+    public void processElement(ProcessContext ctx) throws Exception {
+      ctx.output(ctx.element().getKV());
+    }
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testUnboundedSourceSplits() throws Exception {
+    Pipeline p = TestPipeline.create();
+    int numElements = 1000;
+    int numSplits = 10;
+
+    UnboundedSource<KafkaRecord<byte[], Long>, ?> initial =
+        mkKafkaReadTransform(numElements, null).makeSource();
+    List<? extends UnboundedSource<KafkaRecord<byte[], Long>, ?>> splits =
+        initial.generateInitialSplits(numSplits, p.getOptions());
+    assertEquals("Expected exact splitting", numSplits, splits.size());
+
+    long elementsPerSplit = numElements / numSplits;
+    assertEquals("Expected even splits", numElements, elementsPerSplit * numSplits);
+    PCollectionList<Long> pcollections = PCollectionList.empty(p);
+    for (int i = 0; i < splits.size(); ++i) {
+      pcollections = pcollections.and(
+          p.apply("split" + i, Read.from(splits.get(i)).withMaxNumRecords(elementsPerSplit))
+           .apply("Remove Metadata " + i, ParDo.of(new RemoveKafkaMetadata<byte[], Long>()))
+           .apply("collection " + i, Values.<Long>create()));
+    }
+    PCollection<Long> input = pcollections.apply(Flatten.<Long>pCollections());
+
+    addCountingAsserts(input, numElements);
+    p.run();
+  }
+
+  /**
+   * A timestamp function that uses the given value as the timestamp.
+   */
+  private static class ValueAsTimestampFn
+                       implements SerializableFunction<KV<byte[], Long>, Instant> {
+    @Override
+    public Instant apply(KV<byte[], Long> input) {
+      return new Instant(input.getValue());
+    }
+  }
+
+  @Test
+  public void testUnboundedSourceCheckpointMark() throws Exception {
+    int numElements = 85; // 85 to make sure some partitions have more records than other.
+
+    // create a single split:
+    UnboundedSource<KafkaRecord<byte[], Long>, KafkaCheckpointMark> source =
+        mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
+          .makeSource()
+          .generateInitialSplits(1, PipelineOptionsFactory.fromArgs(new String[0]).create())
+          .get(0);
+
+    UnboundedReader<KafkaRecord<byte[], Long>> reader = source.createReader(null, null);
+    final int numToSkip = 3;
+    // advance once:
+    assertTrue(reader.start());
+
+    // Advance the source numToSkip-1 elements and manually save state.
+    for (long l = 0; l < numToSkip - 1; ++l) {
+      assertTrue(reader.advance());
+    }
+
+    // Confirm that we get the expected element in sequence before checkpointing.
+
+    assertEquals(numToSkip - 1, (long) reader.getCurrent().getKV().getValue());
+    assertEquals(numToSkip - 1, reader.getCurrentTimestamp().getMillis());
+
+    // Checkpoint and restart, and confirm that the source continues correctly.
+    KafkaCheckpointMark mark = CoderUtils.clone(
+        source.getCheckpointMarkCoder(), (KafkaCheckpointMark) reader.getCheckpointMark());
+    reader = source.createReader(null, mark);
+    assertTrue(reader.start());
+
+    // Confirm that we get the next elements in sequence.
+    // This also confirms that Reader interleaves records from each partitions by the reader.
+    for (int i = numToSkip; i < numElements; i++) {
+      assertEquals(i, (long) reader.getCurrent().getKV().getValue());
+      assertEquals(i, reader.getCurrentTimestamp().getMillis());
+      reader.advance();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 75f192c..95d1f55 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -36,6 +36,7 @@
 
   <modules>
     <module>hdfs</module>
+    <module>kafka</module>
   </modules>
 
-</project>
\ No newline at end of file
+</project>