You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/04/22 02:34:55 UTC

[1/5] incubator-beam git commit: Closes #142

Repository: incubator-beam
Updated Branches:
  refs/heads/master 4c01c7d30 -> 640e2588f


Closes #142


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/640e2588
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/640e2588
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/640e2588

Branch: refs/heads/master
Commit: 640e2588fc74c901ec1b3a56ce6b88e13b3444fc
Parents: 4c01c7d 9210660
Author: Dan Halperin <dh...@google.com>
Authored: Thu Apr 21 17:34:33 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Apr 21 17:34:33 2016 -0700

----------------------------------------------------------------------
 sdks/java/io/kafka/pom.xml                      |  104 ++
 .../beam/sdk/io/kafka/KafkaCheckpointMark.java  |   77 ++
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 1054 ++++++++++++++++++
 .../apache/beam/sdk/io/kafka/KafkaRecord.java   |   91 ++
 .../beam/sdk/io/kafka/KafkaRecordCoder.java     |  119 ++
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   |  380 +++++++
 sdks/java/io/pom.xml                            |    3 +-
 7 files changed, 1827 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[4/5] incubator-beam git commit: KafkaIO: unbounded source for reading from Apache Kafka

Posted by dh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7b175dff/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java b/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java
new file mode 100644
index 0000000..b4bdc83
--- /dev/null
+++ b/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java
@@ -0,0 +1,378 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.contrib.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder;
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.Flatten;
+import com.google.cloud.dataflow.sdk.transforms.Max;
+import com.google.cloud.dataflow.sdk.transforms.Min;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
+import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
+import com.google.cloud.dataflow.sdk.transforms.Values;
+import com.google.cloud.dataflow.sdk.util.CoderUtils;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionList;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+/**
+ * Tests of {@link KafkaSource}.
+ */
+@RunWith(JUnit4.class)
+public class KafkaIOTest {
+  /*
+   * The tests below borrow code and structure from CountingSourceTest. In addition verifies
+   * the reader interleaves the records from multiple partitions.
+   *
+   * Other tests to consider :
+   *   - test KafkaRecordCoder
+   */
+
+  // Update mock consumer with records distributed among the given topics, each with given number
+  // of partitions. Records are assigned in round-robin order among the partitions.
+  private static MockConsumer<byte[], byte[]> mkMockConsumer(
+      List<String> topics, int partitionsPerTopic, int numElements) {
+
+    final List<TopicPartition> partitions = new ArrayList<>();
+    final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records = new HashMap<>();
+    Map<String, List<PartitionInfo>> partitionMap = new HashMap<>();
+
+    for (String topic : topics) {
+      List<PartitionInfo> partIds = new ArrayList<>(partitionsPerTopic);
+      for (int i = 0; i < partitionsPerTopic; i++) {
+        partitions.add(new TopicPartition(topic, i));
+        partIds.add(new PartitionInfo(topic, i, null, null, null));
+      }
+      partitionMap.put(topic, partIds);
+    }
+
+    int numPartitions = partitions.size();
+    long[] offsets = new long[numPartitions];
+
+    for (int i = 0; i < numElements; i++) {
+      int pIdx = i % numPartitions;
+      TopicPartition tp = partitions.get(pIdx);
+
+      if (!records.containsKey(tp)) {
+        records.put(tp, new ArrayList<ConsumerRecord<byte[], byte[]>>());
+      }
+      records.get(tp).add(
+          // Note: this interface has changed in 0.10. may get fixed before the release.
+          new ConsumerRecord<byte[], byte[]>(
+              tp.topic(),
+              tp.partition(),
+              offsets[pIdx]++,
+              null, // key
+              ByteBuffer.wrap(new byte[8]).putLong(i).array())); // value is 8 byte record id.
+    }
+
+    MockConsumer<byte[], byte[]> consumer =
+        new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
+          // override assign() to add records that belong to the assigned partitions.
+          public void assign(List<TopicPartition> assigned) {
+            super.assign(assigned);
+            for (TopicPartition tp : assigned) {
+              for (ConsumerRecord<byte[], byte[]> r : records.get(tp)) {
+                addRecord(r);
+              }
+              updateBeginningOffsets(ImmutableMap.of(tp, 0L));
+              updateEndOffsets(ImmutableMap.of(tp, (long)records.get(tp).size()));
+              seek(tp, 0);
+            }
+          }
+        };
+
+    for (String topic : topics) {
+      consumer.updatePartitions(topic, partitionMap.get(topic));
+    }
+
+    return consumer;
+  }
+
+  private static class ConsumerFactoryFn
+                implements SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> {
+    private final List<String> topics;
+    private final int partitionsPerTopic;
+    private final int numElements;
+
+    public ConsumerFactoryFn(List<String> topics, int partitionsPerTopic, int numElements) {
+      this.topics = topics;
+      this.partitionsPerTopic = partitionsPerTopic;
+      this.numElements = numElements;
+    }
+
+    public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
+      return mkMockConsumer(topics, partitionsPerTopic, numElements);
+    }
+  }
+
+  /**
+   * Creates a consumer with two topics, with 5 partitions each.
+   * numElements are (round-robin) assigned all the 10 partitions.
+   */
+  private static KafkaIO.TypedRead<byte[], Long> mkKafkaReadTransform(
+      int numElements,
+      @Nullable SerializableFunction<KV<byte[], Long>, Instant> timestampFn) {
+
+    List<String> topics = ImmutableList.of("topic_a", "topic_b");
+
+    KafkaIO.Read<byte[], Long> reader = KafkaIO.read()
+        .withBootstrapServers("none")
+        .withTopics(topics)
+        .withConsumerFactoryFn(new ConsumerFactoryFn(topics, 10, numElements)) // 20 partitions
+        .withValueCoder(BigEndianLongCoder.of())
+        .withMaxNumRecords(numElements);
+
+    if (timestampFn != null) {
+      return reader.withTimestampFn(timestampFn);
+    } else {
+      return reader;
+    }
+  }
+
+  private static class AssertMultipleOf implements SerializableFunction<Iterable<Long>, Void> {
+    private final int num;
+
+    public AssertMultipleOf(int num) {
+      this.num = num;
+    }
+
+    @Override
+    public Void apply(Iterable<Long> values) {
+      for (Long v : values) {
+        assertEquals(0, v % num);
+      }
+      return null;
+    }
+  }
+
+  public static void addCountingAsserts(PCollection<Long> input, long numElements) {
+    // Count == numElements
+    DataflowAssert
+      .thatSingleton(input.apply("Count", Count.<Long>globally()))
+      .isEqualTo(numElements);
+    // Unique count == numElements
+    DataflowAssert
+      .thatSingleton(input.apply(RemoveDuplicates.<Long>create())
+                          .apply("UniqueCount", Count.<Long>globally()))
+      .isEqualTo(numElements);
+    // Min == 0
+    DataflowAssert
+      .thatSingleton(input.apply("Min", Min.<Long>globally()))
+      .isEqualTo(0L);
+    // Max == numElements-1
+    DataflowAssert
+      .thatSingleton(input.apply("Max", Max.<Long>globally()))
+      .isEqualTo(numElements - 1);
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testUnboundedSource() {
+    Pipeline p = TestPipeline.create();
+    int numElements = 1000;
+
+    PCollection<Long> input = p
+        .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
+            .withoutMetadata())
+        .apply(Values.<Long>create());
+
+    addCountingAsserts(input, numElements);
+    p.run();
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testUnboundedSourceWithExplicitPartitions() {
+    Pipeline p = TestPipeline.create();
+    int numElements = 1000;
+
+    List<String> topics = ImmutableList.of("test");
+
+    KafkaIO.TypedRead<byte[], Long> reader = KafkaIO.read()
+        .withBootstrapServers("none")
+        .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5)))
+        .withConsumerFactoryFn(new ConsumerFactoryFn(topics, 10, numElements)) // 10 partitions
+        .withValueCoder(BigEndianLongCoder.of())
+        .withMaxNumRecords(numElements / 10);
+
+    PCollection<Long> input = p
+        .apply(reader.withoutMetadata())
+        .apply(Values.<Long>create());
+
+    // assert that every element is a multiple of 5.
+    DataflowAssert
+      .that(input)
+      .satisfies(new AssertMultipleOf(5));
+
+    DataflowAssert
+      .thatSingleton(input.apply(Count.<Long>globally()))
+      .isEqualTo(numElements / 10L);
+
+    p.run();
+  }
+
+  private static class ElementValueDiff extends DoFn<Long, Long> {
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      c.output(c.element() - c.timestamp().getMillis());
+    }
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testUnboundedSourceTimestamps() {
+    Pipeline p = TestPipeline.create();
+    int numElements = 1000;
+
+    PCollection<Long> input = p
+        .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata())
+        .apply(Values.<Long>create());
+
+    addCountingAsserts(input, numElements);
+
+    PCollection<Long> diffs = input
+        .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
+        .apply("RemoveDuplicateTimestamps", RemoveDuplicates.<Long>create());
+    // This assert also confirms that diffs only has one unique value.
+    DataflowAssert.thatSingleton(diffs).isEqualTo(0L);
+
+    p.run();
+  }
+
+  private static class RemoveKafkaMetadata<K, V> extends DoFn<KafkaRecord<K, V>, KV<K, V>> {
+    @Override
+    public void processElement(ProcessContext ctx) throws Exception {
+      ctx.output(ctx.element().getKV());
+    }
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testUnboundedSourceSplits() throws Exception {
+    Pipeline p = TestPipeline.create();
+    int numElements = 1000;
+    int numSplits = 10;
+
+    UnboundedSource<KafkaRecord<byte[], Long>, ?> initial =
+        mkKafkaReadTransform(numElements, null).makeSource();
+    List<? extends UnboundedSource<KafkaRecord<byte[], Long>, ?>> splits =
+        initial.generateInitialSplits(numSplits, p.getOptions());
+    assertEquals("Expected exact splitting", numSplits, splits.size());
+
+    long elementsPerSplit = numElements / numSplits;
+    assertEquals("Expected even splits", numElements, elementsPerSplit * numSplits);
+    PCollectionList<Long> pcollections = PCollectionList.empty(p);
+    for (int i = 0; i < splits.size(); ++i) {
+      pcollections = pcollections.and(
+          p.apply("split" + i, Read.from(splits.get(i)).withMaxNumRecords(elementsPerSplit))
+           .apply("Remove Metadata " + i, ParDo.of(new RemoveKafkaMetadata<byte[], Long>()))
+           .apply("collection " + i, Values.<Long>create()));
+    }
+    PCollection<Long> input = pcollections.apply(Flatten.<Long>pCollections());
+
+    addCountingAsserts(input, numElements);
+    p.run();
+  }
+
+  /**
+   * A timestamp function that uses the given value as the timestamp.
+   */
+  private static class ValueAsTimestampFn
+                       implements SerializableFunction<KV<byte[], Long>, Instant> {
+    @Override
+    public Instant apply(KV<byte[], Long> input) {
+      return new Instant(input.getValue());
+    }
+  }
+
+  @Test
+  public void testUnboundedSourceCheckpointMark() throws Exception {
+    int numElements = 85; // 85 to make sure some partitions have more records than other.
+
+    // create a single split:
+    UnboundedSource<KafkaRecord<byte[], Long>, KafkaCheckpointMark> source =
+        mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
+          .makeSource()
+          .generateInitialSplits(1, PipelineOptionsFactory.fromArgs(new String[0]).create())
+          .get(0);
+
+    UnboundedReader<KafkaRecord<byte[], Long>> reader = source.createReader(null, null);
+    final int numToSkip = 3;
+    // advance once:
+    assertTrue(reader.start());
+
+    // Advance the source numToSkip-1 elements and manually save state.
+    for (long l = 0; l < numToSkip - 1; ++l) {
+      assertTrue(reader.advance());
+    }
+
+    // Confirm that we get the expected element in sequence before checkpointing.
+
+    assertEquals(numToSkip - 1, (long) reader.getCurrent().getKV().getValue());
+    assertEquals(numToSkip - 1, reader.getCurrentTimestamp().getMillis());
+
+    // Checkpoint and restart, and confirm that the source continues correctly.
+    KafkaCheckpointMark mark = CoderUtils.clone(
+        source.getCheckpointMarkCoder(), (KafkaCheckpointMark) reader.getCheckpointMark());
+    reader = source.createReader(null, mark);
+    assertTrue(reader.start());
+
+    // Confirm that we get the next elements in sequence.
+    // This also confirms that Reader interleaves records from each partitions by the reader.
+    for (int i = numToSkip; i < numElements; i++) {
+      assertEquals(i, (long) reader.getCurrent().getKV().getValue());
+      assertEquals(i, reader.getCurrentTimestamp().getMillis());
+      reader.advance();
+    }
+  }
+}


[3/5] incubator-beam git commit: Kafka: various fixes

Posted by dh...@apache.org.
Kafka: various fixes

*) package renaming
*) move it out of contrib to I/O module
*) get it running as part of the normal travis build,
   and remove specialization
*) remove examples, to be added in a separate PR.
      Either there should be an addition to an existing example to
   handle "multiple" streaming sources, or we should have this as an
   integration test. No need, however, for an example just to show
   how to use an I/O in a pipeline -- that's what Javadoc is for.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/92106605
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/92106605
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/92106605

Branch: refs/heads/master
Commit: 92106605eb91f49032c9e9dc7fdea9a6b8901671
Parents: 7b175df
Author: Dan Halperin <dh...@google.com>
Authored: Wed Apr 6 23:43:57 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Apr 21 17:34:33 2016 -0700

----------------------------------------------------------------------
 contrib/examples/kafka/pom.xml                  |  189 ----
 .../kafka/examples/TopHashtagsExample.java      |  265 -----
 contrib/kafka/pom.xml                           |  176 ---
 .../contrib/kafka/KafkaCheckpointMark.java      |   76 --
 .../cloud/dataflow/contrib/kafka/KafkaIO.java   | 1053 -----------------
 .../dataflow/contrib/kafka/KafkaRecord.java     |   84 --
 .../contrib/kafka/KafkaRecordCoder.java         |  118 --
 .../dataflow/contrib/kafka/KafkaIOTest.java     |  378 -------
 sdks/java/io/kafka/pom.xml                      |  104 ++
 .../beam/sdk/io/kafka/KafkaCheckpointMark.java  |   77 ++
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 1054 ++++++++++++++++++
 .../apache/beam/sdk/io/kafka/KafkaRecord.java   |   91 ++
 .../beam/sdk/io/kafka/KafkaRecordCoder.java     |  119 ++
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   |  380 +++++++
 sdks/java/io/pom.xml                            |    3 +-
 15 files changed, 1827 insertions(+), 2340 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/contrib/examples/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/examples/kafka/pom.xml b/contrib/examples/kafka/pom.xml
