You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/10/31 23:03:12 UTC

[1/2] beam git commit: [BEAM-2468] Reading Kinesis records in the background

Repository: beam
Updated Branches:
  refs/heads/master c53a121f5 -> 612af0a29


[BEAM-2468] Reading Kinesis records in the background


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ec394465
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ec394465
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ec394465

Branch: refs/heads/master
Commit: ec3944659d16d696bcd73589aa035dbaa9aede2c
Parents: c53a121
Author: Pawel Kaczmarczyk <p....@ocado.com>
Authored: Mon Oct 2 17:24:11 2017 +0200
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Oct 31 16:02:43 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/kinesis/KinesisReader.java      |  43 ++---
 .../sdk/io/kinesis/KinesisReaderCheckpoint.java |  18 --
 .../apache/beam/sdk/io/kinesis/RoundRobin.java  |  54 ------
 .../beam/sdk/io/kinesis/ShardCheckpoint.java    |   8 +-
 .../beam/sdk/io/kinesis/ShardReadersPool.java   | 162 ++++++++++++++++
 .../sdk/io/kinesis/ShardRecordsIterator.java    |  90 ++++-----
 .../beam/sdk/io/kinesis/KinesisReaderTest.java  |  66 +++----
 .../beam/sdk/io/kinesis/RoundRobinTest.java     |  59 ------
 .../sdk/io/kinesis/ShardReadersPoolTest.java    | 185 +++++++++++++++++++
 .../io/kinesis/ShardRecordsIteratorTest.java    |  35 ++--
 10 files changed, 454 insertions(+), 266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ec394465/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
index 8095150..665b897 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
@@ -18,10 +18,8 @@
 package org.apache.beam.sdk.io.kinesis;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.collect.Lists.newArrayList;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.NoSuchElementException;
 
 import org.apache.beam.sdk.io.UnboundedSource;
@@ -64,7 +62,6 @@ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
   private final SimplifiedKinesisClient kinesis;
   private final KinesisSource source;
   private final CheckpointGenerator initialCheckpointGenerator;
-  private RoundRobin<ShardRecordsIterator> shardIterators;
   private CustomOptional<KinesisRecord> currentRecord = CustomOptional.absent();
   private MovingFunction minReadTimestampMsSinceEpoch;
   private Instant lastWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
@@ -72,6 +69,7 @@ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
   private Instant backlogBytesLastCheckTime = new Instant(0L);
   private Duration upToDateThreshold;
   private Duration backlogBytesCheckThreshold;
