You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/07/13 03:07:01 UTC

[49/50] [abbrv] beam git commit: Reformatting Kinesis IO to comply with official code style

Reformatting Kinesis IO to comply with official code style


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7925a668
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7925a668
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7925a668

Branch: refs/heads/DSL_SQL
Commit: 7925a668b12e272c7b2631ff6b20376e92ad90be
Parents: 4abd714
Author: Pawel Kaczmarczyk <p....@ocado.com>
Authored: Mon Jun 19 11:10:25 2017 +0200
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Jul 12 20:01:02 2017 -0700

----------------------------------------------------------------------
 .../sdk/io/kinesis/CheckpointGenerator.java     |   6 +-
 .../beam/sdk/io/kinesis/CustomOptional.java     | 111 ++--
 .../io/kinesis/DynamicCheckpointGenerator.java  |  52 +-
 .../sdk/io/kinesis/GetKinesisRecordsResult.java |  49 +-
 .../sdk/io/kinesis/KinesisClientProvider.java   |   4 +-
 .../apache/beam/sdk/io/kinesis/KinesisIO.java   | 279 +++++-----
 .../beam/sdk/io/kinesis/KinesisReader.java      | 206 +++----
 .../sdk/io/kinesis/KinesisReaderCheckpoint.java |  97 ++--
 .../beam/sdk/io/kinesis/KinesisRecord.java      | 177 +++---
 .../beam/sdk/io/kinesis/KinesisRecordCoder.java |  68 +--
 .../beam/sdk/io/kinesis/KinesisSource.java      | 147 ++---
 .../beam/sdk/io/kinesis/RecordFilter.java       |  18 +-
 .../apache/beam/sdk/io/kinesis/RoundRobin.java  |  37 +-
 .../beam/sdk/io/kinesis/ShardCheckpoint.java    | 241 ++++-----
 .../sdk/io/kinesis/ShardRecordsIterator.java    | 106 ++--
 .../sdk/io/kinesis/SimplifiedKinesisClient.java | 215 ++++----
 .../beam/sdk/io/kinesis/StartingPoint.java      |  84 +--
 .../io/kinesis/StaticCheckpointGenerator.java   |  27 +-
 .../io/kinesis/TransientKinesisException.java   |   7 +-
 .../beam/sdk/io/kinesis/AmazonKinesisMock.java  | 539 ++++++++++---------
 .../beam/sdk/io/kinesis/CustomOptionalTest.java |  27 +-
 .../kinesis/DynamicCheckpointGeneratorTest.java |  33 +-
 .../sdk/io/kinesis/KinesisMockReadTest.java     |  97 ++--
 .../io/kinesis/KinesisReaderCheckpointTest.java |  52 +-
 .../beam/sdk/io/kinesis/KinesisReaderIT.java    | 127 ++---
 .../beam/sdk/io/kinesis/KinesisReaderTest.java  | 166 +++---
 .../sdk/io/kinesis/KinesisRecordCoderTest.java  |  34 +-
 .../beam/sdk/io/kinesis/KinesisTestOptions.java |  43 +-
 .../beam/sdk/io/kinesis/KinesisUploader.java    |  70 +--
 .../beam/sdk/io/kinesis/RecordFilterTest.java   |  52 +-
 .../beam/sdk/io/kinesis/RoundRobinTest.java     |  42 +-
 .../sdk/io/kinesis/ShardCheckpointTest.java     | 203 +++----
 .../io/kinesis/ShardRecordsIteratorTest.java    | 216 ++++----
 .../io/kinesis/SimplifiedKinesisClientTest.java | 351 ++++++------
 34 files changed, 2031 insertions(+), 1952 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
