You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/07/13 03:07:00 UTC

[48/50] [abbrv] beam git commit: Reformatting Kinesis IO to comply with official code style

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
index 3e3984a..80c950f 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
-
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
@@ -31,9 +30,11 @@ import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import com.amazonaws.services.kinesis.model.StreamDescription;
 import com.google.common.collect.Lists;
+
 import java.util.Date;
 import java.util.List;
 import java.util.concurrent.Callable;
+
 import org.joda.time.Instant;
 
 /**
@@ -41,117 +42,121 @@ import org.joda.time.Instant;
  * proper error handling.
  */
 class SimplifiedKinesisClient {
-    private final AmazonKinesis kinesis;
 
-    public SimplifiedKinesisClient(AmazonKinesis kinesis) {
-        this.kinesis = kinesis;
-    }
+  private final AmazonKinesis kinesis;
 
-    public static SimplifiedKinesisClient from(KinesisClientProvider provider) {
-        return new SimplifiedKinesisClient(provider.get());
-    }
+  public SimplifiedKinesisClient(AmazonKinesis kinesis) {
+    this.kinesis = kinesis;
+  }
 
-    public String getShardIterator(final String streamName, final String shardId,
-                                   final ShardIteratorType shardIteratorType,
-                                   final String startingSequenceNumber, final Instant timestamp)
-            throws TransientKinesisException {
-        final Date date = timestamp != null ? timestamp.toDate() : null;
-        return wrapExceptions(new Callable<String>() {
-            @Override
-            public String call() throws Exception {
-                return kinesis.getShardIterator(new GetShardIteratorRequest()
-                        .withStreamName(streamName)
-                        .withShardId(shardId)
-                        .withShardIteratorType(shardIteratorType)
-                        .withStartingSequenceNumber(startingSequenceNumber)
-                        .withTimestamp(date)
-                ).getShardIterator();
-            }
-        });
-    }
+  public static SimplifiedKinesisClient from(KinesisClientProvider provider) {
+    return new SimplifiedKinesisClient(provider.get());
+  }
 
-    public List<Shard> listShards(final String streamName) throws TransientKinesisException {
-        return wrapExceptions(new Callable<List<Shard>>() {
-            @Override
-            public List<Shard> call() throws Exception {
-                List<Shard> shards = Lists.newArrayList();
-                String lastShardId = null;
-
-                StreamDescription description;
-                do {
-                    description = kinesis.describeStream(streamName, lastShardId)
-                            .getStreamDescription();
-
-                    shards.addAll(description.getShards());
-                    lastShardId = shards.get(shards.size() - 1).getShardId();
-                } while (description.getHasMoreShards());
-
-                return shards;
-            }
-        });
-    }
+  public String getShardIterator(final String streamName, final String shardId,
+      final ShardIteratorType shardIteratorType,
+      final String startingSequenceNumber, final Instant timestamp)
+      throws TransientKinesisException {
+    final Date date = timestamp != null ? timestamp.toDate() : null;
+    return wrapExceptions(new Callable<String>() {
 
-    /**
-     * Gets records from Kinesis and deaggregates them if needed.
-     *
-     * @return list of deaggregated records
-     * @throws TransientKinesisException - in case of recoverable situation
-     */
-    public GetKinesisRecordsResult getRecords(String shardIterator, String streamName,
-                                              String shardId) throws TransientKinesisException {
-        return getRecords(shardIterator, streamName, shardId, null);
-    }
+      @Override
+      public String call() throws Exception {
+        return kinesis.getShardIterator(new GetShardIteratorRequest()
+            .withStreamName(streamName)
+            .withShardId(shardId)
+            .withShardIteratorType(shardIteratorType)
+            .withStartingSequenceNumber(startingSequenceNumber)
+            .withTimestamp(date)
+        ).getShardIterator();
+      }
+    });
+  }
 
-    /**
-     * Gets records from Kinesis and deaggregates them if needed.
-     *
-     * @return list of deaggregated records
-     * @throws TransientKinesisException - in case of recoverable situation
-     */
-    public GetKinesisRecordsResult getRecords(final String shardIterator, final String streamName,
-                                              final String shardId, final Integer limit)
-            throws
-            TransientKinesisException {
-        return wrapExceptions(new Callable<GetKinesisRecordsResult>() {
-            @Override
-            public GetKinesisRecordsResult call() throws Exception {
-                GetRecordsResult response = kinesis.getRecords(new GetRecordsRequest()
-                        .withShardIterator(shardIterator)
-                        .withLimit(limit));
-                return new GetKinesisRecordsResult(
-                        UserRecord.deaggregate(response.getRecords()),
-                        response.getNextShardIterator(),
-                        streamName, shardId);
-            }
-        });
-    }
+  public List<Shard> listShards(final String streamName) throws TransientKinesisException {
+    return wrapExceptions(new Callable<List<Shard>>() {
+
+      @Override
+      public List<Shard> call() throws Exception {
+        List<Shard> shards = Lists.newArrayList();
+        String lastShardId = null;
+
+        StreamDescription description;
+        do {
+          description = kinesis.describeStream(streamName, lastShardId)
+              .getStreamDescription();
+
+          shards.addAll(description.getShards());
+          lastShardId = shards.get(shards.size() - 1).getShardId();
+        } while (description.getHasMoreShards());
+
+        return shards;
+      }
+    });
+  }
+
+  /**
+   * Gets records from Kinesis and deaggregates them if needed.
+   *
+   * @return list of deaggregated records
+   * @throws TransientKinesisException - in case of recoverable situation
+   */
+  public GetKinesisRecordsResult getRecords(String shardIterator, String streamName,
+      String shardId) throws TransientKinesisException {
+    return getRecords(shardIterator, streamName, shardId, null);
+  }
+
+  /**
+   * Gets records from Kinesis and deaggregates them if needed.
+   *
+   * @return list of deaggregated records
+   * @throws TransientKinesisException - in case of recoverable situation
+   */
+  public GetKinesisRecordsResult getRecords(final String shardIterator, final String streamName,
+      final String shardId, final Integer limit)
+      throws
+      TransientKinesisException {
+    return wrapExceptions(new Callable<GetKinesisRecordsResult>() {
+
+      @Override
+      public GetKinesisRecordsResult call() throws Exception {
+        GetRecordsResult response = kinesis.getRecords(new GetRecordsRequest()
+            .withShardIterator(shardIterator)
+            .withLimit(limit));
+        return new GetKinesisRecordsResult(
+            UserRecord.deaggregate(response.getRecords()),
+            response.getNextShardIterator(),
+            streamName, shardId);
+      }
+    });
+  }
 
-    /**
-     * Wraps Amazon specific exceptions into more friendly format.
-     *
-     * @throws TransientKinesisException              - in case of recoverable situation, i.e.
-     *                                  the request rate is too high, Kinesis remote service
-     *                                  failed, network issue, etc.
-     * @throws ExpiredIteratorException - if iterator needs to be refreshed
-     * @throws RuntimeException         - in all other cases
-     */
-    private <T> T wrapExceptions(Callable<T> callable) throws TransientKinesisException {
-        try {
-            return callable.call();
-        } catch (ExpiredIteratorException e) {
-            throw e;
-        } catch (LimitExceededException | ProvisionedThroughputExceededException e) {
-            throw new TransientKinesisException(
-                    "Too many requests to Kinesis. Wait some time and retry.", e);
-        } catch (AmazonServiceException e) {
-            if (e.getErrorType() == AmazonServiceException.ErrorType.Service) {
-                throw new TransientKinesisException(
-                        "Kinesis backend failed. Wait some time and retry.", e);
-            }
-            throw new RuntimeException("Kinesis client side failure", e);
-        } catch (Exception e) {
-            throw new RuntimeException("Unknown kinesis failure, when trying to reach kinesis", e);
-        }
+  /**
+   * Wraps Amazon specific exceptions into more friendly format.
+   *
+   * @throws TransientKinesisException              - in case of recoverable situation, i.e.
+   *                                  the request rate is too high, Kinesis remote service
+   *                                  failed, network issue, etc.
+   * @throws ExpiredIteratorException - if iterator needs to be refreshed
+   * @throws RuntimeException         - in all other cases
+   */
+  private <T> T wrapExceptions(Callable<T> callable) throws TransientKinesisException {
+    try {
+      return callable.call();
+    } catch (ExpiredIteratorException e) {
+      throw e;
+    } catch (LimitExceededException | ProvisionedThroughputExceededException e) {
+      throw new TransientKinesisException(
+          "Too many requests to Kinesis. Wait some time and retry.", e);
+    } catch (AmazonServiceException e) {
+      if (e.getErrorType() == AmazonServiceException.ErrorType.Service) {
+        throw new TransientKinesisException(
+            "Kinesis backend failed. Wait some time and retry.", e);
+      }
+      throw new RuntimeException("Kinesis client side failure", e);
+    } catch (Exception e) {
+      throw new RuntimeException("Unknown kinesis failure, when trying to reach kinesis", e);
     }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
index d8842c4..f9298fa 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
@@ -17,13 +17,14 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
-
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
+
 import java.io.Serializable;
 import java.util.Objects;
+
 import org.joda.time.Instant;
 
 /**
@@ -32,54 +33,55 @@ import org.joda.time.Instant;
  * in which case the reader will start reading at the specified point in time.
  */
 class StartingPoint implements Serializable {
-    private final InitialPositionInStream position;
-    private final Instant timestamp;
 
-    public StartingPoint(InitialPositionInStream position) {
-        this.position = checkNotNull(position, "position");
-        this.timestamp = null;
-    }
+  private final InitialPositionInStream position;
+  private final Instant timestamp;
 
-    public StartingPoint(Instant timestamp) {
-        this.timestamp = checkNotNull(timestamp, "timestamp");
-        this.position = null;
-    }
+  public StartingPoint(InitialPositionInStream position) {
+    this.position = checkNotNull(position, "position");
+    this.timestamp = null;
+  }
 
-    public InitialPositionInStream getPosition() {
-        return position;
-    }
+  public StartingPoint(Instant timestamp) {
+    this.timestamp = checkNotNull(timestamp, "timestamp");
+    this.position = null;
+  }
 
-    public String getPositionName() {
-        return position != null ? position.name() : ShardIteratorType.AT_TIMESTAMP.name();
-    }
+  public InitialPositionInStream getPosition() {
+    return position;
+  }
 
-    public Instant getTimestamp() {
-        return timestamp != null ? timestamp : null;
-    }
+  public String getPositionName() {
+    return position != null ? position.name() : ShardIteratorType.AT_TIMESTAMP.name();
+  }
 
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        StartingPoint that = (StartingPoint) o;
-        return position == that.position && Objects.equals(timestamp, that.timestamp);
-    }
+  public Instant getTimestamp() {
+    return timestamp != null ? timestamp : null;
+  }
 
-    @Override
-    public int hashCode() {
-        return Objects.hash(position, timestamp);
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
     }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    StartingPoint that = (StartingPoint) o;
+    return position == that.position && Objects.equals(timestamp, that.timestamp);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(position, timestamp);
+  }
 
-    @Override
-    public String toString() {
-        if (timestamp == null) {
-            return position.toString();
-        } else {
-            return "Starting at timestamp " + timestamp;
-        }
+  @Override
+  public String toString() {
+    if (timestamp == null) {
+      return position.toString();
+    } else {
+      return "Starting at timestamp " + timestamp;
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java
index 22dc973..1ec865d 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java
@@ -23,20 +23,21 @@ import static com.google.common.base.Preconditions.checkNotNull;
  * Always returns the same instance of checkpoint.
  */
 class StaticCheckpointGenerator implements CheckpointGenerator {
-    private final KinesisReaderCheckpoint checkpoint;
 
-    public StaticCheckpointGenerator(KinesisReaderCheckpoint checkpoint) {
-        checkNotNull(checkpoint, "checkpoint");
-        this.checkpoint = checkpoint;
-    }
+  private final KinesisReaderCheckpoint checkpoint;
 
-    @Override
-    public KinesisReaderCheckpoint generate(SimplifiedKinesisClient client) {
-        return checkpoint;
-    }
+  public StaticCheckpointGenerator(KinesisReaderCheckpoint checkpoint) {
+    checkNotNull(checkpoint, "checkpoint");
+    this.checkpoint = checkpoint;
+  }
 
-    @Override
-    public String toString() {
-        return checkpoint.toString();
-    }
+  @Override
+  public KinesisReaderCheckpoint generate(SimplifiedKinesisClient client) {
+    return checkpoint;
+  }
+
+  @Override
+  public String toString() {
+    return checkpoint.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
index 57ad8a8..68ca0d7 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
@@ -23,7 +23,8 @@ import com.amazonaws.AmazonServiceException;
  * A transient exception thrown by Kinesis.
  */
 class TransientKinesisException extends Exception {
-    public TransientKinesisException(String s, AmazonServiceException e) {
-        super(s, e);
-    }
+
+  public TransientKinesisException(String s, AmazonServiceException e) {
+    super(s, e);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
index 046c9d9..994d6e3 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
@@ -66,10 +66,12 @@ import com.amazonaws.services.kinesis.model.SplitShardRequest;
 import com.amazonaws.services.kinesis.model.SplitShardResult;
 import com.amazonaws.services.kinesis.model.StreamDescription;
 import com.google.common.base.Function;
+
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.List;
 import javax.annotation.Nullable;
+
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.joda.time.Instant;
 
@@ -78,298 +80,301 @@ import org.joda.time.Instant;
  */
 class AmazonKinesisMock implements AmazonKinesis {
 
-    static class TestData implements Serializable {
-        private final String data;
-        private final Instant arrivalTimestamp;
-        private final String sequenceNumber;
-
-        public TestData(KinesisRecord record) {
-            this(new String(record.getData().array()),
-                    record.getApproximateArrivalTimestamp(),
-                    record.getSequenceNumber());
-        }
-
-        public TestData(String data, Instant arrivalTimestamp, String sequenceNumber) {
-            this.data = data;
-            this.arrivalTimestamp = arrivalTimestamp;
-            this.sequenceNumber = sequenceNumber;
-        }
-
-        public Record convertToRecord() {
-            return new Record().
-                    withApproximateArrivalTimestamp(arrivalTimestamp.toDate()).
-                    withData(ByteBuffer.wrap(data.getBytes())).
-                    withSequenceNumber(sequenceNumber).
-                    withPartitionKey("");
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            return EqualsBuilder.reflectionEquals(this, obj);
-        }
-
-        @Override
-        public int hashCode() {
-            return reflectionHashCode(this);
-        }
-    }
-
-    static class Provider implements KinesisClientProvider {
-
-        private final List<List<TestData>> shardedData;
-        private final int numberOfRecordsPerGet;
-
-        public Provider(List<List<TestData>> shardedData, int numberOfRecordsPerGet) {
-            this.shardedData = shardedData;
-            this.numberOfRecordsPerGet = numberOfRecordsPerGet;
-        }
-
-        @Override
-        public AmazonKinesis get() {
-            return new AmazonKinesisMock(transform(shardedData,
-                    new Function<List<TestData>, List<Record>>() {
-                        @Override
-                        public List<Record> apply(@Nullable List<TestData> testDatas) {
-                            return transform(testDatas, new Function<TestData, Record>() {
-                                @Override
-                                public Record apply(@Nullable TestData testData) {
-                                    return testData.convertToRecord();
-                                }
-                            });
-                        }
-                    }), numberOfRecordsPerGet);
-        }
-    }
-
-    private final List<List<Record>> shardedData;
-    private final int numberOfRecordsPerGet;
-
-    public AmazonKinesisMock(List<List<Record>> shardedData, int numberOfRecordsPerGet) {
-        this.shardedData = shardedData;
-        this.numberOfRecordsPerGet = numberOfRecordsPerGet;
-    }
-
-    @Override
-    public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) {
-        String[] shardIteratorParts = getRecordsRequest.getShardIterator().split(":");
-        int shardId = parseInt(shardIteratorParts[0]);
-        int startingRecord = parseInt(shardIteratorParts[1]);
-        List<Record> shardData = shardedData.get(shardId);
-
-        int toIndex = min(startingRecord + numberOfRecordsPerGet, shardData.size());
-        int fromIndex = min(startingRecord, toIndex);
-        return new GetRecordsResult().
-                withRecords(shardData.subList(fromIndex, toIndex)).
-                withNextShardIterator(String.format("%s:%s", shardId, toIndex));
-    }
-
-    @Override
-    public GetShardIteratorResult getShardIterator(
-            GetShardIteratorRequest getShardIteratorRequest) {
-        ShardIteratorType shardIteratorType = ShardIteratorType.fromValue(
-                getShardIteratorRequest.getShardIteratorType());
-
-        String shardIterator;
-        if (shardIteratorType == ShardIteratorType.TRIM_HORIZON) {
-            shardIterator = String.format("%s:%s", getShardIteratorRequest.getShardId(), 0);
-        } else {
-            throw new RuntimeException("Not implemented");
-        }
-
-        return new GetShardIteratorResult().withShardIterator(shardIterator);
-    }
-
-    @Override
-    public DescribeStreamResult describeStream(String streamName, String exclusiveStartShardId) {
-        int nextShardId = 0;
-        if (exclusiveStartShardId != null) {
-            nextShardId = parseInt(exclusiveStartShardId) + 1;
-        }
-        boolean hasMoreShards = nextShardId + 1 < shardedData.size();
-
-        List<Shard> shards = newArrayList();
-        if (nextShardId < shardedData.size()) {
-            shards.add(new Shard().withShardId(Integer.toString(nextShardId)));
-        }
-
-        return new DescribeStreamResult().withStreamDescription(
-                new StreamDescription().withHasMoreShards(hasMoreShards).withShards(shards)
-        );
-    }
-
-    @Override
-    public void setEndpoint(String endpoint) {
-
-    }
-
-    @Override
-    public void setRegion(Region region) {
-
-    }
-
-    @Override
-    public AddTagsToStreamResult addTagsToStream(AddTagsToStreamRequest addTagsToStreamRequest) {
-        throw new RuntimeException("Not implemented");
-    }
-
-    @Override
-    public CreateStreamResult createStream(CreateStreamRequest createStreamRequest) {
-        throw new RuntimeException("Not implemented");
-    }
-
-    @Override
-    public CreateStreamResult createStream(String streamName, Integer shardCount) {
-        throw new RuntimeException("Not implemented");
-    }
-
-    @Override
-    public DecreaseStreamRetentionPeriodResult decreaseStreamRetentionPeriod(
-            DecreaseStreamRetentionPeriodRequest decreaseStreamRetentionPeriodRequest) {
-        throw new RuntimeException("Not implemented");
-    }
-
-    @Override
-    public DeleteStreamResult deleteStream(DeleteStreamRequest deleteStreamRequest) {
-        throw new RuntimeException("Not implemented");
-    }
-
-    @Override
-    public DeleteStreamResult deleteStream(String streamName) {
-        throw new RuntimeException("Not implemented");
-    }
-
-    @Override
-    public DescribeStreamResult describeStream(DescribeStreamRequest describeStreamRequest) {
-        throw new RuntimeException("Not implemented");
-    }
-
-    @Override
-    public DescribeStreamResult describeStream(String streamName) {
-
-        throw new RuntimeException("Not implemented");
-    }
-
-    @Override
-    public DescribeStreamResult describeStream(String streamName,
-                                               Integer limit, String exclusiveStartShardId) {
-        throw new RuntimeException("Not implemented");
-    }
-
-    @Override
-    public DisableEnhancedMonitoringResult disableEnhancedMonitoring(
-            DisableEnhancedMonitoringRequest disableEnhancedMonitoringRequest) {
-        throw new RuntimeException("Not implemented");
-    }
-
-    @Override
-    public EnableEnhancedMonitoringResult enableEnhancedMonitoring(
-            EnableEnhancedMonitoringRequest enableEnhancedMonitoringRequest) {
-        throw new RuntimeException("Not implemented");
-    }
-
-    @Override
-    public GetShardIteratorResult getShardIterator(String streamName,
-                                                   String shardId,
-                                                   String shardIteratorType) {
-        throw new RuntimeException("Not implemented");
-    }
-
-    @Override
-    public GetShardIteratorResult getShardIterator(String streamName,
-                                                   String shardId,
-                                                   String shardIteratorType,
-                                                   String startingSequenceNumber) {
-        throw new RuntimeException("Not implemented");
-    }
-
-    @Override
-    public IncreaseStreamRetentionPeriodResult increaseStreamRetentionPeriod(
-            IncreaseStreamRetentionPeriodRequest increaseStreamRetentionPeriodRequest) {
-        throw new RuntimeException("Not implemented");
-    }
+  static class TestData implements Serializable {
 
-    @Override
-    public ListStreamsResult listStreams(ListStreamsRequest listStreamsRequest) {
-        throw new RuntimeException("Not implemented");
-    }
+    private final String data;
+    private final Instant arrivalTimestamp;
+    private final String sequenceNumber;
 
-    @Override
-    public ListStreamsResult listStreams() {
-        throw new RuntimeException("Not implemented");
+    public TestData(KinesisRecord record) {
+      this(new String(record.getData().array()),
+          record.getApproximateArrivalTimestamp(),
+          record.getSequenceNumber());
     }
 
-    @Override
-    public ListStreamsResult listStreams(String exclusiveStartStreamName) {
-        throw new RuntimeException("Not implemented");
+    public TestData(String data, Instant arrivalTimestamp, String sequenceNumber) {
+      this.data = data;
+      this.arrivalTimestamp = arrivalTimestamp;
+      this.sequenceNumber = sequenceNumber;
     }
 
-    @Override
-    public ListStreamsResult listStreams(Integer limit, String exclusiveStartStreamName) {
-        throw new RuntimeException("Not implemented");
+    public Record convertToRecord() {
+      return new Record().
+          withApproximateArrivalTimestamp(arrivalTimestamp.toDate()).
+          withData(ByteBuffer.wrap(data.getBytes())).
+          withSequenceNumber(sequenceNumber).
+          withPartitionKey("");
     }
 
     @Override
-    public ListTagsForStreamResult listTagsForStream(
-            ListTagsForStreamRequest listTagsForStreamRequest) {
-        throw new RuntimeException("Not implemented");
+    public boolean equals(Object obj) {
+      return EqualsBuilder.reflectionEquals(this, obj);
     }
 
     @Override
-    public MergeShardsResult mergeShards(MergeShardsRequest mergeShardsRequest) {
-        throw new RuntimeException("Not implemented");
+    public int hashCode() {
+      return reflectionHashCode(this);
     }
+  }
 
-    @Override
-    public MergeShardsResult mergeShards(String streamName,
-                                         String shardToMerge, String adjacentShardToMerge) {
-        throw new RuntimeException("Not implemented");
-    }
+  static class Provider implements KinesisClientProvider {
 
-    @Override
-    public PutRecordResult putRecord(PutRecordRequest putRecordRequest) {
-        throw new RuntimeException("Not implemented");
-    }
+    private final List<List<TestData>> shardedData;
+    private final int numberOfRecordsPerGet;
 
-    @Override
-    public PutRecordResult putRecord(String streamName, ByteBuffer data, String partitionKey) {
-        throw new RuntimeException("Not implemented");
+    public Provider(List<List<TestData>> shardedData, int numberOfRecordsPerGet) {
+      this.shardedData = shardedData;
+      this.numberOfRecordsPerGet = numberOfRecordsPerGet;
     }
 
     @Override
-    public PutRecordResult putRecord(String streamName, ByteBuffer data,
-                                     String partitionKey, String sequenceNumberForOrdering) {
-        throw new RuntimeException("Not implemented");
-    }
+    public AmazonKinesis get() {
+      return new AmazonKinesisMock(transform(shardedData,
+          new Function<List<TestData>, List<Record>>() {
 
-    @Override
-    public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest) {
-        throw new RuntimeException("Not implemented");
-    }
+            @Override
+            public List<Record> apply(@Nullable List<TestData> testDatas) {
+              return transform(testDatas, new Function<TestData, Record>() {
 
-    @Override
-    public RemoveTagsFromStreamResult removeTagsFromStream(
-            RemoveTagsFromStreamRequest removeTagsFromStreamRequest) {
-        throw new RuntimeException("Not implemented");
+                @Override
+                public Record apply(@Nullable TestData testData) {
+                  return testData.convertToRecord();
+                }
+              });
+            }
+          }), numberOfRecordsPerGet);
     }
+  }
 
-    @Override
-    public SplitShardResult splitShard(SplitShardRequest splitShardRequest) {
-        throw new RuntimeException("Not implemented");
-    }
+  private final List<List<Record>> shardedData;
+  private final int numberOfRecordsPerGet;
 
-    @Override
-    public SplitShardResult splitShard(String streamName,
-                                       String shardToSplit, String newStartingHashKey) {
-        throw new RuntimeException("Not implemented");
-    }
+  public AmazonKinesisMock(List<List<Record>> shardedData, int numberOfRecordsPerGet) {
+    this.shardedData = shardedData;
+    this.numberOfRecordsPerGet = numberOfRecordsPerGet;
+  }
 
-    @Override
-    public void shutdown() {
+  @Override
+  public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) {
+    String[] shardIteratorParts = getRecordsRequest.getShardIterator().split(":");
+    int shardId = parseInt(shardIteratorParts[0]);
+    int startingRecord = parseInt(shardIteratorParts[1]);
+    List<Record> shardData = shardedData.get(shardId);
 
-    }
+    int toIndex = min(startingRecord + numberOfRecordsPerGet, shardData.size());
+    int fromIndex = min(startingRecord, toIndex);
+    return new GetRecordsResult().
+        withRecords(shardData.subList(fromIndex, toIndex)).
+        withNextShardIterator(String.format("%s:%s", shardId, toIndex));
+  }
 
-    @Override
-    public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) {
-        throw new RuntimeException("Not implemented");
-    }
+  @Override
+  public GetShardIteratorResult getShardIterator(
+      GetShardIteratorRequest getShardIteratorRequest) {
+    ShardIteratorType shardIteratorType = ShardIteratorType.fromValue(
+        getShardIteratorRequest.getShardIteratorType());
+
+    String shardIterator;
+    if (shardIteratorType == ShardIteratorType.TRIM_HORIZON) {
+      shardIterator = String.format("%s:%s", getShardIteratorRequest.getShardId(), 0);
+    } else {
+      throw new RuntimeException("Not implemented");
+    }
+
+    return new GetShardIteratorResult().withShardIterator(shardIterator);
+  }
+
+  @Override
+  public DescribeStreamResult describeStream(String streamName, String exclusiveStartShardId) {
+    int nextShardId = 0;
+    if (exclusiveStartShardId != null) {
+      nextShardId = parseInt(exclusiveStartShardId) + 1;
+    }
+    boolean hasMoreShards = nextShardId + 1 < shardedData.size();
+
+    List<Shard> shards = newArrayList();
+    if (nextShardId < shardedData.size()) {
+      shards.add(new Shard().withShardId(Integer.toString(nextShardId)));
+    }
+
+    return new DescribeStreamResult().withStreamDescription(
+        new StreamDescription().withHasMoreShards(hasMoreShards).withShards(shards)
+    );
+  }
+
+  @Override
+  public void setEndpoint(String endpoint) {
+
+  }
+
+  @Override
+  public void setRegion(Region region) {
+
+  }
+
+  @Override
+  public AddTagsToStreamResult addTagsToStream(AddTagsToStreamRequest addTagsToStreamRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public CreateStreamResult createStream(CreateStreamRequest createStreamRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public CreateStreamResult createStream(String streamName, Integer shardCount) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public DecreaseStreamRetentionPeriodResult decreaseStreamRetentionPeriod(
+      DecreaseStreamRetentionPeriodRequest decreaseStreamRetentionPeriodRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public DeleteStreamResult deleteStream(DeleteStreamRequest deleteStreamRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public DeleteStreamResult deleteStream(String streamName) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public DescribeStreamResult describeStream(DescribeStreamRequest describeStreamRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public DescribeStreamResult describeStream(String streamName) {
+
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public DescribeStreamResult describeStream(String streamName,
+      Integer limit, String exclusiveStartShardId) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public DisableEnhancedMonitoringResult disableEnhancedMonitoring(
+      DisableEnhancedMonitoringRequest disableEnhancedMonitoringRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public EnableEnhancedMonitoringResult enableEnhancedMonitoring(
+      EnableEnhancedMonitoringRequest enableEnhancedMonitoringRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public GetShardIteratorResult getShardIterator(String streamName,
+      String shardId,
+      String shardIteratorType) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public GetShardIteratorResult getShardIterator(String streamName,
+      String shardId,
+      String shardIteratorType,
+      String startingSequenceNumber) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public IncreaseStreamRetentionPeriodResult increaseStreamRetentionPeriod(
+      IncreaseStreamRetentionPeriodRequest increaseStreamRetentionPeriodRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public ListStreamsResult listStreams(ListStreamsRequest listStreamsRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public ListStreamsResult listStreams() {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public ListStreamsResult listStreams(String exclusiveStartStreamName) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public ListStreamsResult listStreams(Integer limit, String exclusiveStartStreamName) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public ListTagsForStreamResult listTagsForStream(
+      ListTagsForStreamRequest listTagsForStreamRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public MergeShardsResult mergeShards(MergeShardsRequest mergeShardsRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public MergeShardsResult mergeShards(String streamName,
+      String shardToMerge, String adjacentShardToMerge) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public PutRecordResult putRecord(PutRecordRequest putRecordRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public PutRecordResult putRecord(String streamName, ByteBuffer data, String partitionKey) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public PutRecordResult putRecord(String streamName, ByteBuffer data,
+      String partitionKey, String sequenceNumberForOrdering) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public RemoveTagsFromStreamResult removeTagsFromStream(
+      RemoveTagsFromStreamRequest removeTagsFromStreamRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public SplitShardResult splitShard(SplitShardRequest splitShardRequest) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public SplitShardResult splitShard(String streamName,
+      String shardToSplit, String newStartingHashKey) {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public void shutdown() {
+
+  }
+
+  @Override
+  public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) {
+    throw new RuntimeException("Not implemented");
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
index 00acffe..0b16bb7 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
@@ -18,24 +18,27 @@
 package org.apache.beam.sdk.io.kinesis;
 
 import com.google.common.testing.EqualsTester;
+
 import java.util.NoSuchElementException;
+
 import org.junit.Test;
 
 /**
  * Tests {@link CustomOptional}.
  */
 public class CustomOptionalTest {
-    @Test(expected = NoSuchElementException.class)
-    public void absentThrowsNoSuchElementExceptionOnGet() {
-        CustomOptional.absent().get();
-    }
 
-    @Test
-    public void testEqualsAndHashCode() {
-        new EqualsTester()
-            .addEqualityGroup(CustomOptional.absent(), CustomOptional.absent())
-            .addEqualityGroup(CustomOptional.of(3), CustomOptional.of(3))
-            .addEqualityGroup(CustomOptional.of(11))
-            .addEqualityGroup(CustomOptional.of("3")).testEquals();
-    }
+  @Test(expected = NoSuchElementException.class)
+  public void absentThrowsNoSuchElementExceptionOnGet() {
+    CustomOptional.absent().get();
+  }
+
+  @Test
+  public void testEqualsAndHashCode() {
+    new EqualsTester()
+        .addEqualityGroup(CustomOptional.absent(), CustomOptional.absent())
+        .addEqualityGroup(CustomOptional.of(3), CustomOptional.of(3))
+        .addEqualityGroup(CustomOptional.of(11))
+        .addEqualityGroup(CustomOptional.of("3")).testEquals();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
index c92ac9a..1bb9717 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
@@ -28,30 +28,29 @@ import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
 
-
 /***
  */
 @RunWith(MockitoJUnitRunner.class)
 public class DynamicCheckpointGeneratorTest {
 
-    @Mock
-    private SimplifiedKinesisClient kinesisClient;
-    @Mock
-    private Shard shard1, shard2, shard3;
+  @Mock
+  private SimplifiedKinesisClient kinesisClient;
+  @Mock
+  private Shard shard1, shard2, shard3;
 
-    @Test
-    public void shouldMapAllShardsToCheckpoints() throws Exception {
-        given(shard1.getShardId()).willReturn("shard-01");
-        given(shard2.getShardId()).willReturn("shard-02");
-        given(shard3.getShardId()).willReturn("shard-03");
-        given(kinesisClient.listShards("stream")).willReturn(asList(shard1, shard2, shard3));
+  @Test
+  public void shouldMapAllShardsToCheckpoints() throws Exception {
+    given(shard1.getShardId()).willReturn("shard-01");
+    given(shard2.getShardId()).willReturn("shard-02");
+    given(shard3.getShardId()).willReturn("shard-03");
+    given(kinesisClient.listShards("stream")).willReturn(asList(shard1, shard2, shard3));
 
-        StartingPoint startingPoint = new StartingPoint(InitialPositionInStream.LATEST);
-        DynamicCheckpointGenerator underTest = new DynamicCheckpointGenerator("stream",
-                startingPoint);
+    StartingPoint startingPoint = new StartingPoint(InitialPositionInStream.LATEST);
+    DynamicCheckpointGenerator underTest = new DynamicCheckpointGenerator("stream",
+        startingPoint);
 
-        KinesisReaderCheckpoint checkpoint = underTest.generate(kinesisClient);
+    KinesisReaderCheckpoint checkpoint = underTest.generate(kinesisClient);
 
-        assertThat(checkpoint).hasSize(3);
-    }
+    assertThat(checkpoint).hasSize(3);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
index 567e25f..44ad67d 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
@@ -21,7 +21,9 @@ import static com.google.common.collect.Lists.newArrayList;
 
 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
 import com.google.common.collect.Iterables;
+
 import java.util.List;
+
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -36,59 +38,60 @@ import org.junit.Test;
  */
 public class KinesisMockReadTest {
 
-    @Rule
-    public final transient TestPipeline p = TestPipeline.create();
-
-    @Test
-    public void readsDataFromMockKinesis() {
-        int noOfShards = 3;
-        int noOfEventsPerShard = 100;
-        List<List<AmazonKinesisMock.TestData>> testData =
-                provideTestData(noOfShards, noOfEventsPerShard);
-
-        PCollection<AmazonKinesisMock.TestData> result = p
-                .apply(
-                        KinesisIO.read()
-                                .from("stream", InitialPositionInStream.TRIM_HORIZON)
-                                .withClientProvider(new AmazonKinesisMock.Provider(testData, 10))
-                                .withMaxNumRecords(noOfShards * noOfEventsPerShard))
-                .apply(ParDo.of(new KinesisRecordToTestData()));
-        PAssert.that(result).containsInAnyOrder(Iterables.concat(testData));
-        p.run();
-    }
+  @Rule
+  public final transient TestPipeline p = TestPipeline.create();
 
-    private static class KinesisRecordToTestData extends
-            DoFn<KinesisRecord, AmazonKinesisMock.TestData> {
-        @ProcessElement
-        public void processElement(ProcessContext c) throws Exception {
-            c.output(new AmazonKinesisMock.TestData(c.element()));
-        }
-    }
+  @Test
+  public void readsDataFromMockKinesis() {
+    int noOfShards = 3;
+    int noOfEventsPerShard = 100;
+    List<List<AmazonKinesisMock.TestData>> testData =
+        provideTestData(noOfShards, noOfEventsPerShard);
 
-    private List<List<AmazonKinesisMock.TestData>> provideTestData(
-            int noOfShards,
-            int noOfEventsPerShard) {
+    PCollection<AmazonKinesisMock.TestData> result = p
+        .apply(
+            KinesisIO.read()
+                .from("stream", InitialPositionInStream.TRIM_HORIZON)
+                .withClientProvider(new AmazonKinesisMock.Provider(testData, 10))
+                .withMaxNumRecords(noOfShards * noOfEventsPerShard))
+        .apply(ParDo.of(new KinesisRecordToTestData()));
+    PAssert.that(result).containsInAnyOrder(Iterables.concat(testData));
+    p.run();
+  }
 
-        int seqNumber = 0;
+  private static class KinesisRecordToTestData extends
+      DoFn<KinesisRecord, AmazonKinesisMock.TestData> {
 
-        List<List<AmazonKinesisMock.TestData>> shardedData = newArrayList();
-        for (int i = 0; i < noOfShards; ++i) {
-            List<AmazonKinesisMock.TestData> shardData = newArrayList();
-            shardedData.add(shardData);
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      c.output(new AmazonKinesisMock.TestData(c.element()));
+    }
+  }
+
+  private List<List<AmazonKinesisMock.TestData>> provideTestData(
+      int noOfShards,
+      int noOfEventsPerShard) {
 
-            DateTime arrival = DateTime.now();
-            for (int j = 0; j < noOfEventsPerShard; ++j) {
-                arrival = arrival.plusSeconds(1);
+    int seqNumber = 0;
 
-                seqNumber++;
-                shardData.add(new AmazonKinesisMock.TestData(
-                        Integer.toString(seqNumber),
-                        arrival.toInstant(),
-                        Integer.toString(seqNumber))
-                );
-            }
-        }
+    List<List<AmazonKinesisMock.TestData>> shardedData = newArrayList();
+    for (int i = 0; i < noOfShards; ++i) {
+      List<AmazonKinesisMock.TestData> shardData = newArrayList();
+      shardedData.add(shardData);
 
-        return shardedData;
+      DateTime arrival = DateTime.now();
+      for (int j = 0; j < noOfEventsPerShard; ++j) {
+        arrival = arrival.plusSeconds(1);
+
+        seqNumber++;
+        shardData.add(new AmazonKinesisMock.TestData(
+            Integer.toString(seqNumber),
+            arrival.toInstant(),
+            Integer.toString(seqNumber))
+        );
+      }
     }
+
+    return shardedData;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
index 8c8da64..1038a47 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
@@ -17,13 +17,14 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
-
 import static java.util.Arrays.asList;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import com.google.common.collect.Iterables;
+
 import java.util.Iterator;
 import java.util.List;
+
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -35,33 +36,34 @@ import org.mockito.runners.MockitoJUnitRunner;
  */
 @RunWith(MockitoJUnitRunner.class)
 public class KinesisReaderCheckpointTest {
-    @Mock
-    private ShardCheckpoint a, b, c;
 
-    private KinesisReaderCheckpoint checkpoint;
+  @Mock
+  private ShardCheckpoint a, b, c;
+
+  private KinesisReaderCheckpoint checkpoint;
 
-    @Before
-    public void setUp() {
-        checkpoint = new KinesisReaderCheckpoint(asList(a, b, c));
-    }
+  @Before
+  public void setUp() {
+    checkpoint = new KinesisReaderCheckpoint(asList(a, b, c));
+  }
 
-    @Test
-    public void splitsCheckpointAccordingly() {
-        verifySplitInto(1);
-        verifySplitInto(2);
-        verifySplitInto(3);
-        verifySplitInto(4);
-    }
+  @Test
+  public void splitsCheckpointAccordingly() {
+    verifySplitInto(1);
+    verifySplitInto(2);
+    verifySplitInto(3);
+    verifySplitInto(4);
+  }
 
-    @Test(expected = UnsupportedOperationException.class)
-    public void isImmutable() {
-        Iterator<ShardCheckpoint> iterator = checkpoint.iterator();
-        iterator.remove();
-    }
+  @Test(expected = UnsupportedOperationException.class)
+  public void isImmutable() {
+    Iterator<ShardCheckpoint> iterator = checkpoint.iterator();
+    iterator.remove();
+  }
 
-    private void verifySplitInto(int size) {
-        List<KinesisReaderCheckpoint> split = checkpoint.splitInto(size);
-        assertThat(Iterables.concat(split)).containsOnly(a, b, c);
-        assertThat(split).hasSize(Math.min(size, 3));
-    }
+  private void verifySplitInto(int size) {
+    List<KinesisReaderCheckpoint> split = checkpoint.splitInto(size);
+    assertThat(Iterables.concat(split)).containsOnly(a, b, c);
+    assertThat(split).hasSize(Math.min(size, 3));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
index 8eb6546..5781033 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
@@ -23,6 +23,7 @@ import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import com.amazonaws.regions.Regions;
+
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
@@ -31,6 +32,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
@@ -50,72 +52,75 @@ import org.junit.Test;
  * You need to provide all {@link KinesisTestOptions} in order to run this.
  */
 public class KinesisReaderIT {
-    private static final long PIPELINE_STARTUP_TIME = TimeUnit.SECONDS.toMillis(10);
-    private ExecutorService singleThreadExecutor = newSingleThreadExecutor();
-
-    @Rule
-    public final transient TestPipeline p = TestPipeline.create();
-
-    @Ignore
-    @Test
-    public void readsDataFromRealKinesisStream()
-            throws IOException, InterruptedException, ExecutionException {
-        KinesisTestOptions options = readKinesisOptions();
-        List<String> testData = prepareTestData(1000);
-
-        Future<?> future = startTestPipeline(testData, options);
-        KinesisUploader.uploadAll(testData, options);
-        future.get();
-    }
 
-    private List<String> prepareTestData(int count) {
-        List<String> data = newArrayList();
-        for (int i = 0; i < count; ++i) {
-            data.add(RandomStringUtils.randomAlphabetic(32));
-        }
-        return data;
-    }
+  private static final long PIPELINE_STARTUP_TIME = TimeUnit.SECONDS.toMillis(10);
+  private ExecutorService singleThreadExecutor = newSingleThreadExecutor();
 
-    private Future<?> startTestPipeline(List<String> testData, KinesisTestOptions options)
-            throws InterruptedException {
-
-        PCollection<String> result = p.
-                apply(KinesisIO.read()
-                        .from(options.getAwsKinesisStream(), Instant.now())
-                        .withClientProvider(options.getAwsAccessKey(), options.getAwsSecretKey(),
-                                Regions.fromName(options.getAwsKinesisRegion()))
-                        .withMaxReadTime(Duration.standardMinutes(3))
-                ).
-                apply(ParDo.of(new RecordDataToString()));
-        PAssert.that(result).containsInAnyOrder(testData);
-
-        Future<?> future = singleThreadExecutor.submit(new Callable<Void>() {
-            @Override
-            public Void call() throws Exception {
-                PipelineResult result = p.run();
-                PipelineResult.State state = result.getState();
-                while (state != PipelineResult.State.DONE && state != PipelineResult.State.FAILED) {
-                    Thread.sleep(1000);
-                    state = result.getState();
-                }
-                assertThat(state).isEqualTo(PipelineResult.State.DONE);
-                return null;
-            }
-        });
-        Thread.sleep(PIPELINE_STARTUP_TIME);
-        return future;
-    }
+  @Rule
+  public final transient TestPipeline p = TestPipeline.create();
+
+  @Ignore
+  @Test
+  public void readsDataFromRealKinesisStream()
+      throws IOException, InterruptedException, ExecutionException {
+    KinesisTestOptions options = readKinesisOptions();
+    List<String> testData = prepareTestData(1000);
 
-    private KinesisTestOptions readKinesisOptions() {
-        PipelineOptionsFactory.register(KinesisTestOptions.class);
-        return TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
+    Future<?> future = startTestPipeline(testData, options);
+    KinesisUploader.uploadAll(testData, options);
+    future.get();
+  }
+
+  private List<String> prepareTestData(int count) {
+    List<String> data = newArrayList();
+    for (int i = 0; i < count; ++i) {
+      data.add(RandomStringUtils.randomAlphabetic(32));
     }
+    return data;
+  }
+
+  private Future<?> startTestPipeline(List<String> testData, KinesisTestOptions options)
+      throws InterruptedException {
+
+    PCollection<String> result = p.
+        apply(KinesisIO.read()
+            .from(options.getAwsKinesisStream(), Instant.now())
+            .withClientProvider(options.getAwsAccessKey(), options.getAwsSecretKey(),
+                Regions.fromName(options.getAwsKinesisRegion()))
+            .withMaxReadTime(Duration.standardMinutes(3))
+        ).
+        apply(ParDo.of(new RecordDataToString()));
+    PAssert.that(result).containsInAnyOrder(testData);
+
+    Future<?> future = singleThreadExecutor.submit(new Callable<Void>() {
 
-    private static class RecordDataToString extends DoFn<KinesisRecord, String> {
-        @ProcessElement
-        public void processElement(ProcessContext c) throws Exception {
-            checkNotNull(c.element(), "Null record given");
-            c.output(new String(c.element().getData().array(), StandardCharsets.UTF_8));
+      @Override
+      public Void call() throws Exception {
+        PipelineResult result = p.run();
+        PipelineResult.State state = result.getState();
+        while (state != PipelineResult.State.DONE && state != PipelineResult.State.FAILED) {
+          Thread.sleep(1000);
+          state = result.getState();
         }
+        assertThat(state).isEqualTo(PipelineResult.State.DONE);
+        return null;
+      }
+    });
+    Thread.sleep(PIPELINE_STARTUP_TIME);
+    return future;
+  }
+
+  private KinesisTestOptions readKinesisOptions() {
+    PipelineOptionsFactory.register(KinesisTestOptions.class);
+    return TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
+  }
+
+  private static class RecordDataToString extends DoFn<KinesisRecord, String> {
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      checkNotNull(c.element(), "Null record given");
+      c.output(new String(c.element().getData().array(), StandardCharsets.UTF_8));
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
index 3111029..a26501a 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.NoSuchElementException;
+
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -34,87 +35,88 @@ import org.mockito.runners.MockitoJUnitRunner;
  */
 @RunWith(MockitoJUnitRunner.class)
 public class KinesisReaderTest {
-    @Mock
-    private SimplifiedKinesisClient kinesis;
-    @Mock
-    private CheckpointGenerator generator;
-    @Mock
-    private ShardCheckpoint firstCheckpoint, secondCheckpoint;
-    @Mock
-    private ShardRecordsIterator firstIterator, secondIterator;
-    @Mock
-    private KinesisRecord a, b, c, d;
-
-    private KinesisReader reader;
-
-    @Before
-    public void setUp() throws IOException, TransientKinesisException {
-        when(generator.generate(kinesis)).thenReturn(new KinesisReaderCheckpoint(
-                asList(firstCheckpoint, secondCheckpoint)
-        ));
-        when(firstCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(firstIterator);
-        when(secondCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(secondIterator);
-        when(firstIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
-        when(secondIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
-
-        reader = new KinesisReader(kinesis, generator, null);
-    }
-
-    @Test
-    public void startReturnsFalseIfNoDataAtTheBeginning() throws IOException {
-        assertThat(reader.start()).isFalse();
-    }
-
-    @Test(expected = NoSuchElementException.class)
-    public void throwsNoSuchElementExceptionIfNoData() throws IOException {
-        reader.start();
-        reader.getCurrent();
-    }
-
-    @Test
-    public void startReturnsTrueIfSomeDataAvailable() throws IOException,
-            TransientKinesisException {
-        when(firstIterator.next()).
-                thenReturn(CustomOptional.of(a)).
-                thenReturn(CustomOptional.<KinesisRecord>absent());
-
-        assertThat(reader.start()).isTrue();
-    }
-
-    @Test
-    public void advanceReturnsFalseIfThereIsTransientExceptionInKinesis()
-            throws IOException, TransientKinesisException {
-        reader.start();
-
-        when(firstIterator.next()).thenThrow(TransientKinesisException.class);
-
-        assertThat(reader.advance()).isFalse();
-    }
-
-    @Test
-    public void readsThroughAllDataAvailable() throws IOException, TransientKinesisException {
-        when(firstIterator.next()).
-                thenReturn(CustomOptional.<KinesisRecord>absent()).
-                thenReturn(CustomOptional.of(a)).
-                thenReturn(CustomOptional.<KinesisRecord>absent()).
-                thenReturn(CustomOptional.of(b)).
-                thenReturn(CustomOptional.<KinesisRecord>absent());
-
-        when(secondIterator.next()).
-                thenReturn(CustomOptional.of(c)).
-                thenReturn(CustomOptional.<KinesisRecord>absent()).
-                thenReturn(CustomOptional.of(d)).
-                thenReturn(CustomOptional.<KinesisRecord>absent());
-
-        assertThat(reader.start()).isTrue();
-        assertThat(reader.getCurrent()).isEqualTo(c);
-        assertThat(reader.advance()).isTrue();
-        assertThat(reader.getCurrent()).isEqualTo(a);
-        assertThat(reader.advance()).isTrue();
-        assertThat(reader.getCurrent()).isEqualTo(d);
-        assertThat(reader.advance()).isTrue();
-        assertThat(reader.getCurrent()).isEqualTo(b);
-        assertThat(reader.advance()).isFalse();
-    }
+
+  @Mock
+  private SimplifiedKinesisClient kinesis;
+  @Mock
+  private CheckpointGenerator generator;
+  @Mock
+  private ShardCheckpoint firstCheckpoint, secondCheckpoint;
+  @Mock
+  private ShardRecordsIterator firstIterator, secondIterator;
+  @Mock
+  private KinesisRecord a, b, c, d;
+
+  private KinesisReader reader;
+
+  @Before
+  public void setUp() throws IOException, TransientKinesisException {
+    when(generator.generate(kinesis)).thenReturn(new KinesisReaderCheckpoint(
+        asList(firstCheckpoint, secondCheckpoint)
+    ));
+    when(firstCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(firstIterator);
+    when(secondCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(secondIterator);
+    when(firstIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
+    when(secondIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
+
+    reader = new KinesisReader(kinesis, generator, null);
+  }
+
+  @Test
+  public void startReturnsFalseIfNoDataAtTheBeginning() throws IOException {
+    assertThat(reader.start()).isFalse();
+  }
+
+  @Test(expected = NoSuchElementException.class)
+  public void throwsNoSuchElementExceptionIfNoData() throws IOException {
+    reader.start();
+    reader.getCurrent();
+  }
+
+  @Test
+  public void startReturnsTrueIfSomeDataAvailable() throws IOException,
+      TransientKinesisException {
+    when(firstIterator.next()).
+        thenReturn(CustomOptional.of(a)).
+        thenReturn(CustomOptional.<KinesisRecord>absent());
+
+    assertThat(reader.start()).isTrue();
+  }
+
+  @Test
+  public void advanceReturnsFalseIfThereIsTransientExceptionInKinesis()
+      throws IOException, TransientKinesisException {
+    reader.start();
+
+    when(firstIterator.next()).thenThrow(TransientKinesisException.class);
+
+    assertThat(reader.advance()).isFalse();
+  }
+
+  @Test
+  public void readsThroughAllDataAvailable() throws IOException, TransientKinesisException {
+    when(firstIterator.next()).
+        thenReturn(CustomOptional.<KinesisRecord>absent()).
+        thenReturn(CustomOptional.of(a)).
+        thenReturn(CustomOptional.<KinesisRecord>absent()).
+        thenReturn(CustomOptional.of(b)).
+        thenReturn(CustomOptional.<KinesisRecord>absent());
+
+    when(secondIterator.next()).
+        thenReturn(CustomOptional.of(c)).
+        thenReturn(CustomOptional.<KinesisRecord>absent()).
+        thenReturn(CustomOptional.of(d)).
+        thenReturn(CustomOptional.<KinesisRecord>absent());
+
+    assertThat(reader.start()).isTrue();
+    assertThat(reader.getCurrent()).isEqualTo(c);
+    assertThat(reader.advance()).isTrue();
+    assertThat(reader.getCurrent()).isEqualTo(a);
+    assertThat(reader.advance()).isTrue();
+    assertThat(reader.getCurrent()).isEqualTo(d);
+    assertThat(reader.advance()).isTrue();
+    assertThat(reader.getCurrent()).isEqualTo(b);
+    assertThat(reader.advance()).isFalse();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
index 8771c86..c9f01bb 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io.kinesis;
 
 import java.nio.ByteBuffer;
+
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.joda.time.Instant;
 import org.junit.Test;
@@ -26,20 +27,21 @@ import org.junit.Test;
  * Tests {@link KinesisRecordCoder}.
  */
 public class KinesisRecordCoderTest {
-    @Test
-    public void encodingAndDecodingWorks() throws Exception {
-        KinesisRecord record = new KinesisRecord(
-                ByteBuffer.wrap("data".getBytes()),
-                "sequence",
-                128L,
-                "partition",
-                Instant.now(),
-                Instant.now(),
-                "stream",
-                "shard"
-        );
-        CoderProperties.coderDecodeEncodeEqual(
-                new KinesisRecordCoder(), record
-        );
-    }
+
+  @Test
+  public void encodingAndDecodingWorks() throws Exception {
+    KinesisRecord record = new KinesisRecord(
+        ByteBuffer.wrap("data".getBytes()),
+        "sequence",
+        128L,
+        "partition",
+        Instant.now(),
+        Instant.now(),
+        "stream",
+        "shard"
+    );
+    CoderProperties.coderDecodeEncodeEqual(
+        new KinesisRecordCoder(), record
+    );
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
index 324de46..76bcb27 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
@@ -25,23 +25,28 @@ import org.apache.beam.sdk.testing.TestPipelineOptions;
  * Options for Kinesis integration tests.
  */
 public interface KinesisTestOptions extends TestPipelineOptions {
-    @Description("AWS region where Kinesis stream resided")
-    @Default.String("aws-kinesis-region")
-    String getAwsKinesisRegion();
-    void setAwsKinesisRegion(String value);
-
-    @Description("Kinesis stream name")
-    @Default.String("aws-kinesis-stream")
-    String getAwsKinesisStream();
-    void setAwsKinesisStream(String value);
-
-    @Description("AWS secret key")
-    @Default.String("aws-secret-key")
-    String getAwsSecretKey();
-    void setAwsSecretKey(String value);
-
-    @Description("AWS access key")
-    @Default.String("aws-access-key")
-    String getAwsAccessKey();
-    void setAwsAccessKey(String value);
+
+  @Description("AWS region where Kinesis stream resided")
+  @Default.String("aws-kinesis-region")
+  String getAwsKinesisRegion();
+
+  void setAwsKinesisRegion(String value);
+
+  @Description("Kinesis stream name")
+  @Default.String("aws-kinesis-stream")
+  String getAwsKinesisStream();
+
+  void setAwsKinesisStream(String value);
+
+  @Description("AWS secret key")
+  @Default.String("aws-secret-key")
+  String getAwsSecretKey();
+
+  void setAwsSecretKey(String value);
+
+  @Description("AWS access key")
+  @Default.String("aws-access-key")
+  String getAwsAccessKey();
+
+  void setAwsAccessKey(String value);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
index 7518ff7..7a7cb02 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
@@ -29,6 +29,7 @@ import com.amazonaws.services.kinesis.model.PutRecordsResult;
 import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
+
 import java.nio.ByteBuffer;
 import java.util.List;
 
@@ -37,47 +38,46 @@ import java.util.List;
  */
 public class KinesisUploader {
 
-    public static final int MAX_NUMBER_OF_RECORDS_IN_BATCH = 499;
-
-    public static void uploadAll(List<String> data, KinesisTestOptions options) {
-        AmazonKinesisClient client = new AmazonKinesisClient(
-                new StaticCredentialsProvider(
-                        new BasicAWSCredentials(
-                                options.getAwsAccessKey(), options.getAwsSecretKey()))
-        ).withRegion(Regions.fromName(options.getAwsKinesisRegion()));
+  public static final int MAX_NUMBER_OF_RECORDS_IN_BATCH = 499;
 
-        List<List<String>> partitions = Lists.partition(data, MAX_NUMBER_OF_RECORDS_IN_BATCH);
+  public static void uploadAll(List<String> data, KinesisTestOptions options) {
+    AmazonKinesisClient client = new AmazonKinesisClient(
+        new StaticCredentialsProvider(
+            new BasicAWSCredentials(
+                options.getAwsAccessKey(), options.getAwsSecretKey()))
+    ).withRegion(Regions.fromName(options.getAwsKinesisRegion()));
 
+    List<List<String>> partitions = Lists.partition(data, MAX_NUMBER_OF_RECORDS_IN_BATCH);
 
-        for (List<String> partition : partitions) {
-            List<PutRecordsRequestEntry> allRecords = newArrayList();
-            for (String row : partition) {
-                allRecords.add(new PutRecordsRequestEntry().
-                        withData(ByteBuffer.wrap(row.getBytes(Charsets.UTF_8))).
-                        withPartitionKey(Integer.toString(row.hashCode()))
+    for (List<String> partition : partitions) {
+      List<PutRecordsRequestEntry> allRecords = newArrayList();
+      for (String row : partition) {
+        allRecords.add(new PutRecordsRequestEntry().
+            withData(ByteBuffer.wrap(row.getBytes(Charsets.UTF_8))).
+            withPartitionKey(Integer.toString(row.hashCode()))
 
-                );
-            }
+        );
+      }
 
-            PutRecordsResult result;
-            do {
-                result = client.putRecords(
-                        new PutRecordsRequest().
-                                withStreamName(options.getAwsKinesisStream()).
-                                withRecords(allRecords));
-                List<PutRecordsRequestEntry> failedRecords = newArrayList();
-                int i = 0;
-                for (PutRecordsResultEntry row : result.getRecords()) {
-                    if (row.getErrorCode() != null) {
-                        failedRecords.add(allRecords.get(i));
-                    }
-                    ++i;
-                }
-                allRecords = failedRecords;
-            }
-
-            while (result.getFailedRecordCount() > 0);
+      PutRecordsResult result;
+      do {
+        result = client.putRecords(
+            new PutRecordsRequest().
+                withStreamName(options.getAwsKinesisStream()).
+                withRecords(allRecords));
+        List<PutRecordsRequestEntry> failedRecords = newArrayList();
+        int i = 0;
+        for (PutRecordsResultEntry row : result.getRecords()) {
+          if (row.getErrorCode() != null) {
+            failedRecords.add(allRecords.get(i));
+          }
+          ++i;
         }
+        allRecords = failedRecords;
+      }
+
+      while (result.getFailedRecordCount() > 0);
     }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
index f979c01..cb32562 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
@@ -20,47 +20,49 @@ package org.apache.beam.sdk.io.kinesis;
 import static org.mockito.BDDMockito.given;
 
 import com.google.common.collect.Lists;
+
 import java.util.Collections;
 import java.util.List;
+
 import org.assertj.core.api.Assertions;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
 
-
 /***
  */
 @RunWith(MockitoJUnitRunner.class)
 public class RecordFilterTest {
-    @Mock
-    private ShardCheckpoint checkpoint;
-    @Mock
-    private KinesisRecord record1, record2, record3, record4, record5;
 
-    @Test
-    public void shouldFilterOutRecordsBeforeOrAtCheckpoint() {
-        given(checkpoint.isBeforeOrAt(record1)).willReturn(false);
-        given(checkpoint.isBeforeOrAt(record2)).willReturn(true);
-        given(checkpoint.isBeforeOrAt(record3)).willReturn(true);
-        given(checkpoint.isBeforeOrAt(record4)).willReturn(false);
-        given(checkpoint.isBeforeOrAt(record5)).willReturn(true);
-        List<KinesisRecord> records = Lists.newArrayList(record1, record2,
-                record3, record4, record5);
-        RecordFilter underTest = new RecordFilter();
+  @Mock
+  private ShardCheckpoint checkpoint;
+  @Mock
+  private KinesisRecord record1, record2, record3, record4, record5;
+
+  @Test
+  public void shouldFilterOutRecordsBeforeOrAtCheckpoint() {
+    given(checkpoint.isBeforeOrAt(record1)).willReturn(false);
+    given(checkpoint.isBeforeOrAt(record2)).willReturn(true);
+    given(checkpoint.isBeforeOrAt(record3)).willReturn(true);
+    given(checkpoint.isBeforeOrAt(record4)).willReturn(false);
+    given(checkpoint.isBeforeOrAt(record5)).willReturn(true);
+    List<KinesisRecord> records = Lists.newArrayList(record1, record2,
+        record3, record4, record5);
+    RecordFilter underTest = new RecordFilter();
 
-        List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint);
+    List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint);
 
-        Assertions.assertThat(retainedRecords).containsOnly(record2, record3, record5);
-    }
+    Assertions.assertThat(retainedRecords).containsOnly(record2, record3, record5);
+  }
 
-    @Test
-    public void shouldNotFailOnEmptyList() {
-        List<KinesisRecord> records = Collections.emptyList();
-        RecordFilter underTest = new RecordFilter();
+  @Test
+  public void shouldNotFailOnEmptyList() {
+    List<KinesisRecord> records = Collections.emptyList();
+    RecordFilter underTest = new RecordFilter();
 
-        List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint);
+    List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint);
 
-        Assertions.assertThat(retainedRecords).isEmpty();
-    }
+    Assertions.assertThat(retainedRecords).isEmpty();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
index f032eea..e4abce4 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
@@ -22,36 +22,38 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import java.util.Collections;
 import java.util.List;
+
 import org.junit.Test;
 
 /**
  * Tests {@link RoundRobin}.
  */
 public class RoundRobinTest {
-    @Test(expected = IllegalArgumentException.class)
-    public void doesNotAllowCreationWithEmptyCollection() {
-        new RoundRobin<>(Collections.emptyList());
-    }
 
-    @Test
-    public void goesThroughElementsInCycle() {
-        List<String> input = newArrayList("a", "b", "c");
+  @Test(expected = IllegalArgumentException.class)
+  public void doesNotAllowCreationWithEmptyCollection() {
+    new RoundRobin<>(Collections.emptyList());
+  }
 
-        RoundRobin<String> roundRobin = new RoundRobin<>(newArrayList(input));
+  @Test
+  public void goesThroughElementsInCycle() {
+    List<String> input = newArrayList("a", "b", "c");
 
-        input.addAll(input);  // duplicate the input
-        for (String element : input) {
-            assertThat(roundRobin.getCurrent()).isEqualTo(element);
-            assertThat(roundRobin.getCurrent()).isEqualTo(element);
-            roundRobin.moveForward();
-        }
+    RoundRobin<String> roundRobin = new RoundRobin<>(newArrayList(input));
+
+    input.addAll(input);  // duplicate the input
+    for (String element : input) {
+      assertThat(roundRobin.getCurrent()).isEqualTo(element);
+      assertThat(roundRobin.getCurrent()).isEqualTo(element);
+      roundRobin.moveForward();
     }
+  }
 
-    @Test
-    public void usualIteratorGoesThroughElementsOnce() {
-        List<String> input = newArrayList("a", "b", "c");
+  @Test
+  public void usualIteratorGoesThroughElementsOnce() {
+    List<String> input = newArrayList("a", "b", "c");
 
-        RoundRobin<String> roundRobin = new RoundRobin<>(input);
-        assertThat(roundRobin).hasSize(3).containsOnly(input.toArray(new String[0]));
-    }
+    RoundRobin<String> roundRobin = new RoundRobin<>(input);
+    assertThat(roundRobin).hasSize(3).containsOnly(input.toArray(new String[0]));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
index 39ab36f..d4784c4 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
@@ -32,7 +32,9 @@ import static org.mockito.Mockito.when;
 
 import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
+
 import java.io.IOException;
+
 import org.joda.time.DateTime;
 import org.joda.time.Instant;
 import org.junit.Before;
@@ -46,104 +48,105 @@ import org.mockito.runners.MockitoJUnitRunner;
  */
 @RunWith(MockitoJUnitRunner.class)
 public class ShardCheckpointTest {
-    private static final String AT_SEQUENCE_SHARD_IT = "AT_SEQUENCE_SHARD_IT";
-    private static final String AFTER_SEQUENCE_SHARD_IT = "AFTER_SEQUENCE_SHARD_IT";
-    private static final String STREAM_NAME = "STREAM";
-    private static final String SHARD_ID = "SHARD_ID";
-    @Mock
-    private SimplifiedKinesisClient client;
-
-    @Before
-    public void setUp() throws IOException, TransientKinesisException {
-        when(client.getShardIterator(
-                eq(STREAM_NAME), eq(SHARD_ID), eq(AT_SEQUENCE_NUMBER),
-                anyString(), isNull(Instant.class))).
-                thenReturn(AT_SEQUENCE_SHARD_IT);
-        when(client.getShardIterator(
-                eq(STREAM_NAME), eq(SHARD_ID), eq(AFTER_SEQUENCE_NUMBER),
-                anyString(), isNull(Instant.class))).
-                thenReturn(AFTER_SEQUENCE_SHARD_IT);
-    }
-
-    @Test
-    public void testProvidingShardIterator() throws IOException, TransientKinesisException {
-        assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", null).getShardIterator(client))
-                .isEqualTo(AT_SEQUENCE_SHARD_IT);
-        assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", null).getShardIterator(client))
-                .isEqualTo(AFTER_SEQUENCE_SHARD_IT);
-        assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client)).isEqualTo
-                (AT_SEQUENCE_SHARD_IT);
-        assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client))
-                .isEqualTo(AT_SEQUENCE_SHARD_IT);
-    }
-
-    @Test
-    public void testComparisonWithExtendedSequenceNumber() {
-        assertThat(new ShardCheckpoint("", "", new StartingPoint(LATEST)).isBeforeOrAt(
-                recordWith(new ExtendedSequenceNumber("100", 0L))
-        )).isTrue();
-
-        assertThat(new ShardCheckpoint("", "", new StartingPoint(TRIM_HORIZON)).isBeforeOrAt(
-                recordWith(new ExtendedSequenceNumber("100", 0L))
-        )).isTrue();
-
-        assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "10", 1L).isBeforeOrAt(
-                recordWith(new ExtendedSequenceNumber("100", 0L))
-        )).isTrue();
-
-        assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
-                recordWith(new ExtendedSequenceNumber("100", 0L))
-        )).isTrue();
-
-        assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
-                recordWith(new ExtendedSequenceNumber("100", 0L))
-        )).isFalse();
-
-        assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 1L).isBeforeOrAt(
-                recordWith(new ExtendedSequenceNumber("100", 0L))
-        )).isFalse();
-
-        assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
-                recordWith(new ExtendedSequenceNumber("99", 1L))
-        )).isFalse();
-    }
-
-    @Test
-    public void testComparisonWithTimestamp() {
-        DateTime referenceTimestamp = DateTime.now();
-
-        assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
-                .isBeforeOrAt(recordWith(referenceTimestamp.minusMillis(10).toInstant()))
-        ).isFalse();
-
-        assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
-                .isBeforeOrAt(recordWith(referenceTimestamp.toInstant()))
-        ).isTrue();
-
-        assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
-                .isBeforeOrAt(recordWith(referenceTimestamp.plusMillis(10).toInstant()))
-        ).isTrue();
-    }
-
-    private KinesisRecord recordWith(ExtendedSequenceNumber extendedSequenceNumber) {
-        KinesisRecord record = mock(KinesisRecord.class);
-        given(record.getExtendedSequenceNumber()).willReturn(extendedSequenceNumber);
-        return record;
-    }
-
-    private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, String sequenceNumber,
-                                       Long subSequenceNumber) {
-        return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, sequenceNumber,
-                subSequenceNumber);
-    }
-
-    private KinesisRecord recordWith(Instant approximateArrivalTimestamp) {
-        KinesisRecord record = mock(KinesisRecord.class);
-        given(record.getApproximateArrivalTimestamp()).willReturn(approximateArrivalTimestamp);
-        return record;
-    }
-
-    private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, Instant timestamp) {
-        return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, timestamp);
-    }
+
+  private static final String AT_SEQUENCE_SHARD_IT = "AT_SEQUENCE_SHARD_IT";
+  private static final String AFTER_SEQUENCE_SHARD_IT = "AFTER_SEQUENCE_SHARD_IT";
+  private static final String STREAM_NAME = "STREAM";
+  private static final String SHARD_ID = "SHARD_ID";
+  @Mock
+  private SimplifiedKinesisClient client;
+
+  @Before
+  public void setUp() throws IOException, TransientKinesisException {
+    when(client.getShardIterator(
+        eq(STREAM_NAME), eq(SHARD_ID), eq(AT_SEQUENCE_NUMBER),
+        anyString(), isNull(Instant.class))).
+        thenReturn(AT_SEQUENCE_SHARD_IT);
+    when(client.getShardIterator(
+        eq(STREAM_NAME), eq(SHARD_ID), eq(AFTER_SEQUENCE_NUMBER),
+        anyString(), isNull(Instant.class))).
+        thenReturn(AFTER_SEQUENCE_SHARD_IT);
+  }
+
+  @Test
+  public void testProvidingShardIterator() throws IOException, TransientKinesisException {
+    assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", null).getShardIterator(client))
+        .isEqualTo(AT_SEQUENCE_SHARD_IT);
+    assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", null).getShardIterator(client))
+        .isEqualTo(AFTER_SEQUENCE_SHARD_IT);
+    assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client)).isEqualTo
+        (AT_SEQUENCE_SHARD_IT);
+    assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client))
+        .isEqualTo(AT_SEQUENCE_SHARD_IT);
+  }
+
+  @Test
+  public void testComparisonWithExtendedSequenceNumber() {
+    assertThat(new ShardCheckpoint("", "", new StartingPoint(LATEST)).isBeforeOrAt(
+        recordWith(new ExtendedSequenceNumber("100", 0L))
+    )).isTrue();
+
+    assertThat(new ShardCheckpoint("", "", new StartingPoint(TRIM_HORIZON)).isBeforeOrAt(
+        recordWith(new ExtendedSequenceNumber("100", 0L))
+    )).isTrue();
+
+    assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "10", 1L).isBeforeOrAt(
+        recordWith(new ExtendedSequenceNumber("100", 0L))
+    )).isTrue();
+
+    assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
+        recordWith(new ExtendedSequenceNumber("100", 0L))
+    )).isTrue();
+
+    assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
+        recordWith(new ExtendedSequenceNumber("100", 0L))
+    )).isFalse();
+
+    assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 1L).isBeforeOrAt(
+        recordWith(new ExtendedSequenceNumber("100", 0L))
+    )).isFalse();
+
+    assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
+        recordWith(new ExtendedSequenceNumber("99", 1L))
+    )).isFalse();
+  }
+
+  @Test
+  public void testComparisonWithTimestamp() {
+    DateTime referenceTimestamp = DateTime.now();
+
+    assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
+        .isBeforeOrAt(recordWith(referenceTimestamp.minusMillis(10).toInstant()))
+    ).isFalse();
+
+    assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
+        .isBeforeOrAt(recordWith(referenceTimestamp.toInstant()))
+    ).isTrue();
+
+    assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
+        .isBeforeOrAt(recordWith(referenceTimestamp.plusMillis(10).toInstant()))
+    ).isTrue();
+  }
+
+  private KinesisRecord recordWith(ExtendedSequenceNumber extendedSequenceNumber) {
+    KinesisRecord record = mock(KinesisRecord.class);
+    given(record.getExtendedSequenceNumber()).willReturn(extendedSequenceNumber);
+    return record;
+  }
+
+  private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, String sequenceNumber,
+      Long subSequenceNumber) {
+    return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, sequenceNumber,
+        subSequenceNumber);
+  }
+
+  private KinesisRecord recordWith(Instant approximateArrivalTimestamp) {
+    KinesisRecord record = mock(KinesisRecord.class);
+    given(record.getApproximateArrivalTimestamp()).willReturn(approximateArrivalTimestamp);
+    return record;
+  }
+
+  private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, Instant timestamp) {
+    return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, timestamp);
+  }
 }