+  private ShardReadersPool shardReadersPool;
 
   KinesisReader(SimplifiedKinesisClient kinesis,
       CheckpointGenerator initialCheckpointGenerator,
@@ -107,13 +105,8 @@ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
     LOG.info("Starting reader using {}", initialCheckpointGenerator);
 
     try {
-      KinesisReaderCheckpoint initialCheckpoint =
-          initialCheckpointGenerator.generate(kinesis);
-      List<ShardRecordsIterator> iterators = newArrayList();
-      for (ShardCheckpoint checkpoint : initialCheckpoint) {
-        iterators.add(checkpoint.getShardRecordsIterator(kinesis));
-      }
-      shardIterators = new RoundRobin<>(iterators);
+      shardReadersPool = createShardReadersPool();
+      shardReadersPool.start();
     } catch (TransientKinesisException e) {
       throw new IOException(e);
     }
@@ -128,21 +121,12 @@ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
    */
   @Override
   public boolean advance() throws IOException {
-    try {
-      for (int i = 0; i < shardIterators.size(); ++i) {
-        currentRecord = shardIterators.getCurrent().next();
-        if (currentRecord.isPresent()) {
-          Instant approximateArrivalTimestamp = currentRecord.get()
-              .getApproximateArrivalTimestamp();
-          minReadTimestampMsSinceEpoch.add(Instant.now().getMillis(),
-              approximateArrivalTimestamp.getMillis());
-          return true;
-        } else {
-          shardIterators.moveForward();
-        }
-      }
-    } catch (TransientKinesisException e) {
-      LOG.warn("Transient exception occurred", e);
+    currentRecord = shardReadersPool.nextRecord();
+    if (currentRecord.isPresent()) {
+      Instant approximateArrivalTimestamp = currentRecord.get().getApproximateArrivalTimestamp();
+      minReadTimestampMsSinceEpoch.add(Instant.now().getMillis(),
+          approximateArrivalTimestamp.getMillis());
+      return true;
     }
     return false;
   }
@@ -170,13 +154,14 @@ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
 
   @Override
   public void close() throws IOException {
+    shardReadersPool.stop();
   }
 
   @Override
   public Instant getWatermark() {
     Instant now = Instant.now();
     long readMin = minReadTimestampMsSinceEpoch.get(now.getMillis());
-    if (readMin == Long.MAX_VALUE) {
+    if (readMin == Long.MAX_VALUE && shardReadersPool.allShardsUpToDate()) {
       lastWatermark = now;
     } else if (minReadTimestampMsSinceEpoch.isSignificant()) {
       Instant minReadTime = new Instant(readMin);
@@ -189,7 +174,7 @@ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
 
   @Override
   public UnboundedSource.CheckpointMark getCheckpointMark() {
-    return KinesisReaderCheckpoint.asCurrentStateOf(shardIterators);
+    return shardReadersPool.getCheckpointMark();
   }
 
   @Override
@@ -221,4 +206,8 @@ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
         watermark, lastBacklogBytes);
     return lastBacklogBytes;
   }
+
+  ShardReadersPool createShardReadersPool() throws TransientKinesisException {
+    return new ShardReadersPool(kinesis, initialCheckpointGenerator.generate(kinesis));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ec394465/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
index d995e75..eca8791 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
@@ -17,18 +17,15 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
-import static com.google.common.collect.Iterables.transform;
 import static com.google.common.collect.Lists.newArrayList;
 import static com.google.common.collect.Lists.partition;
 
-import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Iterator;
 import java.util.List;
-import javax.annotation.Nullable;
 
 import org.apache.beam.sdk.io.UnboundedSource;
 
@@ -47,21 +44,6 @@ class KinesisReaderCheckpoint implements Iterable<ShardCheckpoint>, UnboundedSou
     this.shardCheckpoints = ImmutableList.copyOf(shardCheckpoints);
   }
 
-  public static KinesisReaderCheckpoint asCurrentStateOf(Iterable<ShardRecordsIterator>
-      iterators) {
-    return new KinesisReaderCheckpoint(transform(iterators,
-        new Function<ShardRecordsIterator, ShardCheckpoint>() {
-
-          @Nullable
-          @Override
-          public ShardCheckpoint apply(@Nullable
-              ShardRecordsIterator shardRecordsIterator) {
-            assert shardRecordsIterator != null;
-            return shardRecordsIterator.getCheckpoint();
-          }
-        }));
-  }
-
   /**
    * Splits given multi-shard checkpoint into partitions of approximately equal size.
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/ec394465/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
deleted file mode 100644
index 806d982..0000000
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.kinesis;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.collect.Queues.newArrayDeque;
-
-import java.util.Deque;
-import java.util.Iterator;
-
-/**
- * Very simple implementation of round robin algorithm.
- */
-class RoundRobin<T> implements Iterable<T> {
-
-  private final Deque<T> deque;
-
-  public RoundRobin(Iterable<T> collection) {
-    this.deque = newArrayDeque(collection);
-    checkArgument(!deque.isEmpty(), "Tried to initialize RoundRobin with empty collection");
-  }
-
-  public T getCurrent() {
-    return deque.getFirst();
-  }
-
-  public void moveForward() {
-    deque.addLast(deque.removeFirst());
-  }
-
-  public int size() {
-    return deque.size();
-  }
-
-  @Override
-  public Iterator<T> iterator() {
-    return deque.iterator();
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/ec394465/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
index 95f97b8..94e3b96 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
@@ -85,8 +85,7 @@ class ShardCheckpoint implements Serializable {
     }
     if (shardIteratorType == AT_TIMESTAMP) {
       checkNotNull(timestamp,
-          "You must provide timestamp for AT_SEQUENCE_NUMBER"
-              + " or AFTER_SEQUENCE_NUMBER");
+          "You must provide timestamp for AT_TIMESTAMP");
     } else {
       checkArgument(timestamp == null,
           "Timestamp must be null for an iterator type other than AT_TIMESTAMP");
@@ -131,11 +130,6 @@ class ShardCheckpoint implements Serializable {
         sequenceNumber);
   }
 
-  public ShardRecordsIterator getShardRecordsIterator(SimplifiedKinesisClient kinesis)
-      throws TransientKinesisException {
-    return new ShardRecordsIterator(this, kinesis);
-  }
-
   public String getShardIterator(SimplifiedKinesisClient kinesisClient)
       throws TransientKinesisException {
     if (checkpointIsInTheMiddleOfAUserRecord()) {

http://git-wip-us.apache.org/repos/asf/beam/blob/ec394465/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
new file mode 100644
index 0000000..83e3081
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.Iterables.transform;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Internal shard iterators pool.
+ * It maintains the thread pool for reading Kinesis shards in separate threads.
+ * Read records are stored in a blocking queue of limited capacity.
+ */
+class ShardReadersPool {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ShardReadersPool.class);
+  private static final int DEFAULT_CAPACITY_PER_SHARD = 10_000;
+  private ExecutorService executorService;
+  private BlockingQueue<KinesisRecord> recordsQueue;
+  private Map<String, ShardRecordsIterator> shardIteratorsMap;
+  private SimplifiedKinesisClient kinesis;
+  private KinesisReaderCheckpoint initialCheckpoint;
+  private final int queueCapacityPerShard;
+  private AtomicBoolean poolOpened = new AtomicBoolean(true);
+
+  ShardReadersPool(SimplifiedKinesisClient kinesis, KinesisReaderCheckpoint initialCheckpoint) {
+    this(kinesis, initialCheckpoint, DEFAULT_CAPACITY_PER_SHARD);
+  }
+
+  ShardReadersPool(SimplifiedKinesisClient kinesis, KinesisReaderCheckpoint initialCheckpoint,
+      int queueCapacityPerShard) {
+    this.kinesis = kinesis;
+    this.initialCheckpoint = initialCheckpoint;
+    this.queueCapacityPerShard = queueCapacityPerShard;
+  }
+
+  void start() throws TransientKinesisException {
+    ImmutableMap.Builder<String, ShardRecordsIterator> shardsMap = ImmutableMap.builder();
+    for (ShardCheckpoint checkpoint : initialCheckpoint) {
+      shardsMap.put(checkpoint.getShardId(), createShardIterator(kinesis, checkpoint));
+    }
+    shardIteratorsMap = shardsMap.build();
+    executorService = Executors.newFixedThreadPool(shardIteratorsMap.size());
+    recordsQueue = new LinkedBlockingQueue<>(queueCapacityPerShard * shardIteratorsMap.size());
+    for (final ShardRecordsIterator shardRecordsIterator : shardIteratorsMap.values()) {
+      executorService.submit(new Runnable() {
+
+        @Override
+        public void run() {
+          readLoop(shardRecordsIterator);
+        }
+      });
+    }
+  }
+
+  private void readLoop(ShardRecordsIterator shardRecordsIterator) {
+    while (poolOpened.get()) {
+      try {
+        List<KinesisRecord> kinesisRecords = shardRecordsIterator.readNextBatch();
+        for (KinesisRecord kinesisRecord : kinesisRecords) {
+          recordsQueue.put(kinesisRecord);
+        }
+      } catch (TransientKinesisException e) {
+        LOG.warn("Transient exception occurred.", e);
+      } catch (InterruptedException e) {
+        LOG.warn("Thread was interrupted, finishing the read loop", e);
+        break;
+      } catch (Throwable e) {
+        LOG.error("Unexpected exception occurred", e);
+      }
+    }
+    LOG.info("Kinesis Shard read loop has finished");
+  }
+
+  CustomOptional<KinesisRecord> nextRecord() {
+    try {
+      KinesisRecord record = recordsQueue.poll(1, TimeUnit.SECONDS);
+      if (record == null) {
+        return CustomOptional.absent();
+      }
+      shardIteratorsMap.get(record.getShardId()).ackRecord(record);
+      return CustomOptional.of(record);
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while waiting for KinesisRecord from the buffer");
+      return CustomOptional.absent();
+    }
+  }
+
+  void stop() {
+    LOG.info("Closing shard iterators pool");
+    poolOpened.set(false);
+    executorService.shutdownNow();
+    boolean isShutdown = false;
+    int attemptsLeft = 3;
+    while (!isShutdown && attemptsLeft-- > 0) {
+      try {
+        isShutdown = executorService.awaitTermination(10, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted while waiting for the executor service to shutdown");
+        throw new RuntimeException(e);
+      }
+      if (!isShutdown && attemptsLeft > 0) {
+        LOG.warn("Executor service is taking long time to shutdown, will retry. {} attempts left",
+            attemptsLeft);
+      }
+    }
+  }
+
+  boolean allShardsUpToDate() {
+    boolean shardsUpToDate = true;
+    for (ShardRecordsIterator shardRecordsIterator : shardIteratorsMap.values()) {
+      shardsUpToDate &= shardRecordsIterator.isUpToDate();
+    }
+    return shardsUpToDate;
+  }
+
+  KinesisReaderCheckpoint getCheckpointMark() {
+    return new KinesisReaderCheckpoint(transform(shardIteratorsMap.values(),
+        new Function<ShardRecordsIterator, ShardCheckpoint>() {
+          @Override
+          public ShardCheckpoint apply(ShardRecordsIterator shardRecordsIterator) {
+            checkArgument(shardRecordsIterator != null, "shardRecordsIterator can not be null");
+            return shardRecordsIterator.getCheckpoint();
+          }
+        }));
+  }
+
+  ShardRecordsIterator createShardIterator(SimplifiedKinesisClient kinesis,
+      ShardCheckpoint checkpoint) throws TransientKinesisException {
+    return new ShardRecordsIterator(checkpoint, kinesis);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ec394465/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
index d4e8038..c1450be 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
@@ -18,19 +18,21 @@
 package org.apache.beam.sdk.io.kinesis;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.collect.Queues.newArrayDeque;
 
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
 
-import java.util.Deque;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Iterates over records in a single shard.
- * Under the hood records are retrieved from Kinesis in batches and stored in the in-memory queue.
- * Then the caller of {@link ShardRecordsIterator#next()} can read from queue one by one.
+ * Records are retrieved in batches via calls to {@link ShardRecordsIterator#readNextBatch()}.
+ * Client has to confirm processed records by calling
+ * {@link ShardRecordsIterator#ackRecord(KinesisRecord)} method.
  */
 class ShardRecordsIterator {
 
@@ -38,71 +40,59 @@ class ShardRecordsIterator {
 
   private final SimplifiedKinesisClient kinesis;
   private final RecordFilter filter;
-  private ShardCheckpoint checkpoint;
+  private final String streamName;
+  private final String shardId;
+  private AtomicReference<ShardCheckpoint> checkpoint;
   private String shardIterator;
-  private Deque<KinesisRecord> data = newArrayDeque();
+  private AtomicLong millisBehindLatest = new AtomicLong(Long.MAX_VALUE);
 
-  public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
-      SimplifiedKinesisClient simplifiedKinesisClient) throws
-      TransientKinesisException {
+  ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
+      SimplifiedKinesisClient simplifiedKinesisClient) throws TransientKinesisException {
     this(initialCheckpoint, simplifiedKinesisClient, new RecordFilter());
   }
 
-  public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
+  ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
       SimplifiedKinesisClient simplifiedKinesisClient,
-      RecordFilter filter) throws
-      TransientKinesisException {
-
-    this.checkpoint = checkNotNull(initialCheckpoint, "initialCheckpoint");
+      RecordFilter filter) throws TransientKinesisException {
+    this.checkpoint = new AtomicReference<>(checkNotNull(initialCheckpoint, "initialCheckpoint"));
     this.filter = checkNotNull(filter, "filter");
     this.kinesis = checkNotNull(simplifiedKinesisClient, "simplifiedKinesisClient");
-    shardIterator = checkpoint.getShardIterator(kinesis);
+    this.streamName = initialCheckpoint.getStreamName();
+    this.shardId = initialCheckpoint.getShardId();
+    this.shardIterator = initialCheckpoint.getShardIterator(kinesis);
   }
 
-  /**
-   * Returns record if there's any present.
-   * Returns absent() if there are no new records at this time in the shard.
-   */
-  public CustomOptional<KinesisRecord> next() throws TransientKinesisException {
-    readMoreIfNecessary();
-
-    if (data.isEmpty()) {
-      return CustomOptional.absent();
-    } else {
-      KinesisRecord record = data.removeFirst();
-      checkpoint = checkpoint.moveAfter(record);
-      return CustomOptional.of(record);
-    }
+  List<KinesisRecord> readNextBatch() throws TransientKinesisException {
+    GetKinesisRecordsResult response = fetchRecords();
+    LOG.debug("Fetched {} new records", response.getRecords().size());
+
+    List<KinesisRecord> filteredRecords = filter.apply(response.getRecords(), checkpoint.get());
+    millisBehindLatest.set(response.getMillisBehindLatest());
+    return filteredRecords;
   }
 
-  private void readMoreIfNecessary() throws TransientKinesisException {
-    if (data.isEmpty()) {
-      GetKinesisRecordsResult response = fetchRecords();
-      data.addAll(filter.apply(response.getRecords(), checkpoint));
+  private GetKinesisRecordsResult fetchRecords() throws TransientKinesisException {
+    try {
+      GetKinesisRecordsResult response = kinesis.getRecords(shardIterator, streamName, shardId);
+      shardIterator = response.getNextShardIterator();
+      return response;
+    } catch (ExpiredIteratorException e) {
+      LOG.info("Refreshing expired iterator", e);
+      shardIterator = checkpoint.get().getShardIterator(kinesis);
+      return fetchRecords();
     }
   }
 
-  private GetKinesisRecordsResult fetchRecords() throws TransientKinesisException {
-    GetKinesisRecordsResult response = null;
-    do {
-      try {
-        response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(),
-            checkpoint.getShardId());
-        shardIterator = response.getNextShardIterator();
-      } catch (ExpiredIteratorException e) {
-        LOG.info("Refreshing expired iterator", e);
-        shardIterator = checkpoint.getShardIterator(kinesis);
-      }
-    } while (response == null || gotEmptyResponseButIsBeforeEndOfTheStream(response));
-    return response;
+  ShardCheckpoint getCheckpoint() {
+    return checkpoint.get();
   }
 
-  private boolean gotEmptyResponseButIsBeforeEndOfTheStream(GetKinesisRecordsResult response) {
-    return response.getRecords().isEmpty() && response.getMillisBehindLatest() > 0;
+  boolean isUpToDate() {
+    return millisBehindLatest.get() == 0L;
   }
 
-  public ShardCheckpoint getCheckpoint() {
-    return checkpoint;
+  void ackRecord(KinesisRecord record) {
+    checkpoint.set(checkpoint.get().moveAfter(record));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ec394465/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
index 22d8bce..11ae011 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
@@ -21,7 +21,9 @@ import static java.util.Arrays.asList;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
@@ -50,11 +52,11 @@ public class KinesisReaderTest {
   @Mock
   private ShardCheckpoint firstCheckpoint, secondCheckpoint;
   @Mock
-  private ShardRecordsIterator firstIterator, secondIterator;
-  @Mock
   private KinesisRecord a, b, c, d;
   @Mock
   private KinesisSource kinesisSource;
+  @Mock
+  private ShardReadersPool shardReadersPool;
 
   private KinesisReader reader;
 
@@ -63,16 +65,22 @@ public class KinesisReaderTest {
     when(generator.generate(kinesis)).thenReturn(new KinesisReaderCheckpoint(
         asList(firstCheckpoint, secondCheckpoint)
     ));
-    when(firstCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(firstIterator);
-    when(secondCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(secondIterator);
-    when(firstIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
-    when(secondIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
+    when(shardReadersPool.nextRecord()).thenReturn(CustomOptional.<KinesisRecord>absent());
     when(a.getApproximateArrivalTimestamp()).thenReturn(Instant.now());
     when(b.getApproximateArrivalTimestamp()).thenReturn(Instant.now());
     when(c.getApproximateArrivalTimestamp()).thenReturn(Instant.now());
     when(d.getApproximateArrivalTimestamp()).thenReturn(Instant.now());
 
-    reader = new KinesisReader(kinesis, generator, kinesisSource, Duration.ZERO, Duration.ZERO);
+    reader = createReader(Duration.ZERO);
+  }
+
+  private KinesisReader createReader(Duration backlogBytesCheckThreshold)
+      throws TransientKinesisException {
+    KinesisReader kinesisReader = spy(new KinesisReader(kinesis, generator, kinesisSource,
+        Duration.ZERO, backlogBytesCheckThreshold));
+    doReturn(shardReadersPool).when(kinesisReader)
+        .createShardReadersPool();
+    return kinesisReader;
   }
 
   @Test
@@ -89,7 +97,7 @@ public class KinesisReaderTest {
   @Test
   public void startReturnsTrueIfSomeDataAvailable() throws IOException,
       TransientKinesisException {
-    when(firstIterator.next()).
+    when(shardReadersPool.nextRecord()).
         thenReturn(CustomOptional.of(a)).
         thenReturn(CustomOptional.<KinesisRecord>absent());
 
@@ -97,34 +105,22 @@ public class KinesisReaderTest {
   }
 
   @Test
-  public void advanceReturnsFalseIfThereIsTransientExceptionInKinesis()
-      throws IOException, TransientKinesisException {
-    reader.start();
-
-    when(firstIterator.next()).thenThrow(TransientKinesisException.class);
-
-    assertThat(reader.advance()).isFalse();
-  }
-
-  @Test
   public void readsThroughAllDataAvailable() throws IOException, TransientKinesisException {
-    when(firstIterator.next()).
+    when(shardReadersPool.nextRecord()).
+        thenReturn(CustomOptional.of(c)).
         thenReturn(CustomOptional.<KinesisRecord>absent()).
         thenReturn(CustomOptional.of(a)).
         thenReturn(CustomOptional.<KinesisRecord>absent()).
-        thenReturn(CustomOptional.of(b)).
-        thenReturn(CustomOptional.<KinesisRecord>absent());
-
-    when(secondIterator.next()).
-        thenReturn(CustomOptional.of(c)).
-        thenReturn(CustomOptional.<KinesisRecord>absent()).
         thenReturn(CustomOptional.of(d)).
+        thenReturn(CustomOptional.of(b)).
         thenReturn(CustomOptional.<KinesisRecord>absent());
 
     assertThat(reader.start()).isTrue();
     assertThat(reader.getCurrent()).isEqualTo(c);
+    assertThat(reader.advance()).isFalse();
     assertThat(reader.advance()).isTrue();
     assertThat(reader.getCurrent()).isEqualTo(a);
+    assertThat(reader.advance()).isFalse();
     assertThat(reader.advance()).isTrue();
     assertThat(reader.getCurrent()).isEqualTo(d);
     assertThat(reader.advance()).isTrue();
@@ -138,7 +134,6 @@ public class KinesisReaderTest {
     final long timestampMs = 1000L;
 
     prepareRecordsWithArrivalTimestamps(timestampMs, 1, KinesisReader.MIN_WATERMARK_MESSAGES / 2);
-    when(secondIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
 
     for (boolean more = reader.start(); more; more = reader.advance()) {
       assertThat(reader.getWatermark()).isEqualTo(BoundedWindow.TIMESTAMP_MIN_VALUE);
@@ -151,7 +146,6 @@ public class KinesisReaderTest {
     long timestampMs = 1000L;
 
     prepareRecordsWithArrivalTimestamps(timestampMs, 1, KinesisReader.MIN_WATERMARK_MESSAGES);
-    when(secondIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
 
     int recordsNeededForWatermarkAdvancing = KinesisReader.MIN_WATERMARK_MESSAGES;
     for (boolean more = reader.start(); more; more = reader.advance()) {
@@ -169,7 +163,6 @@ public class KinesisReaderTest {
     long timestampMs = 1000L;
 
     prepareRecordsWithArrivalTimestamps(timestampMs, -1, KinesisReader.MIN_WATERMARK_MESSAGES * 2);
-    when(secondIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
 
     Instant lastWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
     for (boolean more = reader.start(); more; more = reader.advance()) {
@@ -184,14 +177,14 @@ public class KinesisReaderTest {
       int count) throws TransientKinesisException {
     long timestampMs = initialTimestampMs;
     KinesisRecord firstRecord = prepareRecordMockWithArrivalTimestamp(timestampMs);
-    OngoingStubbing<CustomOptional<KinesisRecord>> firstIteratorStubbing =
-        when(firstIterator.next()).thenReturn(CustomOptional.of(firstRecord));
+    OngoingStubbing<CustomOptional<KinesisRecord>> shardReadersPoolStubbing =
+        when(shardReadersPool.nextRecord()).thenReturn(CustomOptional.of(firstRecord));
     for (int i = 0; i < count; i++) {
       timestampMs += increment;
       KinesisRecord record = prepareRecordMockWithArrivalTimestamp(timestampMs);
-      firstIteratorStubbing = firstIteratorStubbing.thenReturn(CustomOptional.of(record));
+      shardReadersPoolStubbing = shardReadersPoolStubbing.thenReturn(CustomOptional.of(record));
     }
-    firstIteratorStubbing.thenReturn(CustomOptional.<KinesisRecord>absent());
+    shardReadersPoolStubbing.thenReturn(CustomOptional.<KinesisRecord>absent());
   }
 
   private KinesisRecord prepareRecordMockWithArrivalTimestamp(long timestampMs) {
@@ -202,7 +195,8 @@ public class KinesisReaderTest {
 
   @Test
   public void getTotalBacklogBytesShouldReturnLastSeenValueWhenKinesisExceptionsOccur()
-      throws TransientKinesisException {
+      throws TransientKinesisException, IOException {
+    reader.start();
     when(kinesisSource.getStreamName()).thenReturn("stream1");
     when(kinesis.getBacklogBytes(eq("stream1"), any(Instant.class)))
         .thenReturn(10L)
@@ -216,9 +210,9 @@ public class KinesisReaderTest {
 
   @Test
   public void getTotalBacklogBytesShouldReturnLastSeenValueWhenCalledFrequently()
-      throws TransientKinesisException {
-    KinesisReader backlogCachingReader = new KinesisReader(kinesis, generator, kinesisSource,
-        Duration.ZERO, Duration.standardSeconds(30));
+      throws TransientKinesisException, IOException {
+    KinesisReader backlogCachingReader = createReader(Duration.standardSeconds(30));
+    backlogCachingReader.start();
     when(kinesisSource.getStreamName()).thenReturn("stream1");
     when(kinesis.getBacklogBytes(eq("stream1"), any(Instant.class)))
         .thenReturn(10L)

http://git-wip-us.apache.org/repos/asf/beam/blob/ec394465/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
deleted file mode 100644
index e4abce4..0000000
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.kinesis;
-
-import static com.google.common.collect.Lists.newArrayList;
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.junit.Test;
-
-/**
- * Tests {@link RoundRobin}.
- */
-public class RoundRobinTest {
-
-  @Test(expected = IllegalArgumentException.class)
-  public void doesNotAllowCreationWithEmptyCollection() {
-    new RoundRobin<>(Collections.emptyList());
-  }
-
-  @Test
-  public void goesThroughElementsInCycle() {
-    List<String> input = newArrayList("a", "b", "c");
-
-    RoundRobin<String> roundRobin = new RoundRobin<>(newArrayList(input));
-
-    input.addAll(input);  // duplicate the input
-    for (String element : input) {
-      assertThat(roundRobin.getCurrent()).isEqualTo(element);
-      assertThat(roundRobin.getCurrent()).isEqualTo(element);
-      roundRobin.moveForward();
-    }
-  }
-
-  @Test
-  public void usualIteratorGoesThroughElementsOnce() {
-    List<String> input = newArrayList("a", "b", "c");
-
-    RoundRobin<String> roundRobin = new RoundRobin<>(input);
-    assertThat(roundRobin).hasSize(3).containsOnly(input.toArray(new String[0]));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/ec394465/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
new file mode 100644
index 0000000..03cc428
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.base.Stopwatch;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Tests {@link ShardReadersPool}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class ShardReadersPoolTest {
+
+  @Mock
+  private ShardRecordsIterator firstIterator, secondIterator;
+  @Mock
+  private ShardCheckpoint firstCheckpoint, secondCheckpoint;
+  @Mock
+  private SimplifiedKinesisClient kinesis;
+  @Mock
+  private KinesisRecord a, b, c, d;
+
+  private ShardReadersPool shardReadersPool;
+
+  @Before
+  public void setUp() throws TransientKinesisException {
+    when(a.getShardId()).thenReturn("shard1");
+    when(b.getShardId()).thenReturn("shard1");
+    when(c.getShardId()).thenReturn("shard2");
+    when(d.getShardId()).thenReturn("shard2");
+    when(firstCheckpoint.getShardId()).thenReturn("shard1");
+    when(secondCheckpoint.getShardId()).thenReturn("shard2");
+    KinesisReaderCheckpoint checkpoint = new KinesisReaderCheckpoint(
+        Arrays.asList(firstCheckpoint, secondCheckpoint));
+    shardReadersPool = Mockito.spy(new ShardReadersPool(kinesis, checkpoint));
+    doReturn(firstIterator).when(shardReadersPool).createShardIterator(kinesis, firstCheckpoint);
+    doReturn(secondIterator).when(shardReadersPool).createShardIterator(kinesis, secondCheckpoint);
+  }
+
+  @Test
+  public void shouldReturnAllRecords() throws TransientKinesisException {
+    when(firstIterator.readNextBatch())
+        .thenReturn(Collections.<KinesisRecord>emptyList())
+        .thenReturn(asList(a, b))
+        .thenReturn(Collections.<KinesisRecord>emptyList());
+    when(secondIterator.readNextBatch())
+        .thenReturn(singletonList(c))
+        .thenReturn(singletonList(d))
+        .thenReturn(Collections.<KinesisRecord>emptyList());
+
+    shardReadersPool.start();
+    List<KinesisRecord> fetchedRecords = new ArrayList<>();
+    while (fetchedRecords.size() < 4) {
+      CustomOptional<KinesisRecord> nextRecord = shardReadersPool.nextRecord();
+      if (nextRecord.isPresent()) {
+        fetchedRecords.add(nextRecord.get());
+      }
+    }
+    assertThat(fetchedRecords).containsExactlyInAnyOrder(a, b, c, d);
+  }
+
+  @Test
+  public void shouldReturnAbsentOptionalWhenNoRecords() throws TransientKinesisException {
+    when(firstIterator.readNextBatch())
+        .thenReturn(Collections.<KinesisRecord>emptyList());
+    when(secondIterator.readNextBatch())
+        .thenReturn(Collections.<KinesisRecord>emptyList());
+
+    shardReadersPool.start();
+    CustomOptional<KinesisRecord> nextRecord = shardReadersPool.nextRecord();
+    assertThat(nextRecord.isPresent()).isFalse();
+  }
+
+  @Test
+  public void shouldCheckpointReadRecords() throws TransientKinesisException {
+    when(firstIterator.readNextBatch())
+        .thenReturn(asList(a, b))
+        .thenReturn(Collections.<KinesisRecord>emptyList());
+    when(secondIterator.readNextBatch())
+        .thenReturn(singletonList(c))
+        .thenReturn(singletonList(d))
+        .thenReturn(Collections.<KinesisRecord>emptyList());
+
+    shardReadersPool.start();
+    int recordsFound = 0;
+    while (recordsFound < 4) {
+      CustomOptional<KinesisRecord> nextRecord = shardReadersPool.nextRecord();
+      if (nextRecord.isPresent()) {
+        recordsFound++;
+        KinesisRecord kinesisRecord = nextRecord.get();
+        if (kinesisRecord.getShardId().equals("shard1")) {
+          verify(firstIterator).ackRecord(kinesisRecord);
+        } else {
+          verify(secondIterator).ackRecord(kinesisRecord);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void shouldInterruptKinesisReadingAndStopShortly() throws TransientKinesisException {
+    when(firstIterator.readNextBatch()).thenAnswer(new Answer<List<KinesisRecord>>() {
+
+      @Override
+      public List<KinesisRecord> answer(InvocationOnMock invocation) throws Throwable {
+        Thread.sleep(TimeUnit.MINUTES.toMillis(1));
+        return Collections.emptyList();
+      }
+    });
+    shardReadersPool.start();
+
+    Stopwatch stopwatch = Stopwatch.createStarted();
+    shardReadersPool.stop();
+    assertThat(stopwatch.elapsed(TimeUnit.MILLISECONDS)).isLessThan(TimeUnit.SECONDS.toMillis(1));
+  }
+
+  @Test
+  public void shouldInterruptPuttingRecordsToQueueAndStopShortly()
+      throws TransientKinesisException {
+    when(firstIterator.readNextBatch()).thenReturn(asList(a, b, c));
+    KinesisReaderCheckpoint checkpoint = new KinesisReaderCheckpoint(
+        Arrays.asList(firstCheckpoint, secondCheckpoint));
+    ShardReadersPool shardReadersPool = new ShardReadersPool(kinesis, checkpoint, 2);
+    shardReadersPool.start();
+
+    Stopwatch stopwatch = Stopwatch.createStarted();
+    shardReadersPool.stop();
+    assertThat(stopwatch.elapsed(TimeUnit.MILLISECONDS)).isLessThan(TimeUnit.SECONDS.toMillis(1));
+
+  }
+
+  @Test
+  public void shouldDetectThatNotAllShardsAreUpToDate() throws TransientKinesisException {
+    when(firstIterator.isUpToDate()).thenReturn(true);
+    when(secondIterator.isUpToDate()).thenReturn(false);
+    shardReadersPool.start();
+
+    assertThat(shardReadersPool.allShardsUpToDate()).isFalse();
+  }
+
+  @Test
+  public void shouldDetectThatAllShardsAreUpToDate() throws TransientKinesisException {
+    when(firstIterator.isUpToDate()).thenReturn(true);
+    when(secondIterator.isUpToDate()).thenReturn(true);
+    shardReadersPool.start();
+
+    assertThat(shardReadersPool.allShardsUpToDate()).isTrue();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ec394465/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
index 4b2190f..a77eafa 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
@@ -104,27 +104,31 @@ public class ShardRecordsIteratorTest {
   }
 
   @Test
-  public void returnsAbsentIfNoRecordsPresent() throws IOException, TransientKinesisException {
-    assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
-    assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
-    assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+  public void goesThroughAvailableRecords() throws IOException, TransientKinesisException {
+    when(firstResult.getRecords()).thenReturn(asList(a, b, c));
+    when(secondResult.getRecords()).thenReturn(singletonList(d));
+    when(thirdResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+
+    assertThat(iterator.getCheckpoint()).isEqualTo(firstCheckpoint);
+    assertThat(iterator.readNextBatch()).isEqualTo(asList(a, b, c));
+    assertThat(iterator.readNextBatch()).isEqualTo(singletonList(d));
+    assertThat(iterator.readNextBatch()).isEqualTo(Collections.emptyList());
+
   }
 
   @Test
-  public void goesThroughAvailableRecords() throws IOException, TransientKinesisException {
+  public void conformingRecordsMovesCheckpoint() throws IOException, TransientKinesisException {
     when(firstResult.getRecords()).thenReturn(asList(a, b, c));
     when(secondResult.getRecords()).thenReturn(singletonList(d));
+    when(thirdResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
 
-    assertThat(iterator.getCheckpoint()).isEqualTo(firstCheckpoint);
-    assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
+    iterator.ackRecord(a);
     assertThat(iterator.getCheckpoint()).isEqualTo(aCheckpoint);
-    assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
+    iterator.ackRecord(b);
     assertThat(iterator.getCheckpoint()).isEqualTo(bCheckpoint);
-    assertThat(iterator.next()).isEqualTo(CustomOptional.of(c));
+    iterator.ackRecord(c);
     assertThat(iterator.getCheckpoint()).isEqualTo(cCheckpoint);
-    assertThat(iterator.next()).isEqualTo(CustomOptional.of(d));
-    assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
-    assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+    iterator.ackRecord(d);
     assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
   }
 
@@ -140,9 +144,10 @@ public class ShardRecordsIteratorTest {
     when(kinesisClient.getRecords(SECOND_REFRESHED_ITERATOR, STREAM_NAME, SHARD_ID))
         .thenReturn(secondResult);
 
-    assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
-    assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
-    assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+    assertThat(iterator.readNextBatch()).isEqualTo(singletonList(a));
+    iterator.ackRecord(a);
+    assertThat(iterator.readNextBatch()).isEqualTo(singletonList(b));
+    assertThat(iterator.readNextBatch()).isEqualTo(Collections.emptyList());
   }
 
   private static class IdentityAnswer implements Answer<Object> {


[2/2] beam git commit: This closes #3930: [BEAM-2468] Reading Kinesis records in the background

Posted by jk...@apache.org.
This closes #3930: [BEAM-2468] Reading Kinesis records in the background


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/612af0a2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/612af0a2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/612af0a2

Branch: refs/heads/master
Commit: 612af0a296550e6ae12033aa75ed5ffac7650382
Parents: c53a121 ec39446
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Oct 31 16:02:57 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Oct 31 16:02:57 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/kinesis/KinesisReader.java      |  43 ++---
 .../sdk/io/kinesis/KinesisReaderCheckpoint.java |  18 --
 .../apache/beam/sdk/io/kinesis/RoundRobin.java  |  54 ------
 .../beam/sdk/io/kinesis/ShardCheckpoint.java    |   8 +-
 .../beam/sdk/io/kinesis/ShardReadersPool.java   | 162 ++++++++++++++++
 .../sdk/io/kinesis/ShardRecordsIterator.java    |  90 ++++-----
 .../beam/sdk/io/kinesis/KinesisReaderTest.java  |  66 +++----
 .../beam/sdk/io/kinesis/RoundRobinTest.java     |  59 ------
 .../sdk/io/kinesis/ShardReadersPoolTest.java    | 185 +++++++++++++++++++
 .../io/kinesis/ShardRecordsIteratorTest.java    |  35 ++--
 10 files changed, 454 insertions(+), 266 deletions(-)
----------------------------------------------------------------------