You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2016/10/28 14:47:56 UTC

[22/50] incubator-beam git commit: [BEAM-658] Support Read.Unbounded primitive.

[BEAM-658] Support Read.Unbounded primitive.

Changed mapSourceFunction to use scala's native Option.

Upgrade to kryo-serializers 0.39 that provides support for ReverseList (used by Top).

Better logging.

Allow a longer read time-frame for read in tests.

Assert initial parallelism is gerater than zero.

Add OnBatchCompleted listener that writes to Kafka.

Test latest.

Give read from Kafka more time to avoid flaky tests.

Change log level in MicrobatchSource.Reader to debug.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b72e7e34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b72e7e34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b72e7e34

Branch: refs/heads/apex-runner
Commit: b72e7e34f70527e4ee575aaf0bc8c4c31ed31b0a
Parents: c94b8ea
Author: Sela <an...@paypal.com>
Authored: Thu Sep 29 16:30:32 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Wed Oct 26 20:51:16 2016 +0300

----------------------------------------------------------------------
 runners/spark/pom.xml                           |  44 ++--
 .../runners/spark/SparkPipelineOptions.java     |  11 +
 .../runners/spark/io/EmptyCheckpointMark.java   |  52 ++++
 .../apache/beam/runners/spark/io/KafkaIO.java   | 131 ----------
 .../beam/runners/spark/io/MicrobatchSource.java | 262 +++++++++++++++++++
 .../beam/runners/spark/io/SourceDStream.java    | 156 +++++++++++
 .../apache/beam/runners/spark/io/SourceRDD.java |  75 +++++-
 .../runners/spark/io/SparkUnboundedSource.java  | 167 ++++++++++++
 .../spark/stateful/StateSpecFunctions.java      | 167 ++++++++++++
 .../runners/spark/stateful/package-info.java    |  22 ++
 .../spark/translation/TranslationUtils.java     |  17 ++
 .../SparkRunnerStreamingContextFactory.java     |   7 +
 .../streaming/StreamingTransformTranslator.java |  37 +--
 .../streaming/KafkaStreamingTest.java           | 150 ++++++++---
 .../ResumeFromCheckpointStreamingTest.java      |  34 ++-
 .../streaming/utils/EmbeddedKafkaCluster.java   |  51 ++--
 .../utils/KafkaWriteOnBatchCompleted.java       | 105 ++++++++
 17 files changed, 1224 insertions(+), 264 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b72e7e34/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 458205a..36ce7b5 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -36,7 +36,7 @@
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
     <spark.version>1.6.2</spark.version>
     <hadoop.version>2.2.0</hadoop.version>
-    <kafka.version>0.8.2.1</kafka.version>
+    <kafka.version>0.9.0.1</kafka.version>
     <dropwizard.metrics.version>3.1.2</dropwizard.metrics.version>
   </properties>
 