index 919d85a..2629c57 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
-
 import java.io.Serializable;
 
 /**
@@ -25,6 +24,7 @@ import java.io.Serializable;
  * How exactly the checkpoint is generated is up to implementing class.
  */
 interface CheckpointGenerator extends Serializable {
-    KinesisReaderCheckpoint generate(SimplifiedKinesisClient client)
-            throws TransientKinesisException;
+
+  KinesisReaderCheckpoint generate(SimplifiedKinesisClient client)
+      throws TransientKinesisException;
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
index 4bed0e3..5a28214 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
@@ -24,76 +24,79 @@ import java.util.Objects;
  * Similar to Guava {@code Optional}, but throws {@link NoSuchElementException} for missing element.
  */
 abstract class CustomOptional<T> {
-    @SuppressWarnings("unchecked")
-    public static <T> CustomOptional<T> absent() {
-        return (Absent<T>) Absent.INSTANCE;
-    }
 
-    public static <T> CustomOptional<T> of(T v) {
-        return new Present<>(v);
-    }
+  @SuppressWarnings("unchecked")
+  public static <T> CustomOptional<T> absent() {
+    return (Absent<T>) Absent.INSTANCE;
+  }
 
-    public abstract boolean isPresent();
+  public static <T> CustomOptional<T> of(T v) {
+    return new Present<>(v);
+  }
 
-    public abstract T get();
+  public abstract boolean isPresent();
 
-    private static class Present<T> extends CustomOptional<T> {
-        private final T value;
+  public abstract T get();
 
-        private Present(T value) {
-            this.value = value;
-        }
+  private static class Present<T> extends CustomOptional<T> {
 
-        @Override
-        public boolean isPresent() {
-            return true;
-        }
+    private final T value;
 
-        @Override
-        public T get() {
-            return value;
-        }
+    private Present(T value) {
+      this.value = value;
+    }
 
-        @Override
-        public boolean equals(Object o) {
-            if (!(o instanceof Present)) {
-                return false;
-            }
+    @Override
+    public boolean isPresent() {
+      return true;
+    }
 
-            Present<?> present = (Present<?>) o;
-            return Objects.equals(value, present.value);
-        }
+    @Override
+    public T get() {
+      return value;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof Present)) {
+        return false;
+      }
 
-        @Override
-        public int hashCode() {
-            return Objects.hash(value);
-        }
+      Present<?> present = (Present<?>) o;
+      return Objects.equals(value, present.value);
     }
 
-    private static class Absent<T> extends CustomOptional<T> {
-        private static final Absent<Object> INSTANCE = new Absent<>();
+    @Override
+    public int hashCode() {
+      return Objects.hash(value);
+    }
+  }
 
-        private Absent() {
-        }
+  private static class Absent<T> extends CustomOptional<T> {
 
-        @Override
-        public boolean isPresent() {
-            return false;
-        }
+    private static final Absent<Object> INSTANCE = new Absent<>();
 
-        @Override
-        public T get() {
-            throw new NoSuchElementException();
-        }
+    private Absent() {
+    }
+
+    @Override
+    public boolean isPresent() {
+      return false;
+    }
 
-        @Override
-        public boolean equals(Object o) {
-            return o instanceof Absent;
-        }
+    @Override
+    public T get() {
+      throw new NoSuchElementException();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      return o instanceof Absent;
+    }
 
-        @Override
-        public int hashCode() {
-            return 0;
-        }
+    @Override
+    public int hashCode() {
+      return 0;
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
index 2ec293c..9933019 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
@@ -28,29 +28,31 @@ import com.google.common.base.Function;
  * List of shards is obtained dynamically on call to {@link #generate(SimplifiedKinesisClient)}.
  */
 class DynamicCheckpointGenerator implements CheckpointGenerator {
-    private final String streamName;
-    private final StartingPoint startingPoint;
-
-    public DynamicCheckpointGenerator(String streamName, StartingPoint startingPoint) {
-        this.streamName = checkNotNull(streamName, "streamName");
-        this.startingPoint = checkNotNull(startingPoint, "startingPoint");
-    }
-
-    @Override
-    public KinesisReaderCheckpoint generate(SimplifiedKinesisClient kinesis)
-            throws TransientKinesisException {
-        return new KinesisReaderCheckpoint(
-                transform(kinesis.listShards(streamName), new Function<Shard, ShardCheckpoint>() {
-                    @Override
-                    public ShardCheckpoint apply(Shard shard) {
-                        return new ShardCheckpoint(streamName, shard.getShardId(), startingPoint);
-                    }
-                })
-        );
-    }
-
-    @Override
-    public String toString() {
-        return String.format("Checkpoint generator for %s: %s", streamName, startingPoint);
-    }
+
+  private final String streamName;
+  private final StartingPoint startingPoint;
+
+  public DynamicCheckpointGenerator(String streamName, StartingPoint startingPoint) {
+    this.streamName = checkNotNull(streamName, "streamName");
+    this.startingPoint = checkNotNull(startingPoint, "startingPoint");
+  }
+
+  @Override
+  public KinesisReaderCheckpoint generate(SimplifiedKinesisClient kinesis)
+      throws TransientKinesisException {
+    return new KinesisReaderCheckpoint(
+        transform(kinesis.listShards(streamName), new Function<Shard, ShardCheckpoint>() {
+
+          @Override
+          public ShardCheckpoint apply(Shard shard) {
+            return new ShardCheckpoint(streamName, shard.getShardId(), startingPoint);
+          }
+        })
+    );
+  }
+
+  @Override
+  public String toString() {
+    return String.format("Checkpoint generator for %s: %s", streamName, startingPoint);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
index 5a34d7d..f605f55 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
@@ -21,6 +21,7 @@ import static com.google.common.collect.Lists.transform;
 
 import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
 import com.google.common.base.Function;
+
 import java.util.List;
 import javax.annotation.Nullable;
 
@@ -28,27 +29,29 @@ import javax.annotation.Nullable;
  * Represents the output of 'get' operation on Kinesis stream.
  */
 class GetKinesisRecordsResult {
-    private final List<KinesisRecord> records;
-    private final String nextShardIterator;
-
-    public GetKinesisRecordsResult(List<UserRecord> records, String nextShardIterator,
-                                   final String streamName, final String shardId) {
-        this.records = transform(records, new Function<UserRecord, KinesisRecord>() {
-            @Nullable
-            @Override
-            public KinesisRecord apply(@Nullable UserRecord input) {
-                assert input != null;  // to make FindBugs happy
-                return new KinesisRecord(input, streamName, shardId);
-            }
-        });
-        this.nextShardIterator = nextShardIterator;
-    }
-
-    public List<KinesisRecord> getRecords() {
-        return records;
-    }
-
-    public String getNextShardIterator() {
-        return nextShardIterator;
-    }
+
+  private final List<KinesisRecord> records;
+  private final String nextShardIterator;
+
+  public GetKinesisRecordsResult(List<UserRecord> records, String nextShardIterator,
+      final String streamName, final String shardId) {
+    this.records = transform(records, new Function<UserRecord, KinesisRecord>() {
+
+      @Nullable
+      @Override
+      public KinesisRecord apply(@Nullable UserRecord input) {
+        assert input != null;  // to make FindBugs happy
+        return new KinesisRecord(input, streamName, shardId);
+      }
+    });
+    this.nextShardIterator = nextShardIterator;
+  }
+
+  public List<KinesisRecord> getRecords() {
+    return records;
+  }
+
+  public String getNextShardIterator() {
+    return nextShardIterator;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
index c7fd7f6..b5b721e 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io.kinesis;
 
 import com.amazonaws.services.kinesis.AmazonKinesis;
+
 import java.io.Serializable;
 
 /**
@@ -27,5 +28,6 @@ import java.io.Serializable;
  * {@link Serializable} to ensure it can be sent to worker machines.
  */
 interface KinesisClientProvider extends Serializable {
-    AmazonKinesis get();
+
+  AmazonKinesis get();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
index b85eb63..bc8ada1 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
-
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -29,7 +28,9 @@ import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.AmazonKinesisClient;
 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
 import com.google.auto.value.AutoValue;
+
 import javax.annotation.Nullable;
+
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -102,142 +103,148 @@ import org.joda.time.Instant;
  */
 @Experimental(Experimental.Kind.SOURCE_SINK)
 public final class KinesisIO {
-    /** Returns a new {@link Read} transform for reading from Kinesis. */
-    public static Read read() {
-        return new AutoValue_KinesisIO_Read.Builder().setMaxNumRecords(-1).build();
+
+  /** Returns a new {@link Read} transform for reading from Kinesis. */
+  public static Read read() {
+    return new AutoValue_KinesisIO_Read.Builder().setMaxNumRecords(-1).build();
+  }
+
+  /** Implementation of {@link #read}. */
+  @AutoValue
+  public abstract static class Read extends PTransform<PBegin, PCollection<KinesisRecord>> {
+
+    @Nullable
+    abstract String getStreamName();
+
+    @Nullable
+    abstract StartingPoint getInitialPosition();
+
+    @Nullable
+    abstract KinesisClientProvider getClientProvider();
+
+    abstract int getMaxNumRecords();
+
+    @Nullable
+    abstract Duration getMaxReadTime();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      abstract Builder setStreamName(String streamName);
+
+      abstract Builder setInitialPosition(StartingPoint startingPoint);
+
+      abstract Builder setClientProvider(KinesisClientProvider clientProvider);
+
+      abstract Builder setMaxNumRecords(int maxNumRecords);
+
+      abstract Builder setMaxReadTime(Duration maxReadTime);
+
+      abstract Read build();
     }
 
-    /** Implementation of {@link #read}. */
-    @AutoValue
-    public abstract static class Read extends PTransform<PBegin, PCollection<KinesisRecord>> {
-        @Nullable
-        abstract String getStreamName();
-
-        @Nullable
-        abstract StartingPoint getInitialPosition();
-
-        @Nullable
-        abstract KinesisClientProvider getClientProvider();
-
-        abstract int getMaxNumRecords();
-
-        @Nullable
-        abstract Duration getMaxReadTime();
-
-        abstract Builder toBuilder();
-
-        @AutoValue.Builder
-        abstract static class Builder {
-            abstract Builder setStreamName(String streamName);
-            abstract Builder setInitialPosition(StartingPoint startingPoint);
-            abstract Builder setClientProvider(KinesisClientProvider clientProvider);
-            abstract Builder setMaxNumRecords(int maxNumRecords);
-            abstract Builder setMaxReadTime(Duration maxReadTime);
-
-            abstract Read build();
-        }
-
-        /**
-         * Specify reading from streamName at some initial position.
-         */
-        public Read from(String streamName, InitialPositionInStream initialPosition) {
-            return toBuilder()
-                .setStreamName(streamName)
-                .setInitialPosition(
-                    new StartingPoint(checkNotNull(initialPosition, "initialPosition")))
-                .build();
-        }
-
-        /**
-         * Specify reading from streamName beginning at given {@link Instant}.
-         * This {@link Instant} must be in the past, i.e. before {@link Instant#now()}.
-         */
-        public Read from(String streamName, Instant initialTimestamp) {
-            return toBuilder()
-                .setStreamName(streamName)
-                .setInitialPosition(
-                    new StartingPoint(checkNotNull(initialTimestamp, "initialTimestamp")))
-                .build();
-        }
-
-        /**
-         * Allows to specify custom {@link KinesisClientProvider}.
-         * {@link KinesisClientProvider} provides {@link AmazonKinesis} instances which are later
-         * used for communication with Kinesis.
-         * You should use this method if {@link Read#withClientProvider(String, String, Regions)}
-         * does not suit your needs.
-         */
-        public Read withClientProvider(KinesisClientProvider kinesisClientProvider) {
-            return toBuilder().setClientProvider(kinesisClientProvider).build();
-        }
-
-        /**
-         * Specify credential details and region to be used to read from Kinesis.
-         * If you need more sophisticated credential protocol, then you should look at
-         * {@link Read#withClientProvider(KinesisClientProvider)}.
-         */
-        public Read withClientProvider(String awsAccessKey, String awsSecretKey, Regions region) {
-            return withClientProvider(new BasicKinesisProvider(awsAccessKey, awsSecretKey, region));
-        }
-
-        /** Specifies to read at most a given number of records. */
-        public Read withMaxNumRecords(int maxNumRecords) {
-            checkArgument(
-                maxNumRecords > 0, "maxNumRecords must be positive, but was: %s", maxNumRecords);
-            return toBuilder().setMaxNumRecords(maxNumRecords).build();
-        }
-
-        /** Specifies to read at most a given number of records. */
-        public Read withMaxReadTime(Duration maxReadTime) {
-            checkNotNull(maxReadTime, "maxReadTime");
-            return toBuilder().setMaxReadTime(maxReadTime).build();
-        }
-
-        @Override
-        public PCollection<KinesisRecord> expand(PBegin input) {
-            org.apache.beam.sdk.io.Read.Unbounded<KinesisRecord> read =
-                org.apache.beam.sdk.io.Read.from(
-                    new KinesisSource(getClientProvider(), getStreamName(), getInitialPosition()));
-            if (getMaxNumRecords() > 0) {
-                BoundedReadFromUnboundedSource<KinesisRecord> bounded =
-                    read.withMaxNumRecords(getMaxNumRecords());
-                return getMaxReadTime() == null
-                    ? input.apply(bounded)
-                    : input.apply(bounded.withMaxReadTime(getMaxReadTime()));
-            } else {
-                return getMaxReadTime() == null
-                    ? input.apply(read)
-                    : input.apply(read.withMaxReadTime(getMaxReadTime()));
-            }
-        }
-
-        private static final class BasicKinesisProvider implements KinesisClientProvider {
-
-            private final String accessKey;
-            private final String secretKey;
-            private final Regions region;
-
-            private BasicKinesisProvider(String accessKey, String secretKey, Regions region) {
-                this.accessKey = checkNotNull(accessKey, "accessKey");
-                this.secretKey = checkNotNull(secretKey, "secretKey");
-                this.region = checkNotNull(region, "region");
-            }
-
-
-            private AWSCredentialsProvider getCredentialsProvider() {
-                return new StaticCredentialsProvider(new BasicAWSCredentials(
-                        accessKey,
-                        secretKey
-                ));
-
-            }
-
-            @Override
-            public AmazonKinesis get() {
-                AmazonKinesisClient client = new AmazonKinesisClient(getCredentialsProvider());
-                client.withRegion(region);
-                return client;
-            }
-        }
+    /**
+     * Specify reading from streamName at some initial position.
+     */
+    public Read from(String streamName, InitialPositionInStream initialPosition) {
+      return toBuilder()
+          .setStreamName(streamName)
+          .setInitialPosition(
+              new StartingPoint(checkNotNull(initialPosition, "initialPosition")))
+          .build();
+    }
+
+    /**
+     * Specify reading from streamName beginning at given {@link Instant}.
+     * This {@link Instant} must be in the past, i.e. before {@link Instant#now()}.
+     */
+    public Read from(String streamName, Instant initialTimestamp) {
+      return toBuilder()
+          .setStreamName(streamName)
+          .setInitialPosition(
+              new StartingPoint(checkNotNull(initialTimestamp, "initialTimestamp")))
+          .build();
+    }
+
+    /**
+     * Allows to specify custom {@link KinesisClientProvider}.
+     * {@link KinesisClientProvider} provides {@link AmazonKinesis} instances which are later
+     * used for communication with Kinesis.
+     * You should use this method if {@link Read#withClientProvider(String, String, Regions)}
+     * does not suit your needs.
+     */
+    public Read withClientProvider(KinesisClientProvider kinesisClientProvider) {
+      return toBuilder().setClientProvider(kinesisClientProvider).build();
+    }
+
+    /**
+     * Specify credential details and region to be used to read from Kinesis.
+     * If you need more sophisticated credential protocol, then you should look at
+     * {@link Read#withClientProvider(KinesisClientProvider)}.
+     */
+    public Read withClientProvider(String awsAccessKey, String awsSecretKey, Regions region) {
+      return withClientProvider(new BasicKinesisProvider(awsAccessKey, awsSecretKey, region));
+    }
+
+    /** Specifies to read at most a given number of records. */
+    public Read withMaxNumRecords(int maxNumRecords) {
+      checkArgument(
+          maxNumRecords > 0, "maxNumRecords must be positive, but was: %s", maxNumRecords);
+      return toBuilder().setMaxNumRecords(maxNumRecords).build();
+    }
+
+    /** Specifies to read at most a given number of records. */
+    public Read withMaxReadTime(Duration maxReadTime) {
+      checkNotNull(maxReadTime, "maxReadTime");
+      return toBuilder().setMaxReadTime(maxReadTime).build();
+    }
+
+    @Override
+    public PCollection<KinesisRecord> expand(PBegin input) {
+      org.apache.beam.sdk.io.Read.Unbounded<KinesisRecord> read =
+          org.apache.beam.sdk.io.Read.from(
+              new KinesisSource(getClientProvider(), getStreamName(), getInitialPosition()));
+      if (getMaxNumRecords() > 0) {
+        BoundedReadFromUnboundedSource<KinesisRecord> bounded =
+            read.withMaxNumRecords(getMaxNumRecords());
+        return getMaxReadTime() == null
+            ? input.apply(bounded)
+            : input.apply(bounded.withMaxReadTime(getMaxReadTime()));
+      } else {
+        return getMaxReadTime() == null
+            ? input.apply(read)
+            : input.apply(read.withMaxReadTime(getMaxReadTime()));
+      }
+    }
+
+    private static final class BasicKinesisProvider implements KinesisClientProvider {
+
+      private final String accessKey;
+      private final String secretKey;
+      private final Regions region;
+
+      private BasicKinesisProvider(String accessKey, String secretKey, Regions region) {
+        this.accessKey = checkNotNull(accessKey, "accessKey");
+        this.secretKey = checkNotNull(secretKey, "secretKey");
+        this.region = checkNotNull(region, "region");
+      }
+
+      private AWSCredentialsProvider getCredentialsProvider() {
+        return new StaticCredentialsProvider(new BasicAWSCredentials(
+            accessKey,
+            secretKey
+        ));
+
+      }
+
+      @Override
+      public AmazonKinesis get() {
+        AmazonKinesisClient client = new AmazonKinesisClient(getCredentialsProvider());
+        client.withRegion(region);
+        return client;
+      }
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
index 2138094..e5c32d2 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
@@ -17,129 +17,129 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
-
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.collect.Lists.newArrayList;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.NoSuchElementException;
+
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * Reads data from multiple kinesis shards in a single thread.
  * It uses simple round robin algorithm when fetching data from shards.
  */
 class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
-    private static final Logger LOG = LoggerFactory.getLogger(KinesisReader.class);
-
-    private final SimplifiedKinesisClient kinesis;
-    private final UnboundedSource<KinesisRecord, ?> source;
-    private final CheckpointGenerator initialCheckpointGenerator;
-    private RoundRobin<ShardRecordsIterator> shardIterators;
-    private CustomOptional<KinesisRecord> currentRecord = CustomOptional.absent();
-
-    public KinesisReader(SimplifiedKinesisClient kinesis,
-                         CheckpointGenerator initialCheckpointGenerator,
-                         UnboundedSource<KinesisRecord, ?> source) {
-        this.kinesis = checkNotNull(kinesis, "kinesis");
-        this.initialCheckpointGenerator =
-                checkNotNull(initialCheckpointGenerator, "initialCheckpointGenerator");
-        this.source = source;
-    }
-
-    /**
-     * Generates initial checkpoint and instantiates iterators for shards.
-     */
-    @Override
-    public boolean start() throws IOException {
-        LOG.info("Starting reader using {}", initialCheckpointGenerator);
-
-        try {
-            KinesisReaderCheckpoint initialCheckpoint =
-                    initialCheckpointGenerator.generate(kinesis);
-            List<ShardRecordsIterator> iterators = newArrayList();
-            for (ShardCheckpoint checkpoint : initialCheckpoint) {
-                iterators.add(checkpoint.getShardRecordsIterator(kinesis));
-            }
-            shardIterators = new RoundRobin<>(iterators);
-        } catch (TransientKinesisException e) {
-            throw new IOException(e);
-        }
 
-        return advance();
+  private static final Logger LOG = LoggerFactory.getLogger(KinesisReader.class);
+
+  private final SimplifiedKinesisClient kinesis;
+  private final UnboundedSource<KinesisRecord, ?> source;
+  private final CheckpointGenerator initialCheckpointGenerator;
+  private RoundRobin<ShardRecordsIterator> shardIterators;
+  private CustomOptional<KinesisRecord> currentRecord = CustomOptional.absent();
+
+  public KinesisReader(SimplifiedKinesisClient kinesis,
+      CheckpointGenerator initialCheckpointGenerator,
+      UnboundedSource<KinesisRecord, ?> source) {
+    this.kinesis = checkNotNull(kinesis, "kinesis");
+    this.initialCheckpointGenerator =
+        checkNotNull(initialCheckpointGenerator, "initialCheckpointGenerator");
+    this.source = source;
+  }
+
+  /**
+   * Generates initial checkpoint and instantiates iterators for shards.
+   */
+  @Override
+  public boolean start() throws IOException {
+    LOG.info("Starting reader using {}", initialCheckpointGenerator);
+
+    try {
+      KinesisReaderCheckpoint initialCheckpoint =
+          initialCheckpointGenerator.generate(kinesis);
+      List<ShardRecordsIterator> iterators = newArrayList();
+      for (ShardCheckpoint checkpoint : initialCheckpoint) {
+        iterators.add(checkpoint.getShardRecordsIterator(kinesis));
+      }
+      shardIterators = new RoundRobin<>(iterators);
+    } catch (TransientKinesisException e) {
+      throw new IOException(e);
     }
 
-    /**
-     * Moves to the next record in one of the shards.
-     * If current shard iterator can be move forward (i.e. there's a record present) then we do it.
-     * If not, we iterate over shards in a round-robin manner.
-     */
-    @Override
-    public boolean advance() throws IOException {
-        try {
-            for (int i = 0; i < shardIterators.size(); ++i) {
-                currentRecord = shardIterators.getCurrent().next();
-                if (currentRecord.isPresent()) {
-                    return true;
-                } else {
-                    shardIterators.moveForward();
-                }
-            }
-        } catch (TransientKinesisException e) {
-            LOG.warn("Transient exception occurred", e);
+    return advance();
+  }
+
+  /**
+   * Moves to the next record in one of the shards.
+   * If current shard iterator can be move forward (i.e. there's a record present) then we do it.
+   * If not, we iterate over shards in a round-robin manner.
+   */
+  @Override
+  public boolean advance() throws IOException {
+    try {
+      for (int i = 0; i < shardIterators.size(); ++i) {
+        currentRecord = shardIterators.getCurrent().next();
+        if (currentRecord.isPresent()) {
+          return true;
+        } else {
+          shardIterators.moveForward();
         }
-        return false;
-    }
-
-    @Override
-    public byte[] getCurrentRecordId() throws NoSuchElementException {
-        return currentRecord.get().getUniqueId();
-    }
-
-    @Override
-    public KinesisRecord getCurrent() throws NoSuchElementException {
-        return currentRecord.get();
-    }
-
-    /**
-     * When {@link KinesisReader} was advanced to the current record.
-     * We cannot use approximate arrival timestamp given for each record by Kinesis as it
-     * is not guaranteed to be accurate - this could lead to mark some records as "late"
-     * even if they were not.
-     */
-    @Override
-    public Instant getCurrentTimestamp() throws NoSuchElementException {
-        return currentRecord.get().getReadTime();
-    }
-
-    @Override
-    public void close() throws IOException {
-    }
-
-    /**
-     * Current time.
-     * We cannot give better approximation of the watermark with current semantics of
-     * {@link KinesisReader#getCurrentTimestamp()}, because we don't know when the next
-     * {@link KinesisReader#advance()} will be called.
-     */
-    @Override
-    public Instant getWatermark() {
-        return Instant.now();
-    }
-
-    @Override
-    public UnboundedSource.CheckpointMark getCheckpointMark() {
-        return KinesisReaderCheckpoint.asCurrentStateOf(shardIterators);
-    }
-
-    @Override
-    public UnboundedSource<KinesisRecord, ?> getCurrentSource() {
-        return source;
+      }
+    } catch (TransientKinesisException e) {
+      LOG.warn("Transient exception occurred", e);
     }
+    return false;
+  }
+
+  @Override
+  public byte[] getCurrentRecordId() throws NoSuchElementException {
+    return currentRecord.get().getUniqueId();
+  }
+
+  @Override
+  public KinesisRecord getCurrent() throws NoSuchElementException {
+    return currentRecord.get();
+  }
+
+  /**
+   * When {@link KinesisReader} was advanced to the current record.
+   * We cannot use approximate arrival timestamp given for each record by Kinesis as it
+   * is not guaranteed to be accurate - this could lead to mark some records as "late"
+   * even if they were not.
+   */
+  @Override
+  public Instant getCurrentTimestamp() throws NoSuchElementException {
+    return currentRecord.get().getReadTime();
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+
+  /**
+   * Current time.
+   * We cannot give better approximation of the watermark with current semantics of
+   * {@link KinesisReader#getCurrentTimestamp()}, because we don't know when the next
+   * {@link KinesisReader#advance()} will be called.
+   */
+  @Override
+  public Instant getWatermark() {
+    return Instant.now();
+  }
+
+  @Override
+  public UnboundedSource.CheckpointMark getCheckpointMark() {
+    return KinesisReaderCheckpoint.asCurrentStateOf(shardIterators);
+  }
+
+  @Override
+  public UnboundedSource<KinesisRecord, ?> getCurrentSource() {
+    return source;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
index f0fa45d..d995e75 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
@@ -23,11 +23,13 @@ import static com.google.common.collect.Lists.partition;
 
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Iterator;
 import java.util.List;
 import javax.annotation.Nullable;
+
 import org.apache.beam.sdk.io.UnboundedSource;
 
 /**
@@ -37,60 +39,61 @@ import org.apache.beam.sdk.io.UnboundedSource;
  * This class is immutable.
  */
 class KinesisReaderCheckpoint implements Iterable<ShardCheckpoint>, UnboundedSource
-        .CheckpointMark, Serializable {
-    private final List<ShardCheckpoint> shardCheckpoints;
+    .CheckpointMark, Serializable {
 
-    public KinesisReaderCheckpoint(Iterable<ShardCheckpoint> shardCheckpoints) {
-        this.shardCheckpoints = ImmutableList.copyOf(shardCheckpoints);
-    }
+  private final List<ShardCheckpoint> shardCheckpoints;
 
-    public static KinesisReaderCheckpoint asCurrentStateOf(Iterable<ShardRecordsIterator>
-                                                                   iterators) {
-        return new KinesisReaderCheckpoint(transform(iterators,
-                new Function<ShardRecordsIterator, ShardCheckpoint>() {
-
-                    @Nullable
-                    @Override
-                    public ShardCheckpoint apply(@Nullable
-                                                 ShardRecordsIterator shardRecordsIterator) {
-                        assert shardRecordsIterator != null;
-                        return shardRecordsIterator.getCheckpoint();
-                    }
-                }));
-    }
+  public KinesisReaderCheckpoint(Iterable<ShardCheckpoint> shardCheckpoints) {
+    this.shardCheckpoints = ImmutableList.copyOf(shardCheckpoints);
+  }
 
-    /**
-     * Splits given multi-shard checkpoint into partitions of approximately equal size.
-     *
-     * @param desiredNumSplits - upper limit for number of partitions to generate.
-     * @return list of checkpoints covering consecutive partitions of current checkpoint.
-     */
-    public List<KinesisReaderCheckpoint> splitInto(int desiredNumSplits) {
-        int partitionSize = divideAndRoundUp(shardCheckpoints.size(), desiredNumSplits);
-
-        List<KinesisReaderCheckpoint> checkpoints = newArrayList();
-        for (List<ShardCheckpoint> shardPartition : partition(shardCheckpoints, partitionSize)) {
-            checkpoints.add(new KinesisReaderCheckpoint(shardPartition));
-        }
-        return checkpoints;
-    }
+  public static KinesisReaderCheckpoint asCurrentStateOf(Iterable<ShardRecordsIterator>
+      iterators) {
+    return new KinesisReaderCheckpoint(transform(iterators,
+        new Function<ShardRecordsIterator, ShardCheckpoint>() {
 
-    private int divideAndRoundUp(int nominator, int denominator) {
-        return (nominator + denominator - 1) / denominator;
-    }
+          @Nullable
+          @Override
+          public ShardCheckpoint apply(@Nullable
+              ShardRecordsIterator shardRecordsIterator) {
+            assert shardRecordsIterator != null;
+            return shardRecordsIterator.getCheckpoint();
+          }
+        }));
+  }
 
-    @Override
-    public void finalizeCheckpoint() throws IOException {
+  /**
+   * Splits given multi-shard checkpoint into partitions of approximately equal size.
+   *
+   * @param desiredNumSplits - upper limit for number of partitions to generate.
+   * @return list of checkpoints covering consecutive partitions of current checkpoint.
+   */
+  public List<KinesisReaderCheckpoint> splitInto(int desiredNumSplits) {
+    int partitionSize = divideAndRoundUp(shardCheckpoints.size(), desiredNumSplits);
 
+    List<KinesisReaderCheckpoint> checkpoints = newArrayList();
+    for (List<ShardCheckpoint> shardPartition : partition(shardCheckpoints, partitionSize)) {
+      checkpoints.add(new KinesisReaderCheckpoint(shardPartition));
     }
+    return checkpoints;
+  }
 
-    @Override
-    public String toString() {
-        return shardCheckpoints.toString();
-    }
+  private int divideAndRoundUp(int nominator, int denominator) {
+    return (nominator + denominator - 1) / denominator;
+  }
 
-    @Override
-    public Iterator<ShardCheckpoint> iterator() {
-        return shardCheckpoints.iterator();
-    }
+  @Override
+  public void finalizeCheckpoint() throws IOException {
+
+  }
+
+  @Override
+  public String toString() {
+    return shardCheckpoints.toString();
+  }
+
+  @Override
+  public Iterator<ShardCheckpoint> iterator() {
+    return shardCheckpoints.iterator();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
index 02b5370..057b7bb 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
@@ -22,7 +22,9 @@ import static org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode
 import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
 import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
 import com.google.common.base.Charsets;
+
 import java.nio.ByteBuffer;
+
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.joda.time.Instant;
 
@@ -30,91 +32,92 @@ import org.joda.time.Instant;
  * {@link UserRecord} enhanced with utility methods.
  */
 public class KinesisRecord {
-    private Instant readTime;
-    private String streamName;
-    private String shardId;
-    private long subSequenceNumber;
-    private String sequenceNumber;
-    private Instant approximateArrivalTimestamp;
-    private ByteBuffer data;
-    private String partitionKey;
-
-    public KinesisRecord(UserRecord record, String streamName, String shardId) {
-        this(record.getData(), record.getSequenceNumber(), record.getSubSequenceNumber(),
-                record.getPartitionKey(),
-                new Instant(record.getApproximateArrivalTimestamp()),
-                Instant.now(),
-                streamName, shardId);
-    }
-
-    public KinesisRecord(ByteBuffer data, String sequenceNumber, long subSequenceNumber,
-                         String partitionKey, Instant approximateArrivalTimestamp,
-                         Instant readTime,
-                         String streamName, String shardId) {
-        this.data = data;
-        this.sequenceNumber = sequenceNumber;
-        this.subSequenceNumber = subSequenceNumber;
-        this.partitionKey = partitionKey;
-        this.approximateArrivalTimestamp = approximateArrivalTimestamp;
-        this.readTime = readTime;
-        this.streamName = streamName;
-        this.shardId = shardId;
-    }
-
-    public ExtendedSequenceNumber getExtendedSequenceNumber() {
-        return new ExtendedSequenceNumber(getSequenceNumber(), getSubSequenceNumber());
-    }
-
-    /***
-     * @return unique id of the record based on its position in the stream
-     */
-    public byte[] getUniqueId() {
-        return getExtendedSequenceNumber().toString().getBytes(Charsets.UTF_8);
-    }
-
-    public Instant getReadTime() {
-        return readTime;
-    }
-
-    public String getStreamName() {
-        return streamName;
-    }
-
-    public String getShardId() {
-        return shardId;
-    }
-
-    public byte[] getDataAsBytes() {
-        return getData().array();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        return EqualsBuilder.reflectionEquals(this, obj);
-    }
-
-    @Override
-    public int hashCode() {
-        return reflectionHashCode(this);
-    }
-
-    public long getSubSequenceNumber() {
-        return subSequenceNumber;
-    }
-
-    public String getSequenceNumber() {
-        return sequenceNumber;
-    }
-
-    public Instant getApproximateArrivalTimestamp() {
-        return approximateArrivalTimestamp;
-    }
-
-    public ByteBuffer getData() {
-        return data;
-    }
-
-    public String getPartitionKey() {
-        return partitionKey;
-    }
+
+  private Instant readTime;
+  private String streamName;
+  private String shardId;
+  private long subSequenceNumber;
+  private String sequenceNumber;
+  private Instant approximateArrivalTimestamp;
+  private ByteBuffer data;
+  private String partitionKey;
+
+  public KinesisRecord(UserRecord record, String streamName, String shardId) {
+    this(record.getData(), record.getSequenceNumber(), record.getSubSequenceNumber(),
+        record.getPartitionKey(),
+        new Instant(record.getApproximateArrivalTimestamp()),
+        Instant.now(),
+        streamName, shardId);
+  }
+
+  public KinesisRecord(ByteBuffer data, String sequenceNumber, long subSequenceNumber,
+      String partitionKey, Instant approximateArrivalTimestamp,
+      Instant readTime,
+      String streamName, String shardId) {
+    this.data = data;
+    this.sequenceNumber = sequenceNumber;
+    this.subSequenceNumber = subSequenceNumber;
+    this.partitionKey = partitionKey;
+    this.approximateArrivalTimestamp = approximateArrivalTimestamp;
+    this.readTime = readTime;
+    this.streamName = streamName;
+    this.shardId = shardId;
+  }
+
+  public ExtendedSequenceNumber getExtendedSequenceNumber() {
+    return new ExtendedSequenceNumber(getSequenceNumber(), getSubSequenceNumber());
+  }
+
+  /***
+   * @return unique id of the record based on its position in the stream
+   */
+  public byte[] getUniqueId() {
+    return getExtendedSequenceNumber().toString().getBytes(Charsets.UTF_8);
+  }
+
+  public Instant getReadTime() {
+    return readTime;
+  }
+
+  public String getStreamName() {
+    return streamName;
+  }
+
+  public String getShardId() {
+    return shardId;
+  }
+
+  public byte[] getDataAsBytes() {
+    return getData().array();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return EqualsBuilder.reflectionEquals(this, obj);
+  }
+
+  @Override
+  public int hashCode() {
+    return reflectionHashCode(this);
+  }
+
+  public long getSubSequenceNumber() {
+    return subSequenceNumber;
+  }
+
+  public String getSequenceNumber() {
+    return sequenceNumber;
+  }
+
+  public Instant getApproximateArrivalTimestamp() {
+    return approximateArrivalTimestamp;
+  }
+
+  public ByteBuffer getData() {
+    return data;
+  }
+
+  public String getPartitionKey() {
+    return partitionKey;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
index f233e27..dcf564d 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
@@ -33,40 +34,41 @@ import org.joda.time.Instant;
  * A {@link Coder} for {@link KinesisRecord}.
  */
 class KinesisRecordCoder extends AtomicCoder<KinesisRecord> {
-    private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
-    private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
-    private static final InstantCoder INSTANT_CODER = InstantCoder.of();
-    private static final VarLongCoder VAR_LONG_CODER = VarLongCoder.of();
 
-    public static KinesisRecordCoder of() {
-        return new KinesisRecordCoder();
-    }
+  private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
+  private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
+  private static final InstantCoder INSTANT_CODER = InstantCoder.of();
+  private static final VarLongCoder VAR_LONG_CODER = VarLongCoder.of();
+
+  public static KinesisRecordCoder of() {
+    return new KinesisRecordCoder();
+  }
 
-    @Override
-    public void encode(KinesisRecord value, OutputStream outStream) throws
-            IOException {
-        BYTE_ARRAY_CODER.encode(value.getData().array(), outStream);
-        STRING_CODER.encode(value.getSequenceNumber(), outStream);
-        STRING_CODER.encode(value.getPartitionKey(), outStream);
-        INSTANT_CODER.encode(value.getApproximateArrivalTimestamp(), outStream);
-        VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream);
-        INSTANT_CODER.encode(value.getReadTime(), outStream);
-        STRING_CODER.encode(value.getStreamName(), outStream);
-        STRING_CODER.encode(value.getShardId(), outStream);
-    }
+  @Override
+  public void encode(KinesisRecord value, OutputStream outStream) throws
+      IOException {
+    BYTE_ARRAY_CODER.encode(value.getData().array(), outStream);
+    STRING_CODER.encode(value.getSequenceNumber(), outStream);
+    STRING_CODER.encode(value.getPartitionKey(), outStream);
+    INSTANT_CODER.encode(value.getApproximateArrivalTimestamp(), outStream);
+    VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream);
+    INSTANT_CODER.encode(value.getReadTime(), outStream);
+    STRING_CODER.encode(value.getStreamName(), outStream);
+    STRING_CODER.encode(value.getShardId(), outStream);
+  }
 
-    @Override
-    public KinesisRecord decode(InputStream inStream) throws IOException {
-        ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream));
-        String sequenceNumber = STRING_CODER.decode(inStream);
-        String partitionKey = STRING_CODER.decode(inStream);
-        Instant approximateArrivalTimestamp = INSTANT_CODER.decode(inStream);
-        long subSequenceNumber = VAR_LONG_CODER.decode(inStream);
-        Instant readTimestamp = INSTANT_CODER.decode(inStream);
-        String streamName = STRING_CODER.decode(inStream);
-        String shardId = STRING_CODER.decode(inStream);
-        return new KinesisRecord(data, sequenceNumber, subSequenceNumber, partitionKey,
-                approximateArrivalTimestamp, readTimestamp, streamName, shardId
-        );
-    }
+  @Override
+  public KinesisRecord decode(InputStream inStream) throws IOException {
+    ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream));
+    String sequenceNumber = STRING_CODER.decode(inStream);
+    String partitionKey = STRING_CODER.decode(inStream);
+    Instant approximateArrivalTimestamp = INSTANT_CODER.decode(inStream);
+    long subSequenceNumber = VAR_LONG_CODER.decode(inStream);
+    Instant readTimestamp = INSTANT_CODER.decode(inStream);
+    String streamName = STRING_CODER.decode(inStream);
+    String shardId = STRING_CODER.decode(inStream);
+    return new KinesisRecord(data, sequenceNumber, subSequenceNumber, partitionKey,
+        approximateArrivalTimestamp, readTimestamp, streamName, shardId
+    );
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
index 7e67d07..362792b 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.collect.Lists.newArrayList;
 
 import java.util.List;
+
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
@@ -28,85 +29,85 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * Represents source for single stream in Kinesis.
  */
 class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoint> {
-    private static final Logger LOG = LoggerFactory.getLogger(KinesisSource.class);
-
-    private final KinesisClientProvider kinesis;
-    private CheckpointGenerator initialCheckpointGenerator;
 
-    public KinesisSource(KinesisClientProvider kinesis, String streamName,
-                         StartingPoint startingPoint) {
-        this(kinesis, new DynamicCheckpointGenerator(streamName, startingPoint));
+  private static final Logger LOG = LoggerFactory.getLogger(KinesisSource.class);
+
+  private final KinesisClientProvider kinesis;
+  private CheckpointGenerator initialCheckpointGenerator;
+
+  public KinesisSource(KinesisClientProvider kinesis, String streamName,
+      StartingPoint startingPoint) {
+    this(kinesis, new DynamicCheckpointGenerator(streamName, startingPoint));
+  }
+
+  private KinesisSource(KinesisClientProvider kinesisClientProvider,
+      CheckpointGenerator initialCheckpoint) {
+    this.kinesis = kinesisClientProvider;
+    this.initialCheckpointGenerator = initialCheckpoint;
+    validate();
+  }
+
+  /**
+   * Generate splits for reading from the stream.
+   * Basically, it'll try to evenly split set of shards in the stream into
+   * {@code desiredNumSplits} partitions. Each partition is then a split.
+   */
+  @Override
+  public List<KinesisSource> split(int desiredNumSplits,
+      PipelineOptions options) throws Exception {
+    KinesisReaderCheckpoint checkpoint =
+        initialCheckpointGenerator.generate(SimplifiedKinesisClient.from(kinesis));
+
+    List<KinesisSource> sources = newArrayList();
+
+    for (KinesisReaderCheckpoint partition : checkpoint.splitInto(desiredNumSplits)) {
+      sources.add(new KinesisSource(
+          kinesis,
+          new StaticCheckpointGenerator(partition)));
     }
-
-    private KinesisSource(KinesisClientProvider kinesisClientProvider,
-                          CheckpointGenerator initialCheckpoint) {
-        this.kinesis = kinesisClientProvider;
-        this.initialCheckpointGenerator = initialCheckpoint;
-        validate();
+    return sources;
+  }
+
+  /**
+   * Creates reader based on given {@link KinesisReaderCheckpoint}.
+   * If {@link KinesisReaderCheckpoint} is not given, then we use
+   * {@code initialCheckpointGenerator} to generate new checkpoint.
+   */
+  @Override
+  public UnboundedReader<KinesisRecord> createReader(PipelineOptions options,
+      KinesisReaderCheckpoint checkpointMark) {
+
+    CheckpointGenerator checkpointGenerator = initialCheckpointGenerator;
+
+    if (checkpointMark != null) {
+      checkpointGenerator = new StaticCheckpointGenerator(checkpointMark);
     }
 
-    /**
-     * Generate splits for reading from the stream.
-     * Basically, it'll try to evenly split set of shards in the stream into
-     * {@code desiredNumSplits} partitions. Each partition is then a split.
-     */
-    @Override
-    public List<KinesisSource> split(int desiredNumSplits,
-                                                     PipelineOptions options) throws Exception {
-        KinesisReaderCheckpoint checkpoint =
-                initialCheckpointGenerator.generate(SimplifiedKinesisClient.from(kinesis));
-
-        List<KinesisSource> sources = newArrayList();
-
-        for (KinesisReaderCheckpoint partition : checkpoint.splitInto(desiredNumSplits)) {
-            sources.add(new KinesisSource(
-                    kinesis,
-                    new StaticCheckpointGenerator(partition)));
-        }
-        return sources;
-    }
-
-    /**
-     * Creates reader based on given {@link KinesisReaderCheckpoint}.
-     * If {@link KinesisReaderCheckpoint} is not given, then we use
-     * {@code initialCheckpointGenerator} to generate new checkpoint.
-     */
-    @Override
-    public UnboundedReader<KinesisRecord> createReader(PipelineOptions options,
-                                                KinesisReaderCheckpoint checkpointMark) {
-
-        CheckpointGenerator checkpointGenerator = initialCheckpointGenerator;
-
-        if (checkpointMark != null) {
-            checkpointGenerator = new StaticCheckpointGenerator(checkpointMark);
-        }
-
-        LOG.info("Creating new reader using {}", checkpointGenerator);
-
-        return new KinesisReader(
-                SimplifiedKinesisClient.from(kinesis),
-                checkpointGenerator,
-                this);
-    }
-
-    @Override
-    public Coder<KinesisReaderCheckpoint> getCheckpointMarkCoder() {
-        return SerializableCoder.of(KinesisReaderCheckpoint.class);
-    }
-
-    @Override
-    public void validate() {
-        checkNotNull(kinesis);
-        checkNotNull(initialCheckpointGenerator);
-    }
-
-    @Override
-    public Coder<KinesisRecord> getDefaultOutputCoder() {
-        return KinesisRecordCoder.of();
-    }
+    LOG.info("Creating new reader using {}", checkpointGenerator);
+
+    return new KinesisReader(
+        SimplifiedKinesisClient.from(kinesis),
+        checkpointGenerator,
+        this);
+  }
+
+  @Override
+  public Coder<KinesisReaderCheckpoint> getCheckpointMarkCoder() {
+    return SerializableCoder.of(KinesisReaderCheckpoint.class);
+  }
+
+  @Override
+  public void validate() {
+    checkNotNull(kinesis);
+    checkNotNull(initialCheckpointGenerator);
+  }
+
+  @Override
+  public Coder<KinesisRecord> getDefaultOutputCoder() {
+    return KinesisRecordCoder.of();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java
index 40e65fc..eca725c 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java
@@ -21,7 +21,6 @@ import static com.google.common.collect.Lists.newArrayList;
 
 import java.util.List;
 
-
 /**
  * Filters out records, which were already processed and checkpointed.
  *
@@ -29,13 +28,14 @@ import java.util.List;
  * accuracy, not with "subSequenceNumber" accuracy.
  */
 class RecordFilter {
-    public List<KinesisRecord> apply(List<KinesisRecord> records, ShardCheckpoint checkpoint) {
-        List<KinesisRecord> filteredRecords = newArrayList();
-        for (KinesisRecord record : records) {
-            if (checkpoint.isBeforeOrAt(record)) {
-                filteredRecords.add(record);
-            }
-        }
-        return filteredRecords;
+
+  public List<KinesisRecord> apply(List<KinesisRecord> records, ShardCheckpoint checkpoint) {
+    List<KinesisRecord> filteredRecords = newArrayList();
+    for (KinesisRecord record : records) {
+      if (checkpoint.isBeforeOrAt(record)) {
+        filteredRecords.add(record);
+      }
     }
+    return filteredRecords;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
index e4ff541..806d982 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
@@ -27,27 +27,28 @@ import java.util.Iterator;
  * Very simple implementation of round robin algorithm.
  */
 class RoundRobin<T> implements Iterable<T> {
-    private final Deque<T> deque;
 
-    public RoundRobin(Iterable<T> collection) {
-        this.deque = newArrayDeque(collection);
-        checkArgument(!deque.isEmpty(), "Tried to initialize RoundRobin with empty collection");
-    }
+  private final Deque<T> deque;
 
-    public T getCurrent() {
-        return deque.getFirst();
-    }
+  public RoundRobin(Iterable<T> collection) {
+    this.deque = newArrayDeque(collection);
+    checkArgument(!deque.isEmpty(), "Tried to initialize RoundRobin with empty collection");
+  }
 
-    public void moveForward() {
-        deque.addLast(deque.removeFirst());
-    }
+  public T getCurrent() {
+    return deque.getFirst();
+  }
 
-    public int size() {
-        return deque.size();
-    }
+  public void moveForward() {
+    deque.addLast(deque.removeFirst());
+  }
 
-    @Override
-    public Iterator<T> iterator() {
-        return deque.iterator();
-    }
+  public int size() {
+    return deque.size();
+  }
+
+  @Override
+  public Iterator<T> iterator() {
+    return deque.iterator();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
index 6aa3504..95f97b8 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
-
 import static com.amazonaws.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER;
 import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER;
 import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
@@ -27,9 +26,10 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
 import com.amazonaws.services.kinesis.model.Record;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
+
 import java.io.Serializable;
-import org.joda.time.Instant;
 
+import org.joda.time.Instant;
 
 /**
  * Checkpoint mark for single shard in the stream.
@@ -45,131 +45,132 @@ import org.joda.time.Instant;
  * This class is immutable.
  */
 class ShardCheckpoint implements Serializable {
-    private final String streamName;
-    private final String shardId;
-    private final String sequenceNumber;
-    private final ShardIteratorType shardIteratorType;
-    private final Long subSequenceNumber;
-    private final Instant timestamp;
-
-    public ShardCheckpoint(String streamName, String shardId, StartingPoint
-            startingPoint) {
-        this(streamName, shardId,
-                ShardIteratorType.fromValue(startingPoint.getPositionName()),
-                startingPoint.getTimestamp());
-    }
-
-    public ShardCheckpoint(String streamName, String shardId, ShardIteratorType
-            shardIteratorType, Instant timestamp) {
-        this(streamName, shardId, shardIteratorType, null, null, timestamp);
-    }
-
-    public ShardCheckpoint(String streamName, String shardId, ShardIteratorType
-            shardIteratorType, String sequenceNumber, Long subSequenceNumber) {
-        this(streamName, shardId, shardIteratorType, sequenceNumber, subSequenceNumber, null);
-    }
-
-    private ShardCheckpoint(String streamName, String shardId, ShardIteratorType shardIteratorType,
-                            String sequenceNumber, Long subSequenceNumber, Instant timestamp) {
-        this.shardIteratorType = checkNotNull(shardIteratorType, "shardIteratorType");
-        this.streamName = checkNotNull(streamName, "streamName");
-        this.shardId = checkNotNull(shardId, "shardId");
-        if (shardIteratorType == AT_SEQUENCE_NUMBER || shardIteratorType == AFTER_SEQUENCE_NUMBER) {
-            checkNotNull(sequenceNumber,
-                    "You must provide sequence number for AT_SEQUENCE_NUMBER"
-                            + " or AFTER_SEQUENCE_NUMBER");
-        } else {
-            checkArgument(sequenceNumber == null,
-                    "Sequence number must be null for LATEST, TRIM_HORIZON or AT_TIMESTAMP");
-        }
-        if (shardIteratorType == AT_TIMESTAMP) {
-            checkNotNull(timestamp,
-                    "You must provide timestamp for AT_SEQUENCE_NUMBER"
-                            + " or AFTER_SEQUENCE_NUMBER");
-        } else {
-            checkArgument(timestamp == null,
-                    "Timestamp must be null for an iterator type other than AT_TIMESTAMP");
-        }
-
-        this.subSequenceNumber = subSequenceNumber;
-        this.sequenceNumber = sequenceNumber;
-        this.timestamp = timestamp;
-    }
-
-    /**
-     * Used to compare {@link ShardCheckpoint} object to {@link KinesisRecord}. Depending
-     * on the the underlying shardIteratorType, it will either compare the timestamp or the
-     * {@link ExtendedSequenceNumber}.
-     *
-     * @param other
-     * @return if current checkpoint mark points before or at given {@link ExtendedSequenceNumber}
-     */
-    public boolean isBeforeOrAt(KinesisRecord other) {
-        if (shardIteratorType == AT_TIMESTAMP) {
-            return timestamp.compareTo(other.getApproximateArrivalTimestamp()) <= 0;
-        }
-        int result = extendedSequenceNumber().compareTo(other.getExtendedSequenceNumber());
-        if (result == 0) {
-            return shardIteratorType == AT_SEQUENCE_NUMBER;
-        }
-        return result < 0;
-    }
-
-    private ExtendedSequenceNumber extendedSequenceNumber() {
-        String fullSequenceNumber = sequenceNumber;
-        if (fullSequenceNumber == null) {
-            fullSequenceNumber = shardIteratorType.toString();
-        }
-        return new ExtendedSequenceNumber(fullSequenceNumber, subSequenceNumber);
-    }
 
-    @Override
-    public String toString() {
-        return String.format("Checkpoint %s for stream %s, shard %s: %s", shardIteratorType,
-                streamName, shardId,
-                sequenceNumber);
+  private final String streamName;
+  private final String shardId;
+  private final String sequenceNumber;
+  private final ShardIteratorType shardIteratorType;
+  private final Long subSequenceNumber;
+  private final Instant timestamp;
+
+  public ShardCheckpoint(String streamName, String shardId, StartingPoint
+      startingPoint) {
+    this(streamName, shardId,
+        ShardIteratorType.fromValue(startingPoint.getPositionName()),
+        startingPoint.getTimestamp());
+  }
+
+  public ShardCheckpoint(String streamName, String shardId, ShardIteratorType
+      shardIteratorType, Instant timestamp) {
+    this(streamName, shardId, shardIteratorType, null, null, timestamp);
+  }
+
+  public ShardCheckpoint(String streamName, String shardId, ShardIteratorType
+      shardIteratorType, String sequenceNumber, Long subSequenceNumber) {
+    this(streamName, shardId, shardIteratorType, sequenceNumber, subSequenceNumber, null);
+  }
+
+  private ShardCheckpoint(String streamName, String shardId, ShardIteratorType shardIteratorType,
+      String sequenceNumber, Long subSequenceNumber, Instant timestamp) {
+    this.shardIteratorType = checkNotNull(shardIteratorType, "shardIteratorType");
+    this.streamName = checkNotNull(streamName, "streamName");
+    this.shardId = checkNotNull(shardId, "shardId");
+    if (shardIteratorType == AT_SEQUENCE_NUMBER || shardIteratorType == AFTER_SEQUENCE_NUMBER) {
+      checkNotNull(sequenceNumber,
+          "You must provide sequence number for AT_SEQUENCE_NUMBER"
+              + " or AFTER_SEQUENCE_NUMBER");
+    } else {
+      checkArgument(sequenceNumber == null,
+          "Sequence number must be null for LATEST, TRIM_HORIZON or AT_TIMESTAMP");
     }
-
-    public ShardRecordsIterator getShardRecordsIterator(SimplifiedKinesisClient kinesis)
-            throws TransientKinesisException {
-        return new ShardRecordsIterator(this, kinesis);
+    if (shardIteratorType == AT_TIMESTAMP) {
+      checkNotNull(timestamp,
+          "You must provide timestamp for AT_SEQUENCE_NUMBER"
+              + " or AFTER_SEQUENCE_NUMBER");
+    } else {
+      checkArgument(timestamp == null,
+          "Timestamp must be null for an iterator type other than AT_TIMESTAMP");
     }
 
-    public String getShardIterator(SimplifiedKinesisClient kinesisClient)
-            throws TransientKinesisException {
-        if (checkpointIsInTheMiddleOfAUserRecord()) {
-            return kinesisClient.getShardIterator(streamName,
-                    shardId, AT_SEQUENCE_NUMBER,
-                    sequenceNumber, null);
-        }
-        return kinesisClient.getShardIterator(streamName,
-                shardId, shardIteratorType,
-                sequenceNumber, timestamp);
+    this.subSequenceNumber = subSequenceNumber;
+    this.sequenceNumber = sequenceNumber;
+    this.timestamp = timestamp;
+  }
+
+  /**
+   * Used to compare {@link ShardCheckpoint} object to {@link KinesisRecord}. Depending
+   * on the the underlying shardIteratorType, it will either compare the timestamp or the
+   * {@link ExtendedSequenceNumber}.
+   *
+   * @param other
+   * @return if current checkpoint mark points before or at given {@link ExtendedSequenceNumber}
+   */
+  public boolean isBeforeOrAt(KinesisRecord other) {
+    if (shardIteratorType == AT_TIMESTAMP) {
+      return timestamp.compareTo(other.getApproximateArrivalTimestamp()) <= 0;
     }
-
-    private boolean checkpointIsInTheMiddleOfAUserRecord() {
-        return shardIteratorType == AFTER_SEQUENCE_NUMBER && subSequenceNumber != null;
+    int result = extendedSequenceNumber().compareTo(other.getExtendedSequenceNumber());
+    if (result == 0) {
+      return shardIteratorType == AT_SEQUENCE_NUMBER;
     }
+    return result < 0;
+  }
 
-    /**
-     * Used to advance checkpoint mark to position after given {@link Record}.
-     *
-     * @param record
-     * @return new checkpoint object pointing directly after given {@link Record}
-     */
-    public ShardCheckpoint moveAfter(KinesisRecord record) {
-        return new ShardCheckpoint(
-                streamName, shardId,
-                AFTER_SEQUENCE_NUMBER,
-                record.getSequenceNumber(),
-                record.getSubSequenceNumber());
+  private ExtendedSequenceNumber extendedSequenceNumber() {
+    String fullSequenceNumber = sequenceNumber;
+    if (fullSequenceNumber == null) {
+      fullSequenceNumber = shardIteratorType.toString();
     }
-
-    public String getStreamName() {
-        return streamName;
-    }
-
-    public String getShardId() {
-        return shardId;
+    return new ExtendedSequenceNumber(fullSequenceNumber, subSequenceNumber);
+  }
+
+  @Override
+  public String toString() {
+    return String.format("Checkpoint %s for stream %s, shard %s: %s", shardIteratorType,
+        streamName, shardId,
+        sequenceNumber);
+  }
+
+  public ShardRecordsIterator getShardRecordsIterator(SimplifiedKinesisClient kinesis)
+      throws TransientKinesisException {
+    return new ShardRecordsIterator(this, kinesis);
+  }
+
+  public String getShardIterator(SimplifiedKinesisClient kinesisClient)
+      throws TransientKinesisException {
+    if (checkpointIsInTheMiddleOfAUserRecord()) {
+      return kinesisClient.getShardIterator(streamName,
+          shardId, AT_SEQUENCE_NUMBER,
+          sequenceNumber, null);
     }
+    return kinesisClient.getShardIterator(streamName,
+        shardId, shardIteratorType,
+        sequenceNumber, timestamp);
+  }
+
+  private boolean checkpointIsInTheMiddleOfAUserRecord() {
+    return shardIteratorType == AFTER_SEQUENCE_NUMBER && subSequenceNumber != null;
+  }
+
+  /**
+   * Used to advance checkpoint mark to position after given {@link Record}.
+   *
+   * @param record
+   * @return new checkpoint object pointing directly after given {@link Record}
+   */
+  public ShardCheckpoint moveAfter(KinesisRecord record) {
+    return new ShardCheckpoint(
+        streamName, shardId,
+        AFTER_SEQUENCE_NUMBER,
+        record.getSequenceNumber(),
+        record.getSubSequenceNumber());
+  }
+
+  public String getStreamName() {
+    return streamName;
+  }
+
+  public String getShardId() {
+    return shardId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
index 872f604..a69c6c1 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
@@ -21,7 +21,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.collect.Queues.newArrayDeque;
 
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+
 import java.util.Deque;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,68 +33,68 @@ import org.slf4j.LoggerFactory;
  * Then the caller of {@link ShardRecordsIterator#next()} can read from queue one by one.
  */
 class ShardRecordsIterator {
-    private static final Logger LOG = LoggerFactory.getLogger(ShardRecordsIterator.class);
 
-    private final SimplifiedKinesisClient kinesis;
-    private final RecordFilter filter;
-    private ShardCheckpoint checkpoint;
-    private String shardIterator;
-    private Deque<KinesisRecord> data = newArrayDeque();
+  private static final Logger LOG = LoggerFactory.getLogger(ShardRecordsIterator.class);
 
-    public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
-                                SimplifiedKinesisClient simplifiedKinesisClient) throws
-            TransientKinesisException {
-        this(initialCheckpoint, simplifiedKinesisClient, new RecordFilter());
-    }
+  private final SimplifiedKinesisClient kinesis;
+  private final RecordFilter filter;
+  private ShardCheckpoint checkpoint;
+  private String shardIterator;
+  private Deque<KinesisRecord> data = newArrayDeque();
 
-    public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
-                                SimplifiedKinesisClient simplifiedKinesisClient,
-                                RecordFilter filter) throws
-            TransientKinesisException {
+  public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
+      SimplifiedKinesisClient simplifiedKinesisClient) throws
+      TransientKinesisException {
+    this(initialCheckpoint, simplifiedKinesisClient, new RecordFilter());
+  }
 
-        this.checkpoint = checkNotNull(initialCheckpoint, "initialCheckpoint");
-        this.filter = checkNotNull(filter, "filter");
-        this.kinesis = checkNotNull(simplifiedKinesisClient, "simplifiedKinesisClient");
-        shardIterator = checkpoint.getShardIterator(kinesis);
-    }
+  public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
+      SimplifiedKinesisClient simplifiedKinesisClient,
+      RecordFilter filter) throws
+      TransientKinesisException {
 
-    /**
-     * Returns record if there's any present.
-     * Returns absent() if there are no new records at this time in the shard.
-     */
-    public CustomOptional<KinesisRecord> next() throws TransientKinesisException {
-        readMoreIfNecessary();
+    this.checkpoint = checkNotNull(initialCheckpoint, "initialCheckpoint");
+    this.filter = checkNotNull(filter, "filter");
+    this.kinesis = checkNotNull(simplifiedKinesisClient, "simplifiedKinesisClient");
+    shardIterator = checkpoint.getShardIterator(kinesis);
+  }
 
-        if (data.isEmpty()) {
-            return CustomOptional.absent();
-        } else {
-            KinesisRecord record = data.removeFirst();
-            checkpoint = checkpoint.moveAfter(record);
-            return CustomOptional.of(record);
-        }
-    }
+  /**
+   * Returns record if there's any present.
+   * Returns absent() if there are no new records at this time in the shard.
+   */
+  public CustomOptional<KinesisRecord> next() throws TransientKinesisException {
+    readMoreIfNecessary();
 
-    private void readMoreIfNecessary() throws TransientKinesisException {
-        if (data.isEmpty()) {
-            GetKinesisRecordsResult response;
-            try {
-                response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(),
-                        checkpoint.getShardId());
-            } catch (ExpiredIteratorException e) {
-                LOG.info("Refreshing expired iterator", e);
-                shardIterator = checkpoint.getShardIterator(kinesis);
-                response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(),
-                        checkpoint.getShardId());
-            }
-            LOG.debug("Fetched {} new records", response.getRecords().size());
-            shardIterator = response.getNextShardIterator();
-            data.addAll(filter.apply(response.getRecords(), checkpoint));
-        }
+    if (data.isEmpty()) {
+      return CustomOptional.absent();
+    } else {
+      KinesisRecord record = data.removeFirst();
+      checkpoint = checkpoint.moveAfter(record);
+      return CustomOptional.of(record);
     }
+  }
 
-    public ShardCheckpoint getCheckpoint() {
-        return checkpoint;
+  private void readMoreIfNecessary() throws TransientKinesisException {
+    if (data.isEmpty()) {
+      GetKinesisRecordsResult response;
+      try {
+        response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(),
+            checkpoint.getShardId());
+      } catch (ExpiredIteratorException e) {
+        LOG.info("Refreshing expired iterator", e);
+        shardIterator = checkpoint.getShardIterator(kinesis);
+        response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(),
+            checkpoint.getShardId());
+      }
+      LOG.debug("Fetched {} new records", response.getRecords().size());
+      shardIterator = response.getNextShardIterator();
+      data.addAll(filter.apply(response.getRecords(), checkpoint));
     }
+  }
 
+  public ShardCheckpoint getCheckpoint() {
+    return checkpoint;
+  }
 
 }