You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 17:09:33 UTC

[06/28] beam git commit: Revert "[BEAM-2610] This closes #3553"

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
index 80c950f..3e3984a 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
+
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
@@ -30,11 +31,9 @@ import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import com.amazonaws.services.kinesis.model.StreamDescription;
 import com.google.common.collect.Lists;
-
 import java.util.Date;
 import java.util.List;
 import java.util.concurrent.Callable;
-
 import org.joda.time.Instant;
 
 /**
@@ -42,121 +41,117 @@ import org.joda.time.Instant;
  * proper error handling.
  */
 class SimplifiedKinesisClient {
+    private final AmazonKinesis kinesis;
 
-  private final AmazonKinesis kinesis;
-
-  public SimplifiedKinesisClient(AmazonKinesis kinesis) {
-    this.kinesis = kinesis;
-  }
-
-  public static SimplifiedKinesisClient from(KinesisClientProvider provider) {
-    return new SimplifiedKinesisClient(provider.get());
-  }
-
-  public String getShardIterator(final String streamName, final String shardId,
-      final ShardIteratorType shardIteratorType,
-      final String startingSequenceNumber, final Instant timestamp)
-      throws TransientKinesisException {
-    final Date date = timestamp != null ? timestamp.toDate() : null;
-    return wrapExceptions(new Callable<String>() {
-
-      @Override
-      public String call() throws Exception {
-        return kinesis.getShardIterator(new GetShardIteratorRequest()
-            .withStreamName(streamName)
-            .withShardId(shardId)
-            .withShardIteratorType(shardIteratorType)
-            .withStartingSequenceNumber(startingSequenceNumber)
-            .withTimestamp(date)
-        ).getShardIterator();
-      }
-    });
-  }
-
-  public List<Shard> listShards(final String streamName) throws TransientKinesisException {
-    return wrapExceptions(new Callable<List<Shard>>() {
-
-      @Override
-      public List<Shard> call() throws Exception {
-        List<Shard> shards = Lists.newArrayList();
-        String lastShardId = null;
-
-        StreamDescription description;
-        do {
-          description = kinesis.describeStream(streamName, lastShardId)
-              .getStreamDescription();
+    public SimplifiedKinesisClient(AmazonKinesis kinesis) {
+        this.kinesis = kinesis;
+    }
 
-          shards.addAll(description.getShards());
-          lastShardId = shards.get(shards.size() - 1).getShardId();
-        } while (description.getHasMoreShards());
+    public static SimplifiedKinesisClient from(KinesisClientProvider provider) {
+        return new SimplifiedKinesisClient(provider.get());
+    }
 
-        return shards;
-      }
-    });
-  }
+    public String getShardIterator(final String streamName, final String shardId,
+                                   final ShardIteratorType shardIteratorType,
+                                   final String startingSequenceNumber, final Instant timestamp)
+            throws TransientKinesisException {
+        final Date date = timestamp != null ? timestamp.toDate() : null;
+        return wrapExceptions(new Callable<String>() {
+            @Override
+            public String call() throws Exception {
+                return kinesis.getShardIterator(new GetShardIteratorRequest()
+                        .withStreamName(streamName)
+                        .withShardId(shardId)
+                        .withShardIteratorType(shardIteratorType)
+                        .withStartingSequenceNumber(startingSequenceNumber)
+                        .withTimestamp(date)
+                ).getShardIterator();
+            }
+        });
+    }
 
-  /**
-   * Gets records from Kinesis and deaggregates them if needed.
-   *
-   * @return list of deaggregated records
-   * @throws TransientKinesisException - in case of recoverable situation
-   */
-  public GetKinesisRecordsResult getRecords(String shardIterator, String streamName,
-      String shardId) throws TransientKinesisException {
-    return getRecords(shardIterator, streamName, shardId, null);
-  }
+    public List<Shard> listShards(final String streamName) throws TransientKinesisException {
+        return wrapExceptions(new Callable<List<Shard>>() {
+            @Override
+            public List<Shard> call() throws Exception {
+                List<Shard> shards = Lists.newArrayList();
+                String lastShardId = null;
+
+                StreamDescription description;
+                do {
+                    description = kinesis.describeStream(streamName, lastShardId)
+                            .getStreamDescription();
+
+                    shards.addAll(description.getShards());
+                    lastShardId = shards.get(shards.size() - 1).getShardId();
+                } while (description.getHasMoreShards());
+
+                return shards;
+            }
+        });
+    }
 
-  /**
-   * Gets records from Kinesis and deaggregates them if needed.
-   *
-   * @return list of deaggregated records
-   * @throws TransientKinesisException - in case of recoverable situation
-   */
-  public GetKinesisRecordsResult getRecords(final String shardIterator, final String streamName,
-      final String shardId, final Integer limit)
-      throws
-      TransientKinesisException {
-    return wrapExceptions(new Callable<GetKinesisRecordsResult>() {
+    /**
+     * Gets records from Kinesis and deaggregates them if needed.
+     *
+     * @return list of deaggregated records
+     * @throws TransientKinesisException - in case of recoverable situation
+     */
+    public GetKinesisRecordsResult getRecords(String shardIterator, String streamName,
+                                              String shardId) throws TransientKinesisException {
+        return getRecords(shardIterator, streamName, shardId, null);
+    }
 
-      @Override
-      public GetKinesisRecordsResult call() throws Exception {
-        GetRecordsResult response = kinesis.getRecords(new GetRecordsRequest()
-            .withShardIterator(shardIterator)
-            .withLimit(limit));
-        return new GetKinesisRecordsResult(
-            UserRecord.deaggregate(response.getRecords()),
-            response.getNextShardIterator(),
-            streamName, shardId);
-      }
-    });
-  }
+    /**
+     * Gets records from Kinesis and deaggregates them if needed.
+     *
+     * @return list of deaggregated records
+     * @throws TransientKinesisException - in case of recoverable situation
+     */
+    public GetKinesisRecordsResult getRecords(final String shardIterator, final String streamName,
+                                              final String shardId, final Integer limit)
+            throws
+            TransientKinesisException {
+        return wrapExceptions(new Callable<GetKinesisRecordsResult>() {
+            @Override
+            public GetKinesisRecordsResult call() throws Exception {
+                GetRecordsResult response = kinesis.getRecords(new GetRecordsRequest()
+                        .withShardIterator(shardIterator)
+                        .withLimit(limit));
+                return new GetKinesisRecordsResult(
+                        UserRecord.deaggregate(response.getRecords()),
+                        response.getNextShardIterator(),
+                        streamName, shardId);
+            }
+        });
+    }
 
-  /**
-   * Wraps Amazon specific exceptions into more friendly format.
-   *
-   * @throws TransientKinesisException              - in case of recoverable situation, i.e.
-   *                                  the request rate is too high, Kinesis remote service
-   *                                  failed, network issue, etc.
-   * @throws ExpiredIteratorException - if iterator needs to be refreshed
-   * @throws RuntimeException         - in all other cases
-   */
-  private <T> T wrapExceptions(Callable<T> callable) throws TransientKinesisException {
-    try {
-      return callable.call();
-    } catch (ExpiredIteratorException e) {
-      throw e;
-    } catch (LimitExceededException | ProvisionedThroughputExceededException e) {
-      throw new TransientKinesisException(
-          "Too many requests to Kinesis. Wait some time and retry.", e);
-    } catch (AmazonServiceException e) {
-      if (e.getErrorType() == AmazonServiceException.ErrorType.Service) {
-        throw new TransientKinesisException(
-            "Kinesis backend failed. Wait some time and retry.", e);
-      }
-      throw new RuntimeException("Kinesis client side failure", e);
-    } catch (Exception e) {
-      throw new RuntimeException("Unknown kinesis failure, when trying to reach kinesis", e);
+    /**
+     * Wraps Amazon specific exceptions into more friendly format.
+     *
+     * @throws TransientKinesisException              - in case of recoverable situation, i.e.
+     *                                  the request rate is too high, Kinesis remote service
+     *                                  failed, network issue, etc.
+     * @throws ExpiredIteratorException - if iterator needs to be refreshed
+     * @throws RuntimeException         - in all other cases
+     */
+    private <T> T wrapExceptions(Callable<T> callable) throws TransientKinesisException {
+        try {
+            return callable.call();
+        } catch (ExpiredIteratorException e) {
+            throw e;
+        } catch (LimitExceededException | ProvisionedThroughputExceededException e) {
+            throw new TransientKinesisException(
+                    "Too many requests to Kinesis. Wait some time and retry.", e);
+        } catch (AmazonServiceException e) {
+            if (e.getErrorType() == AmazonServiceException.ErrorType.Service) {
+                throw new TransientKinesisException(
+                        "Kinesis backend failed. Wait some time and retry.", e);
+            }
+            throw new RuntimeException("Kinesis client side failure", e);
+        } catch (Exception e) {
+            throw new RuntimeException("Unknown kinesis failure, when trying to reach kinesis", e);
+        }
     }
-  }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
index f9298fa..d8842c4 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
@@ -17,14 +17,13 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
+
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
-
 import java.io.Serializable;
 import java.util.Objects;
-
 import org.joda.time.Instant;
 
 /**
@@ -33,55 +32,54 @@ import org.joda.time.Instant;
  * in which case the reader will start reading at the specified point in time.
  */
 class StartingPoint implements Serializable {
+    private final InitialPositionInStream position;
+    private final Instant timestamp;
 
-  private final InitialPositionInStream position;
-  private final Instant timestamp;
-
-  public StartingPoint(InitialPositionInStream position) {
-    this.position = checkNotNull(position, "position");
-    this.timestamp = null;
-  }
-
-  public StartingPoint(Instant timestamp) {
-    this.timestamp = checkNotNull(timestamp, "timestamp");
-    this.position = null;
-  }
+    public StartingPoint(InitialPositionInStream position) {
+        this.position = checkNotNull(position, "position");
+        this.timestamp = null;
+    }
 
-  public InitialPositionInStream getPosition() {
-    return position;
-  }
+    public StartingPoint(Instant timestamp) {
+        this.timestamp = checkNotNull(timestamp, "timestamp");
+        this.position = null;
+    }
 
-  public String getPositionName() {
-    return position != null ? position.name() : ShardIteratorType.AT_TIMESTAMP.name();
-  }
+    public InitialPositionInStream getPosition() {
+        return position;
+    }
 
-  public Instant getTimestamp() {
-    return timestamp != null ? timestamp : null;
-  }
+    public String getPositionName() {
+        return position != null ? position.name() : ShardIteratorType.AT_TIMESTAMP.name();
+    }
 
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
+    public Instant getTimestamp() {
+        return timestamp != null ? timestamp : null;
     }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        StartingPoint that = (StartingPoint) o;
+        return position == that.position && Objects.equals(timestamp, that.timestamp);
     }
-    StartingPoint that = (StartingPoint) o;
-    return position == that.position && Objects.equals(timestamp, that.timestamp);
-  }
 
-  @Override
-  public int hashCode() {
-    return Objects.hash(position, timestamp);
-  }
+    @Override
+    public int hashCode() {
+        return Objects.hash(position, timestamp);
+    }
 
-  @Override
-  public String toString() {
-    if (timestamp == null) {
-      return position.toString();
-    } else {
-      return "Starting at timestamp " + timestamp;
+    @Override
+    public String toString() {
+        if (timestamp == null) {
+            return position.toString();
+        } else {
+            return "Starting at timestamp " + timestamp;
+        }
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java
index 1ec865d..22dc973 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java
@@ -23,21 +23,20 @@ import static com.google.common.base.Preconditions.checkNotNull;
  * Always returns the same instance of checkpoint.
  */
 class StaticCheckpointGenerator implements CheckpointGenerator {
+    private final KinesisReaderCheckpoint checkpoint;
 
-  private final KinesisReaderCheckpoint checkpoint;
+    public StaticCheckpointGenerator(KinesisReaderCheckpoint checkpoint) {
+        checkNotNull(checkpoint, "checkpoint");
+        this.checkpoint = checkpoint;
+    }
 
-  public StaticCheckpointGenerator(KinesisReaderCheckpoint checkpoint) {
-    checkNotNull(checkpoint, "checkpoint");
-    this.checkpoint = checkpoint;
-  }
+    @Override
+    public KinesisReaderCheckpoint generate(SimplifiedKinesisClient client) {
+        return checkpoint;
+    }
 
-  @Override
-  public KinesisReaderCheckpoint generate(SimplifiedKinesisClient client) {
-    return checkpoint;
-  }
-
-  @Override
-  public String toString() {
-    return checkpoint.toString();
-  }
+    @Override
+    public String toString() {
+        return checkpoint.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
index 68ca0d7..57ad8a8 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
@@ -23,8 +23,7 @@ import com.amazonaws.AmazonServiceException;
  * A transient exception thrown by Kinesis.
  */
 class TransientKinesisException extends Exception {
-
-  public TransientKinesisException(String s, AmazonServiceException e) {
-    super(s, e);
-  }
+    public TransientKinesisException(String s, AmazonServiceException e) {
+        super(s, e);
+    }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
index 994d6e3..046c9d9 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
@@ -66,12 +66,10 @@ import com.amazonaws.services.kinesis.model.SplitShardRequest;
 import com.amazonaws.services.kinesis.model.SplitShardResult;
 import com.amazonaws.services.kinesis.model.StreamDescription;
 import com.google.common.base.Function;
-
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.List;
 import javax.annotation.Nullable;
-
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.joda.time.Instant;
 
@@ -80,301 +78,298 @@ import org.joda.time.Instant;
  */
 class AmazonKinesisMock implements AmazonKinesis {
 
-  static class TestData implements Serializable {
+    static class TestData implements Serializable {
+        private final String data;
+        private final Instant arrivalTimestamp;
+        private final String sequenceNumber;
+
+        public TestData(KinesisRecord record) {
+            this(new String(record.getData().array()),
+                    record.getApproximateArrivalTimestamp(),
+                    record.getSequenceNumber());
+        }
+
+        public TestData(String data, Instant arrivalTimestamp, String sequenceNumber) {
+            this.data = data;
+            this.arrivalTimestamp = arrivalTimestamp;
+            this.sequenceNumber = sequenceNumber;
+        }
+
+        public Record convertToRecord() {
+            return new Record().
+                    withApproximateArrivalTimestamp(arrivalTimestamp.toDate()).
+                    withData(ByteBuffer.wrap(data.getBytes())).
+                    withSequenceNumber(sequenceNumber).
+                    withPartitionKey("");
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            return EqualsBuilder.reflectionEquals(this, obj);
+        }
+
+        @Override
+        public int hashCode() {
+            return reflectionHashCode(this);
+        }
+    }
+
+    static class Provider implements KinesisClientProvider {
+
+        private final List<List<TestData>> shardedData;
+        private final int numberOfRecordsPerGet;
+
+        public Provider(List<List<TestData>> shardedData, int numberOfRecordsPerGet) {
+            this.shardedData = shardedData;
+            this.numberOfRecordsPerGet = numberOfRecordsPerGet;
+        }
+
+        @Override
+        public AmazonKinesis get() {
+            return new AmazonKinesisMock(transform(shardedData,
+                    new Function<List<TestData>, List<Record>>() {
+                        @Override
+                        public List<Record> apply(@Nullable List<TestData> testDatas) {
+                            return transform(testDatas, new Function<TestData, Record>() {
+                                @Override
+                                public Record apply(@Nullable TestData testData) {
+                                    return testData.convertToRecord();
+                                }
+                            });
+                        }
+                    }), numberOfRecordsPerGet);
+        }
+    }
 
-    private final String data;
-    private final Instant arrivalTimestamp;
-    private final String sequenceNumber;
+    private final List<List<Record>> shardedData;
+    private final int numberOfRecordsPerGet;
 
-    public TestData(KinesisRecord record) {
-      this(new String(record.getData().array()),
-          record.getApproximateArrivalTimestamp(),
-          record.getSequenceNumber());
+    public AmazonKinesisMock(List<List<Record>> shardedData, int numberOfRecordsPerGet) {
+        this.shardedData = shardedData;
+        this.numberOfRecordsPerGet = numberOfRecordsPerGet;
     }
 
-    public TestData(String data, Instant arrivalTimestamp, String sequenceNumber) {
-      this.data = data;
-      this.arrivalTimestamp = arrivalTimestamp;
-      this.sequenceNumber = sequenceNumber;
+    @Override
+    public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) {
+        String[] shardIteratorParts = getRecordsRequest.getShardIterator().split(":");
+        int shardId = parseInt(shardIteratorParts[0]);
+        int startingRecord = parseInt(shardIteratorParts[1]);
+        List<Record> shardData = shardedData.get(shardId);
+
+        int toIndex = min(startingRecord + numberOfRecordsPerGet, shardData.size());
+        int fromIndex = min(startingRecord, toIndex);
+        return new GetRecordsResult().
+                withRecords(shardData.subList(fromIndex, toIndex)).
+                withNextShardIterator(String.format("%s:%s", shardId, toIndex));
     }
 
-    public Record convertToRecord() {
-      return new Record().
-          withApproximateArrivalTimestamp(arrivalTimestamp.toDate()).
-          withData(ByteBuffer.wrap(data.getBytes())).
-          withSequenceNumber(sequenceNumber).
-          withPartitionKey("");
+    @Override
+    public GetShardIteratorResult getShardIterator(
+            GetShardIteratorRequest getShardIteratorRequest) {
+        ShardIteratorType shardIteratorType = ShardIteratorType.fromValue(
+                getShardIteratorRequest.getShardIteratorType());
+
+        String shardIterator;
+        if (shardIteratorType == ShardIteratorType.TRIM_HORIZON) {
+            shardIterator = String.format("%s:%s", getShardIteratorRequest.getShardId(), 0);
+        } else {
+            throw new RuntimeException("Not implemented");
+        }
+
+        return new GetShardIteratorResult().withShardIterator(shardIterator);
     }
 
     @Override
-    public boolean equals(Object obj) {
-      return EqualsBuilder.reflectionEquals(this, obj);
+    public DescribeStreamResult describeStream(String streamName, String exclusiveStartShardId) {
+        int nextShardId = 0;
+        if (exclusiveStartShardId != null) {
+            nextShardId = parseInt(exclusiveStartShardId) + 1;
+        }
+        boolean hasMoreShards = nextShardId + 1 < shardedData.size();
+
+        List<Shard> shards = newArrayList();
+        if (nextShardId < shardedData.size()) {
+            shards.add(new Shard().withShardId(Integer.toString(nextShardId)));
+        }
+
+        return new DescribeStreamResult().withStreamDescription(
+                new StreamDescription().withHasMoreShards(hasMoreShards).withShards(shards)
+        );
     }
 
     @Override
-    public int hashCode() {
-      return reflectionHashCode(this);
+    public void setEndpoint(String endpoint) {
+
     }
-  }
 
-  static class Provider implements KinesisClientProvider {
+    @Override
+    public void setRegion(Region region) {
 
-    private final List<List<TestData>> shardedData;
-    private final int numberOfRecordsPerGet;
+    }
+
+    @Override
+    public AddTagsToStreamResult addTagsToStream(AddTagsToStreamRequest addTagsToStreamRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public CreateStreamResult createStream(CreateStreamRequest createStreamRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public CreateStreamResult createStream(String streamName, Integer shardCount) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public DecreaseStreamRetentionPeriodResult decreaseStreamRetentionPeriod(
+            DecreaseStreamRetentionPeriodRequest decreaseStreamRetentionPeriodRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public DeleteStreamResult deleteStream(DeleteStreamRequest deleteStreamRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public DeleteStreamResult deleteStream(String streamName) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public DescribeStreamResult describeStream(DescribeStreamRequest describeStreamRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public DescribeStreamResult describeStream(String streamName) {
+
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public DescribeStreamResult describeStream(String streamName,
+                                               Integer limit, String exclusiveStartShardId) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public DisableEnhancedMonitoringResult disableEnhancedMonitoring(
+            DisableEnhancedMonitoringRequest disableEnhancedMonitoringRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public EnableEnhancedMonitoringResult enableEnhancedMonitoring(
+            EnableEnhancedMonitoringRequest enableEnhancedMonitoringRequest) {
+        throw new RuntimeException("Not implemented");
+    }
 
-    public Provider(List<List<TestData>> shardedData, int numberOfRecordsPerGet) {
-      this.shardedData = shardedData;
-      this.numberOfRecordsPerGet = numberOfRecordsPerGet;
+    @Override
+    public GetShardIteratorResult getShardIterator(String streamName,
+                                                   String shardId,
+                                                   String shardIteratorType) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public GetShardIteratorResult getShardIterator(String streamName,
+                                                   String shardId,
+                                                   String shardIteratorType,
+                                                   String startingSequenceNumber) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public IncreaseStreamRetentionPeriodResult increaseStreamRetentionPeriod(
+            IncreaseStreamRetentionPeriodRequest increaseStreamRetentionPeriodRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public ListStreamsResult listStreams(ListStreamsRequest listStreamsRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public ListStreamsResult listStreams() {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public ListStreamsResult listStreams(String exclusiveStartStreamName) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public ListStreamsResult listStreams(Integer limit, String exclusiveStartStreamName) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public ListTagsForStreamResult listTagsForStream(
+            ListTagsForStreamRequest listTagsForStreamRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public MergeShardsResult mergeShards(MergeShardsRequest mergeShardsRequest) {
+        throw new RuntimeException("Not implemented");
     }
 
     @Override
-    public AmazonKinesis get() {
-      return new AmazonKinesisMock(transform(shardedData,
-          new Function<List<TestData>, List<Record>>() {
+    public MergeShardsResult mergeShards(String streamName,
+                                         String shardToMerge, String adjacentShardToMerge) {
+        throw new RuntimeException("Not implemented");
+    }
 
-            @Override
-            public List<Record> apply(@Nullable List<TestData> testDatas) {
-              return transform(testDatas, new Function<TestData, Record>() {
+    @Override
+    public PutRecordResult putRecord(PutRecordRequest putRecordRequest) {
+        throw new RuntimeException("Not implemented");
+    }
 
-                @Override
-                public Record apply(@Nullable TestData testData) {
-                  return testData.convertToRecord();
-                }
-              });
-            }
-          }), numberOfRecordsPerGet);
+    @Override
+    public PutRecordResult putRecord(String streamName, ByteBuffer data, String partitionKey) {
+        throw new RuntimeException("Not implemented");
     }
-  }
 
-  private final List<List<Record>> shardedData;
-  private final int numberOfRecordsPerGet;
+    @Override
+    public PutRecordResult putRecord(String streamName, ByteBuffer data,
+                                     String partitionKey, String sequenceNumberForOrdering) {
+        throw new RuntimeException("Not implemented");
+    }
 
-  public AmazonKinesisMock(List<List<Record>> shardedData, int numberOfRecordsPerGet) {
-    this.shardedData = shardedData;
-    this.numberOfRecordsPerGet = numberOfRecordsPerGet;
-  }
+    @Override
+    public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest) {
+        throw new RuntimeException("Not implemented");
+    }
 
-  @Override
-  public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) {
-    String[] shardIteratorParts = getRecordsRequest.getShardIterator().split(":");
-    int shardId = parseInt(shardIteratorParts[0]);
-    int startingRecord = parseInt(shardIteratorParts[1]);
-    List<Record> shardData = shardedData.get(shardId);
+    @Override
+    public RemoveTagsFromStreamResult removeTagsFromStream(
+            RemoveTagsFromStreamRequest removeTagsFromStreamRequest) {
+        throw new RuntimeException("Not implemented");
+    }
 
-    int toIndex = min(startingRecord + numberOfRecordsPerGet, shardData.size());
-    int fromIndex = min(startingRecord, toIndex);
-    return new GetRecordsResult().
-        withRecords(shardData.subList(fromIndex, toIndex)).
-        withNextShardIterator(String.format("%s:%s", shardId, toIndex));
-  }
+    @Override
+    public SplitShardResult splitShard(SplitShardRequest splitShardRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public SplitShardResult splitShard(String streamName,
+                                       String shardToSplit, String newStartingHashKey) {
+        throw new RuntimeException("Not implemented");
+    }
 
-  @Override
-  public GetShardIteratorResult getShardIterator(
-      GetShardIteratorRequest getShardIteratorRequest) {
-    ShardIteratorType shardIteratorType = ShardIteratorType.fromValue(
-        getShardIteratorRequest.getShardIteratorType());
-
-    String shardIterator;
-    if (shardIteratorType == ShardIteratorType.TRIM_HORIZON) {
-      shardIterator = String.format("%s:%s", getShardIteratorRequest.getShardId(), 0);
-    } else {
-      throw new RuntimeException("Not implemented");
-    }
-
-    return new GetShardIteratorResult().withShardIterator(shardIterator);
-  }
-
-  @Override
-  public DescribeStreamResult describeStream(String streamName, String exclusiveStartShardId) {
-    int nextShardId = 0;
-    if (exclusiveStartShardId != null) {
-      nextShardId = parseInt(exclusiveStartShardId) + 1;
-    }
-    boolean hasMoreShards = nextShardId + 1 < shardedData.size();
-
-    List<Shard> shards = newArrayList();
-    if (nextShardId < shardedData.size()) {
-      shards.add(new Shard().withShardId(Integer.toString(nextShardId)));
-    }
-
-    return new DescribeStreamResult().withStreamDescription(
-        new StreamDescription().withHasMoreShards(hasMoreShards).withShards(shards)
-    );
-  }
-
-  @Override
-  public void setEndpoint(String endpoint) {
-
-  }
-
-  @Override
-  public void setRegion(Region region) {
-
-  }
-
-  @Override
-  public AddTagsToStreamResult addTagsToStream(AddTagsToStreamRequest addTagsToStreamRequest) {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public CreateStreamResult createStream(CreateStreamRequest createStreamRequest) {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public CreateStreamResult createStream(String streamName, Integer shardCount) {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public DecreaseStreamRetentionPeriodResult decreaseStreamRetentionPeriod(
-      DecreaseStreamRetentionPeriodRequest decreaseStreamRetentionPeriodRequest) {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public DeleteStreamResult deleteStream(DeleteStreamRequest deleteStreamRequest) {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public DeleteStreamResult deleteStream(String streamName) {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public DescribeStreamResult describeStream(DescribeStreamRequest describeStreamRequest) {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public DescribeStreamResult describeStream(String streamName) {
-
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public DescribeStreamResult describeStream(String streamName,
-      Integer limit, String exclusiveStartShardId) {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public DisableEnhancedMonitoringResult disableEnhancedMonitoring(
-      DisableEnhancedMonitoringRequest disableEnhancedMonitoringRequest) {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public EnableEnhancedMonitoringResult enableEnhancedMonitoring(
-      EnableEnhancedMonitoringRequest enableEnhancedMonitoringRequest) {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public GetShardIteratorResult getShardIterator(String streamName,
-      String shardId,
-      String shardIteratorType) {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public GetShardIteratorResult getShardIterator(String streamName,
-      String shardId,
-      String shardIteratorType,
-      String startingSequenceNumber) {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public IncreaseStreamRetentionPeriodResult increaseStreamRetentionPeriod(
-      IncreaseStreamRetentionPeriodRequest increaseStreamRetentionPeriodRequest) {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public ListStreamsResult listStreams(ListStreamsRequest listStreamsRequest) {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public ListStreamsResult listStreams() {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public ListStreamsResult listStreams(String exclusiveStartStreamName) {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public ListStreamsResult listStreams(Integer limit, String exclusiveStartStreamName) {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public ListTagsForStreamResult listTagsForStream(
-      ListTagsForStreamRequest listTagsForStreamRequest) {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public MergeShardsResult mergeShards(MergeShardsRequest mergeShardsRequest) {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public MergeShardsResult mergeShards(String streamName,
-      String shardToMerge, String adjacentShardToMerge) {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public PutRecordResult putRecord(PutRecordRequest putRecordRequest) {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public PutRecordResult putRecord(String streamName, ByteBuffer data, String partitionKey) {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public PutRecordResult putRecord(String streamName, ByteBuffer data,
-      String partitionKey, String sequenceNumberForOrdering) {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest) {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public RemoveTagsFromStreamResult removeTagsFromStream(
-      RemoveTagsFromStreamRequest removeTagsFromStreamRequest) {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public SplitShardResult splitShard(SplitShardRequest splitShardRequest) {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public SplitShardResult splitShard(String streamName,
-      String shardToSplit, String newStartingHashKey) {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public void shutdown() {
-
-  }
-
-  @Override
-  public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) {
-    throw new RuntimeException("Not implemented");
-  }
+    @Override
+    public void shutdown() {
+
+    }
+
+    @Override
+    public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) {
+        throw new RuntimeException("Not implemented");
+    }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
index 0b16bb7..00acffe 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
@@ -18,27 +18,24 @@
 package org.apache.beam.sdk.io.kinesis;
 
 import com.google.common.testing.EqualsTester;
-
 import java.util.NoSuchElementException;
-
 import org.junit.Test;
 
 /**
  * Tests {@link CustomOptional}.
  */
 public class CustomOptionalTest {
+    @Test(expected = NoSuchElementException.class)
+    public void absentThrowsNoSuchElementExceptionOnGet() {
+        CustomOptional.absent().get();
+    }
 
-  @Test(expected = NoSuchElementException.class)
-  public void absentThrowsNoSuchElementExceptionOnGet() {
-    CustomOptional.absent().get();
-  }
-
-  @Test
-  public void testEqualsAndHashCode() {
-    new EqualsTester()
-        .addEqualityGroup(CustomOptional.absent(), CustomOptional.absent())
-        .addEqualityGroup(CustomOptional.of(3), CustomOptional.of(3))
-        .addEqualityGroup(CustomOptional.of(11))
-        .addEqualityGroup(CustomOptional.of("3")).testEquals();
-  }
+    @Test
+    public void testEqualsAndHashCode() {
+        new EqualsTester()
+            .addEqualityGroup(CustomOptional.absent(), CustomOptional.absent())
+            .addEqualityGroup(CustomOptional.of(3), CustomOptional.of(3))
+            .addEqualityGroup(CustomOptional.of(11))
+            .addEqualityGroup(CustomOptional.of("3")).testEquals();
+    }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
index 1bb9717..c92ac9a 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
@@ -28,29 +28,30 @@ import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
 
+
 /***
  */
 @RunWith(MockitoJUnitRunner.class)
 public class DynamicCheckpointGeneratorTest {
 
-  @Mock
-  private SimplifiedKinesisClient kinesisClient;
-  @Mock
-  private Shard shard1, shard2, shard3;
+    @Mock
+    private SimplifiedKinesisClient kinesisClient;
+    @Mock
+    private Shard shard1, shard2, shard3;
 
-  @Test
-  public void shouldMapAllShardsToCheckpoints() throws Exception {
-    given(shard1.getShardId()).willReturn("shard-01");
-    given(shard2.getShardId()).willReturn("shard-02");
-    given(shard3.getShardId()).willReturn("shard-03");
-    given(kinesisClient.listShards("stream")).willReturn(asList(shard1, shard2, shard3));
+    @Test
+    public void shouldMapAllShardsToCheckpoints() throws Exception {
+        given(shard1.getShardId()).willReturn("shard-01");
+        given(shard2.getShardId()).willReturn("shard-02");
+        given(shard3.getShardId()).willReturn("shard-03");
+        given(kinesisClient.listShards("stream")).willReturn(asList(shard1, shard2, shard3));
 
-    StartingPoint startingPoint = new StartingPoint(InitialPositionInStream.LATEST);
-    DynamicCheckpointGenerator underTest = new DynamicCheckpointGenerator("stream",
-        startingPoint);
+        StartingPoint startingPoint = new StartingPoint(InitialPositionInStream.LATEST);
+        DynamicCheckpointGenerator underTest = new DynamicCheckpointGenerator("stream",
+                startingPoint);
 
-    KinesisReaderCheckpoint checkpoint = underTest.generate(kinesisClient);
+        KinesisReaderCheckpoint checkpoint = underTest.generate(kinesisClient);
 
-    assertThat(checkpoint).hasSize(3);
-  }
+        assertThat(checkpoint).hasSize(3);
+    }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
index 44ad67d..567e25f 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
@@ -21,9 +21,7 @@ import static com.google.common.collect.Lists.newArrayList;
 
 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
 import com.google.common.collect.Iterables;
-
 import java.util.List;
-
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -38,60 +36,59 @@ import org.junit.Test;
  */
 public class KinesisMockReadTest {
 
-  @Rule
-  public final transient TestPipeline p = TestPipeline.create();
-
-  @Test
-  public void readsDataFromMockKinesis() {
-    int noOfShards = 3;
-    int noOfEventsPerShard = 100;
-    List<List<AmazonKinesisMock.TestData>> testData =
-        provideTestData(noOfShards, noOfEventsPerShard);
-
-    PCollection<AmazonKinesisMock.TestData> result = p
-        .apply(
-            KinesisIO.read()
-                .from("stream", InitialPositionInStream.TRIM_HORIZON)
-                .withClientProvider(new AmazonKinesisMock.Provider(testData, 10))
-                .withMaxNumRecords(noOfShards * noOfEventsPerShard))
-        .apply(ParDo.of(new KinesisRecordToTestData()));
-    PAssert.that(result).containsInAnyOrder(Iterables.concat(testData));
-    p.run();
-  }
-
-  private static class KinesisRecordToTestData extends
-      DoFn<KinesisRecord, AmazonKinesisMock.TestData> {
+    @Rule
+    public final transient TestPipeline p = TestPipeline.create();
+
+    @Test
+    public void readsDataFromMockKinesis() {
+        int noOfShards = 3;
+        int noOfEventsPerShard = 100;
+        List<List<AmazonKinesisMock.TestData>> testData =
+                provideTestData(noOfShards, noOfEventsPerShard);
+
+        PCollection<AmazonKinesisMock.TestData> result = p
+                .apply(
+                        KinesisIO.read()
+                                .from("stream", InitialPositionInStream.TRIM_HORIZON)
+                                .withClientProvider(new AmazonKinesisMock.Provider(testData, 10))
+                                .withMaxNumRecords(noOfShards * noOfEventsPerShard))
+                .apply(ParDo.of(new KinesisRecordToTestData()));
+        PAssert.that(result).containsInAnyOrder(Iterables.concat(testData));
+        p.run();
+    }
 
-    @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
-      c.output(new AmazonKinesisMock.TestData(c.element()));
+    private static class KinesisRecordToTestData extends
+            DoFn<KinesisRecord, AmazonKinesisMock.TestData> {
+        @ProcessElement
+        public void processElement(ProcessContext c) throws Exception {
+            c.output(new AmazonKinesisMock.TestData(c.element()));
+        }
     }
-  }
 
-  private List<List<AmazonKinesisMock.TestData>> provideTestData(
-      int noOfShards,
-      int noOfEventsPerShard) {
+    private List<List<AmazonKinesisMock.TestData>> provideTestData(
+            int noOfShards,
+            int noOfEventsPerShard) {
 
-    int seqNumber = 0;
+        int seqNumber = 0;
 
-    List<List<AmazonKinesisMock.TestData>> shardedData = newArrayList();
-    for (int i = 0; i < noOfShards; ++i) {
-      List<AmazonKinesisMock.TestData> shardData = newArrayList();
-      shardedData.add(shardData);
+        List<List<AmazonKinesisMock.TestData>> shardedData = newArrayList();
+        for (int i = 0; i < noOfShards; ++i) {
+            List<AmazonKinesisMock.TestData> shardData = newArrayList();
+            shardedData.add(shardData);
 
-      DateTime arrival = DateTime.now();
-      for (int j = 0; j < noOfEventsPerShard; ++j) {
-        arrival = arrival.plusSeconds(1);
+            DateTime arrival = DateTime.now();
+            for (int j = 0; j < noOfEventsPerShard; ++j) {
+                arrival = arrival.plusSeconds(1);
 
-        seqNumber++;
-        shardData.add(new AmazonKinesisMock.TestData(
-            Integer.toString(seqNumber),
-            arrival.toInstant(),
-            Integer.toString(seqNumber))
-        );
-      }
-    }
+                seqNumber++;
+                shardData.add(new AmazonKinesisMock.TestData(
+                        Integer.toString(seqNumber),
+                        arrival.toInstant(),
+                        Integer.toString(seqNumber))
+                );
+            }
+        }
 
-    return shardedData;
-  }
+        return shardedData;
+    }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
index 1038a47..8c8da64 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
@@ -17,14 +17,13 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
+
 import static java.util.Arrays.asList;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import com.google.common.collect.Iterables;
-
 import java.util.Iterator;
 import java.util.List;
-
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -36,34 +35,33 @@ import org.mockito.runners.MockitoJUnitRunner;
  */
 @RunWith(MockitoJUnitRunner.class)
 public class KinesisReaderCheckpointTest {
+    @Mock
+    private ShardCheckpoint a, b, c;
 
-  @Mock
-  private ShardCheckpoint a, b, c;
-
-  private KinesisReaderCheckpoint checkpoint;
+    private KinesisReaderCheckpoint checkpoint;
 
-  @Before
-  public void setUp() {
-    checkpoint = new KinesisReaderCheckpoint(asList(a, b, c));
-  }
+    @Before
+    public void setUp() {
+        checkpoint = new KinesisReaderCheckpoint(asList(a, b, c));
+    }
 
-  @Test
-  public void splitsCheckpointAccordingly() {
-    verifySplitInto(1);
-    verifySplitInto(2);
-    verifySplitInto(3);
-    verifySplitInto(4);
-  }
+    @Test
+    public void splitsCheckpointAccordingly() {
+        verifySplitInto(1);
+        verifySplitInto(2);
+        verifySplitInto(3);
+        verifySplitInto(4);
+    }
 
-  @Test(expected = UnsupportedOperationException.class)
-  public void isImmutable() {
-    Iterator<ShardCheckpoint> iterator = checkpoint.iterator();
-    iterator.remove();
-  }
+    @Test(expected = UnsupportedOperationException.class)
+    public void isImmutable() {
+        Iterator<ShardCheckpoint> iterator = checkpoint.iterator();
+        iterator.remove();
+    }
 
-  private void verifySplitInto(int size) {
-    List<KinesisReaderCheckpoint> split = checkpoint.splitInto(size);
-    assertThat(Iterables.concat(split)).containsOnly(a, b, c);
-    assertThat(split).hasSize(Math.min(size, 3));
-  }
+    private void verifySplitInto(int size) {
+        List<KinesisReaderCheckpoint> split = checkpoint.splitInto(size);
+        assertThat(Iterables.concat(split)).containsOnly(a, b, c);
+        assertThat(split).hasSize(Math.min(size, 3));
+    }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
index 5781033..8eb6546 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
@@ -23,7 +23,6 @@ import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import com.amazonaws.regions.Regions;
-
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
@@ -32,7 +31,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
@@ -52,75 +50,72 @@ import org.junit.Test;
  * You need to provide all {@link KinesisTestOptions} in order to run this.
  */
 public class KinesisReaderIT {
-
-  private static final long PIPELINE_STARTUP_TIME = TimeUnit.SECONDS.toMillis(10);
-  private ExecutorService singleThreadExecutor = newSingleThreadExecutor();
-
-  @Rule
-  public final transient TestPipeline p = TestPipeline.create();
-
-  @Ignore
-  @Test
-  public void readsDataFromRealKinesisStream()
-      throws IOException, InterruptedException, ExecutionException {
-    KinesisTestOptions options = readKinesisOptions();
-    List<String> testData = prepareTestData(1000);
-
-    Future<?> future = startTestPipeline(testData, options);
-    KinesisUploader.uploadAll(testData, options);
-    future.get();
-  }
-
-  private List<String> prepareTestData(int count) {
-    List<String> data = newArrayList();
-    for (int i = 0; i < count; ++i) {
-      data.add(RandomStringUtils.randomAlphabetic(32));
+    private static final long PIPELINE_STARTUP_TIME = TimeUnit.SECONDS.toMillis(10);
+    private ExecutorService singleThreadExecutor = newSingleThreadExecutor();
+
+    @Rule
+    public final transient TestPipeline p = TestPipeline.create();
+
+    @Ignore
+    @Test
+    public void readsDataFromRealKinesisStream()
+            throws IOException, InterruptedException, ExecutionException {
+        KinesisTestOptions options = readKinesisOptions();
+        List<String> testData = prepareTestData(1000);
+
+        Future<?> future = startTestPipeline(testData, options);
+        KinesisUploader.uploadAll(testData, options);
+        future.get();
     }
-    return data;
-  }
 
-  private Future<?> startTestPipeline(List<String> testData, KinesisTestOptions options)
-      throws InterruptedException {
-
-    PCollection<String> result = p.
-        apply(KinesisIO.read()
-            .from(options.getAwsKinesisStream(), Instant.now())
-            .withClientProvider(options.getAwsAccessKey(), options.getAwsSecretKey(),
-                Regions.fromName(options.getAwsKinesisRegion()))
-            .withMaxReadTime(Duration.standardMinutes(3))
-        ).
-        apply(ParDo.of(new RecordDataToString()));
-    PAssert.that(result).containsInAnyOrder(testData);
-
-    Future<?> future = singleThreadExecutor.submit(new Callable<Void>() {
-
-      @Override
-      public Void call() throws Exception {
-        PipelineResult result = p.run();
-        PipelineResult.State state = result.getState();
-        while (state != PipelineResult.State.DONE && state != PipelineResult.State.FAILED) {
-          Thread.sleep(1000);
-          state = result.getState();
+    private List<String> prepareTestData(int count) {
+        List<String> data = newArrayList();
+        for (int i = 0; i < count; ++i) {
+            data.add(RandomStringUtils.randomAlphabetic(32));
         }
-        assertThat(state).isEqualTo(PipelineResult.State.DONE);
-        return null;
-      }
-    });
-    Thread.sleep(PIPELINE_STARTUP_TIME);
-    return future;
-  }
+        return data;
+    }
 
-  private KinesisTestOptions readKinesisOptions() {
-    PipelineOptionsFactory.register(KinesisTestOptions.class);
-    return TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
-  }
+    private Future<?> startTestPipeline(List<String> testData, KinesisTestOptions options)
+            throws InterruptedException {
+
+        PCollection<String> result = p.
+                apply(KinesisIO.read()
+                        .from(options.getAwsKinesisStream(), Instant.now())
+                        .withClientProvider(options.getAwsAccessKey(), options.getAwsSecretKey(),
+                                Regions.fromName(options.getAwsKinesisRegion()))
+                        .withMaxReadTime(Duration.standardMinutes(3))
+                ).
+                apply(ParDo.of(new RecordDataToString()));
+        PAssert.that(result).containsInAnyOrder(testData);
+
+        Future<?> future = singleThreadExecutor.submit(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                PipelineResult result = p.run();
+                PipelineResult.State state = result.getState();
+                while (state != PipelineResult.State.DONE && state != PipelineResult.State.FAILED) {
+                    Thread.sleep(1000);
+                    state = result.getState();
+                }
+                assertThat(state).isEqualTo(PipelineResult.State.DONE);
+                return null;
+            }
+        });
+        Thread.sleep(PIPELINE_STARTUP_TIME);
+        return future;
+    }
 
-  private static class RecordDataToString extends DoFn<KinesisRecord, String> {
+    private KinesisTestOptions readKinesisOptions() {
+        PipelineOptionsFactory.register(KinesisTestOptions.class);
+        return TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
+    }
 
-    @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
-      checkNotNull(c.element(), "Null record given");
-      c.output(new String(c.element().getData().array(), StandardCharsets.UTF_8));
+    private static class RecordDataToString extends DoFn<KinesisRecord, String> {
+        @ProcessElement
+        public void processElement(ProcessContext c) throws Exception {
+            checkNotNull(c.element(), "Null record given");
+            c.output(new String(c.element().getData().array(), StandardCharsets.UTF_8));
+        }
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
index a26501a..3111029 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
@@ -23,7 +23,6 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.NoSuchElementException;
-
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -35,88 +34,87 @@ import org.mockito.runners.MockitoJUnitRunner;
  */
 @RunWith(MockitoJUnitRunner.class)
 public class KinesisReaderTest {
-
-  @Mock
-  private SimplifiedKinesisClient kinesis;
-  @Mock
-  private CheckpointGenerator generator;
-  @Mock
-  private ShardCheckpoint firstCheckpoint, secondCheckpoint;
-  @Mock
-  private ShardRecordsIterator firstIterator, secondIterator;
-  @Mock
-  private KinesisRecord a, b, c, d;
-
-  private KinesisReader reader;
-
-  @Before
-  public void setUp() throws IOException, TransientKinesisException {
-    when(generator.generate(kinesis)).thenReturn(new KinesisReaderCheckpoint(
-        asList(firstCheckpoint, secondCheckpoint)
-    ));
-    when(firstCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(firstIterator);
-    when(secondCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(secondIterator);
-    when(firstIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
-    when(secondIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
-
-    reader = new KinesisReader(kinesis, generator, null);
-  }
-
-  @Test
-  public void startReturnsFalseIfNoDataAtTheBeginning() throws IOException {
-    assertThat(reader.start()).isFalse();
-  }
-
-  @Test(expected = NoSuchElementException.class)
-  public void throwsNoSuchElementExceptionIfNoData() throws IOException {
-    reader.start();
-    reader.getCurrent();
-  }
-
-  @Test
-  public void startReturnsTrueIfSomeDataAvailable() throws IOException,
-      TransientKinesisException {
-    when(firstIterator.next()).
-        thenReturn(CustomOptional.of(a)).
-        thenReturn(CustomOptional.<KinesisRecord>absent());
-
-    assertThat(reader.start()).isTrue();
-  }
-
-  @Test
-  public void advanceReturnsFalseIfThereIsTransientExceptionInKinesis()
-      throws IOException, TransientKinesisException {
-    reader.start();
-
-    when(firstIterator.next()).thenThrow(TransientKinesisException.class);
-
-    assertThat(reader.advance()).isFalse();
-  }
-
-  @Test
-  public void readsThroughAllDataAvailable() throws IOException, TransientKinesisException {
-    when(firstIterator.next()).
-        thenReturn(CustomOptional.<KinesisRecord>absent()).
-        thenReturn(CustomOptional.of(a)).
-        thenReturn(CustomOptional.<KinesisRecord>absent()).
-        thenReturn(CustomOptional.of(b)).
-        thenReturn(CustomOptional.<KinesisRecord>absent());
-
-    when(secondIterator.next()).
-        thenReturn(CustomOptional.of(c)).
-        thenReturn(CustomOptional.<KinesisRecord>absent()).
-        thenReturn(CustomOptional.of(d)).
-        thenReturn(CustomOptional.<KinesisRecord>absent());
-
-    assertThat(reader.start()).isTrue();
-    assertThat(reader.getCurrent()).isEqualTo(c);
-    assertThat(reader.advance()).isTrue();
-    assertThat(reader.getCurrent()).isEqualTo(a);
-    assertThat(reader.advance()).isTrue();
-    assertThat(reader.getCurrent()).isEqualTo(d);
-    assertThat(reader.advance()).isTrue();
-    assertThat(reader.getCurrent()).isEqualTo(b);
-    assertThat(reader.advance()).isFalse();
-  }
+    @Mock
+    private SimplifiedKinesisClient kinesis;
+    @Mock
+    private CheckpointGenerator generator;
+    @Mock
+    private ShardCheckpoint firstCheckpoint, secondCheckpoint;
+    @Mock
+    private ShardRecordsIterator firstIterator, secondIterator;
+    @Mock
+    private KinesisRecord a, b, c, d;
+
+    private KinesisReader reader;
+
+    @Before
+    public void setUp() throws IOException, TransientKinesisException {
+        when(generator.generate(kinesis)).thenReturn(new KinesisReaderCheckpoint(
+                asList(firstCheckpoint, secondCheckpoint)
+        ));
+        when(firstCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(firstIterator);
+        when(secondCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(secondIterator);
+        when(firstIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
+        when(secondIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
+
+        reader = new KinesisReader(kinesis, generator, null);
+    }
+
+    @Test
+    public void startReturnsFalseIfNoDataAtTheBeginning() throws IOException {
+        assertThat(reader.start()).isFalse();
+    }
+
+    @Test(expected = NoSuchElementException.class)
+    public void throwsNoSuchElementExceptionIfNoData() throws IOException {
+        reader.start();
+        reader.getCurrent();
+    }
+
+    @Test
+    public void startReturnsTrueIfSomeDataAvailable() throws IOException,
+            TransientKinesisException {
+        when(firstIterator.next()).
+                thenReturn(CustomOptional.of(a)).
+                thenReturn(CustomOptional.<KinesisRecord>absent());
+
+        assertThat(reader.start()).isTrue();
+    }
+
+    @Test
+    public void advanceReturnsFalseIfThereIsTransientExceptionInKinesis()
+            throws IOException, TransientKinesisException {
+        reader.start();
+
+        when(firstIterator.next()).thenThrow(TransientKinesisException.class);
+
+        assertThat(reader.advance()).isFalse();
+    }
+
+    @Test
+    public void readsThroughAllDataAvailable() throws IOException, TransientKinesisException {
+        when(firstIterator.next()).
+                thenReturn(CustomOptional.<KinesisRecord>absent()).
+                thenReturn(CustomOptional.of(a)).
+                thenReturn(CustomOptional.<KinesisRecord>absent()).
+                thenReturn(CustomOptional.of(b)).
+                thenReturn(CustomOptional.<KinesisRecord>absent());
+
+        when(secondIterator.next()).
+                thenReturn(CustomOptional.of(c)).
+                thenReturn(CustomOptional.<KinesisRecord>absent()).
+                thenReturn(CustomOptional.of(d)).
+                thenReturn(CustomOptional.<KinesisRecord>absent());
+
+        assertThat(reader.start()).isTrue();
+        assertThat(reader.getCurrent()).isEqualTo(c);
+        assertThat(reader.advance()).isTrue();
+        assertThat(reader.getCurrent()).isEqualTo(a);
+        assertThat(reader.advance()).isTrue();
+        assertThat(reader.getCurrent()).isEqualTo(d);
+        assertThat(reader.advance()).isTrue();
+        assertThat(reader.getCurrent()).isEqualTo(b);
+        assertThat(reader.advance()).isFalse();
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
index c9f01bb..8771c86 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.io.kinesis;
 
 import java.nio.ByteBuffer;
-
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.joda.time.Instant;
 import org.junit.Test;
@@ -27,21 +26,20 @@ import org.junit.Test;
  * Tests {@link KinesisRecordCoder}.
  */
 public class KinesisRecordCoderTest {
-
-  @Test
-  public void encodingAndDecodingWorks() throws Exception {
-    KinesisRecord record = new KinesisRecord(
-        ByteBuffer.wrap("data".getBytes()),
-        "sequence",
-        128L,
-        "partition",
-        Instant.now(),
-        Instant.now(),
-        "stream",
-        "shard"
-    );
-    CoderProperties.coderDecodeEncodeEqual(
-        new KinesisRecordCoder(), record
-    );
-  }
+    @Test
+    public void encodingAndDecodingWorks() throws Exception {
+        KinesisRecord record = new KinesisRecord(
+                ByteBuffer.wrap("data".getBytes()),
+                "sequence",
+                128L,
+                "partition",
+                Instant.now(),
+                Instant.now(),
+                "stream",
+                "shard"
+        );
+        CoderProperties.coderDecodeEncodeEqual(
+                new KinesisRecordCoder(), record
+        );
+    }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
index 76bcb27..324de46 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
@@ -25,28 +25,23 @@ import org.apache.beam.sdk.testing.TestPipelineOptions;
  * Options for Kinesis integration tests.
  */
 public interface KinesisTestOptions extends TestPipelineOptions {
-
-  @Description("AWS region where Kinesis stream resided")
-  @Default.String("aws-kinesis-region")
-  String getAwsKinesisRegion();
-
-  void setAwsKinesisRegion(String value);
-
-  @Description("Kinesis stream name")
-  @Default.String("aws-kinesis-stream")
-  String getAwsKinesisStream();
-
-  void setAwsKinesisStream(String value);
-
-  @Description("AWS secret key")
-  @Default.String("aws-secret-key")
-  String getAwsSecretKey();
-
-  void setAwsSecretKey(String value);
-
-  @Description("AWS access key")
-  @Default.String("aws-access-key")
-  String getAwsAccessKey();
-
-  void setAwsAccessKey(String value);
+    @Description("AWS region where Kinesis stream resided")
+    @Default.String("aws-kinesis-region")
+    String getAwsKinesisRegion();
+    void setAwsKinesisRegion(String value);
+
+    @Description("Kinesis stream name")
+    @Default.String("aws-kinesis-stream")
+    String getAwsKinesisStream();
+    void setAwsKinesisStream(String value);
+
+    @Description("AWS secret key")
+    @Default.String("aws-secret-key")
+    String getAwsSecretKey();
+    void setAwsSecretKey(String value);
+
+    @Description("AWS access key")
+    @Default.String("aws-access-key")
+    String getAwsAccessKey();
+    void setAwsAccessKey(String value);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
index 7a7cb02..7518ff7 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
@@ -29,7 +29,6 @@ import com.amazonaws.services.kinesis.model.PutRecordsResult;
 import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
-
 import java.nio.ByteBuffer;
 import java.util.List;
 
@@ -38,46 +37,47 @@ import java.util.List;
  */
 public class KinesisUploader {
 
-  public static final int MAX_NUMBER_OF_RECORDS_IN_BATCH = 499;
+    public static final int MAX_NUMBER_OF_RECORDS_IN_BATCH = 499;
 
-  public static void uploadAll(List<String> data, KinesisTestOptions options) {
-    AmazonKinesisClient client = new AmazonKinesisClient(
-        new StaticCredentialsProvider(
-            new BasicAWSCredentials(
-                options.getAwsAccessKey(), options.getAwsSecretKey()))
-    ).withRegion(Regions.fromName(options.getAwsKinesisRegion()));
+    public static void uploadAll(List<String> data, KinesisTestOptions options) {
+        AmazonKinesisClient client = new AmazonKinesisClient(
+                new StaticCredentialsProvider(
+                        new BasicAWSCredentials(
+                                options.getAwsAccessKey(), options.getAwsSecretKey()))
+        ).withRegion(Regions.fromName(options.getAwsKinesisRegion()));
 
-    List<List<String>> partitions = Lists.partition(data, MAX_NUMBER_OF_RECORDS_IN_BATCH);
+        List<List<String>> partitions = Lists.partition(data, MAX_NUMBER_OF_RECORDS_IN_BATCH);
 
-    for (List<String> partition : partitions) {
-      List<PutRecordsRequestEntry> allRecords = newArrayList();
-      for (String row : partition) {
-        allRecords.add(new PutRecordsRequestEntry().
-            withData(ByteBuffer.wrap(row.getBytes(Charsets.UTF_8))).
-            withPartitionKey(Integer.toString(row.hashCode()))
 
-        );
-      }
+        for (List<String> partition : partitions) {
+            List<PutRecordsRequestEntry> allRecords = newArrayList();
+            for (String row : partition) {
+                allRecords.add(new PutRecordsRequestEntry().
+                        withData(ByteBuffer.wrap(row.getBytes(Charsets.UTF_8))).
+                        withPartitionKey(Integer.toString(row.hashCode()))
 
-      PutRecordsResult result;
-      do {
-        result = client.putRecords(
-            new PutRecordsRequest().
-                withStreamName(options.getAwsKinesisStream()).
-                withRecords(allRecords));
-        List<PutRecordsRequestEntry> failedRecords = newArrayList();
-        int i = 0;
-        for (PutRecordsResultEntry row : result.getRecords()) {
-          if (row.getErrorCode() != null) {
-            failedRecords.add(allRecords.get(i));
-          }
-          ++i;
-        }
-        allRecords = failedRecords;
-      }
+                );
+            }
 
-      while (result.getFailedRecordCount() > 0);
+            PutRecordsResult result;
+            do {
+                result = client.putRecords(
+                        new PutRecordsRequest().
+                                withStreamName(options.getAwsKinesisStream()).
+                                withRecords(allRecords));
+                List<PutRecordsRequestEntry> failedRecords = newArrayList();
+                int i = 0;
+                for (PutRecordsResultEntry row : result.getRecords()) {
+                    if (row.getErrorCode() != null) {
+                        failedRecords.add(allRecords.get(i));
+                    }
+                    ++i;
+                }
+                allRecords = failedRecords;
+            }
+
+            while (result.getFailedRecordCount() > 0);
+        }
     }
-  }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
index cb32562..f979c01 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
@@ -20,49 +20,47 @@ package org.apache.beam.sdk.io.kinesis;
 import static org.mockito.BDDMockito.given;
 
 import com.google.common.collect.Lists;
-
 import java.util.Collections;
 import java.util.List;
-
 import org.assertj.core.api.Assertions;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
 
+
 /***
  */
 @RunWith(MockitoJUnitRunner.class)
 public class RecordFilterTest {
+    @Mock
+    private ShardCheckpoint checkpoint;
+    @Mock
+    private KinesisRecord record1, record2, record3, record4, record5;
 
-  @Mock
-  private ShardCheckpoint checkpoint;
-  @Mock
-  private KinesisRecord record1, record2, record3, record4, record5;
-
-  @Test
-  public void shouldFilterOutRecordsBeforeOrAtCheckpoint() {
-    given(checkpoint.isBeforeOrAt(record1)).willReturn(false);
-    given(checkpoint.isBeforeOrAt(record2)).willReturn(true);
-    given(checkpoint.isBeforeOrAt(record3)).willReturn(true);
-    given(checkpoint.isBeforeOrAt(record4)).willReturn(false);
-    given(checkpoint.isBeforeOrAt(record5)).willReturn(true);
-    List<KinesisRecord> records = Lists.newArrayList(record1, record2,
-        record3, record4, record5);
-    RecordFilter underTest = new RecordFilter();
+    @Test
+    public void shouldFilterOutRecordsBeforeOrAtCheckpoint() {
+        given(checkpoint.isBeforeOrAt(record1)).willReturn(false);
+        given(checkpoint.isBeforeOrAt(record2)).willReturn(true);
+        given(checkpoint.isBeforeOrAt(record3)).willReturn(true);
+        given(checkpoint.isBeforeOrAt(record4)).willReturn(false);
+        given(checkpoint.isBeforeOrAt(record5)).willReturn(true);
+        List<KinesisRecord> records = Lists.newArrayList(record1, record2,
+                record3, record4, record5);
+        RecordFilter underTest = new RecordFilter();
 
-    List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint);
+        List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint);
 
-    Assertions.assertThat(retainedRecords).containsOnly(record2, record3, record5);
-  }
+        Assertions.assertThat(retainedRecords).containsOnly(record2, record3, record5);
+    }
 
-  @Test
-  public void shouldNotFailOnEmptyList() {
-    List<KinesisRecord> records = Collections.emptyList();
-    RecordFilter underTest = new RecordFilter();
+    @Test
+    public void shouldNotFailOnEmptyList() {
+        List<KinesisRecord> records = Collections.emptyList();
+        RecordFilter underTest = new RecordFilter();
 
-    List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint);
+        List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint);
 
-    Assertions.assertThat(retainedRecords).isEmpty();
-  }
+        Assertions.assertThat(retainedRecords).isEmpty();
+    }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
index e4abce4..f032eea 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
@@ -22,38 +22,36 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import java.util.Collections;
 import java.util.List;
-
 import org.junit.Test;
 
 /**
  * Tests {@link RoundRobin}.
  */
 public class RoundRobinTest {
+    @Test(expected = IllegalArgumentException.class)
+    public void doesNotAllowCreationWithEmptyCollection() {
+        new RoundRobin<>(Collections.emptyList());
+    }
 
-  @Test(expected = IllegalArgumentException.class)
-  public void doesNotAllowCreationWithEmptyCollection() {
-    new RoundRobin<>(Collections.emptyList());
-  }
-
-  @Test
-  public void goesThroughElementsInCycle() {
-    List<String> input = newArrayList("a", "b", "c");
+    @Test
+    public void goesThroughElementsInCycle() {
+        List<String> input = newArrayList("a", "b", "c");
 
-    RoundRobin<String> roundRobin = new RoundRobin<>(newArrayList(input));
+        RoundRobin<String> roundRobin = new RoundRobin<>(newArrayList(input));
 
-    input.addAll(input);  // duplicate the input
-    for (String element : input) {
-      assertThat(roundRobin.getCurrent()).isEqualTo(element);
-      assertThat(roundRobin.getCurrent()).isEqualTo(element);
-      roundRobin.moveForward();
+        input.addAll(input);  // duplicate the input
+        for (String element : input) {
+            assertThat(roundRobin.getCurrent()).isEqualTo(element);
+            assertThat(roundRobin.getCurrent()).isEqualTo(element);
+            roundRobin.moveForward();
+        }
     }
-  }
 
-  @Test
-  public void usualIteratorGoesThroughElementsOnce() {
-    List<String> input = newArrayList("a", "b", "c");
+    @Test
+    public void usualIteratorGoesThroughElementsOnce() {
+        List<String> input = newArrayList("a", "b", "c");
 
-    RoundRobin<String> roundRobin = new RoundRobin<>(input);
-    assertThat(roundRobin).hasSize(3).containsOnly(input.toArray(new String[0]));
-  }
+        RoundRobin<String> roundRobin = new RoundRobin<>(input);
+        assertThat(roundRobin).hasSize(3).containsOnly(input.toArray(new String[0]));
+    }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
index d4784c4..39ab36f 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
@@ -32,9 +32,7 @@ import static org.mockito.Mockito.when;
 
 import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
-
 import java.io.IOException;
-
 import org.joda.time.DateTime;
 import org.joda.time.Instant;
 import org.junit.Before;
@@ -48,105 +46,104 @@ import org.mockito.runners.MockitoJUnitRunner;
  */
 @RunWith(MockitoJUnitRunner.class)
 public class ShardCheckpointTest {
-
-  private static final String AT_SEQUENCE_SHARD_IT = "AT_SEQUENCE_SHARD_IT";
-  private static final String AFTER_SEQUENCE_SHARD_IT = "AFTER_SEQUENCE_SHARD_IT";
-  private static final String STREAM_NAME = "STREAM";
-  private static final String SHARD_ID = "SHARD_ID";
-  @Mock
-  private SimplifiedKinesisClient client;
-
-  @Before
-  public void setUp() throws IOException, TransientKinesisException {
-    when(client.getShardIterator(
-        eq(STREAM_NAME), eq(SHARD_ID), eq(AT_SEQUENCE_NUMBER),
-        anyString(), isNull(Instant.class))).
-        thenReturn(AT_SEQUENCE_SHARD_IT);
-    when(client.getShardIterator(
-        eq(STREAM_NAME), eq(SHARD_ID), eq(AFTER_SEQUENCE_NUMBER),
-        anyString(), isNull(Instant.class))).
-        thenReturn(AFTER_SEQUENCE_SHARD_IT);
-  }
-
-  @Test
-  public void testProvidingShardIterator() throws IOException, TransientKinesisException {
-    assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", null).getShardIterator(client))
-        .isEqualTo(AT_SEQUENCE_SHARD_IT);
-    assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", null).getShardIterator(client))
-        .isEqualTo(AFTER_SEQUENCE_SHARD_IT);
-    assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client)).isEqualTo
-        (AT_SEQUENCE_SHARD_IT);
-    assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client))
-        .isEqualTo(AT_SEQUENCE_SHARD_IT);
-  }
-
-  @Test
-  public void testComparisonWithExtendedSequenceNumber() {
-    assertThat(new ShardCheckpoint("", "", new StartingPoint(LATEST)).isBeforeOrAt(
-        recordWith(new ExtendedSequenceNumber("100", 0L))
-    )).isTrue();
-
-    assertThat(new ShardCheckpoint("", "", new StartingPoint(TRIM_HORIZON)).isBeforeOrAt(
-        recordWith(new ExtendedSequenceNumber("100", 0L))
-    )).isTrue();
-
-    assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "10", 1L).isBeforeOrAt(
-        recordWith(new ExtendedSequenceNumber("100", 0L))
-    )).isTrue();
-
-    assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
-        recordWith(new ExtendedSequenceNumber("100", 0L))
-    )).isTrue();
-
-    assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
-        recordWith(new ExtendedSequenceNumber("100", 0L))
-    )).isFalse();
-
-    assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 1L).isBeforeOrAt(
-        recordWith(new ExtendedSequenceNumber("100", 0L))
-    )).isFalse();
-
-    assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
-        recordWith(new ExtendedSequenceNumber("99", 1L))
-    )).isFalse();
-  }
-
-  @Test
-  public void testComparisonWithTimestamp() {
-    DateTime referenceTimestamp = DateTime.now();
-
-    assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
-        .isBeforeOrAt(recordWith(referenceTimestamp.minusMillis(10).toInstant()))
-    ).isFalse();
-
-    assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
-        .isBeforeOrAt(recordWith(referenceTimestamp.toInstant()))
-    ).isTrue();
-
-    assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
-        .isBeforeOrAt(recordWith(referenceTimestamp.plusMillis(10).toInstant()))
-    ).isTrue();
-  }
-
-  private KinesisRecord recordWith(ExtendedSequenceNumber extendedSequenceNumber) {
-    KinesisRecord record = mock(KinesisRecord.class);
-    given(record.getExtendedSequenceNumber()).willReturn(extendedSequenceNumber);
-    return record;
-  }
-
-  private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, String sequenceNumber,
-      Long subSequenceNumber) {
-    return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, sequenceNumber,
-        subSequenceNumber);
-  }
-
-  private KinesisRecord recordWith(Instant approximateArrivalTimestamp) {
-    KinesisRecord record = mock(KinesisRecord.class);
-    given(record.getApproximateArrivalTimestamp()).willReturn(approximateArrivalTimestamp);
-    return record;
-  }
-
-  private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, Instant timestamp) {
-    return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, timestamp);
-  }
+    private static final String AT_SEQUENCE_SHARD_IT = "AT_SEQUENCE_SHARD_IT";
+    private static final String AFTER_SEQUENCE_SHARD_IT = "AFTER_SEQUENCE_SHARD_IT";
+    private static final String STREAM_NAME = "STREAM";
+    private static final String SHARD_ID = "SHARD_ID";
+    @Mock
+    private SimplifiedKinesisClient client;
+
+    @Before
+    public void setUp() throws IOException, TransientKinesisException {
+        when(client.getShardIterator(
+                eq(STREAM_NAME), eq(SHARD_ID), eq(AT_SEQUENCE_NUMBER),
+                anyString(), isNull(Instant.class))).
+                thenReturn(AT_SEQUENCE_SHARD_IT);
+        when(client.getShardIterator(
+                eq(STREAM_NAME), eq(SHARD_ID), eq(AFTER_SEQUENCE_NUMBER),
+                anyString(), isNull(Instant.class))).
+                thenReturn(AFTER_SEQUENCE_SHARD_IT);
+    }
+
+    @Test
+    public void testProvidingShardIterator() throws IOException, TransientKinesisException {
+        assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", null).getShardIterator(client))
+                .isEqualTo(AT_SEQUENCE_SHARD_IT);
+        assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", null).getShardIterator(client))
+                .isEqualTo(AFTER_SEQUENCE_SHARD_IT);
+        assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client)).isEqualTo
+                (AT_SEQUENCE_SHARD_IT);
+        assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client))
+                .isEqualTo(AT_SEQUENCE_SHARD_IT);
+    }
+
+    @Test
+    public void testComparisonWithExtendedSequenceNumber() {
+        assertThat(new ShardCheckpoint("", "", new StartingPoint(LATEST)).isBeforeOrAt(
+                recordWith(new ExtendedSequenceNumber("100", 0L))
+        )).isTrue();
+
+        assertThat(new ShardCheckpoint("", "", new StartingPoint(TRIM_HORIZON)).isBeforeOrAt(
+                recordWith(new ExtendedSequenceNumber("100", 0L))
+        )).isTrue();
+
+        assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "10", 1L).isBeforeOrAt(
+                recordWith(new ExtendedSequenceNumber("100", 0L))
+        )).isTrue();
+
+        assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
+                recordWith(new ExtendedSequenceNumber("100", 0L))
+        )).isTrue();
+
+        assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
+                recordWith(new ExtendedSequenceNumber("100", 0L))
+        )).isFalse();
+
+        assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 1L).isBeforeOrAt(
+                recordWith(new ExtendedSequenceNumber("100", 0L))
+        )).isFalse();
+
+        assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
+                recordWith(new ExtendedSequenceNumber("99", 1L))
+        )).isFalse();
+    }
+
+    @Test
+    public void testComparisonWithTimestamp() {
+        DateTime referenceTimestamp = DateTime.now();
+
+        assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
+                .isBeforeOrAt(recordWith(referenceTimestamp.minusMillis(10).toInstant()))
+        ).isFalse();
+
+        assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
+                .isBeforeOrAt(recordWith(referenceTimestamp.toInstant()))
+        ).isTrue();
+
+        assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
+                .isBeforeOrAt(recordWith(referenceTimestamp.plusMillis(10).toInstant()))
+        ).isTrue();
+    }
+
+    private KinesisRecord recordWith(ExtendedSequenceNumber extendedSequenceNumber) {
+        KinesisRecord record = mock(KinesisRecord.class);
+        given(record.getExtendedSequenceNumber()).willReturn(extendedSequenceNumber);
+        return record;
+    }
+
+    private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, String sequenceNumber,
+                                       Long subSequenceNumber) {
+        return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, sequenceNumber,
+                subSequenceNumber);
+    }
+
+    private KinesisRecord recordWith(Instant approximateArrivalTimestamp) {
+        KinesisRecord record = mock(KinesisRecord.class);
+        given(record.getApproximateArrivalTimestamp()).willReturn(approximateArrivalTimestamp);
+        return record;
+    }
+
+    private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, Instant timestamp) {
+        return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, timestamp);
+    }
 }