@@ -112,29 +112,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
-      <artifactId>spark-streaming-kafka_2.10</artifactId>
-      <version>${spark.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.spark</groupId>
       <artifactId>spark-network-common_2.10</artifactId>
       <version>${spark.version}</version>
       <scope>provided</scope>
     </dependency>
     <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_2.10</artifactId>
-      <version>${kafka.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka-clients</artifactId>
-      <version>${kafka.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <version>${hadoop.version}</version>
@@ -208,6 +190,11 @@
       <artifactId>joda-time</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.google.http-client</groupId>
+      <artifactId>google-http-client</artifactId>
+      <version>${google-clients.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-compress</artifactId>
       <version>1.9</version>
@@ -272,6 +259,18 @@
       <version>${dropwizard.metrics.version}</version>
     </dependency>
 
+    <!-- KafkaIO -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-kafka</artifactId>
+      <version>0.3.0-incubating-SNAPSHOT</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>0.9.0.1</version>
+    </dependency>
+
     <!-- test dependencies -->
     <dependency>
       <groupId>junit</groupId>
@@ -284,12 +283,17 @@
         </exclusion>
       </exclusions>
     </dependency>
-
     <dependency>
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-all</artifactId>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.10</artifactId>
+      <version>${kafka.version}</version>
+      <scope>test</scope>
+    </dependency>
 
     <!-- Depend on test jar to scan for RunnableOnService tests -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b72e7e34/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 4d9d51f..08e14fe 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -52,6 +52,17 @@ public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions,
   Long getBatchIntervalMillis();
   void setBatchIntervalMillis(Long batchInterval);
 
+  @Description("Minimum time to spend on read, for each micro-batch.")
+  @Default.Long(200)
+  Long getMinReadTimeMillis();
+  void setMinReadTimeMillis(Long minReadTimeMillis);
+
+  @Description("A value between 0-1 to describe the percentage of a micro-batch dedicated "
+      + "to reading from UnboundedSource.")
+  @Default.Double(0.1)
+  Double getReadTimePercentage();
+  void setReadTimePercentage(Double readTimePercentage);
+
   @Description("A checkpoint directory for streaming resilience, ignored in batch. "
       + "For durability, a reliable filesystem such as HDFS/S3/GS is necessary.")
   @Default.InstanceFactory(TmpCheckpointDirFactory.class)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b72e7e34/runners/spark/src/main/java/org/apache/beam/runners/spark/io/EmptyCheckpointMark.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/EmptyCheckpointMark.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/EmptyCheckpointMark.java
new file mode 100644
index 0000000..a4ab379
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/EmptyCheckpointMark.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.io;
+
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.beam.sdk.io.UnboundedSource;
+
+
+/**
+ * Passing null values to Spark's Java API may cause problems because of Guava preconditions.
+ * See: {@link org.apache.spark.api.java.JavaUtils#optionToOptional}
+ */
+public class EmptyCheckpointMark implements UnboundedSource.CheckpointMark, Serializable {
+  private static final EmptyCheckpointMark INSTANCE = new EmptyCheckpointMark();
+  private static final int ID = 2654265; // some constant to serve as identifier.
+
+  private EmptyCheckpointMark() {};
+
+  public static EmptyCheckpointMark get() {
+    return INSTANCE;
+  }
+
+  @Override
+  public void finalizeCheckpoint() throws IOException { }
+
+  @Override
+  public boolean equals(Object obj) {
+    return obj instanceof EmptyCheckpointMark;
+  }
+
+  @Override
+  public int hashCode() {
+    return ID;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b72e7e34/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java
deleted file mode 100644
index 8cf2083..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/KafkaIO.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.io;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Map;
-import java.util.Set;
-import kafka.serializer.Decoder;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * Read stream from Kafka.
- */
-public final class KafkaIO {
-
-  private KafkaIO() {
-  }
-
-  /**
-   * Read operation from Kafka topics.
-   */
-  public static final class Read {
-
-    private Read() {
-    }
-
-    /**
-     * Define the Kafka consumption.
-     *
-     * @param keyDecoder    {@link Decoder} to decode the Kafka message key
-     * @param valueDecoder  {@link Decoder} to decode the Kafka message value
-     * @param key           Kafka message key Class
-     * @param value         Kafka message value Class
-     * @param topics        Kafka topics to subscribe
-     * @param kafkaParams   map of Kafka parameters
-     * @param <K>           Kafka message key Class type
-     * @param <V>           Kafka message value Class type
-     * @return KafkaIO Unbound input
-     */
-    public static <K, V> Unbound<K, V> from(Class<? extends Decoder<K>> keyDecoder,
-                                            Class<? extends Decoder<V>> valueDecoder,
-                                            Class<K> key,
-                                            Class<V> value, Set<String> topics,
-                                            Map<String, String> kafkaParams) {
-      return new Unbound<>(keyDecoder, valueDecoder, key, value, topics, kafkaParams);
-    }
-
-    /**
-     * A {@link PTransform} reading from Kafka topics and providing {@link PCollection}.
-     */
-    public static class Unbound<K, V> extends PTransform<PBegin, PCollection<KV<K, V>>> {
-
-      private final Class<? extends Decoder<K>> keyDecoderClass;
-      private final Class<? extends Decoder<V>> valueDecoderClass;
-      private final Class<K> keyClass;
-      private final Class<V> valueClass;
-      private final Set<String> topics;
-      private final Map<String, String> kafkaParams;
-
-      Unbound(Class<? extends Decoder<K>> keyDecoder,
-              Class<? extends Decoder<V>> valueDecoder, Class<K> key,
-              Class<V> value, Set<String> topics, Map<String, String> kafkaParams) {
-        checkNotNull(keyDecoder, "need to set the key decoder class of a KafkaIO.Read transform");
-        checkNotNull(
-            valueDecoder, "need to set the value decoder class of a KafkaIO.Read transform");
-        checkNotNull(key, "need to set the key class of a KafkaIO.Read transform");
-        checkNotNull(value, "need to set the value class of a KafkaIO.Read transform");
-        checkNotNull(topics, "need to set the topics of a KafkaIO.Read transform");
-        checkNotNull(kafkaParams, "need to set the kafkaParams of a KafkaIO.Read transform");
-        this.keyDecoderClass = keyDecoder;
-        this.valueDecoderClass = valueDecoder;
-        this.keyClass = key;
-        this.valueClass = value;
-        this.topics = topics;
-        this.kafkaParams = kafkaParams;
-      }
-
-      public Class<? extends Decoder<K>> getKeyDecoderClass() {
-        return keyDecoderClass;
-      }
-
-      public Class<? extends Decoder<V>> getValueDecoderClass() {
-        return valueDecoderClass;
-      }
-
-      public Class<V> getValueClass() {
-        return valueClass;
-      }
-
-      public Class<K> getKeyClass() {
-        return keyClass;
-      }
-
-      public Set<String> getTopics() {
-        return topics;
-      }
-
-      public Map<String, String> getKafkaParams() {
-        return kafkaParams;
-      }
-
-      @Override
-      public PCollection<KV<K, V>> apply(PBegin input) {
-        // Spark streaming micro batches are bounded by default
-        return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
-            WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED);
-      }
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b72e7e34/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
new file mode 100644
index 0000000..4a174aa
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.io;
+
+import com.google.api.client.util.BackOff;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Mostly based on {@link org.apache.beam.sdk.io.BoundedReadFromUnboundedSource},
+ * with some adjustments for this specific use-case.
+ *
+ * <p>A {@link BoundedSource} wrapping an {@link UnboundedSource} to complement Spark's micro-batch
+ * nature.
+ *
+ * <p>By design, Spark's micro-batches are bounded by their duration. Spark also provides a
+ * back-pressure mechanism that may signal a bound by max records.
+ */
+public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
+    extends BoundedSource<T> {
+  private static final Logger LOG = LoggerFactory.getLogger(MicrobatchSource.class);
+
+  private final UnboundedSource<T, CheckpointMarkT> source;
+  private final Duration maxReadTime;
+  private final int numInitialSplits;
+  private final long maxNumRecords;
+
+  // each split of the underlying UnboundedSource is associated with a (consistent) id
+  // to match it's corresponding CheckpointMark state.
+  private final int splitId;
+
+  MicrobatchSource(UnboundedSource<T, CheckpointMarkT> source,
+                   Duration maxReadTime,
+                   int numInitialSplits,
+                   long maxNumRecords,
+                   int splitId) {
+    this.source = source;
+    this.maxReadTime = maxReadTime;
+    this.numInitialSplits = numInitialSplits;
+    this.maxNumRecords = maxNumRecords;
+    this.splitId = splitId;
+  }
+
+  /**
+   * Divide the given number of records into {@code numSplits} approximately
+   * equal parts that sum to {@code numRecords}.
+   */
+  private static long[] splitNumRecords(long numRecords, int numSplits) {
+    long[] splitNumRecords = new long[numSplits];
+    for (int i = 0; i < numSplits; i++) {
+      splitNumRecords[i] = numRecords / numSplits;
+    }
+    for (int i = 0; i < numRecords % numSplits; i++) {
+      splitNumRecords[i] = splitNumRecords[i] + 1;
+    }
+    return splitNumRecords;
+  }
+
+  @Override
+  public List<? extends BoundedSource<T>>
+      splitIntoBundles(long desiredBundleSizeBytes,
+                       PipelineOptions options) throws Exception {
+    List<MicrobatchSource<T, CheckpointMarkT>> result = new ArrayList<>();
+    List<? extends UnboundedSource<T, CheckpointMarkT>> splits =
+        source.generateInitialSplits(numInitialSplits, options);
+    int numSplits = splits.size();
+    long[] numRecords = splitNumRecords(maxNumRecords, numSplits);
+    for (int i = 0; i < numSplits; i++) {
+      // splits must be stable, and cannot change during consecutive executions
+      // for example: Kafka should not add partitions if more then one topic is read.
+      result.add(new MicrobatchSource<>(splits.get(i), maxReadTime, 1, numRecords[i], i));
+    }
+    return result;
+  }
+
+  @Override
+  public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+    return 0;
+  }
+
+  @Override
+  public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+    return false;
+  }
+
+  @Override
+  public BoundedReader<T> createReader(PipelineOptions options) throws IOException {
+    return createReader(options, null);
+  }
+
+  public BoundedReader<T> createReader(PipelineOptions options, CheckpointMarkT checkpointMark)
+      throws IOException {
+    return new Reader(source.createReader(options, checkpointMark));
+  }
+
+  @Override
+  public void validate() {
+    source.validate();
+  }
+
+  @Override
+  public Coder<T> getDefaultOutputCoder() {
+    return source.getDefaultOutputCoder();
+  }
+
+  public Coder<CheckpointMarkT> getCheckpointMarkCoder() {
+    return source.getCheckpointMarkCoder();
+  }
+
+  public int getSplitId() {
+    return splitId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof MicrobatchSource)) {
+      return false;
+    }
+    MicrobatchSource<?, ?> that = (MicrobatchSource<?, ?>) o;
+
+    return splitId == that.splitId;
+  }
+
+  @Override
+  public int hashCode() {
+    return splitId;
+  }
+
+  /**
+   * A {@link BoundedSource.BoundedReader}
+   * wrapping an {@link UnboundedSource.UnboundedReader}.
+   *
+   * <p>This Reader will read until it reached the bound of duration, or max records,
+   * whichever comes first.
+   */
+  public class Reader extends BoundedSource.BoundedReader<T> {
+    private long recordsRead = 0L;
+    private final Instant endTime;
+    private final FluentBackoff backoffFactory;
+    private final UnboundedSource.UnboundedReader<T> reader;
+
+    private Reader(UnboundedSource.UnboundedReader<T> reader) {
+      endTime = Instant.now().plus(maxReadTime);
+      this.reader = reader;
+      backoffFactory =
+          FluentBackoff.DEFAULT
+              .withInitialBackoff(Duration.millis(10))
+              .withMaxBackoff(maxReadTime.minus(1))
+              .withMaxCumulativeBackoff(maxReadTime.minus(1));
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      LOG.debug("MicrobatchReader-{}: Starting a microbatch read from an unbounded source with a "
+          + "max read time of {} msec, and max number of records {}.", splitId, maxReadTime,
+              maxNumRecords);
+      if (reader.start()) {
+        recordsRead++;
+        return true;
+      } else {
+        return advanceWithBackoff();
+      }
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      if (recordsRead >= maxNumRecords) {
+        finalizeCheckpoint();
+        return false;
+      }
+      return advanceWithBackoff();
+    }
+
+    private boolean advanceWithBackoff() throws IOException {
+      // Try reading from the source with exponential backoff
+      BackOff backoff = backoffFactory.backoff();
+      long nextSleep = backoff.nextBackOffMillis();
+      while (nextSleep != BackOff.STOP) {
+        if (endTime != null && Instant.now().isAfter(endTime)) {
+          finalizeCheckpoint();
+          return false;
+        }
+        if (reader.advance()) {
+          recordsRead++;
+          return true;
+        }
+        Uninterruptibles.sleepUninterruptibly(nextSleep, TimeUnit.MILLISECONDS);
+        nextSleep = backoff.nextBackOffMillis();
+      }
+      finalizeCheckpoint();
+      return false;
+    }
+
+    private void finalizeCheckpoint() throws IOException {
+      reader.getCheckpointMark().finalizeCheckpoint();
+      LOG.debug("MicrobatchReader-{}: finalized CheckpointMark successfully after "
+          + "reading {} records.", splitId, recordsRead);
+    }
+
+    @Override
+    public T getCurrent() throws NoSuchElementException {
+      return reader.getCurrent();
+    }
+
+    @Override
+    public Instant getCurrentTimestamp() throws NoSuchElementException {
+      return reader.getCurrentTimestamp();
+    }
+
+    @Override
+    public void close() throws IOException {
+      reader.close();
+    }
+
+    @Override
+    public BoundedSource<T> getCurrentSource() {
+      return MicrobatchSource.this;
+    }
+
+    @SuppressWarnings("unchecked")
+    public CheckpointMarkT getCheckpointMark() {
+      return (CheckpointMarkT) reader.getCheckpointMark();
+    }
+
+    public long getNumRecordsRead() {
+      return recordsRead;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b72e7e34/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
new file mode 100644
index 0000000..4e47757
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.io;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.spark.api.java.JavaSparkContext$;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.streaming.StreamingContext;
+import org.apache.spark.streaming.Time;
+import org.apache.spark.streaming.dstream.InputDStream;
+import org.apache.spark.streaming.scheduler.RateController;
+import org.apache.spark.streaming.scheduler.RateController$;
+import org.apache.spark.streaming.scheduler.rate.RateEstimator;
+import org.apache.spark.streaming.scheduler.rate.RateEstimator$;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Tuple2;
+
+
+
+/**
+ * A {@link SourceDStream} is an {@link InputDStream} of {@link SourceRDD.Unbounded}s.
+ *
+ * <p>This InputDStream will create a stream of partitioned {@link UnboundedSource}s,
+ * and their respective, (optional) starting {@link UnboundedSource.CheckpointMark}.
+ *
+ * <p>The underlying Source is actually a {@link MicrobatchSource} with bounds on read duration,
+ * and max records. Both set here.
+ * Read duration bound is affected by {@link SparkPipelineOptions#getReadTimePercentage()} and
+ * {@link SparkPipelineOptions#getMinReadTimeMillis()}.
+ * Records bound is controlled by the {@link RateController} mechanism.
+ */
+public class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
+      extends InputDStream<Tuple2<Source<T>, CheckpointMarkT>> {
+  private static final Logger LOG = LoggerFactory.getLogger(SourceDStream.class);
+
+  private final UnboundedSource<T, CheckpointMarkT> unboundedSource;
+  private final SparkRuntimeContext runtimeContext;
+  private final Duration boundReadDuration;
+  // the initial parallelism, set by Spark's backend, will be determined once when the job starts.
+  // in case of resuming/recovering from checkpoint, the DStream will be reconstructed and this
+  // property should not be reset.
+  private final int initialParallelism;
+
+  public SourceDStream(StreamingContext ssc,
+                       UnboundedSource<T, CheckpointMarkT> unboundedSource,
+                       SparkRuntimeContext runtimeContext) {
+    super(ssc, JavaSparkContext$.MODULE$.<scala.Tuple2<Source<T>, CheckpointMarkT>>fakeClassTag());
+    this.unboundedSource = unboundedSource;
+    this.runtimeContext = runtimeContext;
+    SparkPipelineOptions options = runtimeContext.getPipelineOptions().as(
+        SparkPipelineOptions.class);
+    this.boundReadDuration = boundReadDuration(options.getReadTimePercentage(),
+        options.getMinReadTimeMillis());
+    // set initial parallelism once.
+    this.initialParallelism = ssc().sc().defaultParallelism();
+    checkArgument(this.initialParallelism > 0, "Number of partitions must be greater than zero.");
+  }
+
+  @Override
+  public scala.Option<RDD<Tuple2<Source<T>, CheckpointMarkT>>> compute(Time validTime) {
+    MicrobatchSource<T, CheckpointMarkT> microbatchSource = new MicrobatchSource<>(
+        unboundedSource, boundReadDuration, initialParallelism, rateControlledMaxRecords(), -1);
+    RDD<scala.Tuple2<Source<T>, CheckpointMarkT>> rdd = new SourceRDD.Unbounded<>(
+        ssc().sc(), runtimeContext, microbatchSource);
+    return scala.Option.apply(rdd);
+  }
+
+  @Override
+  public void start() { }
+
+  @Override
+  public void stop() { }
+
+  @Override
+  public String name() {
+    return "Beam UnboundedSource [" + id() + "]";
+  }
+
+  //---- Bound by time.
+
+  // return the largest between the proportional read time (%batchDuration dedicated for read)
+  // and the min. read time set.
+  private Duration boundReadDuration(double readTimePercentage, long minReadTimeMillis) {
+    long batchDurationMillis = ssc().graph().batchDuration().milliseconds();
+    Duration proportionalDuration = new Duration(Math.round(
+        batchDurationMillis * readTimePercentage));
+    Duration lowerBoundDuration = new Duration(minReadTimeMillis);
+    Duration readDuration = proportionalDuration.isLongerThan(lowerBoundDuration)
+        ? proportionalDuration : lowerBoundDuration;
+    LOG.info("Read duration set to: " + readDuration);
+    return readDuration;
+  }
+
+  //---- Bound by records.
+
+  private long rateControlledMaxRecords() {
+    scala.Option<RateController> rateControllerOption = rateController();
+    if (rateControllerOption.isDefined()) {
+      long rateLimitPerSecond = rateControllerOption.get().getLatestRate();
+      if (rateLimitPerSecond > 0) {
+        long totalRateLimit =
+            rateLimitPerSecond * (ssc().graph().batchDuration().milliseconds() / 1000);
+        LOG.info("RateController set limit to {}", totalRateLimit);
+        return totalRateLimit;
+      }
+    }
+    LOG.info("RateController had nothing to report, default is Long.MAX_VALUE");
+    return Long.MAX_VALUE;
+  }
+
+  private final RateController rateController = new SourceRateController(id(),
+      RateEstimator$.MODULE$.create(ssc().conf(), ssc().graph().batchDuration()));
+
+  @Override
+  public scala.Option<RateController> rateController() {
+    if (RateController$.MODULE$.isBackPressureEnabled(ssc().conf())) {
+      return scala.Option.apply(rateController);
+    } else {
+      return scala.Option.empty();
+    }
+  }
+
+  private static class SourceRateController extends RateController {
+
+    private SourceRateController(int id, RateEstimator rateEstimator) {
+      super(id, rateEstimator);
+    }
+
+    @Override
+    public void publish(long rate) { }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b72e7e34/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
index 679b8b1..3995c89 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
@@ -27,6 +27,7 @@ import java.util.List;
 import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.spark.Dependency;
 import org.apache.spark.InterruptibleIterator;
@@ -46,7 +47,8 @@ import org.slf4j.LoggerFactory;
 public class SourceRDD {
 
   /**
-   * A SourceRDD.Bounded reads input from a {@link BoundedSource} and creates a Spark {@link RDD}.
+   * A {@link SourceRDD.Bounded} reads input from a {@link BoundedSource}
+   * and creates a Spark {@link RDD}.
    * This is the default way for the SparkRunner to read data from Beam's BoundedSources.
    */
   public static class Bounded<T> extends RDD<WindowedValue<T>> {
@@ -196,5 +198,76 @@ public class SourceRDD {
     public int hashCode() {
       return 41 * (41 + rddId) + index;
     }
+
+    public Source<T> getSource() {
+      return source;
+    }
+  }
+
+  /**
+   * A {@link SourceRDD.Unbounded} is the implementation of a micro-batch
+   * in a {@link SourceDStream}.
+   *
+   * <p>This RDD is made of P partitions, each containing a single pair-element of the partitioned
+   * {@link MicrobatchSource} and an optional starting {@link UnboundedSource.CheckpointMark}.
+   */
+  public static class Unbounded<T, CheckpointMarkT extends
+        UnboundedSource.CheckpointMark> extends RDD<scala.Tuple2<Source<T>, CheckpointMarkT>> {
+    private final MicrobatchSource<T, CheckpointMarkT> microbatchSource;
+    private final SparkRuntimeContext runtimeContext;
+
+    private static final scala.collection.immutable.List<Dependency<?>> NIL =
+        scala.collection.immutable.List.empty();
+
+    public Unbounded(SparkContext sc,
+                     SparkRuntimeContext runtimeContext,
+                     MicrobatchSource<T, CheckpointMarkT> microbatchSource) {
+      super(sc, NIL,
+          JavaSparkContext$.MODULE$.<scala.Tuple2<Source<T>, CheckpointMarkT>>fakeClassTag());
+      this.runtimeContext = runtimeContext;
+      this.microbatchSource = microbatchSource;
+    }
+
+    @Override
+    public Partition[] getPartitions() {
+      try {
+        List<? extends Source<T>> partitionedSources = microbatchSource.splitIntoBundles(
+            -1 /* ignored */, runtimeContext.getPipelineOptions());
+        Partition[] partitions = new CheckpointableSourcePartition[partitionedSources.size()];
+        for (int i = 0; i < partitionedSources.size(); i++) {
+          partitions[i] = new CheckpointableSourcePartition<>(id(), i, partitionedSources.get(i),
+              EmptyCheckpointMark.get());
+        }
+        return partitions;
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to create partitions.", e);
+      }
+    }
+
+    @Override
+    public scala.collection.Iterator<scala.Tuple2<Source<T>, CheckpointMarkT>>
+    compute(Partition split, TaskContext context) {
+      @SuppressWarnings("unchecked")
+      CheckpointableSourcePartition<T, CheckpointMarkT> partition =
+          (CheckpointableSourcePartition<T, CheckpointMarkT>) split;
+      scala.Tuple2<Source<T>, CheckpointMarkT> tuple2 =
+          new scala.Tuple2<>(partition.getSource(), partition.checkpointMark);
+      return scala.collection.JavaConversions.asScalaIterator(
+          Collections.singleton(tuple2).iterator());
+    }
+  }
+
+  /** A {@link SourcePartition} with a {@link UnboundedSource.CheckpointMark}. */
+  private static class CheckpointableSourcePartition<T, CheckpointMarkT extends
+      UnboundedSource.CheckpointMark> extends SourcePartition<T> {
+    private final CheckpointMarkT checkpointMark;
+
+    CheckpointableSourcePartition(int rddId,
+                                  int index,
+                                  Source<T> source,
+                                  CheckpointMarkT checkpointMark) {
+      super(rddId, index, source);
+      this.checkpointMark = checkpointMark;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b72e7e34/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
new file mode 100644
index 0000000..b12098d
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.io;
+
+import java.util.Collections;
+import java.util.Iterator;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.stateful.StateSpecFunctions;
+import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
+import org.apache.beam.runners.spark.translation.TranslationUtils;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.spark.api.java.JavaSparkContext$;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.StateSpec;
+import org.apache.spark.streaming.Time;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaMapWithStateDStream;
+import org.apache.spark.streaming.api.java.JavaPairInputDStream;
+import org.apache.spark.streaming.api.java.JavaPairInputDStream$;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.dstream.DStream;
+import org.apache.spark.streaming.scheduler.StreamInputInfo;
+
+
+/**
+ * A "composite" InputDStream implementation for {@link UnboundedSource}s.
+ *
+ * <p>This read is a composite of the following steps:
+ * <ul>
+ * <li>Create a single-element (per-partition) stream, that contains the (partitioned)
+ * {@link Source} and an optional {@link UnboundedSource.CheckpointMark} to start from.</li>
+ * <li>Read from within a stateful operation {@link JavaPairInputDStream#mapWithState(StateSpec)}
+ * using the {@link StateSpecFunctions#mapSourceFunction(SparkRuntimeContext)} mapping function,
+ * which manages the state of the CheckpointMark per partition.</li>
+ * <li>Since the stateful operation is a map operation, the read iterator needs to be flattened,
+ * while reporting the properties of the read (such as number of records) to the tracker.</li>
+ * </ul>
+ */
+public class SparkUnboundedSource {
+
+  public static <T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
+  JavaDStream<WindowedValue<T>> read(JavaStreamingContext jssc,
+                                     SparkRuntimeContext rc,
+                                     UnboundedSource<T, CheckpointMarkT> source) {
+    JavaPairInputDStream<Source<T>, CheckpointMarkT> inputDStream =
+        JavaPairInputDStream$.MODULE$.fromInputDStream(new SourceDStream<>(jssc.ssc(), source, rc),
+            JavaSparkContext$.MODULE$.<Source<T>>fakeClassTag(),
+                JavaSparkContext$.MODULE$.<CheckpointMarkT>fakeClassTag());
+
+    // call mapWithState to read from a checkpointable sources.
+    //TODO: consider broadcasting the rc instead of re-sending every batch.
+    JavaMapWithStateDStream<Source<T>, CheckpointMarkT, byte[],
+        Iterator<WindowedValue<T>>> mapWithStateDStream = inputDStream.mapWithState(
+            StateSpec.function(StateSpecFunctions.<T, CheckpointMarkT>mapSourceFunction(rc)));
+
+    // set checkpoint duration for read stream, if set.
+    checkpointStream(mapWithStateDStream, rc);
+    // flatmap and report read elements. Use the inputDStream's id to tie between the reported
+    // info and the inputDStream it originated from.
+    int id = inputDStream.inputDStream().id();
+    ReportingFlatMappedDStream<WindowedValue<T>> reportingFlatMappedDStream =
+        new ReportingFlatMappedDStream<>(mapWithStateDStream.dstream(), id,
+            getSourceName(source, id));
+
+    return JavaDStream.fromDStream(reportingFlatMappedDStream,
+        JavaSparkContext$.MODULE$.<WindowedValue<T>>fakeClassTag());
+  }
+
+  private static <T> String getSourceName(Source<T> source, int id) {
+    StringBuilder sb = new StringBuilder();
+    for (String s: source.getClass().getSimpleName().replace("$", "").split("(?=[A-Z])")) {
+      String trimmed = s.trim();
+      if (!trimmed.isEmpty()) {
+        sb.append(trimmed).append(" ");
+      }
+    }
+    return sb.append("[").append(id).append("]").toString();
+  }
+
+  private static void checkpointStream(JavaDStream<?> dStream,
+                                       SparkRuntimeContext rc) {
+    long checkpointDurationMillis = rc.getPipelineOptions().as(SparkPipelineOptions.class)
+        .getCheckpointDurationMillis();
+    if (checkpointDurationMillis > 0) {
+      dStream.checkpoint(new Duration(checkpointDurationMillis));
+    }
+  }
+
+  /**
+   * A flatMap DStream function that "flattens" the Iterators read by the
+   * {@link MicrobatchSource.Reader}s, while reporting the properties of the read to the
+   * {@link org.apache.spark.streaming.scheduler.InputInfoTracker} for RateControl purposes
+   * and visibility.
+   */
+  private static class ReportingFlatMappedDStream<T> extends DStream<T> {
+    private final DStream<Iterator<T>> parent;
+    private final int inputDStreamId;
+    private final String sourceName;
+
+    ReportingFlatMappedDStream(DStream<Iterator<T>> parent,
+                               int inputDStreamId,
+                               String sourceName) {
+      super(parent.ssc(), JavaSparkContext$.MODULE$.<T>fakeClassTag());
+      this.parent = parent;
+      this.inputDStreamId = inputDStreamId;
+      this.sourceName = sourceName;
+    }
+
+    @Override
+    public Duration slideDuration() {
+      return parent.slideDuration();
+    }
+
+    @Override
+    public scala.collection.immutable.List<DStream<?>> dependencies() {
+      return scala.collection.JavaConversions.asScalaBuffer(
+          Collections.<DStream<?>>singletonList(parent)).toList();
+    }
+
+    @Override
+    public scala.Option<RDD<T>> compute(Time validTime) {
+      // compute parent.
+      scala.Option<RDD<Iterator<T>>> computedParentRDD = parent.getOrCompute(validTime);
+      // compute this DStream - take single-iterator partitions an flatMap them.
+      if (computedParentRDD.isDefined()) {
+        RDD<T> computedRDD = computedParentRDD.get().toJavaRDD()
+            .flatMap(TranslationUtils.<T>flattenIter()).rdd().cache();
+        // report - for RateEstimator and visibility.
+        report(validTime, computedRDD.count());
+        return scala.Option.apply(computedRDD);
+      } else {
+        report(validTime, 0);
+        return scala.Option.empty();
+      }
+    }
+
+    private void report(Time batchTime, long count) {
+      // metadata - #records read and a description.
+      scala.collection.immutable.Map<String, Object> metadata =
+          new scala.collection.immutable.Map.Map1<String, Object>(
+              StreamInputInfo.METADATA_KEY_DESCRIPTION(),
+                  String.format("Read %d records from %s for batch time: %s", count, sourceName,
+                      batchTime));
+      StreamInputInfo streamInputInfo = new StreamInputInfo(inputDStreamId, count, metadata);
+      ssc().scheduler().inputInfoTracker().reportInfo(batchTime, streamInputInfo);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b72e7e34/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
new file mode 100644
index 0000000..48849c2
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.stateful;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Iterators;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.runners.spark.coders.CoderHelpers;
+import org.apache.beam.runners.spark.io.EmptyCheckpointMark;
+import org.apache.beam.runners.spark.io.MicrobatchSource;
+import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.spark.streaming.State;
+import org.apache.spark.streaming.StateSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.runtime.AbstractFunction3;
+
+/**
+ * A class containing {@link org.apache.spark.streaming.StateSpec} mappingFunctions.
+ */
+public class StateSpecFunctions {
+  private static final Logger LOG = LoggerFactory.getLogger(StateSpecFunctions.class);
+
+  /**
+   * A helper class that is essentially a {@link Serializable} {@link AbstractFunction3}.
+   */
+  private abstract static class SerializableFunction3<T1, T2, T3, T4>
+      extends AbstractFunction3<T1, T2, T3, T4> implements Serializable {
+  }
+
+  /**
+   * A {@link org.apache.spark.streaming.StateSpec} function to support reading from
+   * an {@link UnboundedSource}.
+   *
+   * <p>This StateSpec function expects the following:
+   * <ul>
+   * <li>Key: The (partitioned) Source to read from.</li>
+   * <li>Value: An optional {@link UnboundedSource.CheckpointMark} to start from.</li>
+   * <li>State: A byte representation of the (previously) persisted CheckpointMark.</li>
+   * </ul>
+   * And returns an iterator over all read values (for the micro-batch).
+   *
+   * <p>This stateful operation could be described as a flatMap over a single-element stream, which
+   * outputs all the elements read from the {@link UnboundedSource} for this micro-batch.
+   * Since micro-batches are bounded, the provided UnboundedSource is wrapped by a
+   * {@link MicrobatchSource} that applies bounds in the form of duration and max records
+   * (per micro-batch).
+   *
+   *
+   * <p>In order to avoid using Spark Guava's classes which pollute the
+   * classpath, we use the {@link StateSpec#function(scala.Function3)} signature which employs
+   * scala's native {@link scala.Option}, instead of the
+   * {@link StateSpec#function(org.apache.spark.api.java.function.Function3)} signature,
+   * which employs Guava's {@link com.google.common.base.Optional}.
+   *
+   * <p>See also <a href="https://issues.apache.org/jira/browse/SPARK-4819">SPARK-4819</a>.</p>
+   *
+   * @param runtimeContext    A serializable {@link SparkRuntimeContext}.
+   * @param <T>               The type of the input stream elements.
+   * @param <CheckpointMarkT> The type of the {@link UnboundedSource.CheckpointMark}.
+   * @return The appropriate {@link org.apache.spark.streaming.StateSpec} function.
+   */
+  public static <T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
+  scala.Function3<Source<T>, scala.Option<CheckpointMarkT>, /* CheckpointMarkT */State<byte[]>,
+      Iterator<WindowedValue<T>>> mapSourceFunction(final SparkRuntimeContext runtimeContext) {
+
+    return new SerializableFunction3<Source<T>, Option<CheckpointMarkT>, State<byte[]>,
+        Iterator<WindowedValue<T>>>() {
+
+      @Override
+      public Iterator<WindowedValue<T>> apply(Source<T> source, scala.Option<CheckpointMarkT>
+          startCheckpointMark, State<byte[]> state) {
+        // source as MicrobatchSource
+        MicrobatchSource<T, CheckpointMarkT> microbatchSource =
+            (MicrobatchSource<T, CheckpointMarkT>) source;
+
+        // if state exists, use it, otherwise it's first time so use the startCheckpointMark.
+        // startCheckpointMark may be EmptyCheckpointMark (the Spark Java API tries to apply
+        // Optional(null)), which is handled by the UnboundedSource implementation.
+        Coder<CheckpointMarkT> checkpointCoder = microbatchSource.getCheckpointMarkCoder();
+        CheckpointMarkT checkpointMark;
+        if (state.exists()) {
+          checkpointMark = CoderHelpers.fromByteArray(state.get(), checkpointCoder);
+          LOG.info("Continue reading from an existing CheckpointMark.");
+        } else if (startCheckpointMark.isDefined()
+            && !startCheckpointMark.get().equals(EmptyCheckpointMark.get())) {
+          checkpointMark = startCheckpointMark.get();
+          LOG.info("Start reading from a provided CheckpointMark.");
+        } else {
+          checkpointMark = null;
+          LOG.info("No CheckpointMark provided, start reading from default.");
+        }
+
+        // create reader.
+        BoundedSource.BoundedReader<T> reader;
+        try {
+          reader =
+              microbatchSource.createReader(runtimeContext.getPipelineOptions(), checkpointMark);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+
+        // read microbatch.
+        final List<WindowedValue<T>> readValues = new ArrayList<>();
+        try {
+          // measure how long a read takes per-partition.
+          Stopwatch stopwatch = Stopwatch.createStarted();
+          boolean finished = !reader.start();
+          while (!finished) {
+            readValues.add(WindowedValue.of(reader.getCurrent(), reader.getCurrentTimestamp(),
+                GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
+            finished = !reader.advance();
+          }
+
+          // close and checkpoint reader.
+          reader.close();
+          LOG.info("Source id {} spent {} msec on reading.", microbatchSource.getSplitId(),
+              stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
+
+          // if the Source does not supply a CheckpointMark skip updating the state.
+          @SuppressWarnings("unchecked")
+          CheckpointMarkT finishedReadCheckpointMark =
+              (CheckpointMarkT) ((MicrobatchSource.Reader) reader).getCheckpointMark();
+          if (finishedReadCheckpointMark != null) {
+            state.update(CoderHelpers.toByteArray(finishedReadCheckpointMark, checkpointCoder));
+          } else {
+            LOG.info("Skipping checkpoint marking because the reader failed to supply one.");
+          }
+        } catch (IOException e) {
+          throw new RuntimeException("Failed to read from reader.", e);
+        }
+
+        return Iterators.unmodifiableIterator(readValues.iterator());
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b72e7e34/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/package-info.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/package-info.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/package-info.java
new file mode 100644
index 0000000..e6677cc
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Spark-specific stateful operators.
+ */
+package org.apache.beam.runners.spark.stateful;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b72e7e34/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
index 51937ca..647f8c3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
@@ -21,6 +21,7 @@ package org.apache.beam.runners.spark.translation;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import java.io.Serializable;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.spark.util.BroadcastHelper;
@@ -37,6 +38,7 @@ import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
+import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.streaming.api.java.JavaDStream;
@@ -143,6 +145,21 @@ public final class TranslationUtils {
     };
   }
 
+  /** A Flatmap iterator function, flattening iterators into their elements. */
+  public static <T> FlatMapFunction<Iterator<T>, T> flattenIter() {
+    return new FlatMapFunction<Iterator<T>, T>() {
+      @Override
+      public Iterable<T> call(final Iterator<T> t) throws Exception {
+        return new Iterable<T>() {
+          @Override
+          public Iterator<T> iterator() {
+            return t;
+          }
+        };
+      }
+    };
+  }
+
   /**
    * A utility class to filter {@link TupleTag}s.
    *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b72e7e34/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
index 79c87fb..2378788 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
@@ -18,6 +18,8 @@
 
 package org.apache.beam.runners.spark.translation.streaming;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import com.google.common.base.Predicates;
 import com.google.common.collect.Iterables;
 import java.net.MalformedURLException;
@@ -61,6 +63,11 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF
   @Override
   public JavaStreamingContext create() {
     LOG.info("Creating a new Spark Streaming Context");
+    // validate unbounded read properties.
+    checkArgument(options.getMinReadTimeMillis() < options.getBatchIntervalMillis(),
+        "Minimum read time has to be less than batch time.");
+    checkArgument(options.getReadTimePercentage() > 0 && options.getReadTimePercentage() < 1,
+        "Read time percentage is bound to (0, 1).");
 
     SparkPipelineTranslator translator = new StreamingTransformTranslator.Translator(
         new TransformTranslator.Translator());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b72e7e34/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 1af5e07..71c27df 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -23,14 +23,12 @@ import com.google.common.collect.Maps;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import kafka.serializer.Decoder;
 import org.apache.beam.runners.core.AssignWindowsDoFn;
 import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
 import org.apache.beam.runners.spark.io.ConsoleIO;
 import org.apache.beam.runners.spark.io.CreateStream;
-import org.apache.beam.runners.spark.io.KafkaIO;
+import org.apache.beam.runners.spark.io.SparkUnboundedSource;
 import org.apache.beam.runners.spark.translation.DoFnFunction;
 import org.apache.beam.runners.spark.translation.EvaluationContext;
 import org.apache.beam.runners.spark.translation.GroupCombineFunctions;
@@ -44,6 +42,7 @@ import org.apache.beam.runners.spark.translation.WindowingHelpers;
 import org.apache.beam.runners.spark.util.BroadcastHelper;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.Flatten;
@@ -74,11 +73,6 @@ import org.apache.spark.streaming.Durations;
 import org.apache.spark.streaming.api.java.JavaDStream;
 import org.apache.spark.streaming.api.java.JavaDStreamLike;
 import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaPairInputDStream;
-import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.kafka.KafkaUtils;
-
-import scala.Tuple2;
 
 
 
@@ -103,28 +97,13 @@ public final class StreamingTransformTranslator {
     };
   }
 
-  private static <K, V> TransformEvaluator<KafkaIO.Read.Unbound<K, V>> kafka() {
-    return new TransformEvaluator<KafkaIO.Read.Unbound<K, V>>() {
+  private static <T> TransformEvaluator<Read.Unbounded<T>> readUnbounded() {
+    return new TransformEvaluator<Read.Unbounded<T>>() {
       @Override
-      public void evaluate(KafkaIO.Read.Unbound<K, V> transform, EvaluationContext context) {
+      public void evaluate(Read.Unbounded<T> transform, EvaluationContext context) {
         StreamingEvaluationContext sec = (StreamingEvaluationContext) context;
-        JavaStreamingContext jssc = sec.getStreamingContext();
-        Class<K> keyClazz = transform.getKeyClass();
-        Class<V> valueClazz = transform.getValueClass();
-        Class<? extends Decoder<K>> keyDecoderClazz = transform.getKeyDecoderClass();
-        Class<? extends Decoder<V>> valueDecoderClazz = transform.getValueDecoderClass();
-        Map<String, String> kafkaParams = transform.getKafkaParams();
-        Set<String> topics = transform.getTopics();
-        JavaPairInputDStream<K, V> inputPairStream = KafkaUtils.createDirectStream(jssc, keyClazz,
-                valueClazz, keyDecoderClazz, valueDecoderClazz, kafkaParams, topics);
-        JavaDStream<WindowedValue<KV<K, V>>> inputStream =
-            inputPairStream.map(new Function<Tuple2<K, V>, KV<K, V>>() {
-          @Override
-          public KV<K, V> call(Tuple2<K, V> t2) throws Exception {
-            return KV.of(t2._1(), t2._2());
-          }
-        }).map(WindowingHelpers.<KV<K, V>>windowFunction());
-        sec.setStream(transform, inputStream);
+        sec.setStream(transform, SparkUnboundedSource.read(sec.getStreamingContext(),
+            sec.getRuntimeContext(), transform.getSource()));
       }
     };
   }
@@ -455,6 +434,7 @@ public final class StreamingTransformTranslator {
       .newHashMap();
 
   static {
+    EVALUATORS.put(Read.Unbounded.class, readUnbounded());
     EVALUATORS.put(GroupByKey.class, groupByKey());
     EVALUATORS.put(Combine.GroupedValues.class, combineGrouped());
     EVALUATORS.put(Combine.Globally.class, combineGlobally());
@@ -463,7 +443,6 @@ public final class StreamingTransformTranslator {
     EVALUATORS.put(ParDo.BoundMulti.class, multiDo());
     EVALUATORS.put(ConsoleIO.Write.Unbound.class, print());
     EVALUATORS.put(CreateStream.QueuedValues.class, createFromQueue());
-    EVALUATORS.put(KafkaIO.Read.Unbound.class, kafka());
     EVALUATORS.put(Window.Bound.class, window());
     EVALUATORS.put(Flatten.FlattenPCollectionList.class, flattenPColl());
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b72e7e34/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index d72e70e..8728948 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -19,20 +19,22 @@ package org.apache.beam.runners.spark.translation.streaming;
 
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
-import kafka.serializer.StringDecoder;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.io.KafkaIO;
 import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
+import org.apache.beam.runners.spark.translation.streaming.utils.KafkaWriteOnBatchCompleted;
 import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
 import org.apache.beam.runners.spark.translation.streaming.utils.TestOptionsForStreaming;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.RemoveDuplicates;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
@@ -41,6 +43,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.spark.streaming.api.java.JavaStreamingListener;
 import org.joda.time.Duration;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -53,32 +56,14 @@ import org.junit.rules.TemporaryFolder;
  */
 public class KafkaStreamingTest {
   private static final EmbeddedKafkaCluster.EmbeddedZookeeper EMBEDDED_ZOOKEEPER =
-          new EmbeddedKafkaCluster.EmbeddedZookeeper();
+      new EmbeddedKafkaCluster.EmbeddedZookeeper();
   private static final EmbeddedKafkaCluster EMBEDDED_KAFKA_CLUSTER =
-          new EmbeddedKafkaCluster(EMBEDDED_ZOOKEEPER.getConnection(), new Properties());
-  private static final String TOPIC = "kafka_beam_test_topic";
-  private static final Map<String, String> KAFKA_MESSAGES = ImmutableMap.of(
-      "k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4"
-  );
-  private static final String[] EXPECTED = {"k1,v1", "k2,v2", "k3,v3", "k4,v4"};
+      new EmbeddedKafkaCluster(EMBEDDED_ZOOKEEPER.getConnection());
 
   @BeforeClass
   public static void init() throws IOException {
     EMBEDDED_ZOOKEEPER.startup();
     EMBEDDED_KAFKA_CLUSTER.startup();
-
-    // write to Kafka
-    Properties producerProps = new Properties();
-    producerProps.putAll(EMBEDDED_KAFKA_CLUSTER.getProps());
-    producerProps.put("request.required.acks", 1);
-    producerProps.put("bootstrap.servers", EMBEDDED_KAFKA_CLUSTER.getBrokerList());
-    Serializer<String> stringSerializer = new StringSerializer();
-    try (@SuppressWarnings("unchecked") KafkaProducer<String, String> kafkaProducer =
-        new KafkaProducer(producerProps, stringSerializer, stringSerializer)) {
-      for (Map.Entry<String, String> en : KAFKA_MESSAGES.entrySet()) {
-        kafkaProducer.send(new ProducerRecord<>(TOPIC, en.getKey(), en.getValue()));
-      }
-    }
   }
 
   @Rule
@@ -88,25 +73,122 @@ public class KafkaStreamingTest {
   public TestOptionsForStreaming commonOptions = new TestOptionsForStreaming();
 
   @Test
-  public void testRun() throws Exception {
+  public void testEarliest2Topics() throws Exception {
+    SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
+        checkpointParentDir.newFolder(getClass().getSimpleName()));
+    // It seems that the consumer's first "position" lookup (in unit test) takes +200 msec,
+    // so to be on the safe side we'll set to 750 msec.
+    options.setMinReadTimeMillis(750L);
+    //--- setup
+    // two topics.
+    final String topic1 = "topic1";
+    final String topic2 = "topic2";
+    // messages.
+    final Map<String, String> messages = ImmutableMap.of(
+        "k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4"
+    );
+    // expected.
+    final String[] expected = {"k1,v1", "k2,v2", "k3,v3", "k4,v4"};
+    // batch and window duration.
+    final Duration batchAndWindowDuration = Duration.standardSeconds(1);
+
+    // write to both topics ahead.
+    produce(topic1, messages);
+    produce(topic2, messages);
+
+    //------- test: read and dedup.
+    Pipeline p = Pipeline.create(options);
+
+    Map<String, Object> consumerProps = ImmutableMap.<String, Object>of(
+        "auto.offset.reset", "earliest"
+    );
+
+    KafkaIO.Read<String, String> read = KafkaIO.read()
+        .withBootstrapServers(EMBEDDED_KAFKA_CLUSTER.getBrokerList())
+        .withTopics(Arrays.asList(topic1, topic2))
+        .withKeyCoder(StringUtf8Coder.of())
+        .withValueCoder(StringUtf8Coder.of())
+        .updateConsumerProperties(consumerProps);
+
+    PCollection<String> deduped =
+        p.apply(read.withoutMetadata()).setCoder(
+            KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
+        .apply(Window.<KV<String, String>>into(FixedWindows.of(batchAndWindowDuration)))
+        .apply(ParDo.of(new FormatKVFn()))
+        .apply(RemoveDuplicates.<String>create());
+
+    PAssertStreaming.runAndAssertContents(p, deduped, expected);
+  }
+
+  @Test
+  public void testLatest() throws Exception {
     SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
         checkpointParentDir.newFolder(getClass().getSimpleName()));
-    Map<String, String> kafkaParams = ImmutableMap.of(
-        "metadata.broker.list", EMBEDDED_KAFKA_CLUSTER.getBrokerList(),
-        "auto.offset.reset", "smallest"
+    //--- setup
+    final String topic = "topic";
+    // messages.
+    final Map<String, String> messages = ImmutableMap.of(
+        "k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4"
     );
+    // expected.
+    final String[] expected = {"k1,v1", "k2,v2", "k3,v3", "k4,v4"};
+    // batch and window duration.
+    final Duration batchAndWindowDuration = Duration.standardSeconds(1);
+
+    // write once first batch completes, this will guarantee latest-like behaviour.
+    options.setListeners(Collections.<JavaStreamingListener>singletonList(
+        KafkaWriteOnBatchCompleted.once(messages, Collections.singletonList(topic),
+            EMBEDDED_KAFKA_CLUSTER.getProps(), EMBEDDED_KAFKA_CLUSTER.getBrokerList())));
+    // It seems that the consumer's first "position" lookup (in unit test) takes +200 msec,
+    // so to be on the safe side we'll set to 750 msec.
+    options.setMinReadTimeMillis(750L);
+    // run for more than 1 batch interval, so that reading of latest is attempted in the
+    // first batch with no luck, while the OnBatchCompleted injected-input afterwards will be read
+    // in the second interval.
+    options.setTimeout(Duration.standardSeconds(3).getMillis());
 
+    //------- test: read and format.
     Pipeline p = Pipeline.create(options);
-    PCollection<KV<String, String>> kafkaInput = p.apply(KafkaIO.Read.from(StringDecoder.class,
-        StringDecoder.class, String.class, String.class, Collections.singleton(TOPIC),
-        kafkaParams))
-        .setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
-    PCollection<KV<String, String>> windowedWords = kafkaInput
-        .apply(Window.<KV<String, String>>into(FixedWindows.of(Duration.standardSeconds(1))));
 
-    PCollection<String> formattedKV = windowedWords.apply(ParDo.of(new FormatKVFn()));
+    Map<String, Object> consumerProps = ImmutableMap.<String, Object>of(
+        "auto.offset.reset", "latest"
+    );
+
+    KafkaIO.Read<String, String> read = KafkaIO.read()
+        .withBootstrapServers(EMBEDDED_KAFKA_CLUSTER.getBrokerList())
+        .withTopics(Collections.singletonList(topic))
+        .withKeyCoder(StringUtf8Coder.of())
+        .withValueCoder(StringUtf8Coder.of())
+        .updateConsumerProperties(consumerProps);
 
-    PAssertStreaming.runAndAssertContents(p, formattedKV, EXPECTED);
+    PCollection<String> formatted =
+        p.apply(read.withoutMetadata()).setCoder(
+            KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
+        .apply(Window.<KV<String, String>>into(FixedWindows.of(batchAndWindowDuration)))
+        .apply(ParDo.of(new FormatKVFn()));
+
+    PAssertStreaming.runAndAssertContents(p, formatted, expected);
+  }
+
+  private static void produce(String topic, Map<String, String> messages) {
+    Serializer<String> stringSerializer = new StringSerializer();
+    try (@SuppressWarnings("unchecked") KafkaProducer<String, String> kafkaProducer =
+        new KafkaProducer(defaultProducerProps(), stringSerializer, stringSerializer)) {
+          // feed topic.
+          for (Map.Entry<String, String> en : messages.entrySet()) {
+            kafkaProducer.send(new ProducerRecord<>(topic, en.getKey(), en.getValue()));
+          }
+          // await send completion.
+          kafkaProducer.flush();
+        }
+  }
+
+  private static Properties defaultProducerProps() {
+    Properties producerProps = new Properties();
+    producerProps.putAll(EMBEDDED_KAFKA_CLUSTER.getProps());
+    producerProps.put("acks", "1");
+    producerProps.put("bootstrap.servers", EMBEDDED_KAFKA_CLUSTER.getBrokerList());
+    return producerProps;
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b72e7e34/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index fd1d11a..ca0b668 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -27,17 +27,15 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
-import kafka.serializer.StringDecoder;
 import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
-import org.apache.beam.runners.spark.io.KafkaIO;
 import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
 import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
 import org.apache.beam.runners.spark.translation.streaming.utils.TestOptionsForStreaming;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -112,6 +110,9 @@ public class ResumeFromCheckpointStreamingTest {
   public void testRun() throws Exception {
     SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(
         checkpointParentDir.newFolder(getClass().getSimpleName()));
+    // It seems that the consumer's first "position" lookup (in unit test) takes +200 msec,
+    // so to be on the safe side we'll set to 750 msec.
+    options.setMinReadTimeMillis(750L);
 
     // checkpoint after first (and only) interval.
     options.setCheckpointDurationMillis(options.getBatchIntervalMillis());
@@ -141,19 +142,24 @@ public class ResumeFromCheckpointStreamingTest {
   private static EvaluationResult run(SparkPipelineOptions options) {
     // write to Kafka
     produce();
-    Map<String, String> kafkaParams = ImmutableMap.of(
-            "metadata.broker.list", EMBEDDED_KAFKA_CLUSTER.getBrokerList(),
-            "auto.offset.reset", "smallest"
+    Map<String, Object> consumerProps = ImmutableMap.<String, Object>of(
+        "auto.offset.reset", "earliest"
     );
+
+    KafkaIO.Read<String, String> read = KafkaIO.read()
+        .withBootstrapServers(EMBEDDED_KAFKA_CLUSTER.getBrokerList())
+        .withTopics(Collections.singletonList(TOPIC))
+        .withKeyCoder(StringUtf8Coder.of())
+        .withValueCoder(StringUtf8Coder.of())
+        .updateConsumerProperties(consumerProps);
+
+    Duration windowDuration = new Duration(options.getBatchIntervalMillis());
+
     Pipeline p = Pipeline.create(options);
-    PCollection<KV<String, String>> kafkaInput = p.apply(KafkaIO.Read.from(
-        StringDecoder.class, StringDecoder.class, String.class, String.class,
-            Collections.singleton(TOPIC), kafkaParams)).setCoder(KvCoder.of(StringUtf8Coder.of(),
-                StringUtf8Coder.of()));
-    PCollection<KV<String, String>> windowedWords = kafkaInput
-        .apply(Window.<KV<String, String>>into(FixedWindows.of(Duration.standardSeconds(1))));
-    PCollection<String> formattedKV = windowedWords.apply(ParDo.of(
-        new FormatAsText()));
+    PCollection<String> formattedKV =
+        p.apply(read.withoutMetadata())
+        .apply(Window.<KV<String, String>>into(FixedWindows.of(windowDuration)))
+        .apply(ParDo.of(new FormatAsText()));
 
     // requires a graceful stop so that checkpointing of the first run would finish successfully
     // before stopping and attempting to resume.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b72e7e34/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
index d1729a4..e5e3c56 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.java
@@ -28,8 +28,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.Random;
 import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import kafka.utils.Time;
+import kafka.server.KafkaServerStartable;
 import org.apache.zookeeper.server.NIOServerCnxnFactory;
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ZooKeeperServer;
@@ -50,7 +49,7 @@ public class EmbeddedKafkaCluster {
 
   private final String brokerList;
 
-  private final List<KafkaServer> brokers;
+  private final List<KafkaServerStartable> brokers;
   private final List<File> logDirs;
 
   public EmbeddedKafkaCluster(String zkConnection) {
@@ -93,7 +92,7 @@ public class EmbeddedKafkaCluster {
       if (sb.length() > 0) {
         sb.append(",");
       }
-      sb.append("localhost:").append(port);
+      sb.append("127.0.0.1:").append(port);
     }
     return sb.toString();
   }
@@ -107,12 +106,16 @@ public class EmbeddedKafkaCluster {
       properties.putAll(baseProperties);
       properties.setProperty("zookeeper.connect", zkConnection);
       properties.setProperty("broker.id", String.valueOf(i + 1));
-      properties.setProperty("host.name", "localhost");
+      properties.setProperty("advertised.host.name", "127.0.0.1");
+      properties.setProperty("host.name", "127.0.0.1");
+      properties.setProperty("advertised.port", Integer.toString(port));
       properties.setProperty("port", Integer.toString(port));
-      properties.setProperty("log.dir", logDir.getAbsolutePath());
+      properties.setProperty("log.dirs", logDir.getAbsolutePath());
+      properties.setProperty("offsets.topic.num.partitions", "1");
+      properties.setProperty("offsets.topic.replication.factor", "1");
       properties.setProperty("log.flush.interval.messages", String.valueOf(1));
 
-      KafkaServer broker = startBroker(properties);
+      KafkaServerStartable broker = startBroker(properties);
 
       brokers.add(broker);
       logDirs.add(logDir);
@@ -120,8 +123,8 @@ public class EmbeddedKafkaCluster {
   }
 
 
-  private static KafkaServer startBroker(Properties props) {
-    KafkaServer server = new KafkaServer(new KafkaConfig(props), new SystemTime());
+  private static KafkaServerStartable startBroker(Properties props) {
+    KafkaServerStartable server = new KafkaServerStartable(new KafkaConfig(props));
     server.startup();
     return server;
   }
@@ -129,8 +132,7 @@ public class EmbeddedKafkaCluster {
   public Properties getProps() {
     Properties props = new Properties();
     props.putAll(baseProperties);
-    props.put("metadata.broker.list", brokerList);
-    props.put("zookeeper.connect", zkConnection);
+    props.put("bootstrap.servers", brokerList);
     return props;
   }
 
@@ -147,7 +149,7 @@ public class EmbeddedKafkaCluster {
   }
 
   public void shutdown() {
-    for (KafkaServer broker : brokers) {
+    for (KafkaServerStartable broker : brokers) {
       try {
         broker.shutdown();
       } catch (Exception e) {
@@ -203,7 +205,7 @@ public class EmbeddedKafkaCluster {
       if (this.port == -1) {
         this.port = TestUtils.getAvailablePort();
       }
-      this.factory = NIOServerCnxnFactory.createFactory(new InetSocketAddress("localhost", port),
+      this.factory = NIOServerCnxnFactory.createFactory(new InetSocketAddress("127.0.0.1", port),
               1024);
       this.snapshotDir = TestUtils.constructTempDir("embedded-zk/snapshot");
       this.logDir = TestUtils.constructTempDir("embedded-zk/log");
@@ -233,7 +235,7 @@ public class EmbeddedKafkaCluster {
     }
 
     public String getConnection() {
-      return "localhost:" + port;
+      return "127.0.0.1:" + port;
     }
 
     public void setPort(int port) {
@@ -258,27 +260,6 @@ public class EmbeddedKafkaCluster {
     }
   }
 
-  static class SystemTime implements Time {
-    @Override
-    public long milliseconds() {
-      return System.currentTimeMillis();
-    }
-
-    @Override
-    public long nanoseconds() {
-      return System.nanoTime();
-    }
-
-    @Override
-    public void sleep(long ms) {
-      try {
-        Thread.sleep(ms);
-      } catch (InterruptedException e) {
-        // Ignore
-      }
-    }
-  }
-
   static final class TestUtils {
     private static final Random RANDOM = new Random();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b72e7e34/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/KafkaWriteOnBatchCompleted.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/KafkaWriteOnBatchCompleted.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/KafkaWriteOnBatchCompleted.java
new file mode 100644
index 0000000..38a5bff
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/KafkaWriteOnBatchCompleted.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.translation.streaming.utils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.spark.streaming.api.java.JavaStreamingListener;
+import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Write to Kafka once the OnBatchCompleted hook is activated.
+ */
+public class KafkaWriteOnBatchCompleted extends JavaStreamingListener{
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaWriteOnBatchCompleted.class);
+
+  private final Map<String, String> messages;
+  private final List<String> topics;
+  private final Properties producerProps;
+  private final String brokerList;
+  private final boolean once;
+
+  // A flag to state that no more writes should happen.
+  private boolean done = false;
+
+  private KafkaWriteOnBatchCompleted(Map<String, String> messages,
+                                     List<String> topics,
+                                     Properties producerProps,
+                                     String brokerList,
+                                     boolean once) {
+    this.messages = messages;
+    this.topics = topics;
+    this.producerProps = producerProps;
+    this.brokerList = brokerList;
+    this.once = once;
+  }
+
+  public static KafkaWriteOnBatchCompleted once(Map<String, String> messages,
+                                                List<String> topics,
+                                                Properties producerProps,
+                                                String brokerList) {
+    return new KafkaWriteOnBatchCompleted(messages, topics, producerProps, brokerList, true);
+  }
+
+  public static KafkaWriteOnBatchCompleted always(Map<String, String> messages,
+                                                  List<String> topics,
+                                                  Properties producerProps,
+                                                  String brokerList) {
+    return new KafkaWriteOnBatchCompleted(messages, topics, producerProps, brokerList, false);
+  }
+
+  @Override
+  public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) {
+    super.onBatchCompleted(batchCompleted);
+    if (!done) {
+      LOG.info("Writing to Kafka after batchTime {} has completed.",
+          batchCompleted.batchInfo().batchTime());
+      write();
+      // once runs once.
+      if (once) {
+        done = true;
+      }
+    }
+  }
+
+  private void write() {
+    Properties props = new Properties();
+    props.putAll(producerProps);
+    props.put("acks", "1");
+    props.put("bootstrap.servers", brokerList);
+    Serializer<String> stringSerializer = new StringSerializer();
+    try (@SuppressWarnings("unchecked") KafkaProducer<String, String> kafkaProducer =
+        new KafkaProducer(props, stringSerializer, stringSerializer)) {
+          for (String topic: topics) {
+            for (Map.Entry<String, String> en : messages.entrySet()) {
+              kafkaProducer.send(new ProducerRecord<>(topic, en.getKey(), en.getValue()));
+            }
+            // await send completion.
+            kafkaProducer.flush();
+          }
+        }
+  }
+}