deleted file mode 100644
index 3355020..0000000
--- a/contrib/examples/kafka/pom.xml
+++ /dev/null
@@ -1,189 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-  ~ Copyright (C) 2015 Google Inc.
-  ~
-  ~ Licensed under the Apache License, Version 2.0 (the "License"); you may not
-  ~ use this file except in compliance with the License. You may obtain a copy of
-  ~ the License at
-  ~
-  ~ http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-  ~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-  ~ License for the specific language governing permissions and limitations under
-  ~ the License.
-  ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-
-  <groupId>com.google.cloud.dataflow</groupId>
-  <artifactId>google-cloud-dataflow-java-contrib-kafka-examples</artifactId>
-  <name>Google Cloud Dataflow Kafka Examples</name>
-  <description>Examples apps using Kafka Source in Google Cloud Dataflow</description>
-  <version>0.0.1-SNAPSHOT</version>
-  <packaging>jar</packaging>
-
-  <licenses>
-    <license>
-      <name>Apache License, Version 2.0</name>
-      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
-      <distribution>repo</distribution>
-    </license>
-  </licenses>
-
-  <properties>
-    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-  </properties>
-
-  <build>
-    <plugins>
-
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <version>3.2</version>
-        <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
-        </configuration>
-      </plugin>
-
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-checkstyle-plugin</artifactId>
-        <version>2.12</version>
-        <dependencies>
-          <dependency>
-            <groupId>com.puppycrawl.tools</groupId>
-            <artifactId>checkstyle</artifactId>
-            <version>6.6</version>
-          </dependency>
-        </dependencies>
-        <configuration>
-          <configLocation>../../../checkstyle.xml</configLocation>
-          <consoleOutput>true</consoleOutput>
-          <failOnViolation>true</failOnViolation>
-          <includeTestSourceDirectory>true</includeTestSourceDirectory>
-        </configuration>
-        <executions>
-          <execution>
-            <goals>
-              <goal>check</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-
-      <!-- Source plugin for generating source and test-source JARs. -->
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-source-plugin</artifactId>
-        <version>2.4</version>
-        <executions>
-          <execution>
-            <id>attach-sources</id>
-            <phase>compile</phase>
-            <goals>
-              <goal>jar</goal>
-            </goals>
-          </execution>
-          <execution>
-            <id>attach-test-sources</id>
-            <phase>test-compile</phase>
-            <goals>
-              <goal>test-jar</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-javadoc-plugin</artifactId>
-        <version>2.10.3</version>
-        <configuration>
-          <windowtitle>Google Cloud Dataflow Kafka Contrib</windowtitle>
-          <doctitle>Google Cloud Dataflow Kafka Contrib</doctitle>
-
-          <subpackages>com.google.cloud.dataflow.contrib.kafka</subpackages>
-          <use>false</use>
-          <bottom><![CDATA[<br>]]></bottom>
-
-          <offlineLinks>
-            <offlineLink>
-              <url>https://cloud.google.com/dataflow/java-sdk/JavaDoc/</url>
-              <location>${basedir}/../../javadoc/dataflow-sdk-docs</location>
-            </offlineLink>
-            <offlineLink>
-              <url>http://docs.guava-libraries.googlecode.com/git-history/release18/javadoc/</url>
-              <location>${basedir}/../../javadoc/guava-docs</location>
-            </offlineLink>
-          </offlineLinks>
-        </configuration>
-        <executions>
-          <execution>
-            <goals>
-              <goal>jar</goal>
-            </goals>
-            <phase>package</phase>
-          </execution>
-        </executions>
-      </plugin>
-
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-shade-plugin</artifactId>
-        <version>2.3</version>
-        <executions>
-          <execution>
-            <phase>package</phase>
-            <goals>
-              <goal>shade</goal>
-            </goals>
-            <configuration>
-              <finalName>${project.artifactId}-bundled-${project.version}</finalName>
-              <artifactSet>
-                <includes>
-                  <include>*:*</include>
-                </includes>
-              </artifactSet>
-              <filters>
-                <filter>
-                  <artifact>*:*</artifact>
-                  <excludes>
-                    <exclude>META-INF/*.SF</exclude>
-                    <exclude>META-INF/*.DSA</exclude>
-                    <exclude>META-INF/*.RSA</exclude>
-                  </excludes>
-                </filter>
-              </filters>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-
-    </plugins>
-  </build>
-
-  <dependencies>
-    <dependency>
-      <groupId>com.google.cloud.dataflow</groupId>
-      <artifactId>google-cloud-dataflow-java-contrib-kafka</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-      <version>1.7.7</version>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-jdk14</artifactId>
-      <version>1.7.7</version>
-      <scope>runtime</scope>
-    </dependency>
-  </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/contrib/examples/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/examples/TopHashtagsExample.java
----------------------------------------------------------------------
diff --git a/contrib/examples/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/examples/TopHashtagsExample.java b/contrib/examples/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/examples/TopHashtagsExample.java
deleted file mode 100644
index 2a575f1..0000000
--- a/contrib/examples/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/examples/TopHashtagsExample.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.contrib.kafka.examples;
-
-import com.google.cloud.dataflow.contrib.kafka.KafkaIO;
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.options.Default;
-import com.google.cloud.dataflow.sdk.options.Description;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.options.Validation.Required;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
-import com.google.cloud.dataflow.sdk.transforms.Top;
-import com.google.cloud.dataflow.sdk.transforms.Values;
-import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectWriter;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This Dataflow app show cases {@link KafkaIO}. The application reads from a Kafka topic
- * containing <a href="https://dev.twitter.com/overview/api/tweets">JSON Tweets</a>, calculates top
- * hashtags in 10 minute window. The results are written back to a Kafka topic.
- *
- * <pre>{@code
- * Usage:
- *   $ java -cp jar_with_dependencies.jar                                           \
- *          com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample     \
- *          --project=GCP_PROJECT                                                   \
- *          --stagingLocation=GS_STAGING_DIRECTORY                                  \
- *          --runner=BlockingDataflowPipelineRunner                                 \
- *          --bootstrapServers="kafka_server_1:9092"                                \
- *          --topics="sample_tweets_json"                                           \
- *          --outputTopic="top_hashtags"
- * }</pre>
- */
-public class TopHashtagsExample {
-
-  private static final Logger LOG = LoggerFactory.getLogger(TopHashtagsExample.class);
-
-  /**
-   * Options for the app.
-   */
-  public static interface Options extends PipelineOptions {
-    @Description("Sliding window length in minutes")
-    @Default.Integer(10)
-    Integer getSlidingWindowLengthMinutes();
-    void setSlidingWindowLengthMinutes(Integer value);
-
-    @Description("Trigger window interval in minutes")
-    @Default.Integer(1)
-    Integer getSlidingWindowIntervalMinutes();
-    void setSlidingWindowIntervalMinutes(Integer value);
-
-    @Description("Bootstrap Server(s) for Kafka")
-    @Required
-    String getBootstrapServers();
-    void setBootstrapServers(String servers);
-
-    @Description("One or more comma separated topics to read from")
-    @Required
-    List<String> getTopics();
-    void setTopics(List<String> topics);
-
-    @Description("Number of Top Hashtags to track")
-    @Default.Integer(10)
-    Integer getNumTopHashtags();
-    void setNumTopHashtags(Integer count);
-
-    @Description("Kafka topic name for writing results")
-    @Required
-    String getOutputTopic();
-    void setOutputTopic(String topic);
-  }
-
-  public static void main(String args[]) {
-
-    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-
-    Pipeline pipeline = Pipeline.create(options);
-
-    pipeline
-      .apply(KafkaIO.read()
-          .withBootstrapServers(options.getBootstrapServers())
-          .withTopics(options.getTopics())
-          .withValueCoder(StringUtf8Coder.of())
-          .withTimestampFn(TWEET_TIMESTAMP_OR_NOW)
-          .withoutMetadata())
-      .apply(Values.<String>create())
-      .apply(ParDo.of(new ExtractHashtagsFn()))
-      .apply(Window.<String>into(SlidingWindows
-          .of(Duration.standardMinutes(options.getSlidingWindowLengthMinutes()))
-          .every(Duration.standardMinutes(options.getSlidingWindowIntervalMinutes()))))
-      .apply(Count.<String>perElement())
-      .apply(Top.of(options.getNumTopHashtags(), new KV.OrderByValue<String, Long>())
-                .withoutDefaults())
-      .apply(ParDo.of(new OutputFormatter()))
-      .apply(ParDo.of(new KafkaWriter(options)));
-
-    pipeline.run();
-  }
-
-  // The rest of the file implements DoFns to do the following:
-  //    - extract hashtags
-  //    - format results in json
-  //    - write the results back to Kafka (useful for fetching monitoring the end result).
-
-  private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
-
-  /**
-   * Emit hashtags in the tweet (if any).
-   */
-  private static class ExtractHashtagsFn extends DoFn<String, String> {
-
-    @Override
-    public void processElement(ProcessContext ctx) throws Exception {
-      for (JsonNode hashtag : JSON_MAPPER.readTree(ctx.element())
-                                         .with("entities")
-                                         .withArray("hashtags")) {
-        ctx.output(hashtag.get("text").asText());
-      }
-    }
-  }
-
-  // extract timestamp from "timestamp_ms" field.
-  private static final SerializableFunction<KV<byte[], String>, Instant> TWEET_TIMESTAMP_OR_NOW =
-      new SerializableFunction<KV<byte[], String>, Instant>() {
-        @Override
-        public Instant apply(KV<byte[], String> kv) {
-          try {
-            long tsMillis = JSON_MAPPER.readTree(kv.getValue()).path("timestamp_ms").asLong();
-            return tsMillis == 0 ? Instant.now() : new Instant(tsMillis);
-          } catch (Exception e) {
-            throw Throwables.propagate(e);
-          }
-        }
-      };
-
-  // return json string containing top hashtags and window information time
-  private static class OutputFormatter extends DoFn<List<KV<String, Long>>, String>
-      implements DoFn.RequiresWindowAccess {
-
-    private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormat
-        .forPattern("yyyy-MM-dd HH:mm:ss")
-        .withZoneUTC();
-    private static final ObjectWriter JSON_WRITER = new ObjectMapper()
-        .writerWithType(OutputJson.class);
-
-    static class OutputJson {
-      @JsonProperty String windowStart;
-      @JsonProperty String windowEnd;
-      @JsonProperty String generatedAt;
-      @JsonProperty List<HashtagInfo> topHashtags;
-
-      OutputJson(String windowStart, String windowEnd,
-                 String generatedAt, List<HashtagInfo> topHashtags) {
-        this.windowStart = windowStart;
-        this.windowEnd = windowEnd;
-        this.generatedAt = generatedAt;
-        this.topHashtags = topHashtags;
-      }
-    }
-
-    static class HashtagInfo {
-      @JsonProperty final String hashtag;
-      @JsonProperty final long count;
-      HashtagInfo(String hashtag, long count) {
-        this.hashtag = hashtag;
-        this.count = count;
-      }
-    }
-
-    @Override
-    public void processElement(ProcessContext ctx) throws Exception {
-
-      List<HashtagInfo> topHashtags = new ArrayList<>(ctx.element().size());
-
-      for (KV<String, Long> tag : ctx.element()) {
-        topHashtags.add(new HashtagInfo(tag.getKey(), tag.getValue()));
-      }
-
-      IntervalWindow window = (IntervalWindow) ctx.window();
-
-      String json = JSON_WRITER.writeValueAsString(new OutputJson(
-          DATE_FORMATTER.print(window.start()),
-          DATE_FORMATTER.print(window.end()),
-          DATE_FORMATTER.print(Instant.now()),
-          topHashtags));
-
-      ctx.output(json);
-    }
-  }
-
-  private static class KafkaWriter extends DoFn<String, Void> {
-
-    private final String topic;
-    private final Map<String, Object> config;
-    private static transient KafkaProducer<String, String> producer = null;
-
-    public KafkaWriter(Options options) {
-      this.topic = options.getOutputTopic();
-      this.config = ImmutableMap.<String, Object>of(
-          "bootstrap.servers", options.getBootstrapServers(),
-          "key.serializer",    StringSerializer.class.getName(),
-          "value.serializer",  StringSerializer.class.getName());
-    }
-
-    @Override
-    public void startBundle(Context c) throws Exception {
-      if (producer == null) { // in Beam, startBundle might be called multiple times.
-        producer = new KafkaProducer<String, String>(config);
-      }
-    }
-
-    @Override
-    public void finishBundle(Context c) throws Exception {
-      producer.flush();
-    }
-
-    @Override
-    public void processElement(ProcessContext ctx) throws Exception {
-      LOG.trace("Top Hashtags : {}", ctx.element());
-      producer.send(new ProducerRecord<String, String>(topic, ctx.element()));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/contrib/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/kafka/pom.xml b/contrib/kafka/pom.xml
deleted file mode 100644
index 7fe8165..0000000
--- a/contrib/kafka/pom.xml
+++ /dev/null
@@ -1,176 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-  ~ Copyright (C) 2015 Google Inc.
-  ~
-  ~ Licensed under the Apache License, Version 2.0 (the "License"); you may not
-  ~ use this file except in compliance with the License. You may obtain a copy of
-  ~ the License at
-  ~
-  ~ http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-  ~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-  ~ License for the specific language governing permissions and limitations under
-  ~ the License.
-  ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-
-  <groupId>com.google.cloud.dataflow</groupId>
-  <artifactId>google-cloud-dataflow-java-contrib-kafka</artifactId>
-  <name>Google Cloud Dataflow Kafka Connectors</name>
-  <description>Dataflow Library to read Kafka topics</description>
-  <version>0.0.1-SNAPSHOT</version>
-  <packaging>jar</packaging>
-
-  <licenses>
-    <license>
-      <name>Apache License, Version 2.0</name>
-      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
-      <distribution>repo</distribution>
-    </license>
-  </licenses>
-
-  <properties>
-    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <google-cloud-dataflow-version>[1.2.0,2.0.0)</google-cloud-dataflow-version>
-    <hamcrest.version>1.3</hamcrest.version>
-    <junit.version>4.11</junit.version>
-    <slf4j.version>1.7.7</slf4j.version>
-  </properties>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <version>3.2</version>
-        <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
-        </configuration>
-      </plugin>
-
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-checkstyle-plugin</artifactId>
-        <version>2.12</version>
-        <dependencies>
-          <dependency>
-            <groupId>com.puppycrawl.tools</groupId>
-            <artifactId>checkstyle</artifactId>
-            <version>6.6</version>
-          </dependency>
-        </dependencies>
-        <configuration>
-          <configLocation>../../checkstyle.xml</configLocation>
-          <consoleOutput>true</consoleOutput>
-          <failOnViolation>true</failOnViolation>
-          <includeTestSourceDirectory>true</includeTestSourceDirectory>
-        </configuration>
-        <executions>
-          <execution>
-            <goals>
-              <goal>check</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-
-      <!-- Source plugin for generating source and test-source JARs. -->
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-source-plugin</artifactId>
-        <version>2.4</version>
-        <executions>
-          <execution>
-            <id>attach-sources</id>
-            <phase>compile</phase>
-            <goals>
-              <goal>jar</goal>
-            </goals>
-          </execution>
-          <execution>
-            <id>attach-test-sources</id>
-            <phase>test-compile</phase>
-            <goals>
-              <goal>test-jar</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-javadoc-plugin</artifactId>
-        <version>2.10.3</version>
-        <configuration>
-          <windowtitle>Google Cloud Dataflow Kafka Contrib</windowtitle>
-          <doctitle>Google Cloud Dataflow Kafka Contrib</doctitle>
-
-          <subpackages>com.google.cloud.dataflow.contrib.kafka</subpackages>
-          <use>false</use>
-          <bottom><![CDATA[<br>]]></bottom>
-
-          <offlineLinks>
-            <offlineLink>
-              <url>https://cloud.google.com/dataflow/java-sdk/JavaDoc/</url>
-              <location>${basedir}/../../javadoc/dataflow-sdk-docs</location>
-            </offlineLink>
-            <offlineLink>
-              <url>http://docs.guava-libraries.googlecode.com/git-history/release18/javadoc/</url>
-              <location>${basedir}/../../javadoc/guava-docs</location>
-            </offlineLink>
-          </offlineLinks>
-        </configuration>
-        <executions>
-          <execution>
-            <goals>
-              <goal>jar</goal>
-            </goals>
-            <phase>package</phase>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-
-  <dependencies>
-    <dependency>
-      <groupId>com.google.cloud.dataflow</groupId>
-      <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
-      <version>${google-cloud-dataflow-version}</version>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka-clients</artifactId>
-      <version>[0.9,)</version>
-    </dependency>
-
-    <!-- test dependencies-->
-    <dependency>
-      <groupId>org.hamcrest</groupId>
-      <artifactId>hamcrest-all</artifactId>
-      <version>${hamcrest.version}</version>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <version>${junit.version}</version>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-jdk14</artifactId>
-      <version>${slf4j.version}</version>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaCheckpointMark.java
----------------------------------------------------------------------
diff --git a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaCheckpointMark.java b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaCheckpointMark.java
deleted file mode 100644
index 9b33ee8..0000000
--- a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaCheckpointMark.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.contrib.kafka;
-
-import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
-import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
-import com.google.cloud.dataflow.sdk.io.UnboundedSource;
-
-import org.apache.kafka.common.TopicPartition;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * Checkpoint for an unbounded KafkaIO.Read. Consists of Kafka topic name, partition id,
- * and the latest offset consumed so far.
- */
-@DefaultCoder(SerializableCoder.class)
-public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark, Serializable {
-
-  private final List<PartitionMark> partitions;
-
-  public KafkaCheckpointMark(List<PartitionMark> partitions) {
-    this.partitions = partitions;
-  }
-
-  public List<PartitionMark> getPartitions() {
-    return partitions;
-  }
-
-  @Override
-  public void finalizeCheckpoint() throws IOException {
-    /* nothing to do */
-
-    // We might want to support committing offset in Kafka for better resume point when the job
-    // is restarted (checkpoint is not available for job restarts).
-  }
-
-  /**
-   * A tuple to hold topic, partition, and offset that comprise the checkpoint
-   * for a single partition.
-   */
-  public static class PartitionMark implements Serializable {
-    private final TopicPartition topicPartition;
-    private final long offset;
-
-    public PartitionMark(TopicPartition topicPartition, long offset) {
-      this.topicPartition = topicPartition;
-      this.offset = offset;
-    }
-
-    public TopicPartition getTopicPartition() {
-      return topicPartition;
-    }
-
-    public long getOffset() {
-      return offset;
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java
deleted file mode 100644
index ad254ee..0000000
--- a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java
+++ /dev/null
@@ -1,1053 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.contrib.kafka;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.cloud.dataflow.contrib.kafka.KafkaCheckpointMark.PartitionMark;
-import com.google.cloud.dataflow.sdk.coders.ByteArrayCoder;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
-import com.google.cloud.dataflow.sdk.io.Read.Unbounded;
-import com.google.cloud.dataflow.sdk.io.UnboundedSource;
-import com.google.cloud.dataflow.sdk.io.UnboundedSource.CheckpointMark;
-import com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
-import com.google.cloud.dataflow.sdk.util.ExposedByteArrayInputStream;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PBegin;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-import com.google.common.io.Closeables;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
-/**
- * An unbounded source for <a href="http://kafka.apache.org/">Kafka</a> topics. Kafka version 0.9
- * and above are supported.
- *
- * <h3>Reading from Kafka topics</h3>
- *
- * <p>KafkaIO source returns unbounded collection of Kafka records as
- * {@code PCollection<KafkaRecord<K, V>>}. A {@link KafkaRecord} includes basic
- * metadata like topic-partition and offset, along with key and value associated with a Kafka
- * record.
- *
- * <p>Although most applications consumer single topic, the source can be configured to consume
- * multiple topics or even a specific set of {@link TopicPartition}s.
- *
- * <p> To configure a Kafka source, you must specify at the minimum Kafka <tt>bootstrapServers</tt>
- * and one or more topics to consume. The following example illustrates various options for
- * configuring the source :
- *
- * <pre>{@code
- *
- *  pipeline
- *    .apply(KafkaIO.read()
- *       .withBootstrapServers("broker_1:9092,broker_2:9092")
- *       .withTopics(ImmutableList.of("topic_a", "topic_b"))
- *       // above two are required configuration. returns PCollection<KafkaRecord<byte[], byte[]>
- *
- *       // rest of the settings are optional :
- *
- *       // set a Coder for Key and Value (note the change to return type)
- *       .withKeyCoder(BigEndianLongCoder.of()) // PCollection<KafkaRecord<Long, byte[]>
- *       .withValueCoder(StringUtf8Coder.of())  // PCollection<KafkaRecord<Long, String>
- *
- *       // you can further customize KafkaConsumer used to read the records by adding more
- *       // settings for ConsumerConfig. e.g :
- *       .updateConsumerProperties(ImmutableMap.of("receive.buffer.bytes", 1024 * 1024))
- *
- *       // custom function for calculating record timestamp (default is processing time)
- *       .withTimestampFn(new MyTypestampFunction())
- *
- *       // custom function for watermark (default is record timestamp)
- *       .withWatermarkFn(new MyWatermarkFunction())
- *
- *       // finally, if you don't need Kafka metadata, you can drop it
- *       .withoutMetadata() // PCollection<KV<Long, String>>
- *    )
- *    .apply(Values.<String>create()) // PCollection<String>
- *     ...
- * }</pre>
- *
- * <h3>Partition Assignment and Checkpointing</h3>
- * The Kafka partitions are evenly distributed among splits (workers).
- * Dataflow checkpointing is fully supported and
- * each split can resume from previous checkpoint. See
- * {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for more details on
- * splits and checkpoint support.
- *
- * <p>When the pipeline starts for the first time without any checkpoint, the source starts
- * consuming from the <em>latest</em> offsets. You can override this behavior to consume from the
- * beginning by setting appropriate appropriate properties in {@link ConsumerConfig}, through
- * {@link Read#updateConsumerProperties(Map)}.
- *
- * <h3>Advanced Kafka Configuration</h3>
- * KafakIO allows setting most of the properties in {@link ConsumerConfig}. E.g. if you would like
- * to enable offset <em>auto commit</em> (for external monitoring or other purposes), you can set
- * <tt>"group.id"</tt>, <tt>"enable.auto.commit"</tt>, etc.
- */
-public class KafkaIO {
-
-  private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);
-
-  private static class NowTimestampFn<T> implements SerializableFunction<T, Instant> {
-    @Override
-    public Instant apply(T input) {
-      return Instant.now();
-    }
-  }
-
-
-  /**
-   * Creates and uninitialized {@link Read} {@link PTransform}. Before use, basic Kafka
-   * configuration should set with {@link Read#withBootstrapServers(String)} and
-   * {@link Read#withTopics(List)}. Other optional settings include key and value coders,
-   * custom timestamp and watermark functions. Additionally, {@link Read#withMetadata()} provides
-   * access to Kafka metadata for each record (topic name, partition, offset).
-   */
-  public static Read<byte[], byte[]> read() {
-    return new Read<byte[], byte[]>(
-        new ArrayList<String>(),
-        new ArrayList<TopicPartition>(),
-        ByteArrayCoder.of(),
-        ByteArrayCoder.of(),
-        Read.KAFKA_9_CONSUMER_FACTORY_FN,
-        Read.DEFAULT_CONSUMER_PROPERTIES,
-        Long.MAX_VALUE,
-        null);
-  }
-
-  /**
-   * A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for more
-   * information on usage and configuration.
-   */
-  public static class Read<K, V> extends TypedRead<K, V> {
-
-    /**
-     * Returns a new {@link Read} with Kafka consumer pointing to {@code bootstrapServers}.
-     */
-    public Read<K, V> withBootstrapServers(String bootstrapServers) {
-      return updateConsumerProperties(
-          ImmutableMap.<String, Object>of(
-              ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers));
-    }
-
-    /**
-     * Returns a new {@link Read} that reads from the topics. All the partitions are from each
-     * of the topics is read.
-     * See {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for description
-     * of how the partitions are distributed among the splits.
-     */
-    public Read<K, V> withTopics(List<String> topics) {
-      checkState(topicPartitions.isEmpty(), "Only topics or topicPartitions can be set, not both");
-
-      return new Read<K, V>(ImmutableList.copyOf(topics), topicPartitions, keyCoder, valueCoder,
-          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
-    }
-
-    /**
-     * Returns a new {@link Read} that reads from the partitions. This allows reading only a subset
-     * of partitions for one or more topics when (if ever) needed.
-     * See {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for description
-     * of how the partitions are distributed among the splits.
-     */
-    public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
-      checkState(topics.isEmpty(), "Only topics or topicPartitions can be set, not both");
-
-      return new Read<K, V>(topics, ImmutableList.copyOf(topicPartitions), keyCoder, valueCoder,
-          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
-    }
-
-    /**
-     * Returns a new {@link Read} with {@link Coder} for key bytes.
-     */
-    public <KeyT> Read<KeyT, V> withKeyCoder(Coder<KeyT> keyCoder) {
-      return new Read<KeyT, V>(topics, topicPartitions, keyCoder, valueCoder,
-          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
-    }
-
-    /**
-     * Returns a new {@link Read} with {@link Coder} for value bytes.
-     */
-    public <ValueT> Read<K, ValueT> withValueCoder(Coder<ValueT> valueCoder) {
-      return new Read<K, ValueT>(topics, topicPartitions, keyCoder, valueCoder,
-          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
-    }
-
-    /**
-     * A factory to create Kafka {@link Consumer} from consumer configuration.
-     * This is useful for supporting another version of Kafka consumer.
-     * Default is {@link KafkaConsumer}.
-     */
-    public Read<K, V> withConsumerFactoryFn(
-        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn) {
-      return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder,
-          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
-    }
-
-    /**
-     * Update consumer configuration with new properties.
-     */
-    public Read<K, V> updateConsumerProperties(Map<String, Object> configUpdates) {
-      for (String key : configUpdates.keySet()) {
-        checkArgument(!IGNORED_CONSUMER_PROPERTIES.containsKey(key),
-            "No need to configure '%s'. %s", key, IGNORED_CONSUMER_PROPERTIES.get(key));
-      }
-
-      Map<String, Object> config = new HashMap<>(consumerConfig);
-      config.putAll(configUpdates);
-
-      return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder,
-          consumerFactoryFn, config, maxNumRecords, maxReadTime);
-    }
-
-    /**
-     * Similar to {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded#withMaxNumRecords(long)}.
-     * Mainly used for tests and demo applications.
-     */
-    public Read<K, V> withMaxNumRecords(long maxNumRecords) {
-      return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder,
-          consumerFactoryFn, consumerConfig, maxNumRecords, null);
-    }
-
-    /**
-     * Similar to
-     * {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded#withMaxReadTime(Duration)}.
-     * Mainly used for tests and demo
-     * applications.
-     */
-    public Read<K, V> withMaxReadTime(Duration maxReadTime) {
-      return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder,
-          consumerFactoryFn, consumerConfig, Long.MAX_VALUE, maxReadTime);
-    }
-
-    ///////////////////////////////////////////////////////////////////////////////////////
-
-    private Read(
-        List<String> topics,
-        List<TopicPartition> topicPartitions,
-        Coder<K> keyCoder,
-        Coder<V> valueCoder,
-        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn,
-        Map<String, Object> consumerConfig,
-        long maxNumRecords,
-        @Nullable Duration maxReadTime) {
-
-      super(topics, topicPartitions, keyCoder, valueCoder, null, null,
-          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
-    }
-
-    /**
-     * A set of properties that are not required or don't make sense for our consumer.
-     */
-    private static final Map<String, String> IGNORED_CONSUMER_PROPERTIES = ImmutableMap.of(
-        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyDecoderFn instead",
-        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueDecoderFn instead"
-        // "group.id", "enable.auto.commit", "auto.commit.interval.ms" :
-        //     lets allow these, applications can have better resume point for restarts.
-        );
-
-    // set config defaults
-    private static final Map<String, Object> DEFAULT_CONSUMER_PROPERTIES =
-        ImmutableMap.<String, Object>of(
-            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(),
-            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(),
-
-            // Use large receive buffer. Once KAFKA-3135 is fixed, this _may_ not be required.
-            // with default value of of 32K, It takes multiple seconds between successful polls.
-            // All the consumer work is done inside poll(), with smaller send buffer size, it
-            // takes many polls before a 1MB chunk from the server is fully read. In my testing
-            // about half of the time select() inside kafka consumer waited for 20-30ms, though
-            // the server had lots of data in tcp send buffers on its side. Compared to default,
-            // this setting increased throughput increased by many fold (3-4x).
-            ConsumerConfig.RECEIVE_BUFFER_CONFIG, 512 * 1024,
-
-            // default to latest offset when we are not resuming.
-            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest",
-            // disable auto commit of offsets. we don't require group_id. could be enabled by user.
-            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-
-    // default Kafka 0.9 Consumer supplier.
-    private static final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
-      KAFKA_9_CONSUMER_FACTORY_FN =
-        new SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>() {
-          public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
-            return new KafkaConsumer<>(config);
-          }
-        };
-  }
-
-  /**
-   * A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for more
-   * information on usage and configuration.
-   */
-  public static class TypedRead<K, V>
-                      extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
-
-    /**
-     * A function to assign a timestamp to a record. Default is processing timestamp.
-     */
-    public TypedRead<K, V> withTimestampFn2(
-        SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) {
-      checkNotNull(timestampFn);
-      return new TypedRead<K, V>(topics, topicPartitions, keyCoder, valueCoder,
-          timestampFn, watermarkFn, consumerFactoryFn, consumerConfig,
-          maxNumRecords, maxReadTime);
-    }
-
-    /**
-     * A function to calculate watermark after a record. Default is last record timestamp
-     * @see #withTimestampFn(SerializableFunction)
-     */
-    public TypedRead<K, V> withWatermarkFn2(
-        SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn) {
-      checkNotNull(watermarkFn);
-      return new TypedRead<K, V>(topics, topicPartitions, keyCoder, valueCoder,
-          timestampFn, watermarkFn, consumerFactoryFn, consumerConfig,
-          maxNumRecords, maxReadTime);
-    }
-
-    /**
-     * A function to assign a timestamp to a record. Default is processing timestamp.
-     */
-    public TypedRead<K, V> withTimestampFn(SerializableFunction<KV<K, V>, Instant> timestampFn) {
-      checkNotNull(timestampFn);
-      return withTimestampFn2(unwrapKafkaAndThen(timestampFn));
-    }
-
-    /**
-     * A function to calculate watermark after a record. Default is last record timestamp
-     * @see #withTimestampFn(SerializableFunction)
-     */
-    public TypedRead<K, V> withWatermarkFn(SerializableFunction<KV<K, V>, Instant> watermarkFn) {
-      checkNotNull(watermarkFn);
-      return withWatermarkFn2(unwrapKafkaAndThen(watermarkFn));
-    }
-
-    /**
-     * Returns a {@link PTransform} for PCollection of {@link KV}, dropping Kafka metatdata.
-     */
-    public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() {
-      return new TypedWithoutMetadata<K, V>(this);
-    }
-
-    @Override
-    public PCollection<KafkaRecord<K, V>> apply(PBegin input) {
-     // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set.
-      Unbounded<KafkaRecord<K, V>> unbounded =
-          com.google.cloud.dataflow.sdk.io.Read.from(makeSource());
-
-      PTransform<PInput, PCollection<KafkaRecord<K, V>>> transform = unbounded;
-
-      if (maxNumRecords < Long.MAX_VALUE) {
-        transform = unbounded.withMaxNumRecords(maxNumRecords);
-      } else if (maxReadTime != null) {
-        transform = unbounded.withMaxReadTime(maxReadTime);
-      }
-
-      return input.getPipeline().apply(transform);
-    }
-
-    ////////////////////////////////////////////////////////////////////////////////////////
-
-    protected final List<String> topics;
-    protected final List<TopicPartition> topicPartitions; // mutually exclusive with topics
-    protected final Coder<K> keyCoder;
-    protected final Coder<V> valueCoder;
-    @Nullable protected final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn;
-    @Nullable protected final SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn;
-    protected final
-      SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn;
-    protected final Map<String, Object> consumerConfig;
-    protected final long maxNumRecords; // bounded read, mainly for testing
-    protected final Duration maxReadTime; // bounded read, mainly for testing
-
-    private TypedRead(List<String> topics,
-        List<TopicPartition> topicPartitions,
-        Coder<K> keyCoder,
-        Coder<V> valueCoder,
-        @Nullable SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn,
-        @Nullable SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn,
-        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn,
-        Map<String, Object> consumerConfig,
-        long maxNumRecords,
-        @Nullable Duration maxReadTime) {
-      super("KafkaIO.Read");
-
-      this.topics = topics;
-      this.topicPartitions = topicPartitions;
-      this.keyCoder = keyCoder;
-      this.valueCoder = valueCoder;
-      this.timestampFn = timestampFn;
-      this.watermarkFn = watermarkFn;
-      this.consumerFactoryFn = consumerFactoryFn;
-      this.consumerConfig = consumerConfig;
-      this.maxNumRecords = maxNumRecords;
-      this.maxReadTime = maxReadTime;
-    }
-
-    /**
-     * Creates an {@link UnboundedSource<KafkaRecord<K, V>, ?>} with the configuration in
-     * {@link TypedRead}. Primary use case is unit tests, should not be used in an
-     * application.
-     */
-    @VisibleForTesting
-    UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> makeSource() {
-      return new UnboundedKafkaSource<K, V>(
-          -1,
-          topics,
-          topicPartitions,
-          keyCoder,
-          valueCoder,
-          timestampFn,
-          Optional.fromNullable(watermarkFn),
-          consumerFactoryFn,
-          consumerConfig);
-    }
-
-    // utility method to convert KafkRecord<K, V> to user KV<K, V> before applying user functions
-    private static <KeyT, ValueT, OutT> SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT>
-      unwrapKafkaAndThen(final SerializableFunction<KV<KeyT, ValueT>, OutT> fn) {
-        return new SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT>() {
-          public OutT apply(KafkaRecord<KeyT, ValueT> record) {
-            return fn.apply(record.getKV());
-          }
-        };
-      }
-  }
-
-  /**
-   * A {@link PTransform} to read from Kafka topics. Similar to {@link KafkaIO.Typed}, but removes
-   * Kafka metatdata and returns a {@link PCollection} of {@link KV}.
-   * See {@link KafkaIO} for more information on usage and configuration of reader.
-   */
-  public static class TypedWithoutMetadata<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
-
-    private final TypedRead<K, V> typedRead;
-
-    TypedWithoutMetadata(TypedRead<K, V> read) {
-      super("KafkaIO.Read");
-      this.typedRead = read;
-    }
-
-    @Override
-    public PCollection<KV<K, V>> apply(PBegin begin) {
-      return typedRead
-          .apply(begin)
-          .apply("Remove Kafka Metadata",
-              ParDo.of(new DoFn<KafkaRecord<K, V>, KV<K, V>>() {
-                @Override
-                public void processElement(ProcessContext ctx) {
-                  ctx.output(ctx.element().getKV());
-                }
-              }));
-    }
-  }
-
-  /** Static class, prevent instantiation. */
-  private KafkaIO() {}
-
-  private static class UnboundedKafkaSource<K, V>
-      extends UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> {
-
-    private final int id; // split id, mainly for debugging
-    private final List<String> topics;
-    private final List<TopicPartition> assignedPartitions;
-    private final Coder<K> keyCoder;
-    private final Coder<V> valueCoder;
-    private final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn;
-    // would it be a good idea to pass currentTimestamp to watermarkFn?
-    private final Optional<SerializableFunction<KafkaRecord<K, V>, Instant>> watermarkFn;
-    private
-      SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn;
-    private final Map<String, Object> consumerConfig;
-
-    public UnboundedKafkaSource(
-        int id,
-        List<String> topics,
-        List<TopicPartition> assignedPartitions,
-        Coder<K> keyCoder,
-        Coder<V> valueCoder,
-        @Nullable SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn,
-        Optional<SerializableFunction<KafkaRecord<K, V>, Instant>> watermarkFn,
-        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn,
-        Map<String, Object> consumerConfig) {
-
-      this.id = id;
-      this.assignedPartitions = assignedPartitions;
-      this.topics = topics;
-      this.keyCoder = keyCoder;
-      this.valueCoder = valueCoder;
-      this.timestampFn =
-          (timestampFn == null ? new NowTimestampFn<KafkaRecord<K, V>>() : timestampFn);
-      this.watermarkFn = watermarkFn;
-      this.consumerFactoryFn = consumerFactoryFn;
-      this.consumerConfig = consumerConfig;
-    }
-
-    /**
-     * The partitions are evenly distributed among the splits. The number of splits returned is
-     * {@code min(desiredNumSplits, totalNumPartitions)}, though better not to depend on the exact
-     * count.
-     *
-     * <p> It is important to assign the partitions deterministically so that we can support
-     * resuming a split from last checkpoint. The Kafka partitions are sorted by
-     * {@code <topic, partition>} and then assigned to splits in round-robin order.
-     */
-    @Override
-    public List<UnboundedKafkaSource<K, V>> generateInitialSplits(
-        int desiredNumSplits, PipelineOptions options) throws Exception {
-
-      List<TopicPartition> partitions = new ArrayList<>(assignedPartitions);
-
-      // (a) fetch partitions for each topic
-      // (b) sort by <topic, partition>
-      // (c) round-robin assign the partitions to splits
-
-      if (partitions.isEmpty()) {
-        try (Consumer<?, ?> consumer = consumerFactoryFn.apply(consumerConfig)) {
-          for (String topic : topics) {
-            for (PartitionInfo p : consumer.partitionsFor(topic)) {
-              partitions.add(new TopicPartition(p.topic(), p.partition()));
-            }
-          }
-        }
-      }
-
-      Collections.sort(partitions, new Comparator<TopicPartition>() {
-        public int compare(TopicPartition tp1, TopicPartition tp2) {
-          return ComparisonChain
-              .start()
-              .compare(tp1.topic(), tp2.topic())
-              .compare(tp1.partition(), tp2.partition())
-              .result();
-        }
-      });
-
-      checkArgument(desiredNumSplits > 0);
-      checkState(partitions.size() > 0,
-          "Could not find any partitions. Please check Kafka configuration and topic names");
-
-      int numSplits = Math.min(desiredNumSplits, partitions.size());
-      List<List<TopicPartition>> assignments = new ArrayList<>(numSplits);
-
-      for (int i = 0; i < numSplits; i++) {
-        assignments.add(new ArrayList<TopicPartition>());
-      }
-      for (int i = 0; i < partitions.size(); i++) {
-        assignments.get(i % numSplits).add(partitions.get(i));
-      }
-
-      List<UnboundedKafkaSource<K, V>> result = new ArrayList<>(numSplits);
-
-      for (int i = 0; i < numSplits; i++) {
-        List<TopicPartition> assignedToSplit = assignments.get(i);
-
-        LOG.info("Partitions assigned to split {} (total {}): {}",
-            i, assignedToSplit.size(), Joiner.on(",").join(assignedToSplit));
-
-        result.add(new UnboundedKafkaSource<K, V>(
-            i,
-            this.topics,
-            assignedToSplit,
-            this.keyCoder,
-            this.valueCoder,
-            this.timestampFn,
-            this.watermarkFn,
-            this.consumerFactoryFn,
-            this.consumerConfig));
-      }
-
-      return result;
-    }
-
-    @Override
-    public UnboundedKafkaReader<K, V> createReader(PipelineOptions options,
-                                                   KafkaCheckpointMark checkpointMark) {
-      if (assignedPartitions.isEmpty()) {
-        LOG.warn("Looks like generateSplits() is not called. Generate single split.");
-        try {
-          return new UnboundedKafkaReader<K, V>(
-              generateInitialSplits(1, options).get(0), checkpointMark);
-        } catch (Exception e) {
-          Throwables.propagate(e);
-        }
-      }
-      return new UnboundedKafkaReader<K, V>(this, checkpointMark);
-    }
-
-    @Override
-    public Coder<KafkaCheckpointMark> getCheckpointMarkCoder() {
-      return SerializableCoder.of(KafkaCheckpointMark.class);
-    }
-
-    @Override
-    public boolean requiresDeduping() {
-      // Kafka records are ordered with in partitions. In addition checkpoint guarantees
-      // records are not consumed twice.
-      return false;
-    }
-
-    @Override
-    public void validate() {
-      checkNotNull(consumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
-          "Kafka bootstrap servers should be set");
-      checkArgument(topics.size() > 0 || assignedPartitions.size() > 0,
-          "Kafka topics or topic_partitions are required");
-    }
-
-    @Override
-    public Coder<KafkaRecord<K, V>> getDefaultOutputCoder() {
-      return KafkaRecordCoder.of(keyCoder, valueCoder);
-    }
-  }
-
-  private static class UnboundedKafkaReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> {
-
-    private final UnboundedKafkaSource<K, V> source;
-    private final String name;
-    private Consumer<byte[], byte[]> consumer;
-    private final List<PartitionState> partitionStates;
-    private KafkaRecord<K, V> curRecord;
-    private Instant curTimestamp;
-    private Iterator<PartitionState> curBatch = Collections.emptyIterator();
-
-    private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
-    // how long to wait for new records from kafka consumer inside advance()
-    private static final Duration NEW_RECORDS_POLL_TIMEOUT = Duration.millis(10);
-
-    // Use a separate thread to read Kafka messages. Kafka Consumer does all its work including
-    // network I/O inside poll(). Polling only inside #advance(), especially with a small timeout
-    // like 100 milliseconds does not work well. This along with large receive buffer for
-    // consumer achieved best throughput in tests (see `defaultConsumerProperties`).
-    private final ExecutorService consumerPollThread = Executors.newSingleThreadExecutor();
-    private final SynchronousQueue<ConsumerRecords<byte[], byte[]>> availableRecordsQueue =
-        new SynchronousQueue<>();
-    private volatile boolean closed = false;
-
-    // Backlog support :
-    // Kafka consumer does not have an API to fetch latest offset for topic. We need to seekToEnd()
-    // then look at position(). Use another consumer to do this so that the primary consumer does
-    // not need to be interrupted. The latest offsets are fetched periodically on another thread.
-    // This is still a hack. There could be unintended side effects, e.g. if user enabled offset
-    // auto commit in consumer config, this could interfere with the primary consumer (we will
-    // handle this particular problem). We might have to make this optional.
-    private Consumer<byte[], byte[]> offsetConsumer;
-    private final ScheduledExecutorService offsetFetcherThread =
-        Executors.newSingleThreadScheduledExecutor();
-    private static final int OFFSET_UPDATE_INTERVAL_SECONDS = 5;
-
-    /** watermark before any records have been read. */
-    private static Instant initialWatermark = new Instant(Long.MIN_VALUE);
-
-    public String toString() {
-      return name;
-    }
-
-    // maintains state of each assigned partition (buffered records, consumed offset, etc)
-    private static class PartitionState {
-      private final TopicPartition topicPartition;
-      private long consumedOffset;
-      private long latestOffset;
-      private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = Collections.emptyIterator();
-
-      // simple moving average for size of each record in bytes
-      private double avgRecordSize = 0;
-      private static final int movingAvgWindow = 1000; // very roughly avg of last 1000 elements
-
-
-      PartitionState(TopicPartition partition, long offset) {
-        this.topicPartition = partition;
-        this.consumedOffset = offset;
-        this.latestOffset = -1;
-      }
-
-      // update consumedOffset and avgRecordSize
-      void recordConsumed(long offset, int size) {
-        consumedOffset = offset;
-
-        // this is always updated from single thread. probably not worth making it an AtomicDouble
-        if (avgRecordSize <= 0) {
-          avgRecordSize = size;
-        } else {
-          // initially, first record heavily contributes to average.
-          avgRecordSize += ((size - avgRecordSize) / movingAvgWindow);
-        }
-      }
-
-      synchronized void setLatestOffset(long latestOffset) {
-        this.latestOffset = latestOffset;
-      }
-
-      synchronized long approxBacklogInBytes() {
-        // Note that is an an estimate of uncompressed backlog.
-        // Messages on Kafka might be comressed.
-        if (latestOffset < 0 || consumedOffset < 0) {
-          return UnboundedReader.BACKLOG_UNKNOWN;
-        }
-        if (latestOffset <= consumedOffset || consumedOffset < 0) {
-          return 0;
-        }
-        return (long) ((latestOffset - consumedOffset - 1) * avgRecordSize);
-      }
-    }
-
-    public UnboundedKafkaReader(
-        UnboundedKafkaSource<K, V> source,
-        @Nullable KafkaCheckpointMark checkpointMark) {
-
-      this.source = source;
-      this.name = "Reader-" + source.id;
-
-      partitionStates = ImmutableList.copyOf(Lists.transform(source.assignedPartitions,
-          new Function<TopicPartition, PartitionState>() {
-            public PartitionState apply(TopicPartition tp) {
-              return new PartitionState(tp, -1L);
-            }
-        }));
-
-      if (checkpointMark != null) {
-        // a) verify that assigned and check-pointed partitions match exactly
-        // b) set consumed offsets
-
-        checkState(checkpointMark.getPartitions().size() == source.assignedPartitions.size(),
-            "checkPointMark and assignedPartitions should match");
-        // we could consider allowing a mismatch, though it is not expected in current Dataflow
-
-        for (int i = 0; i < source.assignedPartitions.size(); i++) {
-          PartitionMark ckptMark = checkpointMark.getPartitions().get(i);
-          TopicPartition assigned = source.assignedPartitions.get(i);
-
-          checkState(ckptMark.getTopicPartition().equals(assigned),
-              "checkpointed partition %s and assigned partition %s don't match",
-              ckptMark.getTopicPartition(), assigned);
-
-          partitionStates.get(i).consumedOffset = ckptMark.getOffset();
-        }
-      }
-    }
-
-    private void consumerPollLoop() {
-      // Read in a loop and enqueue the batch of records, if any, to availableRecordsQueue
-      while (!closed) {
-        try {
-          ConsumerRecords<byte[], byte[]> records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
-          if (!records.isEmpty()) {
-            availableRecordsQueue.put(records); // blocks until dequeued.
-          }
-        } catch (InterruptedException e) {
-          LOG.warn("{}: consumer thread is interrupted", this, e); // not expected
-          break;
-        } catch (WakeupException e) {
-          break;
-        }
-      }
-
-      LOG.info("{}: Returning from consumer pool loop", this);
-    }
-
-    private void nextBatch() {
-      curBatch = Collections.emptyIterator();
-
-      ConsumerRecords<byte[], byte[]> records;
-      try {
-        records = availableRecordsQueue.poll(NEW_RECORDS_POLL_TIMEOUT.getMillis(),
-                                             TimeUnit.MILLISECONDS);
-      } catch (InterruptedException e) {
-        LOG.warn("{}: Unexpected", this, e);
-        return;
-      }
-
-      if (records == null) {
-        return;
-      }
-
-      List<PartitionState> nonEmpty = new LinkedList<>();
-
-      for (PartitionState p : partitionStates) {
-        p.recordIter = records.records(p.topicPartition).iterator();
-        if (p.recordIter.hasNext()) {
-          nonEmpty.add(p);
-        }
-      }
-
-      // cycle through the partitions in order to interleave records from each.
-      curBatch = Iterators.cycle(nonEmpty);
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      consumer = source.consumerFactoryFn.apply(source.consumerConfig);
-      consumer.assign(source.assignedPartitions);
-
-      // seek to consumedOffset + 1 if it is set
-      for (PartitionState p : partitionStates) {
-        if (p.consumedOffset >= 0) {
-          LOG.info("{}: resuming {} at {}", name, p.topicPartition, p.consumedOffset + 1);
-          consumer.seek(p.topicPartition, p.consumedOffset + 1);
-        } else {
-          LOG.info("{}: resuming {} at default offset", name, p.topicPartition);
-        }
-      }
-
-      // start consumer read loop.
-      // Note that consumer is not thread safe, should not accessed out side consumerPollLoop()
-      consumerPollThread.submit(
-          new Runnable() {
-            public void run() {
-              consumerPollLoop();
-            }
-          });
-
-      // offsetConsumer setup :
-
-      // override client_id and auto_commit so that it does not interfere with main consumer.
-      String offsetConsumerId = String.format("%s_offset_consumer_%d_%s", name,
-          (new Random()).nextInt(Integer.MAX_VALUE),
-          source.consumerConfig.getOrDefault(ConsumerConfig.CLIENT_ID_CONFIG, "none"));
-      Map<String, Object> offsetConsumerConfig = new HashMap<>(source.consumerConfig);
-      offsetConsumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, offsetConsumerId);
-      offsetConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-
-      offsetConsumer = source.consumerFactoryFn.apply(offsetConsumerConfig);
-      offsetConsumer.assign(source.assignedPartitions);
-
-      offsetFetcherThread.scheduleAtFixedRate(
-          new Runnable() {
-            public void run() {
-              updateLatestOffsets();
-            }
-          }, 0, OFFSET_UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS);
-
-      return advance();
-    }
-
-    @Override
-    public boolean advance() throws IOException {
-      /* Read first record (if any). we need to loop here because :
-       *  - (a) some records initially need to be skipped if they are before consumedOffset
-       *  - (b) if curBatch is empty, we want to fetch next batch and then advance.
-       *  - (c) curBatch is an iterator of iterators. we interleave the records from each.
-       *        curBatch.next() might return an empty iterator.
-       */
-      while (true) {
-        if (curBatch.hasNext()) {
-          PartitionState pState = curBatch.next();
-
-          if (!pState.recordIter.hasNext()) { // -- (c)
-            pState.recordIter = Collections.emptyIterator(); // drop ref
-            curBatch.remove();
-            continue;
-          }
-
-          ConsumerRecord<byte[], byte[]> rawRecord = pState.recordIter.next();
-          long consumed = pState.consumedOffset;
-          long offset = rawRecord.offset();
-
-          if (consumed >= 0 && offset <= consumed) { // -- (a)
-            // this can happen when compression is enabled in Kafka (seems to be fixed in 0.10)
-            // should we check if the offset is way off from consumedOffset (say > 1M)?
-            LOG.warn("{}: ignoring already consumed offset {} for {}",
-                this, offset, pState.topicPartition);
-            continue;
-          }
-
-          // sanity check
-          if (consumed >= 0 && (offset - consumed) != 1) {
-            LOG.warn("{}: gap in offsets for {} after {}. {} records missing.",
-                this, pState.topicPartition, consumed, offset - consumed - 1);
-          }
-
-          if (curRecord == null) {
-            LOG.info("{}: first record offset {}", name, offset);
-          }
-
-          curRecord = null; // user coders below might throw.
-
-          // apply user coders. might want to allow skipping records that fail to decode.
-          // TODO: wrap exceptions from coders to make explicit to users
-          KafkaRecord<K, V> record = new KafkaRecord<K, V>(
-              rawRecord.topic(),
-              rawRecord.partition(),
-              rawRecord.offset(),
-              decode(rawRecord.key(), source.keyCoder),
-              decode(rawRecord.value(), source.valueCoder));
-
-          curTimestamp = source.timestampFn.apply(record);
-          curRecord = record;
-
-          int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length) +
-              (rawRecord.value() == null ? 0 : rawRecord.value().length);
-          pState.recordConsumed(offset, recordSize);
-          return true;
-
-        } else { // -- (b)
-          nextBatch();
-
-          if (!curBatch.hasNext()) {
-            return false;
-          }
-        }
-      }
-    }
-
-    private static byte[] nullBytes = new byte[0];
-    private static <T> T decode(byte[] bytes, Coder<T> coder) throws IOException {
-      // If 'bytes' is null, use byte[0]. It is common for key in Kakfa record to be null.
-      // This makes it impossible for user to distinguish between zero length byte and null.
-      // Alternately, we could have a ByteArrayCoder that handles nulls, and use that for default
-      // coder.
-      byte[] toDecode = bytes == null ? nullBytes : bytes;
-      return coder.decode(new ExposedByteArrayInputStream(toDecode), Coder.Context.OUTER);
-    }
-
-    // update latest offset for each partition.
-    // called from offsetFetcher thread
-    private void updateLatestOffsets() {
-      for (PartitionState p : partitionStates) {
-        try {
-          offsetConsumer.seekToEnd(p.topicPartition);
-          long offset = offsetConsumer.position(p.topicPartition);
-          p.setLatestOffset(offset);;
-        } catch (Exception e) {
-          LOG.warn("{}: exception while fetching latest offsets. ignored.",  this, e);
-          p.setLatestOffset(-1L); // reset
-        }
-
-        LOG.debug("{}: latest offset update for {} : {} (consumed offset {}, avg record size {})",
-            this, p.topicPartition, p.latestOffset, p.consumedOffset, p.avgRecordSize);
-      }
-
-      LOG.debug("{}:  backlog {}", this, getSplitBacklogBytes());
-    }
-
-    @Override
-    public Instant getWatermark() {
-      if (curRecord == null) {
-        LOG.warn("{}: getWatermark() : no records have been read yet.", name);
-        return initialWatermark;
-      }
-
-      return source.watermarkFn.isPresent() ?
-          source.watermarkFn.get().apply(curRecord) : curTimestamp;
-    }
-
-    @Override
-    public CheckpointMark getCheckpointMark() {
-      return new KafkaCheckpointMark(ImmutableList.copyOf(// avoid lazy (consumedOffset can change)
-          Lists.transform(partitionStates,
-              new Function<PartitionState, PartitionMark>() {
-                public PartitionMark apply(PartitionState p) {
-                  return new PartitionMark(p.topicPartition, p.consumedOffset);
-                }
-              }
-          )));
-    }
-
-    @Override
-    public UnboundedSource<KafkaRecord<K, V>, ?> getCurrentSource() {
-      return source;
-    }
-
-    @Override
-    public KafkaRecord<K, V> getCurrent() throws NoSuchElementException {
-      // should we delay updating consumed offset till this point? Mostly not required.
-      return curRecord;
-    }
-
-    @Override
-    public Instant getCurrentTimestamp() throws NoSuchElementException {
-      return curTimestamp;
-    }
-
-
-    @Override
-    public long getSplitBacklogBytes() {
-      long backlogBytes = 0;
-
-      for (PartitionState p : partitionStates) {
-        long pBacklog = p.approxBacklogInBytes();
-        if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) {
-          return UnboundedReader.BACKLOG_UNKNOWN;
-        }
-        backlogBytes += pBacklog;
-      }
-
-      return backlogBytes;
-    }
-
-    @Override
-    public void close() throws IOException {
-      closed = true;
-      availableRecordsQueue.poll(); // drain unread batch, this unblocks consumer thread.
-      consumer.wakeup();
-      consumerPollThread.shutdown();
-      offsetFetcherThread.shutdown();
-      Closeables.close(offsetConsumer, true);
-      Closeables.close(consumer, true);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaRecord.java
----------------------------------------------------------------------
diff --git a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaRecord.java b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaRecord.java
deleted file mode 100644
index 584b1b6..0000000
--- a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaRecord.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.contrib.kafka;
-
-import com.google.cloud.dataflow.sdk.values.KV;
-
-import java.io.Serializable;
-
-/**
- * KafkaRecord contains key and value of the record as well as metadata for the record (topic name,
- * partition id, and offset).
- */
-public class KafkaRecord<K, V> implements Serializable {
-
-  private final String topic;
-  private final int partition;
-  private final long offset;
-  private final KV<K, V> kv;
-
-  public KafkaRecord(
-      String topic,
-      int partition,
-      long offset,
-      K key,
-      V value) {
-    this(topic, partition, offset, KV.of(key, value));
-  }
-
-  public KafkaRecord(
-      String topic,
-      int partition,
-      long offset,
-      KV<K, V> kv) {
-
-    this.topic = topic;
-    this.partition = partition;
-    this.offset = offset;
-    this.kv = kv;
-  }
-
-  public String getTopic() {
-    return topic;
-  }
-
-  public int getPartition() {
-    return partition;
-  }
-
-  public long getOffset() {
-    return offset;
-  }
-
-  public KV<K, V> getKV() {
-    return kv;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj instanceof KafkaRecord) {
-      @SuppressWarnings("unchecked")
-      KafkaRecord<Object, Object> other = (KafkaRecord<Object, Object>) obj;
-      return topic.equals(other.topic)
-          && partition == other.partition
-          && offset == other.offset
-          && kv.equals(other.kv);
-    } else {
-      return false;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaRecordCoder.java
----------------------------------------------------------------------
diff --git a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaRecordCoder.java b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaRecordCoder.java
deleted file mode 100644
index d9af1b5..0000000
--- a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaRecordCoder.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.contrib.kafka;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.coders.StandardCoder;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
-import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
-import com.google.cloud.dataflow.sdk.util.PropertyNames;
-import com.google.cloud.dataflow.sdk.values.KV;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-
-/**
- * {@link Coder} for {@link KafkaRecord}.
- */
-public class KafkaRecordCoder<K, V> extends StandardCoder<KafkaRecord<K, V>> {
-
-  private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
-  private static final VarLongCoder longCoder = VarLongCoder.of();
-  private static final VarIntCoder intCoder = VarIntCoder.of();
-
-  private final KvCoder<K, V> kvCoder;
-
-  @JsonCreator
-  public static KafkaRecordCoder<?, ?> of(@JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-                                          List<Coder<?>> components) {
-    KvCoder<?, ?> kvCoder = KvCoder.of(components);
-    return of(kvCoder.getKeyCoder(), kvCoder.getValueCoder());
-  }
-
-  public static <K, V> KafkaRecordCoder<K, V> of(Coder<K> keyCoder, Coder<V> valueCoder) {
-    return new KafkaRecordCoder<K, V>(keyCoder, valueCoder);
-  }
-
-  public KafkaRecordCoder(Coder<K> keyCoder, Coder<V> valueCoder) {
-    this.kvCoder = KvCoder.of(keyCoder, valueCoder);
-  }
-
-  @Override
-  public void encode(KafkaRecord<K, V> value, OutputStream outStream, Context context)
-                         throws CoderException, IOException {
-    Context nested = context.nested();
-    stringCoder.encode(value.getTopic(), outStream, nested);
-    intCoder.encode(value.getPartition(), outStream, nested);
-    longCoder.encode(value.getOffset(), outStream, nested);
-    kvCoder.encode(value.getKV(), outStream, nested);
-  }
-
-  @Override
-  public KafkaRecord<K, V> decode(InputStream inStream, Context context)
-                                      throws CoderException, IOException {
-    Context nested = context.nested();
-    return new KafkaRecord<K, V>(
-        stringCoder.decode(inStream, nested),
-        intCoder.decode(inStream, nested),
-        longCoder.decode(inStream, nested),
-        kvCoder.decode(inStream, nested));
-  }
-
-  @Override
-  public List<? extends Coder<?>> getCoderArguments() {
-    return kvCoder.getCoderArguments();
-  }
-
-  @Override
-  public void verifyDeterministic() throws NonDeterministicException {
-    kvCoder.verifyDeterministic();
-  }
-
-  @Override
-  public boolean isRegisterByteSizeObserverCheap(KafkaRecord<K, V> value, Context context) {
-    return kvCoder.isRegisterByteSizeObserverCheap(value.getKV(), context);
-    //TODO : do we have to implement getEncodedSize()?
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public Object structuralValue(KafkaRecord<K, V> value) throws Exception {
-    if (consistentWithEquals()) {
-      return value;
-    } else {
-      return new KafkaRecord<Object, Object>(
-          value.getTopic(),
-          value.getPartition(),
-          value.getOffset(),
-          (KV<Object, Object>) kvCoder.structuralValue(value.getKV()));
-    }
-  }
-
-  @Override
-  public boolean consistentWithEquals() {
-    return kvCoder.consistentWithEquals();
-  }
-}


[5/5] incubator-beam git commit: KafkaIO: unbounded source for reading from Apache Kafka

Posted by dh...@apache.org.
KafkaIO: unbounded source for reading from Apache Kafka


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7b175dff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7b175dff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7b175dff

Branch: refs/heads/master
Commit: 7b175dff8303b24cb92702db5b5a1a4691435bb1
Parents: 4c01c7d
Author: Raghu Angadi <ra...@google.com>
Authored: Tue Feb 16 14:37:56 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Apr 21 17:34:33 2016 -0700

----------------------------------------------------------------------
 contrib/examples/kafka/pom.xml                  |  189 ++++
 .../kafka/examples/TopHashtagsExample.java      |  265 +++++
 contrib/kafka/pom.xml                           |  176 +++
 .../contrib/kafka/KafkaCheckpointMark.java      |   76 ++
 .../cloud/dataflow/contrib/kafka/KafkaIO.java   | 1053 ++++++++++++++++++
 .../dataflow/contrib/kafka/KafkaRecord.java     |   84 ++
 .../contrib/kafka/KafkaRecordCoder.java         |  118 ++
 .../dataflow/contrib/kafka/KafkaIOTest.java     |  378 +++++++
 8 files changed, 2339 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7b175dff/contrib/examples/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/examples/kafka/pom.xml b/contrib/examples/kafka/pom.xml
new file mode 100644
index 0000000..3355020
--- /dev/null
+++ b/contrib/examples/kafka/pom.xml
@@ -0,0 +1,189 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+  ~ Copyright (C) 2015 Google Inc.
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License"); you may not
+  ~ use this file except in compliance with the License. You may obtain a copy of
+  ~ the License at
+  ~
+  ~ http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+  ~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+  ~ License for the specific language governing permissions and limitations under
+  ~ the License.
+  ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>com.google.cloud.dataflow</groupId>
+  <artifactId>google-cloud-dataflow-java-contrib-kafka-examples</artifactId>
+  <name>Google Cloud Dataflow Kafka Examples</name>
+  <description>Examples apps using Kafka Source in Google Cloud Dataflow</description>
+  <version>0.0.1-SNAPSHOT</version>
+  <packaging>jar</packaging>
+
+  <licenses>
+    <license>
+      <name>Apache License, Version 2.0</name>
+      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+      <distribution>repo</distribution>
+    </license>
+  </licenses>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+  </properties>
+
+  <build>
+    <plugins>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.2</version>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <version>2.12</version>
+        <dependencies>
+          <dependency>
+            <groupId>com.puppycrawl.tools</groupId>
+            <artifactId>checkstyle</artifactId>
+            <version>6.6</version>
+          </dependency>
+        </dependencies>
+        <configuration>
+          <configLocation>../../../checkstyle.xml</configLocation>
+          <consoleOutput>true</consoleOutput>
+          <failOnViolation>true</failOnViolation>
+          <includeTestSourceDirectory>true</includeTestSourceDirectory>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- Source plugin for generating source and test-source JARs. -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+        <version>2.4</version>
+        <executions>
+          <execution>
+            <id>attach-sources</id>
+            <phase>compile</phase>
+            <goals>
+              <goal>jar</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>attach-test-sources</id>
+            <phase>test-compile</phase>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-javadoc-plugin</artifactId>
+        <version>2.10.3</version>
+        <configuration>
+          <windowtitle>Google Cloud Dataflow Kafka Contrib</windowtitle>
+          <doctitle>Google Cloud Dataflow Kafka Contrib</doctitle>
+
+          <subpackages>com.google.cloud.dataflow.contrib.kafka</subpackages>
+          <use>false</use>
+          <bottom><![CDATA[<br>]]></bottom>
+
+          <offlineLinks>
+            <offlineLink>
+              <url>https://cloud.google.com/dataflow/java-sdk/JavaDoc/</url>
+              <location>${basedir}/../../javadoc/dataflow-sdk-docs</location>
+            </offlineLink>
+            <offlineLink>
+              <url>http://docs.guava-libraries.googlecode.com/git-history/release18/javadoc/</url>
+              <location>${basedir}/../../javadoc/guava-docs</location>
+            </offlineLink>
+          </offlineLinks>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>jar</goal>
+            </goals>
+            <phase>package</phase>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <version>2.3</version>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <finalName>${project.artifactId}-bundled-${project.version}</finalName>
+              <artifactSet>
+                <includes>
+                  <include>*:*</include>
+                </includes>
+              </artifactSet>
+              <filters>
+                <filter>
+                  <artifact>*:*</artifact>
+                  <excludes>
+                    <exclude>META-INF/*.SF</exclude>
+                    <exclude>META-INF/*.DSA</exclude>
+                    <exclude>META-INF/*.RSA</exclude>
+                  </excludes>
+                </filter>
+              </filters>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.google.cloud.dataflow</groupId>
+      <artifactId>google-cloud-dataflow-java-contrib-kafka</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>1.7.7</version>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <version>1.7.7</version>
+      <scope>runtime</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7b175dff/contrib/examples/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/examples/TopHashtagsExample.java
----------------------------------------------------------------------
diff --git a/contrib/examples/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/examples/TopHashtagsExample.java b/contrib/examples/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/examples/TopHashtagsExample.java
new file mode 100644
index 0000000..2a575f1
--- /dev/null
+++ b/contrib/examples/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/examples/TopHashtagsExample.java
@@ -0,0 +1,265 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.contrib.kafka.examples;
+
+import com.google.cloud.dataflow.contrib.kafka.KafkaIO;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.Validation.Required;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
+import com.google.cloud.dataflow.sdk.transforms.Top;
+import com.google.cloud.dataflow.sdk.transforms.Values;
+import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This Dataflow app show cases {@link KafkaIO}. The application reads from a Kafka topic
+ * containing <a href="https://dev.twitter.com/overview/api/tweets">JSON Tweets</a>, calculates top
+ * hashtags in 10 minute window. The results are written back to a Kafka topic.
+ *
+ * <pre>{@code
+ * Usage:
+ *   $ java -cp jar_with_dependencies.jar                                           \
+ *          com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample     \
+ *          --project=GCP_PROJECT                                                   \
+ *          --stagingLocation=GS_STAGING_DIRECTORY                                  \
+ *          --runner=BlockingDataflowPipelineRunner                                 \
+ *          --bootstrapServers="kafka_server_1:9092"                                \
+ *          --topics="sample_tweets_json"                                           \
+ *          --outputTopic="top_hashtags"
+ * }</pre>
+ */
+public class TopHashtagsExample {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TopHashtagsExample.class);
+
+  /**
+   * Options for the app.
+   */
+  public static interface Options extends PipelineOptions {
+    @Description("Sliding window length in minutes")
+    @Default.Integer(10)
+    Integer getSlidingWindowLengthMinutes();
+    void setSlidingWindowLengthMinutes(Integer value);
+
+    @Description("Trigger window interval in minutes")
+    @Default.Integer(1)
+    Integer getSlidingWindowIntervalMinutes();
+    void setSlidingWindowIntervalMinutes(Integer value);
+
+    @Description("Bootstrap Server(s) for Kafka")
+    @Required
+    String getBootstrapServers();
+    void setBootstrapServers(String servers);
+
+    @Description("One or more comma separated topics to read from")
+    @Required
+    List<String> getTopics();
+    void setTopics(List<String> topics);
+
+    @Description("Number of Top Hashtags to track")
+    @Default.Integer(10)
+    Integer getNumTopHashtags();
+    void setNumTopHashtags(Integer count);
+
+    @Description("Kafka topic name for writing results")
+    @Required
+    String getOutputTopic();
+    void setOutputTopic(String topic);
+  }
+
+  public static void main(String args[]) {
+
+    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+
+    Pipeline pipeline = Pipeline.create(options);
+
+    pipeline
+      .apply(KafkaIO.read()
+          .withBootstrapServers(options.getBootstrapServers())
+          .withTopics(options.getTopics())
+          .withValueCoder(StringUtf8Coder.of())
+          .withTimestampFn(TWEET_TIMESTAMP_OR_NOW)
+          .withoutMetadata())
+      .apply(Values.<String>create())
+      .apply(ParDo.of(new ExtractHashtagsFn()))
+      .apply(Window.<String>into(SlidingWindows
+          .of(Duration.standardMinutes(options.getSlidingWindowLengthMinutes()))
+          .every(Duration.standardMinutes(options.getSlidingWindowIntervalMinutes()))))
+      .apply(Count.<String>perElement())
+      .apply(Top.of(options.getNumTopHashtags(), new KV.OrderByValue<String, Long>())
+                .withoutDefaults())
+      .apply(ParDo.of(new OutputFormatter()))
+      .apply(ParDo.of(new KafkaWriter(options)));
+
+    pipeline.run();
+  }
+
+  // The rest of the file implements DoFns to do the following:
+  //    - extract hashtags
+  //    - format results in json
+  //    - write the results back to Kafka (useful for fetching monitoring the end result).
+
+  private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+
+  /**
+   * Emit hashtags in the tweet (if any).
+   */
+  private static class ExtractHashtagsFn extends DoFn<String, String> {
+
+    @Override
+    public void processElement(ProcessContext ctx) throws Exception {
+      for (JsonNode hashtag : JSON_MAPPER.readTree(ctx.element())
+                                         .with("entities")
+                                         .withArray("hashtags")) {
+        ctx.output(hashtag.get("text").asText());
+      }
+    }
+  }
+
+  // extract timestamp from "timestamp_ms" field.
+  private static final SerializableFunction<KV<byte[], String>, Instant> TWEET_TIMESTAMP_OR_NOW =
+      new SerializableFunction<KV<byte[], String>, Instant>() {
+        @Override
+        public Instant apply(KV<byte[], String> kv) {
+          try {
+            long tsMillis = JSON_MAPPER.readTree(kv.getValue()).path("timestamp_ms").asLong();
+            return tsMillis == 0 ? Instant.now() : new Instant(tsMillis);
+          } catch (Exception e) {
+            throw Throwables.propagate(e);
+          }
+        }
+      };
+
+  // return json string containing top hashtags and window information time
+  private static class OutputFormatter extends DoFn<List<KV<String, Long>>, String>
+      implements DoFn.RequiresWindowAccess {
+
+    private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormat
+        .forPattern("yyyy-MM-dd HH:mm:ss")
+        .withZoneUTC();
+    private static final ObjectWriter JSON_WRITER = new ObjectMapper()
+        .writerWithType(OutputJson.class);
+
+    static class OutputJson {
+      @JsonProperty String windowStart;
+      @JsonProperty String windowEnd;
+      @JsonProperty String generatedAt;
+      @JsonProperty List<HashtagInfo> topHashtags;
+
+      OutputJson(String windowStart, String windowEnd,
+                 String generatedAt, List<HashtagInfo> topHashtags) {
+        this.windowStart = windowStart;
+        this.windowEnd = windowEnd;
+        this.generatedAt = generatedAt;
+        this.topHashtags = topHashtags;
+      }
+    }
+
+    static class HashtagInfo {
+      @JsonProperty final String hashtag;
+      @JsonProperty final long count;
+      HashtagInfo(String hashtag, long count) {
+        this.hashtag = hashtag;
+        this.count = count;
+      }
+    }
+
+    @Override
+    public void processElement(ProcessContext ctx) throws Exception {
+
+      List<HashtagInfo> topHashtags = new ArrayList<>(ctx.element().size());
+
+      for (KV<String, Long> tag : ctx.element()) {
+        topHashtags.add(new HashtagInfo(tag.getKey(), tag.getValue()));
+      }
+
+      IntervalWindow window = (IntervalWindow) ctx.window();
+
+      String json = JSON_WRITER.writeValueAsString(new OutputJson(
+          DATE_FORMATTER.print(window.start()),
+          DATE_FORMATTER.print(window.end()),
+          DATE_FORMATTER.print(Instant.now()),
+          topHashtags));
+
+      ctx.output(json);
+    }
+  }
+
+  private static class KafkaWriter extends DoFn<String, Void> {
+
+    private final String topic;
+    private final Map<String, Object> config;
+    private static transient KafkaProducer<String, String> producer = null;
+
+    public KafkaWriter(Options options) {
+      this.topic = options.getOutputTopic();
+      this.config = ImmutableMap.<String, Object>of(
+          "bootstrap.servers", options.getBootstrapServers(),
+          "key.serializer",    StringSerializer.class.getName(),
+          "value.serializer",  StringSerializer.class.getName());
+    }
+
+    @Override
+    public void startBundle(Context c) throws Exception {
+      if (producer == null) { // in Beam, startBundle might be called multiple times.
+        producer = new KafkaProducer<String, String>(config);
+      }
+    }
+
+    @Override
+    public void finishBundle(Context c) throws Exception {
+      producer.flush();
+    }
+
+    @Override
+    public void processElement(ProcessContext ctx) throws Exception {
+      LOG.trace("Top Hashtags : {}", ctx.element());
+      producer.send(new ProducerRecord<String, String>(topic, ctx.element()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7b175dff/contrib/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/kafka/pom.xml b/contrib/kafka/pom.xml
new file mode 100644
index 0000000..7fe8165
--- /dev/null
+++ b/contrib/kafka/pom.xml
@@ -0,0 +1,176 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+  ~ Copyright (C) 2015 Google Inc.
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License"); you may not
+  ~ use this file except in compliance with the License. You may obtain a copy of
+  ~ the License at
+  ~
+  ~ http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+  ~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+  ~ License for the specific language governing permissions and limitations under
+  ~ the License.
+  ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <groupId>com.google.cloud.dataflow</groupId>
+  <artifactId>google-cloud-dataflow-java-contrib-kafka</artifactId>
+  <name>Google Cloud Dataflow Kafka Connectors</name>
+  <description>Dataflow Library to read Kafka topics</description>
+  <version>0.0.1-SNAPSHOT</version>
+  <packaging>jar</packaging>
+
+  <licenses>
+    <license>
+      <name>Apache License, Version 2.0</name>
+      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+      <distribution>repo</distribution>
+    </license>
+  </licenses>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <google-cloud-dataflow-version>[1.2.0,2.0.0)</google-cloud-dataflow-version>
+    <hamcrest.version>1.3</hamcrest.version>
+    <junit.version>4.11</junit.version>
+    <slf4j.version>1.7.7</slf4j.version>
+  </properties>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.2</version>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <version>2.12</version>
+        <dependencies>
+          <dependency>
+            <groupId>com.puppycrawl.tools</groupId>
+            <artifactId>checkstyle</artifactId>
+            <version>6.6</version>
+          </dependency>
+        </dependencies>
+        <configuration>
+          <configLocation>../../checkstyle.xml</configLocation>
+          <consoleOutput>true</consoleOutput>
+          <failOnViolation>true</failOnViolation>
+          <includeTestSourceDirectory>true</includeTestSourceDirectory>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <!-- Source plugin for generating source and test-source JARs. -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+        <version>2.4</version>
+        <executions>
+          <execution>
+            <id>attach-sources</id>
+            <phase>compile</phase>
+            <goals>
+              <goal>jar</goal>
+            </goals>
+          </execution>
+          <execution>
+            <id>attach-test-sources</id>
+            <phase>test-compile</phase>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-javadoc-plugin</artifactId>
+        <version>2.10.3</version>
+        <configuration>
+          <windowtitle>Google Cloud Dataflow Kafka Contrib</windowtitle>
+          <doctitle>Google Cloud Dataflow Kafka Contrib</doctitle>
+
+          <subpackages>com.google.cloud.dataflow.contrib.kafka</subpackages>
+          <use>false</use>
+          <bottom><![CDATA[<br>]]></bottom>
+
+          <offlineLinks>
+            <offlineLink>
+              <url>https://cloud.google.com/dataflow/java-sdk/JavaDoc/</url>
+              <location>${basedir}/../../javadoc/dataflow-sdk-docs</location>
+            </offlineLink>
+            <offlineLink>
+              <url>http://docs.guava-libraries.googlecode.com/git-history/release18/javadoc/</url>
+              <location>${basedir}/../../javadoc/guava-docs</location>
+            </offlineLink>
+          </offlineLinks>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>jar</goal>
+            </goals>
+            <phase>package</phase>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.google.cloud.dataflow</groupId>
+      <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
+      <version>${google-cloud-dataflow-version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>[0.9,)</version>
+    </dependency>
+
+    <!-- test dependencies-->
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <version>${hamcrest.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <version>${slf4j.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7b175dff/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaCheckpointMark.java
----------------------------------------------------------------------
diff --git a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaCheckpointMark.java b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaCheckpointMark.java
new file mode 100644
index 0000000..9b33ee8
--- /dev/null
+++ b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaCheckpointMark.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.contrib.kafka;
+
+import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
+import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Checkpoint for an unbounded KafkaIO.Read. Consists of Kafka topic name, partition id,
+ * and the latest offset consumed so far.
+ */
+@DefaultCoder(SerializableCoder.class)
+public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark, Serializable {
+
+  private final List<PartitionMark> partitions;
+
+  public KafkaCheckpointMark(List<PartitionMark> partitions) {
+    this.partitions = partitions;
+  }
+
+  public List<PartitionMark> getPartitions() {
+    return partitions;
+  }
+
+  @Override
+  public void finalizeCheckpoint() throws IOException {
+    /* nothing to do */
+
+    // We might want to support committing offset in Kafka for better resume point when the job
+    // is restarted (checkpoint is not available for job restarts).
+  }
+
+  /**
+   * A tuple to hold topic, partition, and offset that comprise the checkpoint
+   * for a single partition.
+   */
+  public static class PartitionMark implements Serializable {
+    private final TopicPartition topicPartition;
+    private final long offset;
+
+    public PartitionMark(TopicPartition topicPartition, long offset) {
+      this.topicPartition = topicPartition;
+      this.offset = offset;
+    }
+
+    public TopicPartition getTopicPartition() {
+      return topicPartition;
+    }
+
+    public long getOffset() {
+      return offset;
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7b175dff/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java
new file mode 100644
index 0000000..ad254ee
--- /dev/null
+++ b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java
@@ -0,0 +1,1053 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.contrib.kafka;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.cloud.dataflow.contrib.kafka.KafkaCheckpointMark.PartitionMark;
+import com.google.cloud.dataflow.sdk.coders.ByteArrayCoder;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
+import com.google.cloud.dataflow.sdk.io.Read.Unbounded;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource.CheckpointMark;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
+import com.google.cloud.dataflow.sdk.util.ExposedByteArrayInputStream;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PBegin;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+/**
+ * An unbounded source for <a href="http://kafka.apache.org/">Kafka</a> topics. Kafka version 0.9
+ * and above are supported.
+ *
+ * <h3>Reading from Kafka topics</h3>
+ *
+ * <p>KafkaIO source returns unbounded collection of Kafka records as
+ * {@code PCollection<KafkaRecord<K, V>>}. A {@link KafkaRecord} includes basic
+ * metadata like topic-partition and offset, along with key and value associated with a Kafka
+ * record.
+ *
+ * <p>Although most applications consumer single topic, the source can be configured to consume
+ * multiple topics or even a specific set of {@link TopicPartition}s.
+ *
+ * <p> To configure a Kafka source, you must specify at the minimum Kafka <tt>bootstrapServers</tt>
+ * and one or more topics to consume. The following example illustrates various options for
+ * configuring the source :
+ *
+ * <pre>{@code
+ *
+ *  pipeline
+ *    .apply(KafkaIO.read()
+ *       .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *       .withTopics(ImmutableList.of("topic_a", "topic_b"))
+ *       // above two are required configuration. returns PCollection<KafkaRecord<byte[], byte[]>
+ *
+ *       // rest of the settings are optional :
+ *
+ *       // set a Coder for Key and Value (note the change to return type)
+ *       .withKeyCoder(BigEndianLongCoder.of()) // PCollection<KafkaRecord<Long, byte[]>
+ *       .withValueCoder(StringUtf8Coder.of())  // PCollection<KafkaRecord<Long, String>
+ *
+ *       // you can further customize KafkaConsumer used to read the records by adding more
+ *       // settings for ConsumerConfig. e.g :
+ *       .updateConsumerProperties(ImmutableMap.of("receive.buffer.bytes", 1024 * 1024))
+ *
+ *       // custom function for calculating record timestamp (default is processing time)
+ *       .withTimestampFn(new MyTypestampFunction())
+ *
+ *       // custom function for watermark (default is record timestamp)
+ *       .withWatermarkFn(new MyWatermarkFunction())
+ *
+ *       // finally, if you don't need Kafka metadata, you can drop it
+ *       .withoutMetadata() // PCollection<KV<Long, String>>
+ *    )
+ *    .apply(Values.<String>create()) // PCollection<String>
+ *     ...
+ * }</pre>
+ *
+ * <h3>Partition Assignment and Checkpointing</h3>
+ * The Kafka partitions are evenly distributed among splits (workers).
+ * Dataflow checkpointing is fully supported and
+ * each split can resume from previous checkpoint. See
+ * {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for more details on
+ * splits and checkpoint support.
+ *
+ * <p>When the pipeline starts for the first time without any checkpoint, the source starts
+ * consuming from the <em>latest</em> offsets. You can override this behavior to consume from the
+ * beginning by setting appropriate appropriate properties in {@link ConsumerConfig}, through
+ * {@link Read#updateConsumerProperties(Map)}.
+ *
+ * <h3>Advanced Kafka Configuration</h3>
+ * KafakIO allows setting most of the properties in {@link ConsumerConfig}. E.g. if you would like
+ * to enable offset <em>auto commit</em> (for external monitoring or other purposes), you can set
+ * <tt>"group.id"</tt>, <tt>"enable.auto.commit"</tt>, etc.
+ */
+public class KafkaIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);
+
+  private static class NowTimestampFn<T> implements SerializableFunction<T, Instant> {
+    @Override
+    public Instant apply(T input) {
+      return Instant.now();
+    }
+  }
+
+
+  /**
+   * Creates and uninitialized {@link Read} {@link PTransform}. Before use, basic Kafka
+   * configuration should set with {@link Read#withBootstrapServers(String)} and
+   * {@link Read#withTopics(List)}. Other optional settings include key and value coders,
+   * custom timestamp and watermark functions. Additionally, {@link Read#withMetadata()} provides
+   * access to Kafka metadata for each record (topic name, partition, offset).
+   */
+  public static Read<byte[], byte[]> read() {
+    return new Read<byte[], byte[]>(
+        new ArrayList<String>(),
+        new ArrayList<TopicPartition>(),
+        ByteArrayCoder.of(),
+        ByteArrayCoder.of(),
+        Read.KAFKA_9_CONSUMER_FACTORY_FN,
+        Read.DEFAULT_CONSUMER_PROPERTIES,
+        Long.MAX_VALUE,
+        null);
+  }
+
+  /**
+   * A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for more
+   * information on usage and configuration.
+   */
+  public static class Read<K, V> extends TypedRead<K, V> {
+
+    /**
+     * Returns a new {@link Read} with Kafka consumer pointing to {@code bootstrapServers}.
+     */
+    public Read<K, V> withBootstrapServers(String bootstrapServers) {
+      return updateConsumerProperties(
+          ImmutableMap.<String, Object>of(
+              ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers));
+    }
+
+    /**
+     * Returns a new {@link Read} that reads from the topics. All the partitions are from each
+     * of the topics is read.
+     * See {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for description
+     * of how the partitions are distributed among the splits.
+     */
+    public Read<K, V> withTopics(List<String> topics) {
+      checkState(topicPartitions.isEmpty(), "Only topics or topicPartitions can be set, not both");
+
+      return new Read<K, V>(ImmutableList.copyOf(topics), topicPartitions, keyCoder, valueCoder,
+          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+    }
+
+    /**
+     * Returns a new {@link Read} that reads from the partitions. This allows reading only a subset
+     * of partitions for one or more topics when (if ever) needed.
+     * See {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for description
+     * of how the partitions are distributed among the splits.
+     */
+    public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
+      checkState(topics.isEmpty(), "Only topics or topicPartitions can be set, not both");
+
+      return new Read<K, V>(topics, ImmutableList.copyOf(topicPartitions), keyCoder, valueCoder,
+          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+    }
+
+    /**
+     * Returns a new {@link Read} with {@link Coder} for key bytes.
+     */
+    public <KeyT> Read<KeyT, V> withKeyCoder(Coder<KeyT> keyCoder) {
+      return new Read<KeyT, V>(topics, topicPartitions, keyCoder, valueCoder,
+          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+    }
+
+    /**
+     * Returns a new {@link Read} with {@link Coder} for value bytes.
+     */
+    public <ValueT> Read<K, ValueT> withValueCoder(Coder<ValueT> valueCoder) {
+      return new Read<K, ValueT>(topics, topicPartitions, keyCoder, valueCoder,
+          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+    }
+
+    /**
+     * A factory to create Kafka {@link Consumer} from consumer configuration.
+     * This is useful for supporting another version of Kafka consumer.
+     * Default is {@link KafkaConsumer}.
+     */
+    public Read<K, V> withConsumerFactoryFn(
+        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn) {
+      return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder,
+          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+    }
+
+    /**
+     * Update consumer configuration with new properties.
+     */
+    public Read<K, V> updateConsumerProperties(Map<String, Object> configUpdates) {
+      for (String key : configUpdates.keySet()) {
+        checkArgument(!IGNORED_CONSUMER_PROPERTIES.containsKey(key),
+            "No need to configure '%s'. %s", key, IGNORED_CONSUMER_PROPERTIES.get(key));
+      }
+
+      Map<String, Object> config = new HashMap<>(consumerConfig);
+      config.putAll(configUpdates);
+
+      return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder,
+          consumerFactoryFn, config, maxNumRecords, maxReadTime);
+    }
+
+    /**
+     * Similar to {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded#withMaxNumRecords(long)}.
+     * Mainly used for tests and demo applications.
+     */
+    public Read<K, V> withMaxNumRecords(long maxNumRecords) {
+      return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder,
+          consumerFactoryFn, consumerConfig, maxNumRecords, null);
+    }
+
+    /**
+     * Similar to
+     * {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded#withMaxReadTime(Duration)}.
+     * Mainly used for tests and demo
+     * applications.
+     */
+    public Read<K, V> withMaxReadTime(Duration maxReadTime) {
+      return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder,
+          consumerFactoryFn, consumerConfig, Long.MAX_VALUE, maxReadTime);
+    }
+
+    ///////////////////////////////////////////////////////////////////////////////////////
+
+    private Read(
+        List<String> topics,
+        List<TopicPartition> topicPartitions,
+        Coder<K> keyCoder,
+        Coder<V> valueCoder,
+        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn,
+        Map<String, Object> consumerConfig,
+        long maxNumRecords,
+        @Nullable Duration maxReadTime) {
+
+      super(topics, topicPartitions, keyCoder, valueCoder, null, null,
+          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+    }
+
+    /**
+     * A set of properties that are not required or don't make sense for our consumer.
+     */
+    private static final Map<String, String> IGNORED_CONSUMER_PROPERTIES = ImmutableMap.of(
+        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyDecoderFn instead",
+        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueDecoderFn instead"
+        // "group.id", "enable.auto.commit", "auto.commit.interval.ms" :
+        //     lets allow these, applications can have better resume point for restarts.
+        );
+
+    // set config defaults
+    private static final Map<String, Object> DEFAULT_CONSUMER_PROPERTIES =
+        ImmutableMap.<String, Object>of(
+            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(),
+            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(),
+
+            // Use large receive buffer. Once KAFKA-3135 is fixed, this _may_ not be required.
+            // with default value of of 32K, It takes multiple seconds between successful polls.
+            // All the consumer work is done inside poll(), with smaller send buffer size, it
+            // takes many polls before a 1MB chunk from the server is fully read. In my testing
+            // about half of the time select() inside kafka consumer waited for 20-30ms, though
+            // the server had lots of data in tcp send buffers on its side. Compared to default,
+            // this setting increased throughput increased by many fold (3-4x).
+            ConsumerConfig.RECEIVE_BUFFER_CONFIG, 512 * 1024,
+
+            // default to latest offset when we are not resuming.
+            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest",
+            // disable auto commit of offsets. we don't require group_id. could be enabled by user.
+            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+    // default Kafka 0.9 Consumer supplier.
+    private static final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
+      KAFKA_9_CONSUMER_FACTORY_FN =
+        new SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>() {
+          public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
+            return new KafkaConsumer<>(config);
+          }
+        };
+  }
+
+  /**
+   * A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for more
+   * information on usage and configuration.
+   */
+  public static class TypedRead<K, V>
+                      extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
+
+    /**
+     * A function to assign a timestamp to a record. Default is processing timestamp.
+     */
+    public TypedRead<K, V> withTimestampFn2(
+        SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) {
+      checkNotNull(timestampFn);
+      return new TypedRead<K, V>(topics, topicPartitions, keyCoder, valueCoder,
+          timestampFn, watermarkFn, consumerFactoryFn, consumerConfig,
+          maxNumRecords, maxReadTime);
+    }
+
+    /**
+     * A function to calculate watermark after a record. Default is last record timestamp
+     * @see #withTimestampFn(SerializableFunction)
+     */
+    public TypedRead<K, V> withWatermarkFn2(
+        SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn) {
+      checkNotNull(watermarkFn);
+      return new TypedRead<K, V>(topics, topicPartitions, keyCoder, valueCoder,
+          timestampFn, watermarkFn, consumerFactoryFn, consumerConfig,
+          maxNumRecords, maxReadTime);
+    }
+
+    /**
+     * A function to assign a timestamp to a record. Default is processing timestamp.
+     */
+    public TypedRead<K, V> withTimestampFn(SerializableFunction<KV<K, V>, Instant> timestampFn) {
+      checkNotNull(timestampFn);
+      return withTimestampFn2(unwrapKafkaAndThen(timestampFn));
+    }
+
+    /**
+     * A function to calculate watermark after a record. Default is last record timestamp
+     * @see #withTimestampFn(SerializableFunction)
+     */
+    public TypedRead<K, V> withWatermarkFn(SerializableFunction<KV<K, V>, Instant> watermarkFn) {
+      checkNotNull(watermarkFn);
+      return withWatermarkFn2(unwrapKafkaAndThen(watermarkFn));
+    }
+
+    /**
+     * Returns a {@link PTransform} for PCollection of {@link KV}, dropping Kafka metatdata.
+     */
+    public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() {
+      return new TypedWithoutMetadata<K, V>(this);
+    }
+
+    @Override
+    public PCollection<KafkaRecord<K, V>> apply(PBegin input) {
+     // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set.
+      Unbounded<KafkaRecord<K, V>> unbounded =
+          com.google.cloud.dataflow.sdk.io.Read.from(makeSource());
+
+      PTransform<PInput, PCollection<KafkaRecord<K, V>>> transform = unbounded;
+
+      if (maxNumRecords < Long.MAX_VALUE) {
+        transform = unbounded.withMaxNumRecords(maxNumRecords);
+      } else if (maxReadTime != null) {
+        transform = unbounded.withMaxReadTime(maxReadTime);
+      }
+
+      return input.getPipeline().apply(transform);
+    }
+
+    ////////////////////////////////////////////////////////////////////////////////////////
+
+    protected final List<String> topics;
+    protected final List<TopicPartition> topicPartitions; // mutually exclusive with topics
+    protected final Coder<K> keyCoder;
+    protected final Coder<V> valueCoder;
+    @Nullable protected final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn;
+    @Nullable protected final SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn;
+    protected final
+      SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn;
+    protected final Map<String, Object> consumerConfig;
+    protected final long maxNumRecords; // bounded read, mainly for testing
+    protected final Duration maxReadTime; // bounded read, mainly for testing
+
+    private TypedRead(List<String> topics,
+        List<TopicPartition> topicPartitions,
+        Coder<K> keyCoder,
+        Coder<V> valueCoder,
+        @Nullable SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn,
+        @Nullable SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn,
+        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn,
+        Map<String, Object> consumerConfig,
+        long maxNumRecords,
+        @Nullable Duration maxReadTime) {
+      super("KafkaIO.Read");
+
+      this.topics = topics;
+      this.topicPartitions = topicPartitions;
+      this.keyCoder = keyCoder;
+      this.valueCoder = valueCoder;
+      this.timestampFn = timestampFn;
+      this.watermarkFn = watermarkFn;
+      this.consumerFactoryFn = consumerFactoryFn;
+      this.consumerConfig = consumerConfig;
+      this.maxNumRecords = maxNumRecords;
+      this.maxReadTime = maxReadTime;
+    }
+
+    /**
+     * Creates an {@link UnboundedSource<KafkaRecord<K, V>, ?>} with the configuration in
+     * {@link TypedRead}. Primary use case is unit tests, should not be used in an
+     * application.
+     */
+    @VisibleForTesting
+    UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> makeSource() {
+      return new UnboundedKafkaSource<K, V>(
+          -1,
+          topics,
+          topicPartitions,
+          keyCoder,
+          valueCoder,
+          timestampFn,
+          Optional.fromNullable(watermarkFn),
+          consumerFactoryFn,
+          consumerConfig);
+    }
+
+    // utility method to convert KafkRecord<K, V> to user KV<K, V> before applying user functions
+    private static <KeyT, ValueT, OutT> SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT>
+      unwrapKafkaAndThen(final SerializableFunction<KV<KeyT, ValueT>, OutT> fn) {
+        return new SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT>() {
+          public OutT apply(KafkaRecord<KeyT, ValueT> record) {
+            return fn.apply(record.getKV());
+          }
+        };
+      }
+  }
+
+  /**
+   * A {@link PTransform} to read from Kafka topics. Similar to {@link KafkaIO.Typed}, but removes
+   * Kafka metatdata and returns a {@link PCollection} of {@link KV}.
+   * See {@link KafkaIO} for more information on usage and configuration of reader.
+   */
+  public static class TypedWithoutMetadata<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
+
+    private final TypedRead<K, V> typedRead;
+
+    TypedWithoutMetadata(TypedRead<K, V> read) {
+      super("KafkaIO.Read");
+      this.typedRead = read;
+    }
+
+    @Override
+    public PCollection<KV<K, V>> apply(PBegin begin) {
+      return typedRead
+          .apply(begin)
+          .apply("Remove Kafka Metadata",
+              ParDo.of(new DoFn<KafkaRecord<K, V>, KV<K, V>>() {
+                @Override
+                public void processElement(ProcessContext ctx) {
+                  ctx.output(ctx.element().getKV());
+                }
+              }));
+    }
+  }
+
+  /** Static class, prevent instantiation. */
+  private KafkaIO() {}
+
+  private static class UnboundedKafkaSource<K, V>
+      extends UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> {
+
+    private final int id; // split id, mainly for debugging
+    private final List<String> topics;
+    private final List<TopicPartition> assignedPartitions;
+    private final Coder<K> keyCoder;
+    private final Coder<V> valueCoder;
+    private final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn;
+    // would it be a good idea to pass currentTimestamp to watermarkFn?
+    private final Optional<SerializableFunction<KafkaRecord<K, V>, Instant>> watermarkFn;
+    private
+      SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn;
+    private final Map<String, Object> consumerConfig;
+
+    public UnboundedKafkaSource(
+        int id,
+        List<String> topics,
+        List<TopicPartition> assignedPartitions,
+        Coder<K> keyCoder,
+        Coder<V> valueCoder,
+        @Nullable SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn,
+        Optional<SerializableFunction<KafkaRecord<K, V>, Instant>> watermarkFn,
+        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn,
+        Map<String, Object> consumerConfig) {
+
+      this.id = id;
+      this.assignedPartitions = assignedPartitions;
+      this.topics = topics;
+      this.keyCoder = keyCoder;
+      this.valueCoder = valueCoder;
+      this.timestampFn =
+          (timestampFn == null ? new NowTimestampFn<KafkaRecord<K, V>>() : timestampFn);
+      this.watermarkFn = watermarkFn;
+      this.consumerFactoryFn = consumerFactoryFn;
+      this.consumerConfig = consumerConfig;
+    }
+
+    /**
+     * The partitions are evenly distributed among the splits. The number of splits returned is
+     * {@code min(desiredNumSplits, totalNumPartitions)}, though better not to depend on the exact
+     * count.
+     *
+     * <p> It is important to assign the partitions deterministically so that we can support
+     * resuming a split from last checkpoint. The Kafka partitions are sorted by
+     * {@code <topic, partition>} and then assigned to splits in round-robin order.
+     */
+    @Override
+    public List<UnboundedKafkaSource<K, V>> generateInitialSplits(
+        int desiredNumSplits, PipelineOptions options) throws Exception {
+
+      List<TopicPartition> partitions = new ArrayList<>(assignedPartitions);
+
+      // (a) fetch partitions for each topic
+      // (b) sort by <topic, partition>
+      // (c) round-robin assign the partitions to splits
+
+      if (partitions.isEmpty()) {
+        try (Consumer<?, ?> consumer = consumerFactoryFn.apply(consumerConfig)) {
+          for (String topic : topics) {
+            for (PartitionInfo p : consumer.partitionsFor(topic)) {
+              partitions.add(new TopicPartition(p.topic(), p.partition()));
+            }
+          }
+        }
+      }
+
+      Collections.sort(partitions, new Comparator<TopicPartition>() {
+        public int compare(TopicPartition tp1, TopicPartition tp2) {
+          return ComparisonChain
+              .start()
+              .compare(tp1.topic(), tp2.topic())
+              .compare(tp1.partition(), tp2.partition())
+              .result();
+        }
+      });
+
+      checkArgument(desiredNumSplits > 0);
+      checkState(partitions.size() > 0,
+          "Could not find any partitions. Please check Kafka configuration and topic names");
+
+      int numSplits = Math.min(desiredNumSplits, partitions.size());
+      List<List<TopicPartition>> assignments = new ArrayList<>(numSplits);
+
+      for (int i = 0; i < numSplits; i++) {
+        assignments.add(new ArrayList<TopicPartition>());
+      }
+      for (int i = 0; i < partitions.size(); i++) {
+        assignments.get(i % numSplits).add(partitions.get(i));
+      }
+
+      List<UnboundedKafkaSource<K, V>> result = new ArrayList<>(numSplits);
+
+      for (int i = 0; i < numSplits; i++) {
+        List<TopicPartition> assignedToSplit = assignments.get(i);
+
+        LOG.info("Partitions assigned to split {} (total {}): {}",
+            i, assignedToSplit.size(), Joiner.on(",").join(assignedToSplit));
+
+        result.add(new UnboundedKafkaSource<K, V>(
+            i,
+            this.topics,
+            assignedToSplit,
+            this.keyCoder,
+            this.valueCoder,
+            this.timestampFn,
+            this.watermarkFn,
+            this.consumerFactoryFn,
+            this.consumerConfig));
+      }
+
+      return result;
+    }
+
+    @Override
+    public UnboundedKafkaReader<K, V> createReader(PipelineOptions options,
+                                                   KafkaCheckpointMark checkpointMark) {
+      if (assignedPartitions.isEmpty()) {
+        LOG.warn("Looks like generateSplits() is not called. Generate single split.");
+        try {
+          return new UnboundedKafkaReader<K, V>(
+              generateInitialSplits(1, options).get(0), checkpointMark);
+        } catch (Exception e) {
+          Throwables.propagate(e);
+        }
+      }
+      return new UnboundedKafkaReader<K, V>(this, checkpointMark);
+    }
+
+    @Override
+    public Coder<KafkaCheckpointMark> getCheckpointMarkCoder() {
+      return SerializableCoder.of(KafkaCheckpointMark.class);
+    }
+
+    @Override
+    public boolean requiresDeduping() {
+      // Kafka records are ordered with in partitions. In addition checkpoint guarantees
+      // records are not consumed twice.
+      return false;
+    }
+
+    @Override
+    public void validate() {
+      checkNotNull(consumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
+          "Kafka bootstrap servers should be set");
+      checkArgument(topics.size() > 0 || assignedPartitions.size() > 0,
+          "Kafka topics or topic_partitions are required");
+    }
+
+    @Override
+    public Coder<KafkaRecord<K, V>> getDefaultOutputCoder() {
+      return KafkaRecordCoder.of(keyCoder, valueCoder);
+    }
+  }
+
+  private static class UnboundedKafkaReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> {
+
+    private final UnboundedKafkaSource<K, V> source;
+    private final String name;
+    private Consumer<byte[], byte[]> consumer;
+    private final List<PartitionState> partitionStates;
+    private KafkaRecord<K, V> curRecord;
+    private Instant curTimestamp;
+    private Iterator<PartitionState> curBatch = Collections.emptyIterator();
+
+    private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
+    // how long to wait for new records from kafka consumer inside advance()
+    private static final Duration NEW_RECORDS_POLL_TIMEOUT = Duration.millis(10);
+
+    // Use a separate thread to read Kafka messages. Kafka Consumer does all its work including
+    // network I/O inside poll(). Polling only inside #advance(), especially with a small timeout
+    // like 100 milliseconds does not work well. This along with large receive buffer for
+    // consumer achieved best throughput in tests (see `defaultConsumerProperties`).
+    private final ExecutorService consumerPollThread = Executors.newSingleThreadExecutor();
+    private final SynchronousQueue<ConsumerRecords<byte[], byte[]>> availableRecordsQueue =
+        new SynchronousQueue<>();
+    private volatile boolean closed = false;
+
+    // Backlog support :
+    // Kafka consumer does not have an API to fetch latest offset for topic. We need to seekToEnd()
+    // then look at position(). Use another consumer to do this so that the primary consumer does
+    // not need to be interrupted. The latest offsets are fetched periodically on another thread.
+    // This is still a hack. There could be unintended side effects, e.g. if user enabled offset
+    // auto commit in consumer config, this could interfere with the primary consumer (we will
+    // handle this particular problem). We might have to make this optional.
+    private Consumer<byte[], byte[]> offsetConsumer;
+    private final ScheduledExecutorService offsetFetcherThread =
+        Executors.newSingleThreadScheduledExecutor();
+    private static final int OFFSET_UPDATE_INTERVAL_SECONDS = 5;
+
+    /** watermark before any records have been read. */
+    private static Instant initialWatermark = new Instant(Long.MIN_VALUE);
+
+    public String toString() {
+      return name;
+    }
+
+    // maintains state of each assigned partition (buffered records, consumed offset, etc)
+    private static class PartitionState {
+      private final TopicPartition topicPartition;
+      private long consumedOffset;
+      private long latestOffset;
+      private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = Collections.emptyIterator();
+
+      // simple moving average for size of each record in bytes
+      private double avgRecordSize = 0;
+      private static final int movingAvgWindow = 1000; // very roughly avg of last 1000 elements
+
+
+      PartitionState(TopicPartition partition, long offset) {
+        this.topicPartition = partition;
+        this.consumedOffset = offset;
+        this.latestOffset = -1;
+      }
+
+      // update consumedOffset and avgRecordSize
+      void recordConsumed(long offset, int size) {
+        consumedOffset = offset;
+
+        // this is always updated from single thread. probably not worth making it an AtomicDouble
+        if (avgRecordSize <= 0) {
+          avgRecordSize = size;
+        } else {
+          // initially, first record heavily contributes to average.
+          avgRecordSize += ((size - avgRecordSize) / movingAvgWindow);
+        }
+      }
+
+      synchronized void setLatestOffset(long latestOffset) {
+        this.latestOffset = latestOffset;
+      }
+
+      synchronized long approxBacklogInBytes() {
+        // Note that is an an estimate of uncompressed backlog.
+        // Messages on Kafka might be comressed.
+        if (latestOffset < 0 || consumedOffset < 0) {
+          return UnboundedReader.BACKLOG_UNKNOWN;
+        }
+        if (latestOffset <= consumedOffset || consumedOffset < 0) {
+          return 0;
+        }
+        return (long) ((latestOffset - consumedOffset - 1) * avgRecordSize);
+      }
+    }
+
+    public UnboundedKafkaReader(
+        UnboundedKafkaSource<K, V> source,
+        @Nullable KafkaCheckpointMark checkpointMark) {
+
+      this.source = source;
+      this.name = "Reader-" + source.id;
+
+      partitionStates = ImmutableList.copyOf(Lists.transform(source.assignedPartitions,
+          new Function<TopicPartition, PartitionState>() {
+            public PartitionState apply(TopicPartition tp) {
+              return new PartitionState(tp, -1L);
+            }
+        }));
+
+      if (checkpointMark != null) {
+        // a) verify that assigned and check-pointed partitions match exactly
+        // b) set consumed offsets
+
+        checkState(checkpointMark.getPartitions().size() == source.assignedPartitions.size(),
+            "checkPointMark and assignedPartitions should match");
+        // we could consider allowing a mismatch, though it is not expected in current Dataflow
+
+        for (int i = 0; i < source.assignedPartitions.size(); i++) {
+          PartitionMark ckptMark = checkpointMark.getPartitions().get(i);
+          TopicPartition assigned = source.assignedPartitions.get(i);
+
+          checkState(ckptMark.getTopicPartition().equals(assigned),
+              "checkpointed partition %s and assigned partition %s don't match",
+              ckptMark.getTopicPartition(), assigned);
+
+          partitionStates.get(i).consumedOffset = ckptMark.getOffset();
+        }
+      }
+    }
+
+    private void consumerPollLoop() {
+      // Read in a loop and enqueue the batch of records, if any, to availableRecordsQueue
+      while (!closed) {
+        try {
+          ConsumerRecords<byte[], byte[]> records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
+          if (!records.isEmpty()) {
+            availableRecordsQueue.put(records); // blocks until dequeued.
+          }
+        } catch (InterruptedException e) {
+          LOG.warn("{}: consumer thread is interrupted", this, e); // not expected
+          break;
+        } catch (WakeupException e) {
+          break;
+        }
+      }
+
+      LOG.info("{}: Returning from consumer pool loop", this);
+    }
+
+    private void nextBatch() {
+      curBatch = Collections.emptyIterator();
+
+      ConsumerRecords<byte[], byte[]> records;
+      try {
+        records = availableRecordsQueue.poll(NEW_RECORDS_POLL_TIMEOUT.getMillis(),
+                                             TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        LOG.warn("{}: Unexpected", this, e);
+        return;
+      }
+
+      if (records == null) {
+        return;
+      }
+
+      List<PartitionState> nonEmpty = new LinkedList<>();
+
+      for (PartitionState p : partitionStates) {
+        p.recordIter = records.records(p.topicPartition).iterator();
+        if (p.recordIter.hasNext()) {
+          nonEmpty.add(p);
+        }
+      }
+
+      // cycle through the partitions in order to interleave records from each.
+      curBatch = Iterators.cycle(nonEmpty);
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      consumer = source.consumerFactoryFn.apply(source.consumerConfig);
+      consumer.assign(source.assignedPartitions);
+
+      // seek to consumedOffset + 1 if it is set
+      for (PartitionState p : partitionStates) {
+        if (p.consumedOffset >= 0) {
+          LOG.info("{}: resuming {} at {}", name, p.topicPartition, p.consumedOffset + 1);
+          consumer.seek(p.topicPartition, p.consumedOffset + 1);
+        } else {
+          LOG.info("{}: resuming {} at default offset", name, p.topicPartition);
+        }
+      }
+
+      // start consumer read loop.
+      // Note that consumer is not thread safe, should not accessed out side consumerPollLoop()
+      consumerPollThread.submit(
+          new Runnable() {
+            public void run() {
+              consumerPollLoop();
+            }
+          });
+
+      // offsetConsumer setup :
+
+      // override client_id and auto_commit so that it does not interfere with main consumer.
+      String offsetConsumerId = String.format("%s_offset_consumer_%d_%s", name,
+          (new Random()).nextInt(Integer.MAX_VALUE),
+          source.consumerConfig.getOrDefault(ConsumerConfig.CLIENT_ID_CONFIG, "none"));
+      Map<String, Object> offsetConsumerConfig = new HashMap<>(source.consumerConfig);
+      offsetConsumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, offsetConsumerId);
+      offsetConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+      offsetConsumer = source.consumerFactoryFn.apply(offsetConsumerConfig);
+      offsetConsumer.assign(source.assignedPartitions);
+
+      offsetFetcherThread.scheduleAtFixedRate(
+          new Runnable() {
+            public void run() {
+              updateLatestOffsets();
+            }
+          }, 0, OFFSET_UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS);
+
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      /* Read first record (if any). we need to loop here because :
+       *  - (a) some records initially need to be skipped if they are before consumedOffset
+       *  - (b) if curBatch is empty, we want to fetch next batch and then advance.
+       *  - (c) curBatch is an iterator of iterators. we interleave the records from each.
+       *        curBatch.next() might return an empty iterator.
+       */
+      while (true) {
+        if (curBatch.hasNext()) {
+          PartitionState pState = curBatch.next();
+
+          if (!pState.recordIter.hasNext()) { // -- (c)
+            pState.recordIter = Collections.emptyIterator(); // drop ref
+            curBatch.remove();
+            continue;
+          }
+
+          ConsumerRecord<byte[], byte[]> rawRecord = pState.recordIter.next();
+          long consumed = pState.consumedOffset;
+          long offset = rawRecord.offset();
+
+          if (consumed >= 0 && offset <= consumed) { // -- (a)
+            // this can happen when compression is enabled in Kafka (seems to be fixed in 0.10)
+            // should we check if the offset is way off from consumedOffset (say > 1M)?
+            LOG.warn("{}: ignoring already consumed offset {} for {}",
+                this, offset, pState.topicPartition);
+            continue;
+          }
+
+          // sanity check
+          if (consumed >= 0 && (offset - consumed) != 1) {
+            LOG.warn("{}: gap in offsets for {} after {}. {} records missing.",
+                this, pState.topicPartition, consumed, offset - consumed - 1);
+          }
+
+          if (curRecord == null) {
+            LOG.info("{}: first record offset {}", name, offset);
+          }
+
+          curRecord = null; // user coders below might throw.
+
+          // apply user coders. might want to allow skipping records that fail to decode.
+          // TODO: wrap exceptions from coders to make explicit to users
+          KafkaRecord<K, V> record = new KafkaRecord<K, V>(
+              rawRecord.topic(),
+              rawRecord.partition(),
+              rawRecord.offset(),
+              decode(rawRecord.key(), source.keyCoder),
+              decode(rawRecord.value(), source.valueCoder));
+
+          curTimestamp = source.timestampFn.apply(record);
+          curRecord = record;
+
+          int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length) +
+              (rawRecord.value() == null ? 0 : rawRecord.value().length);
+          pState.recordConsumed(offset, recordSize);
+          return true;
+
+        } else { // -- (b)
+          nextBatch();
+
+          if (!curBatch.hasNext()) {
+            return false;
+          }
+        }
+      }
+    }
+
+    private static byte[] nullBytes = new byte[0];
+    private static <T> T decode(byte[] bytes, Coder<T> coder) throws IOException {
+      // If 'bytes' is null, use byte[0]. It is common for key in Kakfa record to be null.
+      // This makes it impossible for user to distinguish between zero length byte and null.
+      // Alternately, we could have a ByteArrayCoder that handles nulls, and use that for default
+      // coder.
+      byte[] toDecode = bytes == null ? nullBytes : bytes;
+      return coder.decode(new ExposedByteArrayInputStream(toDecode), Coder.Context.OUTER);
+    }
+
+    // update latest offset for each partition.
+    // called from offsetFetcher thread
+    private void updateLatestOffsets() {
+      for (PartitionState p : partitionStates) {
+        try {
+          offsetConsumer.seekToEnd(p.topicPartition);
+          long offset = offsetConsumer.position(p.topicPartition);
+          p.setLatestOffset(offset);;
+        } catch (Exception e) {
+          LOG.warn("{}: exception while fetching latest offsets. ignored.",  this, e);
+          p.setLatestOffset(-1L); // reset
+        }
+
+        LOG.debug("{}: latest offset update for {} : {} (consumed offset {}, avg record size {})",
+            this, p.topicPartition, p.latestOffset, p.consumedOffset, p.avgRecordSize);
+      }
+
+      LOG.debug("{}:  backlog {}", this, getSplitBacklogBytes());
+    }
+
+    @Override
+    public Instant getWatermark() {
+      if (curRecord == null) {
+        LOG.warn("{}: getWatermark() : no records have been read yet.", name);
+        return initialWatermark;
+      }
+
+      return source.watermarkFn.isPresent() ?
+          source.watermarkFn.get().apply(curRecord) : curTimestamp;
+    }
+
+    @Override
+    public CheckpointMark getCheckpointMark() {
+      return new KafkaCheckpointMark(ImmutableList.copyOf(// avoid lazy (consumedOffset can change)
+          Lists.transform(partitionStates,
+              new Function<PartitionState, PartitionMark>() {
+                public PartitionMark apply(PartitionState p) {
+                  return new PartitionMark(p.topicPartition, p.consumedOffset);
+                }
+              }
+          )));
+    }
+
+    @Override
+    public UnboundedSource<KafkaRecord<K, V>, ?> getCurrentSource() {
+      return source;
+    }
+
+    @Override
+    public KafkaRecord<K, V> getCurrent() throws NoSuchElementException {
+      // should we delay updating consumed offset till this point? Mostly not required.
+      return curRecord;
+    }
+
+    @Override
+    public Instant getCurrentTimestamp() throws NoSuchElementException {
+      return curTimestamp;
+    }
+
+
+    @Override
+    public long getSplitBacklogBytes() {
+      long backlogBytes = 0;
+
+      for (PartitionState p : partitionStates) {
+        long pBacklog = p.approxBacklogInBytes();
+        if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) {
+          return UnboundedReader.BACKLOG_UNKNOWN;
+        }
+        backlogBytes += pBacklog;
+      }
+
+      return backlogBytes;
+    }
+
+    @Override
+    public void close() throws IOException {
+      closed = true;
+      availableRecordsQueue.poll(); // drain unread batch, this unblocks consumer thread.
+      consumer.wakeup();
+      consumerPollThread.shutdown();
+      offsetFetcherThread.shutdown();
+      Closeables.close(offsetConsumer, true);
+      Closeables.close(consumer, true);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7b175dff/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaRecord.java
----------------------------------------------------------------------
diff --git a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaRecord.java b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaRecord.java
new file mode 100644
index 0000000..584b1b6
--- /dev/null
+++ b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaRecord.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.contrib.kafka;
+
+import com.google.cloud.dataflow.sdk.values.KV;
+
+import java.io.Serializable;
+
+/**
+ * KafkaRecord contains key and value of the record as well as metadata for the record (topic name,
+ * partition id, and offset).
+ */
+public class KafkaRecord<K, V> implements Serializable {
+
+  private final String topic;
+  private final int partition;
+  private final long offset;
+  private final KV<K, V> kv;
+
+  public KafkaRecord(
+      String topic,
+      int partition,
+      long offset,
+      K key,
+      V value) {
+    this(topic, partition, offset, KV.of(key, value));
+  }
+
+  public KafkaRecord(
+      String topic,
+      int partition,
+      long offset,
+      KV<K, V> kv) {
+
+    this.topic = topic;
+    this.partition = partition;
+    this.offset = offset;
+    this.kv = kv;
+  }
+
+  public String getTopic() {
+    return topic;
+  }
+
+  public int getPartition() {
+    return partition;
+  }
+
+  public long getOffset() {
+    return offset;
+  }
+
+  public KV<K, V> getKV() {
+    return kv;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof KafkaRecord) {
+      @SuppressWarnings("unchecked")
+      KafkaRecord<Object, Object> other = (KafkaRecord<Object, Object>) obj;
+      return topic.equals(other.topic)
+          && partition == other.partition
+          && offset == other.offset
+          && kv.equals(other.kv);
+    } else {
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7b175dff/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaRecordCoder.java
----------------------------------------------------------------------
diff --git a/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaRecordCoder.java b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaRecordCoder.java
new file mode 100644
index 0000000..d9af1b5
--- /dev/null
+++ b/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaRecordCoder.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.contrib.kafka;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.CoderException;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.coders.StandardCoder;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
+import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
+import com.google.cloud.dataflow.sdk.util.PropertyNames;
+import com.google.cloud.dataflow.sdk.values.KV;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
+/**
+ * {@link Coder} for {@link KafkaRecord}.
+ */
+public class KafkaRecordCoder<K, V> extends StandardCoder<KafkaRecord<K, V>> {
+
+  private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
+  private static final VarLongCoder longCoder = VarLongCoder.of();
+  private static final VarIntCoder intCoder = VarIntCoder.of();
+
+  private final KvCoder<K, V> kvCoder;
+
+  @JsonCreator
+  public static KafkaRecordCoder<?, ?> of(@JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
+                                          List<Coder<?>> components) {
+    KvCoder<?, ?> kvCoder = KvCoder.of(components);
+    return of(kvCoder.getKeyCoder(), kvCoder.getValueCoder());
+  }
+
+  public static <K, V> KafkaRecordCoder<K, V> of(Coder<K> keyCoder, Coder<V> valueCoder) {
+    return new KafkaRecordCoder<K, V>(keyCoder, valueCoder);
+  }
+
+  public KafkaRecordCoder(Coder<K> keyCoder, Coder<V> valueCoder) {
+    this.kvCoder = KvCoder.of(keyCoder, valueCoder);
+  }
+
+  @Override
+  public void encode(KafkaRecord<K, V> value, OutputStream outStream, Context context)
+                         throws CoderException, IOException {
+    Context nested = context.nested();
+    stringCoder.encode(value.getTopic(), outStream, nested);
+    intCoder.encode(value.getPartition(), outStream, nested);
+    longCoder.encode(value.getOffset(), outStream, nested);
+    kvCoder.encode(value.getKV(), outStream, nested);
+  }
+
+  @Override
+  public KafkaRecord<K, V> decode(InputStream inStream, Context context)
+                                      throws CoderException, IOException {
+    Context nested = context.nested();
+    return new KafkaRecord<K, V>(
+        stringCoder.decode(inStream, nested),
+        intCoder.decode(inStream, nested),
+        longCoder.decode(inStream, nested),
+        kvCoder.decode(inStream, nested));
+  }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return kvCoder.getCoderArguments();
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    kvCoder.verifyDeterministic();
+  }
+
+  @Override
+  public boolean isRegisterByteSizeObserverCheap(KafkaRecord<K, V> value, Context context) {
+    return kvCoder.isRegisterByteSizeObserverCheap(value.getKV(), context);
+    //TODO : do we have to implement getEncodedSize()?
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Object structuralValue(KafkaRecord<K, V> value) throws Exception {
+    if (consistentWithEquals()) {
+      return value;
+    } else {
+      return new KafkaRecord<Object, Object>(
+          value.getTopic(),
+          value.getPartition(),
+          value.getOffset(),
+          (KV<Object, Object>) kvCoder.structuralValue(value.getKV()));
+    }
+  }
+
+  @Override
+  public boolean consistentWithEquals() {
+    return kvCoder.consistentWithEquals();
+  }
+}


[2/5] incubator-beam git commit: Kafka: various fixes

Posted by dh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java b/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java
deleted file mode 100644
index b4bdc83..0000000
--- a/contrib/kafka/src/test/java/com/google/cloud/dataflow/contrib/kafka/KafkaIOTest.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.contrib.kafka;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder;
-import com.google.cloud.dataflow.sdk.io.Read;
-import com.google.cloud.dataflow.sdk.io.UnboundedSource;
-import com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader;
-import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
-import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
-import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
-import com.google.cloud.dataflow.sdk.testing.TestPipeline;
-import com.google.cloud.dataflow.sdk.transforms.Count;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.Max;
-import com.google.cloud.dataflow.sdk.transforms.Min;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
-import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
-import com.google.cloud.dataflow.sdk.transforms.Values;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionList;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.MockConsumer;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import javax.annotation.Nullable;
-
-/**
- * Tests of {@link KafkaSource}.
- */
-@RunWith(JUnit4.class)
-public class KafkaIOTest {
-  /*
-   * The tests below borrow code and structure from CountingSourceTest. In addition verifies
-   * the reader interleaves the records from multiple partitions.
-   *
-   * Other tests to consider :
-   *   - test KafkaRecordCoder
-   */
-
-  // Update mock consumer with records distributed among the given topics, each with given number
-  // of partitions. Records are assigned in round-robin order among the partitions.
-  private static MockConsumer<byte[], byte[]> mkMockConsumer(
-      List<String> topics, int partitionsPerTopic, int numElements) {
-
-    final List<TopicPartition> partitions = new ArrayList<>();
-    final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records = new HashMap<>();
-    Map<String, List<PartitionInfo>> partitionMap = new HashMap<>();
-
-    for (String topic : topics) {
-      List<PartitionInfo> partIds = new ArrayList<>(partitionsPerTopic);
-      for (int i = 0; i < partitionsPerTopic; i++) {
-        partitions.add(new TopicPartition(topic, i));
-        partIds.add(new PartitionInfo(topic, i, null, null, null));
-      }
-      partitionMap.put(topic, partIds);
-    }
-
-    int numPartitions = partitions.size();
-    long[] offsets = new long[numPartitions];
-
-    for (int i = 0; i < numElements; i++) {
-      int pIdx = i % numPartitions;
-      TopicPartition tp = partitions.get(pIdx);
-
-      if (!records.containsKey(tp)) {
-        records.put(tp, new ArrayList<ConsumerRecord<byte[], byte[]>>());
-      }
-      records.get(tp).add(
-          // Note: this interface has changed in 0.10. may get fixed before the release.
-          new ConsumerRecord<byte[], byte[]>(
-              tp.topic(),
-              tp.partition(),
-              offsets[pIdx]++,
-              null, // key
-              ByteBuffer.wrap(new byte[8]).putLong(i).array())); // value is 8 byte record id.
-    }
-
-    MockConsumer<byte[], byte[]> consumer =
-        new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
-          // override assign() to add records that belong to the assigned partitions.
-          public void assign(List<TopicPartition> assigned) {
-            super.assign(assigned);
-            for (TopicPartition tp : assigned) {
-              for (ConsumerRecord<byte[], byte[]> r : records.get(tp)) {
-                addRecord(r);
-              }
-              updateBeginningOffsets(ImmutableMap.of(tp, 0L));
-              updateEndOffsets(ImmutableMap.of(tp, (long)records.get(tp).size()));
-              seek(tp, 0);
-            }
-          }
-        };
-
-    for (String topic : topics) {
-      consumer.updatePartitions(topic, partitionMap.get(topic));
-    }
-
-    return consumer;
-  }
-
-  private static class ConsumerFactoryFn
-                implements SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> {
-    private final List<String> topics;
-    private final int partitionsPerTopic;
-    private final int numElements;
-
-    public ConsumerFactoryFn(List<String> topics, int partitionsPerTopic, int numElements) {
-      this.topics = topics;
-      this.partitionsPerTopic = partitionsPerTopic;
-      this.numElements = numElements;
-    }
-
-    public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
-      return mkMockConsumer(topics, partitionsPerTopic, numElements);
-    }
-  }
-
-  /**
-   * Creates a consumer with two topics, with 5 partitions each.
-   * numElements are (round-robin) assigned all the 10 partitions.
-   */
-  private static KafkaIO.TypedRead<byte[], Long> mkKafkaReadTransform(
-      int numElements,
-      @Nullable SerializableFunction<KV<byte[], Long>, Instant> timestampFn) {
-
-    List<String> topics = ImmutableList.of("topic_a", "topic_b");
-
-    KafkaIO.Read<byte[], Long> reader = KafkaIO.read()
-        .withBootstrapServers("none")
-        .withTopics(topics)
-        .withConsumerFactoryFn(new ConsumerFactoryFn(topics, 10, numElements)) // 20 partitions
-        .withValueCoder(BigEndianLongCoder.of())
-        .withMaxNumRecords(numElements);
-
-    if (timestampFn != null) {
-      return reader.withTimestampFn(timestampFn);
-    } else {
-      return reader;
-    }
-  }
-
-  private static class AssertMultipleOf implements SerializableFunction<Iterable<Long>, Void> {
-    private final int num;
-
-    public AssertMultipleOf(int num) {
-      this.num = num;
-    }
-
-    @Override
-    public Void apply(Iterable<Long> values) {
-      for (Long v : values) {
-        assertEquals(0, v % num);
-      }
-      return null;
-    }
-  }
-
-  public static void addCountingAsserts(PCollection<Long> input, long numElements) {
-    // Count == numElements
-    DataflowAssert
-      .thatSingleton(input.apply("Count", Count.<Long>globally()))
-      .isEqualTo(numElements);
-    // Unique count == numElements
-    DataflowAssert
-      .thatSingleton(input.apply(RemoveDuplicates.<Long>create())
-                          .apply("UniqueCount", Count.<Long>globally()))
-      .isEqualTo(numElements);
-    // Min == 0
-    DataflowAssert
-      .thatSingleton(input.apply("Min", Min.<Long>globally()))
-      .isEqualTo(0L);
-    // Max == numElements-1
-    DataflowAssert
-      .thatSingleton(input.apply("Max", Max.<Long>globally()))
-      .isEqualTo(numElements - 1);
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testUnboundedSource() {
-    Pipeline p = TestPipeline.create();
-    int numElements = 1000;
-
-    PCollection<Long> input = p
-        .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
-            .withoutMetadata())
-        .apply(Values.<Long>create());
-
-    addCountingAsserts(input, numElements);
-    p.run();
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testUnboundedSourceWithExplicitPartitions() {
-    Pipeline p = TestPipeline.create();
-    int numElements = 1000;
-
-    List<String> topics = ImmutableList.of("test");
-
-    KafkaIO.TypedRead<byte[], Long> reader = KafkaIO.read()
-        .withBootstrapServers("none")
-        .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5)))
-        .withConsumerFactoryFn(new ConsumerFactoryFn(topics, 10, numElements)) // 10 partitions
-        .withValueCoder(BigEndianLongCoder.of())
-        .withMaxNumRecords(numElements / 10);
-
-    PCollection<Long> input = p
-        .apply(reader.withoutMetadata())
-        .apply(Values.<Long>create());
-
-    // assert that every element is a multiple of 5.
-    DataflowAssert
-      .that(input)
-      .satisfies(new AssertMultipleOf(5));
-
-    DataflowAssert
-      .thatSingleton(input.apply(Count.<Long>globally()))
-      .isEqualTo(numElements / 10L);
-
-    p.run();
-  }
-
-  private static class ElementValueDiff extends DoFn<Long, Long> {
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
-      c.output(c.element() - c.timestamp().getMillis());
-    }
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testUnboundedSourceTimestamps() {
-    Pipeline p = TestPipeline.create();
-    int numElements = 1000;
-
-    PCollection<Long> input = p
-        .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata())
-        .apply(Values.<Long>create());
-
-    addCountingAsserts(input, numElements);
-
-    PCollection<Long> diffs = input
-        .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
-        .apply("RemoveDuplicateTimestamps", RemoveDuplicates.<Long>create());
-    // This assert also confirms that diffs only has one unique value.
-    DataflowAssert.thatSingleton(diffs).isEqualTo(0L);
-
-    p.run();
-  }
-
-  private static class RemoveKafkaMetadata<K, V> extends DoFn<KafkaRecord<K, V>, KV<K, V>> {
-    @Override
-    public void processElement(ProcessContext ctx) throws Exception {
-      ctx.output(ctx.element().getKV());
-    }
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testUnboundedSourceSplits() throws Exception {
-    Pipeline p = TestPipeline.create();
-    int numElements = 1000;
-    int numSplits = 10;
-
-    UnboundedSource<KafkaRecord<byte[], Long>, ?> initial =
-        mkKafkaReadTransform(numElements, null).makeSource();
-    List<? extends UnboundedSource<KafkaRecord<byte[], Long>, ?>> splits =
-        initial.generateInitialSplits(numSplits, p.getOptions());
-    assertEquals("Expected exact splitting", numSplits, splits.size());
-
-    long elementsPerSplit = numElements / numSplits;
-    assertEquals("Expected even splits", numElements, elementsPerSplit * numSplits);
-    PCollectionList<Long> pcollections = PCollectionList.empty(p);
-    for (int i = 0; i < splits.size(); ++i) {
-      pcollections = pcollections.and(
-          p.apply("split" + i, Read.from(splits.get(i)).withMaxNumRecords(elementsPerSplit))
-           .apply("Remove Metadata " + i, ParDo.of(new RemoveKafkaMetadata<byte[], Long>()))
-           .apply("collection " + i, Values.<Long>create()));
-    }
-    PCollection<Long> input = pcollections.apply(Flatten.<Long>pCollections());
-
-    addCountingAsserts(input, numElements);
-    p.run();
-  }
-
-  /**
-   * A timestamp function that uses the given value as the timestamp.
-   */
-  private static class ValueAsTimestampFn
-                       implements SerializableFunction<KV<byte[], Long>, Instant> {
-    @Override
-    public Instant apply(KV<byte[], Long> input) {
-      return new Instant(input.getValue());
-    }
-  }
-
-  @Test
-  public void testUnboundedSourceCheckpointMark() throws Exception {
-    int numElements = 85; // 85 to make sure some partitions have more records than other.
-
-    // create a single split:
-    UnboundedSource<KafkaRecord<byte[], Long>, KafkaCheckpointMark> source =
-        mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
-          .makeSource()
-          .generateInitialSplits(1, PipelineOptionsFactory.fromArgs(new String[0]).create())
-          .get(0);
-
-    UnboundedReader<KafkaRecord<byte[], Long>> reader = source.createReader(null, null);
-    final int numToSkip = 3;
-    // advance once:
-    assertTrue(reader.start());
-
-    // Advance the source numToSkip-1 elements and manually save state.
-    for (long l = 0; l < numToSkip - 1; ++l) {
-      assertTrue(reader.advance());
-    }
-
-    // Confirm that we get the expected element in sequence before checkpointing.
-
-    assertEquals(numToSkip - 1, (long) reader.getCurrent().getKV().getValue());
-    assertEquals(numToSkip - 1, reader.getCurrentTimestamp().getMillis());
-
-    // Checkpoint and restart, and confirm that the source continues correctly.
-    KafkaCheckpointMark mark = CoderUtils.clone(
-        source.getCheckpointMarkCoder(), (KafkaCheckpointMark) reader.getCheckpointMark());
-    reader = source.createReader(null, mark);
-    assertTrue(reader.start());
-
-    // Confirm that we get the next elements in sequence.
-    // This also confirms that Reader interleaves records from each partitions by the reader.
-    for (int i = numToSkip; i < numElements; i++) {
-      assertEquals(i, (long) reader.getCurrent().getKV().getValue());
-      assertEquals(i, reader.getCurrentTimestamp().getMillis());
-      reader.advance();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/sdks/java/io/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/pom.xml b/sdks/java/io/kafka/pom.xml
new file mode 100644
index 0000000..98a091d
--- /dev/null
+++ b/sdks/java/io/kafka/pom.xml
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>io-parent</artifactId>
+    <version>0.1.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>kafka</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: IO :: Kafka</name>
+  <description>Library to read Kafka topics.</description>
+  <packaging>jar</packaging>
+
+  <build>
+  <plugins>
+    <plugin>
+      <groupId>org.apache.maven.plugins</groupId>
+      <artifactId>maven-compiler-plugin</artifactId>
+    </plugin>
+
+    <plugin>
+      <groupId>org.apache.maven.plugins</groupId>
+      <artifactId>maven-checkstyle-plugin</artifactId>
+      <version>2.12</version>
+      <dependencies>
+        <dependency>
+          <groupId>com.puppycrawl.tools</groupId>
+          <artifactId>checkstyle</artifactId>
+          <version>6.6</version>
+        </dependency>
+      </dependencies>
+      <configuration>
+        <configLocation>../../checkstyle.xml</configLocation>
+        <consoleOutput>true</consoleOutput>
+        <failOnViolation>true</failOnViolation>
+      </configuration>
+      <executions>
+        <execution>
+          <goals>
+            <goal>check</goal>
+          </goals>
+        </execution>
+      </executions>
+    </plugin>
+  </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>java-sdk-all</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>[0.9,)</version>
+    </dependency>
+
+    <!-- test dependencies-->
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <version>${hamcrest.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <version>${slf4j.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
new file mode 100644
index 0000000..4b6b976
--- /dev/null
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.UnboundedSource;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Checkpoint for an unbounded KafkaIO.Read. Consists of Kafka topic name, partition id,
+ * and the latest offset consumed so far.
+ */
+@DefaultCoder(SerializableCoder.class)
+public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark, Serializable {
+
+  private final List<PartitionMark> partitions;
+
+  public KafkaCheckpointMark(List<PartitionMark> partitions) {
+    this.partitions = partitions;
+  }
+
+  public List<PartitionMark> getPartitions() {
+    return partitions;
+  }
+
+  @Override
+  public void finalizeCheckpoint() throws IOException {
+    /* nothing to do */
+
+    // We might want to support committing offset in Kafka for better resume point when the job
+    // is restarted (checkpoint is not available for job restarts).
+  }
+
+  /**
+   * A tuple to hold topic, partition, and offset that comprise the checkpoint
+   * for a single partition.
+   */
+  public static class PartitionMark implements Serializable {
+    private final TopicPartition topicPartition;
+    private final long offset;
+
+    public PartitionMark(TopicPartition topicPartition, long offset) {
+      this.topicPartition = topicPartition;
+      this.offset = offset;
+    }
+
+    public TopicPartition getTopicPartition() {
+      return topicPartition;
+    }
+
+    public long getOffset() {
+      return offset;
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
new file mode 100644
index 0000000..e605311
--- /dev/null
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -0,0 +1,1054 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.Read.Unbounded;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
+import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
+import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.ExposedByteArrayInputStream;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+/**
+ * An unbounded source for <a href="http://kafka.apache.org/">Kafka</a> topics. Kafka version 0.9
+ * and above are supported.
+ *
+ * <h3>Reading from Kafka topics</h3>
+ *
+ * <p>KafkaIO source returns unbounded collection of Kafka records as
+ * {@code PCollection<KafkaRecord<K, V>>}. A {@link KafkaRecord} includes basic
+ * metadata like topic-partition and offset, along with key and value associated with a Kafka
+ * record.
+ *
+ * <p>Although most applications consumer single topic, the source can be configured to consume
+ * multiple topics or even a specific set of {@link TopicPartition}s.
+ *
+ * <p> To configure a Kafka source, you must specify at the minimum Kafka <tt>bootstrapServers</tt>
+ * and one or more topics to consume. The following example illustrates various options for
+ * configuring the source :
+ *
+ * <pre>{@code
+ *
+ *  pipeline
+ *    .apply(KafkaIO.read()
+ *       .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *       .withTopics(ImmutableList.of("topic_a", "topic_b"))
+ *       // above two are required configuration. returns PCollection<KafkaRecord<byte[], byte[]>
+ *
+ *       // rest of the settings are optional :
+ *
+ *       // set a Coder for Key and Value (note the change to return type)
+ *       .withKeyCoder(BigEndianLongCoder.of()) // PCollection<KafkaRecord<Long, byte[]>
+ *       .withValueCoder(StringUtf8Coder.of())  // PCollection<KafkaRecord<Long, String>
+ *
+ *       // you can further customize KafkaConsumer used to read the records by adding more
+ *       // settings for ConsumerConfig. e.g :
+ *       .updateConsumerProperties(ImmutableMap.of("receive.buffer.bytes", 1024 * 1024))
+ *
+ *       // custom function for calculating record timestamp (default is processing time)
+ *       .withTimestampFn(new MyTypestampFunction())
+ *
+ *       // custom function for watermark (default is record timestamp)
+ *       .withWatermarkFn(new MyWatermarkFunction())
+ *
+ *       // finally, if you don't need Kafka metadata, you can drop it
+ *       .withoutMetadata() // PCollection<KV<Long, String>>
+ *    )
+ *    .apply(Values.<String>create()) // PCollection<String>
+ *     ...
+ * }</pre>
+ *
+ * <h3>Partition Assignment and Checkpointing</h3>
+ * The Kafka partitions are evenly distributed among splits (workers).
+ * Dataflow checkpointing is fully supported and
+ * each split can resume from previous checkpoint. See
+ * {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for more details on
+ * splits and checkpoint support.
+ *
+ * <p>When the pipeline starts for the first time without any checkpoint, the source starts
+ * consuming from the <em>latest</em> offsets. You can override this behavior to consume from the
+ * beginning by setting appropriate appropriate properties in {@link ConsumerConfig}, through
+ * {@link Read#updateConsumerProperties(Map)}.
+ *
+ * <h3>Advanced Kafka Configuration</h3>
+ * KafakIO allows setting most of the properties in {@link ConsumerConfig}. E.g. if you would like
+ * to enable offset <em>auto commit</em> (for external monitoring or other purposes), you can set
+ * <tt>"group.id"</tt>, <tt>"enable.auto.commit"</tt>, etc.
+ */
+public class KafkaIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);
+
+  private static class NowTimestampFn<T> implements SerializableFunction<T, Instant> {
+    @Override
+    public Instant apply(T input) {
+      return Instant.now();
+    }
+  }
+
+
+  /**
+   * Creates and uninitialized {@link Read} {@link PTransform}. Before use, basic Kafka
+   * configuration should set with {@link Read#withBootstrapServers(String)} and
+   * {@link Read#withTopics(List)}. Other optional settings include key and value coders,
+   * custom timestamp and watermark functions.
+   */
+  public static Read<byte[], byte[]> read() {
+    return new Read<byte[], byte[]>(
+        new ArrayList<String>(),
+        new ArrayList<TopicPartition>(),
+        ByteArrayCoder.of(),
+        ByteArrayCoder.of(),
+        Read.KAFKA_9_CONSUMER_FACTORY_FN,
+        Read.DEFAULT_CONSUMER_PROPERTIES,
+        Long.MAX_VALUE,
+        null);
+  }
+
+  /**
+   * A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for more
+   * information on usage and configuration.
+   */
+  public static class Read<K, V> extends TypedRead<K, V> {
+
+    /**
+     * Returns a new {@link Read} with Kafka consumer pointing to {@code bootstrapServers}.
+     */
+    public Read<K, V> withBootstrapServers(String bootstrapServers) {
+      return updateConsumerProperties(
+          ImmutableMap.<String, Object>of(
+              ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers));
+    }
+
+    /**
+     * Returns a new {@link Read} that reads from the topics. All the partitions are from each
+     * of the topics is read.
+     * See {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for description
+     * of how the partitions are distributed among the splits.
+     */
+    public Read<K, V> withTopics(List<String> topics) {
+      checkState(topicPartitions.isEmpty(), "Only topics or topicPartitions can be set, not both");
+
+      return new Read<K, V>(ImmutableList.copyOf(topics), topicPartitions, keyCoder, valueCoder,
+          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+    }
+
+    /**
+     * Returns a new {@link Read} that reads from the partitions. This allows reading only a subset
+     * of partitions for one or more topics when (if ever) needed.
+     * See {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for description
+     * of how the partitions are distributed among the splits.
+     */
+    public Read<K, V> withTopicPartitions(List<TopicPartition> topicPartitions) {
+      checkState(topics.isEmpty(), "Only topics or topicPartitions can be set, not both");
+
+      return new Read<K, V>(topics, ImmutableList.copyOf(topicPartitions), keyCoder, valueCoder,
+          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+    }
+
+    /**
+     * Returns a new {@link Read} with {@link Coder} for key bytes.
+     */
+    public <KeyT> Read<KeyT, V> withKeyCoder(Coder<KeyT> keyCoder) {
+      return new Read<KeyT, V>(topics, topicPartitions, keyCoder, valueCoder,
+          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+    }
+
+    /**
+     * Returns a new {@link Read} with {@link Coder} for value bytes.
+     */
+    public <ValueT> Read<K, ValueT> withValueCoder(Coder<ValueT> valueCoder) {
+      return new Read<K, ValueT>(topics, topicPartitions, keyCoder, valueCoder,
+          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+    }
+
+    /**
+     * A factory to create Kafka {@link Consumer} from consumer configuration.
+     * This is useful for supporting another version of Kafka consumer.
+     * Default is {@link KafkaConsumer}.
+     */
+    public Read<K, V> withConsumerFactoryFn(
+        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn) {
+      return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder,
+          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+    }
+
+    /**
+     * Update consumer configuration with new properties.
+     */
+    public Read<K, V> updateConsumerProperties(Map<String, Object> configUpdates) {
+      for (String key : configUpdates.keySet()) {
+        checkArgument(!IGNORED_CONSUMER_PROPERTIES.containsKey(key),
+            "No need to configure '%s'. %s", key, IGNORED_CONSUMER_PROPERTIES.get(key));
+      }
+
+      Map<String, Object> config = new HashMap<>(consumerConfig);
+      config.putAll(configUpdates);
+
+      return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder,
+          consumerFactoryFn, config, maxNumRecords, maxReadTime);
+    }
+
+    /**
+     * Similar to {@link org.apache.beam.sdk.io.Read.Unbounded#withMaxNumRecords(long)}.
+     * Mainly used for tests and demo applications.
+     */
+    public Read<K, V> withMaxNumRecords(long maxNumRecords) {
+      return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder,
+          consumerFactoryFn, consumerConfig, maxNumRecords, null);
+    }
+
+    /**
+     * Similar to
+     * {@link org.apache.beam.sdk.io.Read.Unbounded#withMaxReadTime(Duration)}.
+     * Mainly used for tests and demo
+     * applications.
+     */
+    public Read<K, V> withMaxReadTime(Duration maxReadTime) {
+      return new Read<K, V>(topics, topicPartitions, keyCoder, valueCoder,
+          consumerFactoryFn, consumerConfig, Long.MAX_VALUE, maxReadTime);
+    }
+
+    ///////////////////////////////////////////////////////////////////////////////////////
+
+    private Read(
+        List<String> topics,
+        List<TopicPartition> topicPartitions,
+        Coder<K> keyCoder,
+        Coder<V> valueCoder,
+        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn,
+        Map<String, Object> consumerConfig,
+        long maxNumRecords,
+        @Nullable Duration maxReadTime) {
+
+      super(topics, topicPartitions, keyCoder, valueCoder, null, null,
+          consumerFactoryFn, consumerConfig, maxNumRecords, maxReadTime);
+    }
+
+    /**
+     * A set of properties that are not required or don't make sense for our consumer.
+     */
+    private static final Map<String, String> IGNORED_CONSUMER_PROPERTIES = ImmutableMap.of(
+        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyDecoderFn instead",
+        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueDecoderFn instead"
+        // "group.id", "enable.auto.commit", "auto.commit.interval.ms" :
+        //     lets allow these, applications can have better resume point for restarts.
+        );
+
+    // set config defaults
+    private static final Map<String, Object> DEFAULT_CONSUMER_PROPERTIES =
+        ImmutableMap.<String, Object>of(
+            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(),
+            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName(),
+
+            // Use large receive buffer. Once KAFKA-3135 is fixed, this _may_ not be required.
+            // with default value of of 32K, It takes multiple seconds between successful polls.
+            // All the consumer work is done inside poll(), with smaller send buffer size, it
+            // takes many polls before a 1MB chunk from the server is fully read. In my testing
+            // about half of the time select() inside kafka consumer waited for 20-30ms, though
+            // the server had lots of data in tcp send buffers on its side. Compared to default,
+            // this setting increased throughput increased by many fold (3-4x).
+            ConsumerConfig.RECEIVE_BUFFER_CONFIG, 512 * 1024,
+
+            // default to latest offset when we are not resuming.
+            ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest",
+            // disable auto commit of offsets. we don't require group_id. could be enabled by user.
+            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+    // default Kafka 0.9 Consumer supplier.
+    private static final SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
+      KAFKA_9_CONSUMER_FACTORY_FN =
+        new SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>() {
+          public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
+            return new KafkaConsumer<>(config);
+          }
+        };
+  }
+
+  /**
+   * A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for more
+   * information on usage and configuration.
+   */
+  public static class TypedRead<K, V>
+                      extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
+
+    /**
+     * A function to assign a timestamp to a record. Default is processing timestamp.
+     */
+    public TypedRead<K, V> withTimestampFn2(
+        SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) {
+      checkNotNull(timestampFn);
+      return new TypedRead<K, V>(topics, topicPartitions, keyCoder, valueCoder,
+          timestampFn, watermarkFn, consumerFactoryFn, consumerConfig,
+          maxNumRecords, maxReadTime);
+    }
+
+    /**
+     * A function to calculate watermark after a record. Default is last record timestamp
+     * @see #withTimestampFn(SerializableFunction)
+     */
+    public TypedRead<K, V> withWatermarkFn2(
+        SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn) {
+      checkNotNull(watermarkFn);
+      return new TypedRead<K, V>(topics, topicPartitions, keyCoder, valueCoder,
+          timestampFn, watermarkFn, consumerFactoryFn, consumerConfig,
+          maxNumRecords, maxReadTime);
+    }
+
+    /**
+     * A function to assign a timestamp to a record. Default is processing timestamp.
+     */
+    public TypedRead<K, V> withTimestampFn(SerializableFunction<KV<K, V>, Instant> timestampFn) {
+      checkNotNull(timestampFn);
+      return withTimestampFn2(unwrapKafkaAndThen(timestampFn));
+    }
+
+    /**
+     * A function to calculate watermark after a record. Default is last record timestamp
+     * @see #withTimestampFn(SerializableFunction)
+     */
+    public TypedRead<K, V> withWatermarkFn(SerializableFunction<KV<K, V>, Instant> watermarkFn) {
+      checkNotNull(watermarkFn);
+      return withWatermarkFn2(unwrapKafkaAndThen(watermarkFn));
+    }
+
+    /**
+     * Returns a {@link PTransform} for PCollection of {@link KV}, dropping Kafka metatdata.
+     */
+    public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() {
+      return new TypedWithoutMetadata<K, V>(this);
+    }
+
+    @Override
+    public PCollection<KafkaRecord<K, V>> apply(PBegin input) {
+     // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set.
+      Unbounded<KafkaRecord<K, V>> unbounded =
+          org.apache.beam.sdk.io.Read.from(makeSource());
+
+      PTransform<PInput, PCollection<KafkaRecord<K, V>>> transform = unbounded;
+
+      if (maxNumRecords < Long.MAX_VALUE) {
+        transform = unbounded.withMaxNumRecords(maxNumRecords);
+      } else if (maxReadTime != null) {
+        transform = unbounded.withMaxReadTime(maxReadTime);
+      }
+
+      return input.getPipeline().apply(transform);
+    }
+
+    ////////////////////////////////////////////////////////////////////////////////////////
+
+    protected final List<String> topics;
+    protected final List<TopicPartition> topicPartitions; // mutually exclusive with topics
+    protected final Coder<K> keyCoder;
+    protected final Coder<V> valueCoder;
+    @Nullable protected final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn;
+    @Nullable protected final SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn;
+    protected final
+      SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn;
+    protected final Map<String, Object> consumerConfig;
+    protected final long maxNumRecords; // bounded read, mainly for testing
+    protected final Duration maxReadTime; // bounded read, mainly for testing
+
+    private TypedRead(List<String> topics,
+        List<TopicPartition> topicPartitions,
+        Coder<K> keyCoder,
+        Coder<V> valueCoder,
+        @Nullable SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn,
+        @Nullable SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn,
+        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn,
+        Map<String, Object> consumerConfig,
+        long maxNumRecords,
+        @Nullable Duration maxReadTime) {
+      super("KafkaIO.Read");
+
+      this.topics = topics;
+      this.topicPartitions = topicPartitions;
+      this.keyCoder = keyCoder;
+      this.valueCoder = valueCoder;
+      this.timestampFn = timestampFn;
+      this.watermarkFn = watermarkFn;
+      this.consumerFactoryFn = consumerFactoryFn;
+      this.consumerConfig = consumerConfig;
+      this.maxNumRecords = maxNumRecords;
+      this.maxReadTime = maxReadTime;
+    }
+
+    /**
+     * Creates an {@link UnboundedSource<KafkaRecord<K, V>, ?>} with the configuration in
+     * {@link TypedRead}. Primary use case is unit tests, should not be used in an
+     * application.
+     */
+    @VisibleForTesting
+    UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> makeSource() {
+      return new UnboundedKafkaSource<K, V>(
+          -1,
+          topics,
+          topicPartitions,
+          keyCoder,
+          valueCoder,
+          timestampFn,
+          Optional.fromNullable(watermarkFn),
+          consumerFactoryFn,
+          consumerConfig);
+    }
+
+    // utility method to convert KafkRecord<K, V> to user KV<K, V> before applying user functions
+    private static <KeyT, ValueT, OutT> SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT>
+      unwrapKafkaAndThen(final SerializableFunction<KV<KeyT, ValueT>, OutT> fn) {
+        return new SerializableFunction<KafkaRecord<KeyT, ValueT>, OutT>() {
+          public OutT apply(KafkaRecord<KeyT, ValueT> record) {
+            return fn.apply(record.getKV());
+          }
+        };
+      }
+  }
+
+  /**
+   * A {@link PTransform} to read from Kafka topics. Similar to {@link KafkaIO.TypedRead}, but
+   * removes Kafka metatdata and returns a {@link PCollection} of {@link KV}.
+   * See {@link KafkaIO} for more information on usage and configuration of reader.
+   */
+  public static class TypedWithoutMetadata<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
+
+    private final TypedRead<K, V> typedRead;
+
+    TypedWithoutMetadata(TypedRead<K, V> read) {
+      super("KafkaIO.Read");
+      this.typedRead = read;
+    }
+
+    @Override
+    public PCollection<KV<K, V>> apply(PBegin begin) {
+      return typedRead
+          .apply(begin)
+          .apply("Remove Kafka Metadata",
+              ParDo.of(new DoFn<KafkaRecord<K, V>, KV<K, V>>() {
+                @Override
+                public void processElement(ProcessContext ctx) {
+                  ctx.output(ctx.element().getKV());
+                }
+              }));
+    }
+  }
+
+  /** Static class, prevent instantiation. */
+  private KafkaIO() {}
+
+  private static class UnboundedKafkaSource<K, V>
+      extends UnboundedSource<KafkaRecord<K, V>, KafkaCheckpointMark> {
+
+    private final int id; // split id, mainly for debugging
+    private final List<String> topics;
+    private final List<TopicPartition> assignedPartitions;
+    private final Coder<K> keyCoder;
+    private final Coder<V> valueCoder;
+    private final SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn;
+    // would it be a good idea to pass currentTimestamp to watermarkFn?
+    private final Optional<SerializableFunction<KafkaRecord<K, V>, Instant>> watermarkFn;
+    private
+      SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn;
+    private final Map<String, Object> consumerConfig;
+
+    public UnboundedKafkaSource(
+        int id,
+        List<String> topics,
+        List<TopicPartition> assignedPartitions,
+        Coder<K> keyCoder,
+        Coder<V> valueCoder,
+        @Nullable SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn,
+        Optional<SerializableFunction<KafkaRecord<K, V>, Instant>> watermarkFn,
+        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn,
+        Map<String, Object> consumerConfig) {
+
+      this.id = id;
+      this.assignedPartitions = assignedPartitions;
+      this.topics = topics;
+      this.keyCoder = keyCoder;
+      this.valueCoder = valueCoder;
+      this.timestampFn =
+          (timestampFn == null ? new NowTimestampFn<KafkaRecord<K, V>>() : timestampFn);
+      this.watermarkFn = watermarkFn;
+      this.consumerFactoryFn = consumerFactoryFn;
+      this.consumerConfig = consumerConfig;
+    }
+
+    /**
+     * The partitions are evenly distributed among the splits. The number of splits returned is
+     * {@code min(desiredNumSplits, totalNumPartitions)}, though better not to depend on the exact
+     * count.
+     *
+     * <p> It is important to assign the partitions deterministically so that we can support
+     * resuming a split from last checkpoint. The Kafka partitions are sorted by
+     * {@code <topic, partition>} and then assigned to splits in round-robin order.
+     */
+    @Override
+    public List<UnboundedKafkaSource<K, V>> generateInitialSplits(
+        int desiredNumSplits, PipelineOptions options) throws Exception {
+
+      List<TopicPartition> partitions = new ArrayList<>(assignedPartitions);
+
+      // (a) fetch partitions for each topic
+      // (b) sort by <topic, partition>
+      // (c) round-robin assign the partitions to splits
+
+      if (partitions.isEmpty()) {
+        try (Consumer<?, ?> consumer = consumerFactoryFn.apply(consumerConfig)) {
+          for (String topic : topics) {
+            for (PartitionInfo p : consumer.partitionsFor(topic)) {
+              partitions.add(new TopicPartition(p.topic(), p.partition()));
+            }
+          }
+        }
+      }
+
+      Collections.sort(partitions, new Comparator<TopicPartition>() {
+        public int compare(TopicPartition tp1, TopicPartition tp2) {
+          return ComparisonChain
+              .start()
+              .compare(tp1.topic(), tp2.topic())
+              .compare(tp1.partition(), tp2.partition())
+              .result();
+        }
+      });
+
+      checkArgument(desiredNumSplits > 0);
+      checkState(partitions.size() > 0,
+          "Could not find any partitions. Please check Kafka configuration and topic names");
+
+      int numSplits = Math.min(desiredNumSplits, partitions.size());
+      List<List<TopicPartition>> assignments = new ArrayList<>(numSplits);
+
+      for (int i = 0; i < numSplits; i++) {
+        assignments.add(new ArrayList<TopicPartition>());
+      }
+      for (int i = 0; i < partitions.size(); i++) {
+        assignments.get(i % numSplits).add(partitions.get(i));
+      }
+
+      List<UnboundedKafkaSource<K, V>> result = new ArrayList<>(numSplits);
+
+      for (int i = 0; i < numSplits; i++) {
+        List<TopicPartition> assignedToSplit = assignments.get(i);
+
+        LOG.info("Partitions assigned to split {} (total {}): {}",
+            i, assignedToSplit.size(), Joiner.on(",").join(assignedToSplit));
+
+        result.add(new UnboundedKafkaSource<K, V>(
+            i,
+            this.topics,
+            assignedToSplit,
+            this.keyCoder,
+            this.valueCoder,
+            this.timestampFn,
+            this.watermarkFn,
+            this.consumerFactoryFn,
+            this.consumerConfig));
+      }
+
+      return result;
+    }
+
+    @Override
+    public UnboundedKafkaReader<K, V> createReader(PipelineOptions options,
+                                                   KafkaCheckpointMark checkpointMark) {
+      if (assignedPartitions.isEmpty()) {
+        LOG.warn("Looks like generateSplits() is not called. Generate single split.");
+        try {
+          return new UnboundedKafkaReader<K, V>(
+              generateInitialSplits(1, options).get(0), checkpointMark);
+        } catch (Exception e) {
+          Throwables.propagate(e);
+        }
+      }
+      return new UnboundedKafkaReader<K, V>(this, checkpointMark);
+    }
+
+    @Override
+    public Coder<KafkaCheckpointMark> getCheckpointMarkCoder() {
+      return SerializableCoder.of(KafkaCheckpointMark.class);
+    }
+
+    @Override
+    public boolean requiresDeduping() {
+      // Kafka records are ordered with in partitions. In addition checkpoint guarantees
+      // records are not consumed twice.
+      return false;
+    }
+
+    @Override
+    public void validate() {
+      checkNotNull(consumerConfig.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
+          "Kafka bootstrap servers should be set");
+      checkArgument(topics.size() > 0 || assignedPartitions.size() > 0,
+          "Kafka topics or topic_partitions are required");
+    }
+
+    @Override
+    public Coder<KafkaRecord<K, V>> getDefaultOutputCoder() {
+      return KafkaRecordCoder.of(keyCoder, valueCoder);
+    }
+  }
+
+  private static class UnboundedKafkaReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> {
+
+    private final UnboundedKafkaSource<K, V> source;
+    private final String name;
+    private Consumer<byte[], byte[]> consumer;
+    private final List<PartitionState> partitionStates;
+    private KafkaRecord<K, V> curRecord;
+    private Instant curTimestamp;
+    private Iterator<PartitionState> curBatch = Collections.emptyIterator();
+
+    private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
+    // how long to wait for new records from kafka consumer inside advance()
+    private static final Duration NEW_RECORDS_POLL_TIMEOUT = Duration.millis(10);
+
+    // Use a separate thread to read Kafka messages. Kafka Consumer does all its work including
+    // network I/O inside poll(). Polling only inside #advance(), especially with a small timeout
+    // like 100 milliseconds does not work well. This along with large receive buffer for
+    // consumer achieved best throughput in tests (see `defaultConsumerProperties`).
+    private final ExecutorService consumerPollThread = Executors.newSingleThreadExecutor();
+    private final SynchronousQueue<ConsumerRecords<byte[], byte[]>> availableRecordsQueue =
+        new SynchronousQueue<>();
+    private volatile boolean closed = false;
+
+    // Backlog support :
+    // Kafka consumer does not have an API to fetch latest offset for topic. We need to seekToEnd()
+    // then look at position(). Use another consumer to do this so that the primary consumer does
+    // not need to be interrupted. The latest offsets are fetched periodically on another thread.
+    // This is still a hack. There could be unintended side effects, e.g. if user enabled offset
+    // auto commit in consumer config, this could interfere with the primary consumer (we will
+    // handle this particular problem). We might have to make this optional.
+    private Consumer<byte[], byte[]> offsetConsumer;
+    private final ScheduledExecutorService offsetFetcherThread =
+        Executors.newSingleThreadScheduledExecutor();
+    private static final int OFFSET_UPDATE_INTERVAL_SECONDS = 5;
+
+    /** watermark before any records have been read. */
+    private static Instant initialWatermark = new Instant(Long.MIN_VALUE);
+
+    public String toString() {
+      return name;
+    }
+
+    // maintains state of each assigned partition (buffered records, consumed offset, etc)
+    private static class PartitionState {
+      private final TopicPartition topicPartition;
+      private long consumedOffset;
+      private long latestOffset;
+      private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = Collections.emptyIterator();
+
+      // simple moving average for size of each record in bytes
+      private double avgRecordSize = 0;
+      private static final int movingAvgWindow = 1000; // very roughly avg of last 1000 elements
+
+
+      PartitionState(TopicPartition partition, long offset) {
+        this.topicPartition = partition;
+        this.consumedOffset = offset;
+        this.latestOffset = -1;
+      }
+
+      // update consumedOffset and avgRecordSize
+      void recordConsumed(long offset, int size) {
+        consumedOffset = offset;
+
+        // this is always updated from single thread. probably not worth making it an AtomicDouble
+        if (avgRecordSize <= 0) {
+          avgRecordSize = size;
+        } else {
+          // initially, first record heavily contributes to average.
+          avgRecordSize += ((size - avgRecordSize) / movingAvgWindow);
+        }
+      }
+
+      synchronized void setLatestOffset(long latestOffset) {
+        this.latestOffset = latestOffset;
+      }
+
+      synchronized long approxBacklogInBytes() {
+        // Note that is an an estimate of uncompressed backlog.
+        // Messages on Kafka might be comressed.
+        if (latestOffset < 0 || consumedOffset < 0) {
+          return UnboundedReader.BACKLOG_UNKNOWN;
+        }
+        if (latestOffset <= consumedOffset || consumedOffset < 0) {
+          return 0;
+        }
+        return (long) ((latestOffset - consumedOffset - 1) * avgRecordSize);
+      }
+    }
+
+    public UnboundedKafkaReader(
+        UnboundedKafkaSource<K, V> source,
+        @Nullable KafkaCheckpointMark checkpointMark) {
+
+      this.source = source;
+      this.name = "Reader-" + source.id;
+
+      partitionStates = ImmutableList.copyOf(Lists.transform(source.assignedPartitions,
+          new Function<TopicPartition, PartitionState>() {
+            public PartitionState apply(TopicPartition tp) {
+              return new PartitionState(tp, -1L);
+            }
+        }));
+
+      if (checkpointMark != null) {
+        // a) verify that assigned and check-pointed partitions match exactly
+        // b) set consumed offsets
+
+        checkState(checkpointMark.getPartitions().size() == source.assignedPartitions.size(),
+            "checkPointMark and assignedPartitions should match");
+        // we could consider allowing a mismatch, though it is not expected in current Dataflow
+
+        for (int i = 0; i < source.assignedPartitions.size(); i++) {
+          PartitionMark ckptMark = checkpointMark.getPartitions().get(i);
+          TopicPartition assigned = source.assignedPartitions.get(i);
+
+          checkState(ckptMark.getTopicPartition().equals(assigned),
+              "checkpointed partition %s and assigned partition %s don't match",
+              ckptMark.getTopicPartition(), assigned);
+
+          partitionStates.get(i).consumedOffset = ckptMark.getOffset();
+        }
+      }
+    }
+
+    private void consumerPollLoop() {
+      // Read in a loop and enqueue the batch of records, if any, to availableRecordsQueue
+      while (!closed) {
+        try {
+          ConsumerRecords<byte[], byte[]> records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
+          if (!records.isEmpty()) {
+            availableRecordsQueue.put(records); // blocks until dequeued.
+          }
+        } catch (InterruptedException e) {
+          LOG.warn("{}: consumer thread is interrupted", this, e); // not expected
+          break;
+        } catch (WakeupException e) {
+          break;
+        }
+      }
+
+      LOG.info("{}: Returning from consumer pool loop", this);
+    }
+
+    private void nextBatch() {
+      curBatch = Collections.emptyIterator();
+
+      ConsumerRecords<byte[], byte[]> records;
+      try {
+        records = availableRecordsQueue.poll(NEW_RECORDS_POLL_TIMEOUT.getMillis(),
+                                             TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        LOG.warn("{}: Unexpected", this, e);
+        return;
+      }
+
+      if (records == null) {
+        return;
+      }
+
+      List<PartitionState> nonEmpty = new LinkedList<>();
+
+      for (PartitionState p : partitionStates) {
+        p.recordIter = records.records(p.topicPartition).iterator();
+        if (p.recordIter.hasNext()) {
+          nonEmpty.add(p);
+        }
+      }
+
+      // cycle through the partitions in order to interleave records from each.
+      curBatch = Iterators.cycle(nonEmpty);
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      consumer = source.consumerFactoryFn.apply(source.consumerConfig);
+      consumer.assign(source.assignedPartitions);
+
+      // seek to consumedOffset + 1 if it is set
+      for (PartitionState p : partitionStates) {
+        if (p.consumedOffset >= 0) {
+          LOG.info("{}: resuming {} at {}", name, p.topicPartition, p.consumedOffset + 1);
+          consumer.seek(p.topicPartition, p.consumedOffset + 1);
+        } else {
+          LOG.info("{}: resuming {} at default offset", name, p.topicPartition);
+        }
+      }
+
+      // start consumer read loop.
+      // Note that consumer is not thread safe, should not accessed out side consumerPollLoop()
+      consumerPollThread.submit(
+          new Runnable() {
+            public void run() {
+              consumerPollLoop();
+            }
+          });
+
+      // offsetConsumer setup :
+
+      Object groupId = source.consumerConfig.get(ConsumerConfig.GROUP_ID_CONFIG);
+      // override group_id and disable auto_commit so that it does not interfere with main consumer
+      String offsetGroupId = String.format("%s_offset_consumer_%d_%s", name,
+          (new Random()).nextInt(Integer.MAX_VALUE), (groupId == null ? "none" : groupId));
+      Map<String, Object> offsetConsumerConfig = new HashMap<>(source.consumerConfig);
+      offsetConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, offsetGroupId);
+      offsetConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+      offsetConsumer = source.consumerFactoryFn.apply(offsetConsumerConfig);
+      offsetConsumer.assign(source.assignedPartitions);
+
+      offsetFetcherThread.scheduleAtFixedRate(
+          new Runnable() {
+            public void run() {
+              updateLatestOffsets();
+            }
+          }, 0, OFFSET_UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS);
+
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      /* Read first record (if any). we need to loop here because :
+       *  - (a) some records initially need to be skipped if they are before consumedOffset
+       *  - (b) if curBatch is empty, we want to fetch next batch and then advance.
+       *  - (c) curBatch is an iterator of iterators. we interleave the records from each.
+       *        curBatch.next() might return an empty iterator.
+       */
+      while (true) {
+        if (curBatch.hasNext()) {
+          PartitionState pState = curBatch.next();
+
+          if (!pState.recordIter.hasNext()) { // -- (c)
+            pState.recordIter = Collections.emptyIterator(); // drop ref
+            curBatch.remove();
+            continue;
+          }
+
+          ConsumerRecord<byte[], byte[]> rawRecord = pState.recordIter.next();
+          long consumed = pState.consumedOffset;
+          long offset = rawRecord.offset();
+
+          if (consumed >= 0 && offset <= consumed) { // -- (a)
+            // this can happen when compression is enabled in Kafka (seems to be fixed in 0.10)
+            // should we check if the offset is way off from consumedOffset (say > 1M)?
+            LOG.warn("{}: ignoring already consumed offset {} for {}",
+                this, offset, pState.topicPartition);
+            continue;
+          }
+
+          // sanity check
+          if (consumed >= 0 && (offset - consumed) != 1) {
+            LOG.warn("{}: gap in offsets for {} after {}. {} records missing.",
+                this, pState.topicPartition, consumed, offset - consumed - 1);
+          }
+
+          if (curRecord == null) {
+            LOG.info("{}: first record offset {}", name, offset);
+          }
+
+          curRecord = null; // user coders below might throw.
+
+          // apply user coders. might want to allow skipping records that fail to decode.
+          // TODO: wrap exceptions from coders to make explicit to users
+          KafkaRecord<K, V> record = new KafkaRecord<K, V>(
+              rawRecord.topic(),
+              rawRecord.partition(),
+              rawRecord.offset(),
+              decode(rawRecord.key(), source.keyCoder),
+              decode(rawRecord.value(), source.valueCoder));
+
+          curTimestamp = source.timestampFn.apply(record);
+          curRecord = record;
+
+          int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length) +
+              (rawRecord.value() == null ? 0 : rawRecord.value().length);
+          pState.recordConsumed(offset, recordSize);
+          return true;
+
+        } else { // -- (b)
+          nextBatch();
+
+          if (!curBatch.hasNext()) {
+            return false;
+          }
+        }
+      }
+    }
+
+    private static byte[] nullBytes = new byte[0];
+    private static <T> T decode(byte[] bytes, Coder<T> coder) throws IOException {
+      // If 'bytes' is null, use byte[0]. It is common for key in Kakfa record to be null.
+      // This makes it impossible for user to distinguish between zero length byte and null.
+      // Alternately, we could have a ByteArrayCoder that handles nulls, and use that for default
+      // coder.
+      byte[] toDecode = bytes == null ? nullBytes : bytes;
+      return coder.decode(new ExposedByteArrayInputStream(toDecode), Coder.Context.OUTER);
+    }
+
+    // update latest offset for each partition.
+    // called from offsetFetcher thread
+    private void updateLatestOffsets() {
+      for (PartitionState p : partitionStates) {
+        try {
+          offsetConsumer.seekToEnd(p.topicPartition);
+          long offset = offsetConsumer.position(p.topicPartition);
+          p.setLatestOffset(offset);;
+        } catch (Exception e) {
+          LOG.warn("{}: exception while fetching latest offsets. ignored.",  this, e);
+          p.setLatestOffset(-1L); // reset
+        }
+
+        LOG.debug("{}: latest offset update for {} : {} (consumed offset {}, avg record size {})",
+            this, p.topicPartition, p.latestOffset, p.consumedOffset, p.avgRecordSize);
+      }
+
+      LOG.debug("{}:  backlog {}", this, getSplitBacklogBytes());
+    }
+
+    @Override
+    public Instant getWatermark() {
+      if (curRecord == null) {
+        LOG.warn("{}: getWatermark() : no records have been read yet.", name);
+        return initialWatermark;
+      }
+
+      return source.watermarkFn.isPresent() ?
+          source.watermarkFn.get().apply(curRecord) : curTimestamp;
+    }
+
+    @Override
+    public CheckpointMark getCheckpointMark() {
+      return new KafkaCheckpointMark(ImmutableList.copyOf(// avoid lazy (consumedOffset can change)
+          Lists.transform(partitionStates,
+              new Function<PartitionState, PartitionMark>() {
+                public PartitionMark apply(PartitionState p) {
+                  return new PartitionMark(p.topicPartition, p.consumedOffset);
+                }
+              }
+          )));
+    }
+
+    @Override
+    public UnboundedSource<KafkaRecord<K, V>, ?> getCurrentSource() {
+      return source;
+    }
+
+    @Override
+    public KafkaRecord<K, V> getCurrent() throws NoSuchElementException {
+      // should we delay updating consumed offset till this point? Mostly not required.
+      return curRecord;
+    }
+
+    @Override
+    public Instant getCurrentTimestamp() throws NoSuchElementException {
+      return curTimestamp;
+    }
+
+
+    @Override
+    public long getSplitBacklogBytes() {
+      long backlogBytes = 0;
+
+      for (PartitionState p : partitionStates) {
+        long pBacklog = p.approxBacklogInBytes();
+        if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) {
+          return UnboundedReader.BACKLOG_UNKNOWN;
+        }
+        backlogBytes += pBacklog;
+      }
+
+      return backlogBytes;
+    }
+
+    @Override
+    public void close() throws IOException {
+      closed = true;
+      availableRecordsQueue.poll(); // drain unread batch, this unblocks consumer thread.
+      consumer.wakeup();
+      consumerPollThread.shutdown();
+      offsetFetcherThread.shutdown();
+      Closeables.close(offsetConsumer, true);
+      Closeables.close(consumer, true);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
new file mode 100644
index 0000000..76e688b
--- /dev/null
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import org.apache.beam.sdk.values.KV;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * KafkaRecord contains key and value of the record as well as metadata for the record (topic name,
+ * partition id, and offset).
+ */
+public class KafkaRecord<K, V> implements Serializable {
+
+  private final String topic;
+  private final int partition;
+  private final long offset;
+  private final KV<K, V> kv;
+
+  public KafkaRecord(
+      String topic,
+      int partition,
+      long offset,
+      K key,
+      V value) {
+    this(topic, partition, offset, KV.of(key, value));
+  }
+
+  public KafkaRecord(
+      String topic,
+      int partition,
+      long offset,
+      KV<K, V> kv) {
+
+    this.topic = topic;
+    this.partition = partition;
+    this.offset = offset;
+    this.kv = kv;
+  }
+
+  public String getTopic() {
+    return topic;
+  }
+
+  public int getPartition() {
+    return partition;
+  }
+
+  public long getOffset() {
+    return offset;
+  }
+
+  public KV<K, V> getKV() {
+    return kv;
+  }
+
+  @Override
+  public int hashCode() {
+    return Arrays.deepHashCode(new Object[]{topic, partition, offset, kv});
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof KafkaRecord) {
+      @SuppressWarnings("unchecked")
+      KafkaRecord<Object, Object> other = (KafkaRecord<Object, Object>) obj;
+      return topic.equals(other.topic)
+          && partition == other.partition
+          && offset == other.offset
+          && kv.equals(other.kv);
+    } else {
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
new file mode 100644
index 0000000..8a3e7f5
--- /dev/null
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecordCoder.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.values.KV;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
+/**
+ * {@link Coder} for {@link KafkaRecord}.
+ */
+public class KafkaRecordCoder<K, V> extends StandardCoder<KafkaRecord<K, V>> {
+
+  private static final StringUtf8Coder stringCoder = StringUtf8Coder.of();
+  private static final VarLongCoder longCoder = VarLongCoder.of();
+  private static final VarIntCoder intCoder = VarIntCoder.of();
+
+  private final KvCoder<K, V> kvCoder;
+
+  @JsonCreator
+  public static KafkaRecordCoder<?, ?> of(@JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
+                                          List<Coder<?>> components) {
+    KvCoder<?, ?> kvCoder = KvCoder.of(components);
+    return of(kvCoder.getKeyCoder(), kvCoder.getValueCoder());
+  }
+
+  public static <K, V> KafkaRecordCoder<K, V> of(Coder<K> keyCoder, Coder<V> valueCoder) {
+    return new KafkaRecordCoder<K, V>(keyCoder, valueCoder);
+  }
+
+  public KafkaRecordCoder(Coder<K> keyCoder, Coder<V> valueCoder) {
+    this.kvCoder = KvCoder.of(keyCoder, valueCoder);
+  }
+
+  @Override
+  public void encode(KafkaRecord<K, V> value, OutputStream outStream, Context context)
+                         throws CoderException, IOException {
+    Context nested = context.nested();
+    stringCoder.encode(value.getTopic(), outStream, nested);
+    intCoder.encode(value.getPartition(), outStream, nested);
+    longCoder.encode(value.getOffset(), outStream, nested);
+    kvCoder.encode(value.getKV(), outStream, nested);
+  }
+
+  @Override
+  public KafkaRecord<K, V> decode(InputStream inStream, Context context)
+                                      throws CoderException, IOException {
+    Context nested = context.nested();
+    return new KafkaRecord<K, V>(
+        stringCoder.decode(inStream, nested),
+        intCoder.decode(inStream, nested),
+        longCoder.decode(inStream, nested),
+        kvCoder.decode(inStream, nested));
+  }
+
+  @Override
+  public List<? extends Coder<?>> getCoderArguments() {
+    return kvCoder.getCoderArguments();
+  }
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    kvCoder.verifyDeterministic();
+  }
+
+  @Override
+  public boolean isRegisterByteSizeObserverCheap(KafkaRecord<K, V> value, Context context) {
+    return kvCoder.isRegisterByteSizeObserverCheap(value.getKV(), context);
+    //TODO : do we have to implement getEncodedSize()?
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Object structuralValue(KafkaRecord<K, V> value) throws Exception {
+    if (consistentWithEquals()) {
+      return value;
+    } else {
+      return new KafkaRecord<Object, Object>(
+          value.getTopic(),
+          value.getPartition(),
+          value.getOffset(),
+          (KV<Object, Object>) kvCoder.structuralValue(value.getKV()));
+    }
+  }
+
+  @Override
+  public boolean consistentWithEquals() {
+    return kvCoder.consistentWithEquals();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
new file mode 100644
index 0000000..96ffc98
--- /dev/null
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.Min;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.RemoveDuplicates;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+/**
+ * Tests of {@link KafkaSource}.
+ */
+@RunWith(JUnit4.class)
+public class KafkaIOTest {
+  /*
+   * The tests below borrow code and structure from CountingSourceTest. In addition verifies
+   * the reader interleaves the records from multiple partitions.
+   *
+   * Other tests to consider :
+   *   - test KafkaRecordCoder
+   */
+
+  // Update mock consumer with records distributed among the given topics, each with given number
+  // of partitions. Records are assigned in round-robin order among the partitions.
+  private static MockConsumer<byte[], byte[]> mkMockConsumer(
+      List<String> topics, int partitionsPerTopic, int numElements) {
+
+    final List<TopicPartition> partitions = new ArrayList<>();
+    final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records = new HashMap<>();
+    Map<String, List<PartitionInfo>> partitionMap = new HashMap<>();
+
+    for (String topic : topics) {
+      List<PartitionInfo> partIds = new ArrayList<>(partitionsPerTopic);
+      for (int i = 0; i < partitionsPerTopic; i++) {
+        partitions.add(new TopicPartition(topic, i));
+        partIds.add(new PartitionInfo(topic, i, null, null, null));
+      }
+      partitionMap.put(topic, partIds);
+    }
+
+    int numPartitions = partitions.size();
+    long[] offsets = new long[numPartitions];
+
+    for (int i = 0; i < numElements; i++) {
+      int pIdx = i % numPartitions;
+      TopicPartition tp = partitions.get(pIdx);
+
+      if (!records.containsKey(tp)) {
+        records.put(tp, new ArrayList<ConsumerRecord<byte[], byte[]>>());
+      }
+      records.get(tp).add(
+          // Note: this interface has changed in 0.10. may get fixed before the release.
+          new ConsumerRecord<byte[], byte[]>(
+              tp.topic(),
+              tp.partition(),
+              offsets[pIdx]++,
+              null, // key
+              ByteBuffer.wrap(new byte[8]).putLong(i).array())); // value is 8 byte record id.
+    }
+
+    MockConsumer<byte[], byte[]> consumer =
+        new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) {
+          // override assign() to add records that belong to the assigned partitions.
+          public void assign(List<TopicPartition> assigned) {
+            super.assign(assigned);
+            for (TopicPartition tp : assigned) {
+              for (ConsumerRecord<byte[], byte[]> r : records.get(tp)) {
+                addRecord(r);
+              }
+              updateBeginningOffsets(ImmutableMap.of(tp, 0L));
+              updateEndOffsets(ImmutableMap.of(tp, (long) records.get(tp).size()));
+              seek(tp, 0);
+            }
+          }
+        };
+
+    for (String topic : topics) {
+      consumer.updatePartitions(topic, partitionMap.get(topic));
+    }
+
+    return consumer;
+  }
+
+  private static class ConsumerFactoryFn
+                implements SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> {
+    private final List<String> topics;
+    private final int partitionsPerTopic;
+    private final int numElements;
+
+    public ConsumerFactoryFn(List<String> topics, int partitionsPerTopic, int numElements) {
+      this.topics = topics;
+      this.partitionsPerTopic = partitionsPerTopic;
+      this.numElements = numElements;
+    }
+
+    public Consumer<byte[], byte[]> apply(Map<String, Object> config) {
+      return mkMockConsumer(topics, partitionsPerTopic, numElements);
+    }
+  }
+
+  /**
+   * Creates a consumer with two topics, with 5 partitions each.
+   * numElements are (round-robin) assigned all the 10 partitions.
+   */
+  private static KafkaIO.TypedRead<byte[], Long> mkKafkaReadTransform(
+      int numElements,
+      @Nullable SerializableFunction<KV<byte[], Long>, Instant> timestampFn) {
+
+    List<String> topics = ImmutableList.of("topic_a", "topic_b");
+
+    KafkaIO.Read<byte[], Long> reader = KafkaIO.read()
+        .withBootstrapServers("none")
+        .withTopics(topics)
+        .withConsumerFactoryFn(new ConsumerFactoryFn(topics, 10, numElements)) // 20 partitions
+        .withValueCoder(BigEndianLongCoder.of())
+        .withMaxNumRecords(numElements);
+
+    if (timestampFn != null) {
+      return reader.withTimestampFn(timestampFn);
+    } else {
+      return reader;
+    }
+  }
+
+  private static class AssertMultipleOf implements SerializableFunction<Iterable<Long>, Void> {
+    private final int num;
+
+    public AssertMultipleOf(int num) {
+      this.num = num;
+    }
+
+    @Override
+    public Void apply(Iterable<Long> values) {
+      for (Long v : values) {
+        assertEquals(0, v % num);
+      }
+      return null;
+    }
+  }
+
+  public static void addCountingAsserts(PCollection<Long> input, long numElements) {
+    // Count == numElements
+    PAssert
+      .thatSingleton(input.apply("Count", Count.<Long>globally()))
+      .isEqualTo(numElements);
+    // Unique count == numElements
+    PAssert
+      .thatSingleton(input.apply(RemoveDuplicates.<Long>create())
+                          .apply("UniqueCount", Count.<Long>globally()))
+      .isEqualTo(numElements);
+    // Min == 0
+    PAssert
+      .thatSingleton(input.apply("Min", Min.<Long>globally()))
+      .isEqualTo(0L);
+    // Max == numElements-1
+    PAssert
+      .thatSingleton(input.apply("Max", Max.<Long>globally()))
+      .isEqualTo(numElements - 1);
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testUnboundedSource() {
+    Pipeline p = TestPipeline.create();
+    int numElements = 1000;
+
+    PCollection<Long> input = p
+        .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
+            .withoutMetadata())
+        .apply(Values.<Long>create());
+
+    addCountingAsserts(input, numElements);
+    p.run();
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testUnboundedSourceWithExplicitPartitions() {
+    Pipeline p = TestPipeline.create();
+    int numElements = 1000;
+
+    List<String> topics = ImmutableList.of("test");
+
+    KafkaIO.TypedRead<byte[], Long> reader = KafkaIO.read()
+        .withBootstrapServers("none")
+        .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5)))
+        .withConsumerFactoryFn(new ConsumerFactoryFn(topics, 10, numElements)) // 10 partitions
+        .withValueCoder(BigEndianLongCoder.of())
+        .withMaxNumRecords(numElements / 10);
+
+    PCollection<Long> input = p
+        .apply(reader.withoutMetadata())
+        .apply(Values.<Long>create());
+
+    // assert that every element is a multiple of 5.
+    PAssert
+      .that(input)
+      .satisfies(new AssertMultipleOf(5));
+
+    PAssert
+      .thatSingleton(input.apply(Count.<Long>globally()))
+      .isEqualTo(numElements / 10L);
+
+    p.run();
+  }
+
+  private static class ElementValueDiff extends DoFn<Long, Long> {
+    @Override
+    public void processElement(ProcessContext c) throws Exception {
+      c.output(c.element() - c.timestamp().getMillis());
+    }
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testUnboundedSourceTimestamps() {
+    Pipeline p = TestPipeline.create();
+    int numElements = 1000;
+
+    PCollection<Long> input = p
+        .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata())
+        .apply(Values.<Long>create());
+
+    addCountingAsserts(input, numElements);
+
+    PCollection<Long> diffs = input
+        .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
+        .apply("RemoveDuplicateTimestamps", RemoveDuplicates.<Long>create());
+    // This assert also confirms that diffs only has one unique value.
+    PAssert.thatSingleton(diffs).isEqualTo(0L);
+
+    p.run();
+  }
+
+  private static class RemoveKafkaMetadata<K, V> extends DoFn<KafkaRecord<K, V>, KV<K, V>> {
+    @Override
+    public void processElement(ProcessContext ctx) throws Exception {
+      ctx.output(ctx.element().getKV());
+    }
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testUnboundedSourceSplits() throws Exception {
+    Pipeline p = TestPipeline.create();
+    int numElements = 1000;
+    int numSplits = 10;
+
+    UnboundedSource<KafkaRecord<byte[], Long>, ?> initial =
+        mkKafkaReadTransform(numElements, null).makeSource();
+    List<? extends UnboundedSource<KafkaRecord<byte[], Long>, ?>> splits =
+        initial.generateInitialSplits(numSplits, p.getOptions());
+    assertEquals("Expected exact splitting", numSplits, splits.size());
+
+    long elementsPerSplit = numElements / numSplits;
+    assertEquals("Expected even splits", numElements, elementsPerSplit * numSplits);
+    PCollectionList<Long> pcollections = PCollectionList.empty(p);
+    for (int i = 0; i < splits.size(); ++i) {
+      pcollections = pcollections.and(
+          p.apply("split" + i, Read.from(splits.get(i)).withMaxNumRecords(elementsPerSplit))
+           .apply("Remove Metadata " + i, ParDo.of(new RemoveKafkaMetadata<byte[], Long>()))
+           .apply("collection " + i, Values.<Long>create()));
+    }
+    PCollection<Long> input = pcollections.apply(Flatten.<Long>pCollections());
+
+    addCountingAsserts(input, numElements);
+    p.run();
+  }
+
+  /**
+   * A timestamp function that uses the given value as the timestamp.
+   */
+  private static class ValueAsTimestampFn
+                       implements SerializableFunction<KV<byte[], Long>, Instant> {
+    @Override
+    public Instant apply(KV<byte[], Long> input) {
+      return new Instant(input.getValue());
+    }
+  }
+
+  @Test
+  public void testUnboundedSourceCheckpointMark() throws Exception {
+    int numElements = 85; // 85 to make sure some partitions have more records than other.
+
+    // create a single split:
+    UnboundedSource<KafkaRecord<byte[], Long>, KafkaCheckpointMark> source =
+        mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
+          .makeSource()
+          .generateInitialSplits(1, PipelineOptionsFactory.fromArgs(new String[0]).create())
+          .get(0);
+
+    UnboundedReader<KafkaRecord<byte[], Long>> reader = source.createReader(null, null);
+    final int numToSkip = 3;
+    // advance once:
+    assertTrue(reader.start());
+
+    // Advance the source numToSkip-1 elements and manually save state.
+    for (long l = 0; l < numToSkip - 1; ++l) {
+      assertTrue(reader.advance());
+    }
+
+    // Confirm that we get the expected element in sequence before checkpointing.
+
+    assertEquals(numToSkip - 1, (long) reader.getCurrent().getKV().getValue());
+    assertEquals(numToSkip - 1, reader.getCurrentTimestamp().getMillis());
+
+    // Checkpoint and restart, and confirm that the source continues correctly.
+    KafkaCheckpointMark mark = CoderUtils.clone(
+        source.getCheckpointMarkCoder(), (KafkaCheckpointMark) reader.getCheckpointMark());
+    reader = source.createReader(null, mark);
+    assertTrue(reader.start());
+
+    // Confirm that we get the next elements in sequence.
+    // This also confirms that Reader interleaves records from each partitions by the reader.
+    for (int i = numToSkip; i < numElements; i++) {
+      assertEquals(i, (long) reader.getCurrent().getKV().getValue());
+      assertEquals(i, reader.getCurrentTimestamp().getMillis());
+      reader.advance();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/92106605/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 75f192c..95d1f55 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -36,6 +36,7 @@
 
   <modules>
     <module>hdfs</module>
+    <module>kafka</module>
   </modules>
 
-</project>
\ No newline at end of file
+</project>