You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/26 23:12:23 UTC

[1/5] incubator-beam git commit: Fix javadoc in Kinesis

Repository: incubator-beam
Updated Branches:
  refs/heads/master 95ab43809 -> a17a99f58


Fix javadoc in Kinesis


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a5005fbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a5005fbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a5005fbe

Branch: refs/heads/master
Commit: a5005fbe7c0a43bdd2d825c2fc77556956bf0bfa
Parents: bed5056
Author: Dan Halperin <dh...@google.com>
Authored: Fri Aug 26 15:20:44 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Aug 26 16:11:23 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kinesis/TransientKinesisException.java  | 2 +-
 .../java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java     | 2 +-
 .../java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java    | 2 +-
 .../java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java   | 2 +-
 .../java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java     | 2 +-
 .../org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java     | 2 +-
 .../test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java   | 2 +-
 .../org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java   | 2 +-
 8 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5005fbe/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
index a1a974b..57ad8a8 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.io.kinesis;
 import com.amazonaws.AmazonServiceException;
 
 /**
- * Created by p.pastuszka on 21.06.2016.
+ * A transient exception thrown by Kinesis.
  */
 class TransientKinesisException extends Exception {
     public TransientKinesisException(String s, AmazonServiceException e) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5005fbe/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
index b007fa4..046c9d9 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
@@ -74,7 +74,7 @@ import org.apache.commons.lang.builder.EqualsBuilder;
 import org.joda.time.Instant;
 
 /**
- * Created by p.pastuszka on 21.07.2016.
+ * Mock implemenation of {@link AmazonKinesis} for testing.
  */
 class AmazonKinesisMock implements AmazonKinesis {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5005fbe/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
index cb0d0e2..20e8372 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
@@ -21,7 +21,7 @@ import java.util.NoSuchElementException;
 import org.junit.Test;
 
 /**
- * Created by ppastuszka on 12.12.15.
+ * Tests {@link CustomOptional}.
  */
 public class CustomOptionalTest {
     @Test(expected = NoSuchElementException.class)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5005fbe/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
index 304220b..f0ab46c 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
@@ -32,7 +32,7 @@ import org.joda.time.DateTime;
 import org.junit.Test;
 
 /**
- * Created by p.pastuszka on 22.07.2016.
+ * Tests {@link AmazonKinesisMock}.
  */
 public class KinesisMockReadTest {
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5005fbe/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
index 29a24821..3111029 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
@@ -30,7 +30,7 @@ import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
 
 /**
- * Created by ppastuszka on 12.12.15.
+ * Tests {@link KinesisReader}.
  */
 @RunWith(MockitoJUnitRunner.class)
 public class KinesisReaderTest {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5005fbe/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
index d301f25..8771c86 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
@@ -23,7 +23,7 @@ import org.joda.time.Instant;
 import org.junit.Test;
 
 /**
- * Created by p.pastuszka on 20.07.2016.
+ * Tests {@link KinesisRecordCoder}.
  */
 public class KinesisRecordCoderTest {
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5005fbe/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
index aedc89e..f032eea 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
@@ -25,7 +25,7 @@ import java.util.List;
 import org.junit.Test;
 
 /**
- * Created by ppastuszka on 12.12.15.
+ * Tests {@link RoundRobin}.
  */
 public class RoundRobinTest {
     @Test(expected = IllegalArgumentException.class)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5005fbe/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
index 585b884..49e806d 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
@@ -36,7 +36,7 @@ import org.mockito.runners.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
 
 /**
- * Created by ppastuszka on 12.12.15.
+ * Tests {@link ShardRecordsIterator}.
  */
 @RunWith(MockitoJUnitRunner.class)
 public class ShardRecordsIteratorTest {


[5/5] incubator-beam git commit: Closes #687

Posted by dh...@apache.org.
Closes #687


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a17a99f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a17a99f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a17a99f5

Branch: refs/heads/master
Commit: a17a99f580d9818e11751b6996a15aa60b2e0c56
Parents: 95ab438 a5005fb
Author: Dan Halperin <dh...@google.com>
Authored: Fri Aug 26 16:12:08 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Aug 26 16:12:08 2016 -0700

----------------------------------------------------------------------
 sdks/java/io/kinesis/pom.xml                    | 179 +++++++++
 .../sdk/io/kinesis/CheckpointGenerator.java     |  30 ++
 .../beam/sdk/io/kinesis/CustomOptional.java     |  85 +++++
 .../io/kinesis/DynamicCheckpointGenerator.java  |  56 +++
 .../sdk/io/kinesis/GetKinesisRecordsResult.java |  54 +++
 .../sdk/io/kinesis/KinesisClientProvider.java   |  31 ++
 .../apache/beam/sdk/io/kinesis/KinesisIO.java   | 190 ++++++++++
 .../beam/sdk/io/kinesis/KinesisReader.java      | 145 +++++++
 .../sdk/io/kinesis/KinesisReaderCheckpoint.java |  96 +++++
 .../beam/sdk/io/kinesis/KinesisRecord.java      | 121 ++++++
 .../beam/sdk/io/kinesis/KinesisRecordCoder.java |  74 ++++
 .../beam/sdk/io/kinesis/KinesisSource.java      | 112 ++++++
 .../beam/sdk/io/kinesis/RecordFilter.java       |  41 ++
 .../apache/beam/sdk/io/kinesis/RoundRobin.java  |  53 +++
 .../beam/sdk/io/kinesis/ShardCheckpoint.java    | 175 +++++++++
 .../sdk/io/kinesis/ShardRecordsIterator.java    |  98 +++++
 .../sdk/io/kinesis/SimplifiedKinesisClient.java | 157 ++++++++
 .../beam/sdk/io/kinesis/StartingPoint.java      |  85 +++++
 .../io/kinesis/StaticCheckpointGenerator.java   |  42 +++
 .../io/kinesis/TransientKinesisException.java   |  29 ++
 .../beam/sdk/io/kinesis/package-info.java       |  22 ++
 .../beam/sdk/io/kinesis/AmazonKinesisMock.java  | 375 +++++++++++++++++++
 .../beam/sdk/io/kinesis/CustomOptionalTest.java |  31 ++
 .../kinesis/DynamicCheckpointGeneratorTest.java |  57 +++
 .../sdk/io/kinesis/KinesisMockReadTest.java     |  91 +++++
 .../io/kinesis/KinesisReaderCheckpointTest.java |  67 ++++
 .../beam/sdk/io/kinesis/KinesisReaderIT.java    | 119 ++++++
 .../beam/sdk/io/kinesis/KinesisReaderTest.java  | 120 ++++++
 .../sdk/io/kinesis/KinesisRecordCoderTest.java  |  45 +++
 .../beam/sdk/io/kinesis/KinesisTestOptions.java |  47 +++
 .../beam/sdk/io/kinesis/KinesisUploader.java    |  84 +++++
 .../beam/sdk/io/kinesis/RecordFilterTest.java   |  66 ++++
 .../beam/sdk/io/kinesis/RoundRobinTest.java     |  57 +++
 .../sdk/io/kinesis/ShardCheckpointTest.java     | 149 ++++++++
 .../io/kinesis/ShardRecordsIteratorTest.java    | 151 ++++++++
 .../io/kinesis/SimplifiedKinesisClientTest.java | 224 +++++++++++
 .../beam/sdk/io/kinesis/package-info.java       |  22 ++
 sdks/java/io/pom.xml                            |   1 +
 38 files changed, 3581 insertions(+)
----------------------------------------------------------------------



[2/5] incubator-beam git commit: Organize imports in Kinesis

Posted by dh...@apache.org.
Organize imports in Kinesis


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bed50565
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bed50565
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bed50565

Branch: refs/heads/master
Commit: bed50565cc5cd8892d1a4577cb5d6505f7bb0367
Parents: bed22de
Author: Dan Halperin <dh...@google.com>
Authored: Fri Aug 26 15:17:05 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Aug 26 16:11:23 2016 -0700

----------------------------------------------------------------------
 .../io/kinesis/DynamicCheckpointGenerator.java  |  4 +--
 .../sdk/io/kinesis/GetKinesisRecordsResult.java |  2 +-
 .../apache/beam/sdk/io/kinesis/KinesisIO.java   |  2 +-
 .../beam/sdk/io/kinesis/KinesisReader.java      |  8 +++---
 .../sdk/io/kinesis/KinesisReaderCheckpoint.java |  4 +--
 .../beam/sdk/io/kinesis/KinesisRecord.java      |  8 +++---
 .../beam/sdk/io/kinesis/KinesisRecordCoder.java |  9 +++----
 .../beam/sdk/io/kinesis/KinesisSource.java      | 10 +++----
 .../apache/beam/sdk/io/kinesis/RoundRobin.java  |  1 -
 .../beam/sdk/io/kinesis/ShardCheckpoint.java    |  8 +++---
 .../sdk/io/kinesis/ShardRecordsIterator.java    |  2 +-
 .../sdk/io/kinesis/SimplifiedKinesisClient.java |  5 ++--
 .../beam/sdk/io/kinesis/StartingPoint.java      |  2 +-
 .../beam/sdk/io/kinesis/AmazonKinesisMock.java  | 12 ++++-----
 .../beam/sdk/io/kinesis/CustomOptionalTest.java |  2 +-
 .../kinesis/DynamicCheckpointGeneratorTest.java |  7 ++---
 .../sdk/io/kinesis/KinesisMockReadTest.java     | 11 ++++----
 .../io/kinesis/KinesisReaderCheckpointTest.java | 10 +++----
 .../beam/sdk/io/kinesis/KinesisReaderIT.java    | 28 ++++++++++----------
 .../beam/sdk/io/kinesis/KinesisReaderTest.java  |  7 ++---
 .../sdk/io/kinesis/KinesisRecordCoderTest.java  |  3 +--
 .../beam/sdk/io/kinesis/KinesisUploader.java    |  4 +--
 .../beam/sdk/io/kinesis/RecordFilterTest.java   |  8 +++---
 .../beam/sdk/io/kinesis/RoundRobinTest.java     |  4 +--
 .../sdk/io/kinesis/ShardCheckpointTest.java     |  7 ++---
 .../io/kinesis/ShardRecordsIteratorTest.java    | 11 ++++----
 .../io/kinesis/SimplifiedKinesisClientTest.java | 11 ++++----
 27 files changed, 92 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
index d86960f..2ec293c 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
@@ -20,10 +20,8 @@ package org.apache.beam.sdk.io.kinesis;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.collect.Lists.transform;
 
-
-import com.google.common.base.Function;
-
 import com.amazonaws.services.kinesis.model.Shard;
+import com.google.common.base.Function;
 
 /**
  * Creates {@link KinesisReaderCheckpoint}, which spans over all shards in given stream.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
index f48b9d5..c0f00de 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
@@ -18,9 +18,9 @@
 package org.apache.beam.sdk.io.kinesis;
 
 import static com.google.common.collect.Lists.transform;
-import com.google.common.base.Function;
 
 import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+import com.google.common.base.Function;
 import java.util.List;
 import javax.annotation.Nullable;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
index b3cb464..811051c 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.io.kinesis;
 
 
-import org.apache.beam.sdk.transforms.PTransform;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.amazonaws.auth.AWSCredentialsProvider;
@@ -28,6 +27,7 @@ import com.amazonaws.regions.Regions;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.AmazonKinesisClient;
 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import org.apache.beam.sdk.transforms.PTransform;
 import org.joda.time.Instant;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
index 38a0050..219a705 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
@@ -18,16 +18,16 @@
 package org.apache.beam.sdk.io.kinesis;
 
 
-import org.apache.beam.sdk.io.UnboundedSource;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.collect.Lists.newArrayList;
 
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.List;
 import java.util.NoSuchElementException;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /***

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
index 6ceb742..663ba44 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
@@ -17,20 +17,18 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
-import org.apache.beam.sdk.io.UnboundedSource;
 import static com.google.common.collect.Iterables.transform;
 import static com.google.common.collect.Lists.newArrayList;
 import static com.google.common.collect.Lists.partition;
 
-
 import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
-
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Iterator;
 import java.util.List;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.UnboundedSource;
 
 /***
  * Checkpoint representing a total progress in a set of shards in single stream.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
index cdb495c..fe2a33d 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
@@ -17,15 +17,15 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
-import com.google.common.base.Charsets;
+import static org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode;
 
 import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
 import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
-import static org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode;
-import org.apache.commons.lang.builder.EqualsBuilder;
-import org.joda.time.Instant;
+import com.google.common.base.Charsets;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.joda.time.Instant;
 
 /**
  * {@link UserRecord} enhanced with utility methods.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
index c383a4f..5b13e31 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
@@ -17,18 +17,17 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
-
 import org.joda.time.Instant;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
 
 /***
  * A {@link Coder} for {@link KinesisRecord}.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
index 38c9fa4..62cba08 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
@@ -17,18 +17,16 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Lists.newArrayList;
+
+import java.util.List;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
-
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.collect.Lists.newArrayList;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.util.List;
 
 
 /***

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
index 7257aa1..7adae4b 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.kinesis;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.collect.Queues.newArrayDeque;
 
-
 import java.util.Deque;
 import java.util.Iterator;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
index 1d8628b..9920aca 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
@@ -18,17 +18,17 @@
 package org.apache.beam.sdk.io.kinesis;
 
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
 import static com.amazonaws.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER;
 import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER;
 import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
 import com.amazonaws.services.kinesis.model.Record;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import org.joda.time.Instant;
 import java.io.Serializable;
+import org.joda.time.Instant;
 
 
 /***

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
index 7dfe158..d17996a 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
@@ -21,9 +21,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.collect.Queues.newArrayDeque;
 
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import java.util.Deque;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.util.Deque;
 
 /***
  * Iterates over records in a single shard.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
index f9a1ea2..96267d1 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
@@ -18,8 +18,6 @@
 package org.apache.beam.sdk.io.kinesis;
 
 
-import com.google.common.collect.Lists;
-
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
@@ -32,10 +30,11 @@ import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededExcepti
 import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import com.amazonaws.services.kinesis.model.StreamDescription;
-import org.joda.time.Instant;
+import com.google.common.collect.Lists;
 import java.util.Date;
 import java.util.List;
 import java.util.concurrent.Callable;
+import org.joda.time.Instant;
 
 /***
  * Wraps {@link AmazonKinesis} class providing much simpler interface and

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
index 8140269..b7ee917 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
@@ -22,9 +22,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import org.joda.time.Instant;
 import java.io.Serializable;
 import java.util.Objects;
+import org.joda.time.Instant;
 
 /***
  * Denotes a point at which the reader should start reading from a Kinesis stream.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
index 7ca8e0b..b007fa4 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
@@ -19,7 +19,9 @@ package org.apache.beam.sdk.io.kinesis;
 
 import static com.google.common.collect.Lists.newArrayList;
 import static com.google.common.collect.Lists.transform;
-import com.google.common.base.Function;
+import static java.lang.Integer.parseInt;
+import static java.lang.Math.min;
+import static org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode;
 
 import com.amazonaws.AmazonWebServiceRequest;
 import com.amazonaws.ResponseMetadata;
@@ -63,15 +65,13 @@ import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import com.amazonaws.services.kinesis.model.SplitShardRequest;
 import com.amazonaws.services.kinesis.model.SplitShardResult;
 import com.amazonaws.services.kinesis.model.StreamDescription;
-import static org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode;
-import org.apache.commons.lang.builder.EqualsBuilder;
-import org.joda.time.Instant;
-import static java.lang.Integer.parseInt;
-import static java.lang.Math.min;
+import com.google.common.base.Function;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.List;
 import javax.annotation.Nullable;
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.joda.time.Instant;
 
 /**
  * Created by p.pastuszka on 21.07.2016.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
index 152fd6d..cb0d0e2 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
-import org.junit.Test;
 import java.util.NoSuchElementException;
+import org.junit.Test;
 
 /**
  * Created by ppastuszka on 12.12.15.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
index a9e5a69..c92ac9a 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
@@ -17,15 +17,16 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
-import com.amazonaws.services.kinesis.model.Shard;
+import static java.util.Arrays.asList;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.BDDMockito.given;
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.amazonaws.services.kinesis.model.Shard;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
-import static java.util.Arrays.asList;
 
 
 /***

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
index 61a858f..304220b 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
@@ -17,20 +17,19 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
+import static com.google.common.collect.Lists.newArrayList;
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.google.common.collect.Iterables;
+import java.util.List;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
-import static com.google.common.collect.Lists.newArrayList;
-
-import com.google.common.collect.Iterables;
-
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
 import org.joda.time.DateTime;
 import org.junit.Test;
-import java.util.List;
 
 /**
  * Created by p.pastuszka on 22.07.2016.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
index 205f050..8c8da64 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
@@ -18,17 +18,17 @@
 package org.apache.beam.sdk.io.kinesis;
 
 
-import com.google.common.collect.Iterables;
-
+import static java.util.Arrays.asList;
 import static org.assertj.core.api.Assertions.assertThat;
+
+import com.google.common.collect.Iterables;
+import java.util.Iterator;
+import java.util.List;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
-import static java.util.Arrays.asList;
-import java.util.Iterator;
-import java.util.List;
 
 /***
  *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
index fbc7c66..73a2455 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
@@ -17,6 +17,20 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Lists.newArrayList;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.amazonaws.regions.Regions;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -25,25 +39,11 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.collect.Lists.newArrayList;
-
-import com.amazonaws.regions.Regions;
-import static org.assertj.core.api.Assertions.assertThat;
 import org.apache.commons.lang.RandomStringUtils;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Ignore;
 import org.junit.Test;
-import static java.util.concurrent.Executors.newSingleThreadExecutor;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Integration test, that reads from the real Kinesis.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
index 793fb57..29a24821 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
@@ -17,16 +17,17 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
+import static java.util.Arrays.asList;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
-import static java.util.Arrays.asList;
-import java.io.IOException;
-import java.util.NoSuchElementException;
 
 /**
  * Created by ppastuszka on 12.12.15.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
index b09b7eb..d301f25 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
@@ -17,11 +17,10 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
+import java.nio.ByteBuffer;
 import org.apache.beam.sdk.testing.CoderProperties;
-
 import org.joda.time.Instant;
 import org.junit.Test;
-import java.nio.ByteBuffer;
 
 /**
  * Created by p.pastuszka on 20.07.2016.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
index 0dcede9..c98242b 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
@@ -18,8 +18,6 @@
 package org.apache.beam.sdk.io.kinesis;
 
 import static com.google.common.collect.Lists.newArrayList;
-import com.google.common.base.Charsets;
-import com.google.common.collect.Lists;
 
 import com.amazonaws.auth.BasicAWSCredentials;
 import com.amazonaws.internal.StaticCredentialsProvider;
@@ -30,6 +28,8 @@ import com.amazonaws.services.kinesis.model.PutRecordsRequest;
 import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
 import com.amazonaws.services.kinesis.model.PutRecordsResult;
 import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
 import java.nio.ByteBuffer;
 import java.util.List;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
index 360106d..f979c01 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
@@ -17,16 +17,16 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
-import com.google.common.collect.Lists;
-
 import static org.mockito.BDDMockito.given;
+
+import com.google.common.collect.Lists;
+import java.util.Collections;
+import java.util.List;
 import org.assertj.core.api.Assertions;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
-import java.util.Collections;
-import java.util.List;
 
 
 /***

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
index a508ddf..aedc89e 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
@@ -18,11 +18,11 @@
 package org.apache.beam.sdk.io.kinesis;
 
 import static com.google.common.collect.Lists.newArrayList;
-
 import static org.assertj.core.api.Assertions.assertThat;
-import org.junit.Test;
+
 import java.util.Collections;
 import java.util.List;
+import org.junit.Test;
 
 /**
  * Created by ppastuszka on 12.12.15.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
index 2227cef..39ab36f 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
@@ -22,8 +22,6 @@ import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPos
 import static com.amazonaws.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER;
 import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER;
 import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
-import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
-import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.BDDMockito.given;
 import static org.mockito.Matchers.anyString;
@@ -31,6 +29,10 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+
+import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import java.io.IOException;
 import org.joda.time.DateTime;
 import org.joda.time.Instant;
 import org.junit.Before;
@@ -38,7 +40,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
-import java.io.IOException;
 
 /**
  *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
index e2a3ccc..585b884 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
@@ -17,11 +17,16 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
-import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Mockito.when;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import java.io.IOException;
+import java.util.Collections;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -29,10 +34,6 @@ import org.mockito.Mock;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.runners.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
-import static java.util.Arrays.asList;
-import static java.util.Collections.singletonList;
-import java.io.IOException;
-import java.util.Collections;
 
 /**
  * Created by ppastuszka on 12.12.15.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
index 44d29d6..96434fd 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
@@ -17,6 +17,11 @@
  */
 package org.apache.beam.sdk.io.kinesis;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.reset;
+
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.AmazonServiceException.ErrorType;
 import com.amazonaws.services.kinesis.AmazonKinesis;
@@ -29,17 +34,13 @@ import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededExcepti
 import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import com.amazonaws.services.kinesis.model.StreamDescription;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
-import static org.mockito.BDDMockito.given;
-import static org.mockito.Mockito.reset;
+import java.util.List;
 import org.joda.time.Instant;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
 import org.mockito.runners.MockitoJUnitRunner;
-import java.util.List;
 
 /***
  */



[3/5] incubator-beam git commit: kinesis: a connector for Amazon Kinesis

Posted by dh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
new file mode 100644
index 0000000..7ca8e0b
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Lists.transform;
+import com.google.common.base.Function;
+
+import com.amazonaws.AmazonWebServiceRequest;
+import com.amazonaws.ResponseMetadata;
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.model.AddTagsToStreamRequest;
+import com.amazonaws.services.kinesis.model.AddTagsToStreamResult;
+import com.amazonaws.services.kinesis.model.CreateStreamRequest;
+import com.amazonaws.services.kinesis.model.CreateStreamResult;
+import com.amazonaws.services.kinesis.model.DecreaseStreamRetentionPeriodRequest;
+import com.amazonaws.services.kinesis.model.DecreaseStreamRetentionPeriodResult;
+import com.amazonaws.services.kinesis.model.DeleteStreamRequest;
+import com.amazonaws.services.kinesis.model.DeleteStreamResult;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.DisableEnhancedMonitoringRequest;
+import com.amazonaws.services.kinesis.model.DisableEnhancedMonitoringResult;
+import com.amazonaws.services.kinesis.model.EnableEnhancedMonitoringRequest;
+import com.amazonaws.services.kinesis.model.EnableEnhancedMonitoringResult;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import com.amazonaws.services.kinesis.model.IncreaseStreamRetentionPeriodRequest;
+import com.amazonaws.services.kinesis.model.IncreaseStreamRetentionPeriodResult;
+import com.amazonaws.services.kinesis.model.ListStreamsRequest;
+import com.amazonaws.services.kinesis.model.ListStreamsResult;
+import com.amazonaws.services.kinesis.model.ListTagsForStreamRequest;
+import com.amazonaws.services.kinesis.model.ListTagsForStreamResult;
+import com.amazonaws.services.kinesis.model.MergeShardsRequest;
+import com.amazonaws.services.kinesis.model.MergeShardsResult;
+import com.amazonaws.services.kinesis.model.PutRecordRequest;
+import com.amazonaws.services.kinesis.model.PutRecordResult;
+import com.amazonaws.services.kinesis.model.PutRecordsRequest;
+import com.amazonaws.services.kinesis.model.PutRecordsResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.RemoveTagsFromStreamRequest;
+import com.amazonaws.services.kinesis.model.RemoveTagsFromStreamResult;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import com.amazonaws.services.kinesis.model.SplitShardRequest;
+import com.amazonaws.services.kinesis.model.SplitShardResult;
+import com.amazonaws.services.kinesis.model.StreamDescription;
+import static org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode;
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.joda.time.Instant;
+import static java.lang.Integer.parseInt;
+import static java.lang.Math.min;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/**
+ * Created by p.pastuszka on 21.07.2016.
+ */
+class AmazonKinesisMock implements AmazonKinesis {
+
+    static class TestData implements Serializable {
+        private final String data;
+        private final Instant arrivalTimestamp;
+        private final String sequenceNumber;
+
+        public TestData(KinesisRecord record) {
+            this(new String(record.getData().array()),
+                    record.getApproximateArrivalTimestamp(),
+                    record.getSequenceNumber());
+        }
+
+        public TestData(String data, Instant arrivalTimestamp, String sequenceNumber) {
+            this.data = data;
+            this.arrivalTimestamp = arrivalTimestamp;
+            this.sequenceNumber = sequenceNumber;
+        }
+
+        public Record convertToRecord() {
+            return new Record().
+                    withApproximateArrivalTimestamp(arrivalTimestamp.toDate()).
+                    withData(ByteBuffer.wrap(data.getBytes())).
+                    withSequenceNumber(sequenceNumber).
+                    withPartitionKey("");
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            return EqualsBuilder.reflectionEquals(this, obj);
+        }
+
+        @Override
+        public int hashCode() {
+            return reflectionHashCode(this);
+        }
+    }
+
+    static class Provider implements KinesisClientProvider {
+
+        private final List<List<TestData>> shardedData;
+        private final int numberOfRecordsPerGet;
+
+        public Provider(List<List<TestData>> shardedData, int numberOfRecordsPerGet) {
+            this.shardedData = shardedData;
+            this.numberOfRecordsPerGet = numberOfRecordsPerGet;
+        }
+
+        @Override
+        public AmazonKinesis get() {
+            return new AmazonKinesisMock(transform(shardedData,
+                    new Function<List<TestData>, List<Record>>() {
+                        @Override
+                        public List<Record> apply(@Nullable List<TestData> testDatas) {
+                            return transform(testDatas, new Function<TestData, Record>() {
+                                @Override
+                                public Record apply(@Nullable TestData testData) {
+                                    return testData.convertToRecord();
+                                }
+                            });
+                        }
+                    }), numberOfRecordsPerGet);
+        }
+    }
+
+    private final List<List<Record>> shardedData;
+    private final int numberOfRecordsPerGet;
+
+    public AmazonKinesisMock(List<List<Record>> shardedData, int numberOfRecordsPerGet) {
+        this.shardedData = shardedData;
+        this.numberOfRecordsPerGet = numberOfRecordsPerGet;
+    }
+
+    @Override
+    public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) {
+        String[] shardIteratorParts = getRecordsRequest.getShardIterator().split(":");
+        int shardId = parseInt(shardIteratorParts[0]);
+        int startingRecord = parseInt(shardIteratorParts[1]);
+        List<Record> shardData = shardedData.get(shardId);
+
+        int toIndex = min(startingRecord + numberOfRecordsPerGet, shardData.size());
+        int fromIndex = min(startingRecord, toIndex);
+        return new GetRecordsResult().
+                withRecords(shardData.subList(fromIndex, toIndex)).
+                withNextShardIterator(String.format("%s:%s", shardId, toIndex));
+    }
+
+    @Override
+    public GetShardIteratorResult getShardIterator(
+            GetShardIteratorRequest getShardIteratorRequest) {
+        ShardIteratorType shardIteratorType = ShardIteratorType.fromValue(
+                getShardIteratorRequest.getShardIteratorType());
+
+        String shardIterator;
+        if (shardIteratorType == ShardIteratorType.TRIM_HORIZON) {
+            shardIterator = String.format("%s:%s", getShardIteratorRequest.getShardId(), 0);
+        } else {
+            throw new RuntimeException("Not implemented");
+        }
+
+        return new GetShardIteratorResult().withShardIterator(shardIterator);
+    }
+
+    @Override
+    public DescribeStreamResult describeStream(String streamName, String exclusiveStartShardId) {
+        int nextShardId = 0;
+        if (exclusiveStartShardId != null) {
+            nextShardId = parseInt(exclusiveStartShardId) + 1;
+        }
+        boolean hasMoreShards = nextShardId + 1 < shardedData.size();
+
+        List<Shard> shards = newArrayList();
+        if (nextShardId < shardedData.size()) {
+            shards.add(new Shard().withShardId(Integer.toString(nextShardId)));
+        }
+
+        return new DescribeStreamResult().withStreamDescription(
+                new StreamDescription().withHasMoreShards(hasMoreShards).withShards(shards)
+        );
+    }
+
+    @Override
+    public void setEndpoint(String endpoint) {
+
+    }
+
+    @Override
+    public void setRegion(Region region) {
+
+    }
+
+    @Override
+    public AddTagsToStreamResult addTagsToStream(AddTagsToStreamRequest addTagsToStreamRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public CreateStreamResult createStream(CreateStreamRequest createStreamRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public CreateStreamResult createStream(String streamName, Integer shardCount) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public DecreaseStreamRetentionPeriodResult decreaseStreamRetentionPeriod(
+            DecreaseStreamRetentionPeriodRequest decreaseStreamRetentionPeriodRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public DeleteStreamResult deleteStream(DeleteStreamRequest deleteStreamRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public DeleteStreamResult deleteStream(String streamName) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public DescribeStreamResult describeStream(DescribeStreamRequest describeStreamRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public DescribeStreamResult describeStream(String streamName) {
+
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public DescribeStreamResult describeStream(String streamName,
+                                               Integer limit, String exclusiveStartShardId) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public DisableEnhancedMonitoringResult disableEnhancedMonitoring(
+            DisableEnhancedMonitoringRequest disableEnhancedMonitoringRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public EnableEnhancedMonitoringResult enableEnhancedMonitoring(
+            EnableEnhancedMonitoringRequest enableEnhancedMonitoringRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public GetShardIteratorResult getShardIterator(String streamName,
+                                                   String shardId,
+                                                   String shardIteratorType) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public GetShardIteratorResult getShardIterator(String streamName,
+                                                   String shardId,
+                                                   String shardIteratorType,
+                                                   String startingSequenceNumber) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public IncreaseStreamRetentionPeriodResult increaseStreamRetentionPeriod(
+            IncreaseStreamRetentionPeriodRequest increaseStreamRetentionPeriodRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public ListStreamsResult listStreams(ListStreamsRequest listStreamsRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public ListStreamsResult listStreams() {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public ListStreamsResult listStreams(String exclusiveStartStreamName) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public ListStreamsResult listStreams(Integer limit, String exclusiveStartStreamName) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public ListTagsForStreamResult listTagsForStream(
+            ListTagsForStreamRequest listTagsForStreamRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public MergeShardsResult mergeShards(MergeShardsRequest mergeShardsRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public MergeShardsResult mergeShards(String streamName,
+                                         String shardToMerge, String adjacentShardToMerge) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public PutRecordResult putRecord(PutRecordRequest putRecordRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public PutRecordResult putRecord(String streamName, ByteBuffer data, String partitionKey) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public PutRecordResult putRecord(String streamName, ByteBuffer data,
+                                     String partitionKey, String sequenceNumberForOrdering) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public RemoveTagsFromStreamResult removeTagsFromStream(
+            RemoveTagsFromStreamRequest removeTagsFromStreamRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public SplitShardResult splitShard(SplitShardRequest splitShardRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public SplitShardResult splitShard(String streamName,
+                                       String shardToSplit, String newStartingHashKey) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public void shutdown() {
+
+    }
+
+    @Override
+    public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) {
+        throw new RuntimeException("Not implemented");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
new file mode 100644
index 0000000..152fd6d
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import org.junit.Test;
+import java.util.NoSuchElementException;
+
+/**
+ * Created by ppastuszka on 12.12.15.
+ */
+public class CustomOptionalTest {
+    @Test(expected = NoSuchElementException.class)
+    public void absentThrowsNoSuchElementExceptionOnGet() {
+        CustomOptional.absent().get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
new file mode 100644
index 0000000..a9e5a69
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.amazonaws.services.kinesis.model.Shard;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.BDDMockito.given;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import static java.util.Arrays.asList;
+
+
+/***
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class DynamicCheckpointGeneratorTest {
+
+    @Mock
+    private SimplifiedKinesisClient kinesisClient;
+    @Mock
+    private Shard shard1, shard2, shard3;
+
+    @Test
+    public void shouldMapAllShardsToCheckpoints() throws Exception {
+        given(shard1.getShardId()).willReturn("shard-01");
+        given(shard2.getShardId()).willReturn("shard-02");
+        given(shard3.getShardId()).willReturn("shard-03");
+        given(kinesisClient.listShards("stream")).willReturn(asList(shard1, shard2, shard3));
+
+        StartingPoint startingPoint = new StartingPoint(InitialPositionInStream.LATEST);
+        DynamicCheckpointGenerator underTest = new DynamicCheckpointGenerator("stream",
+                startingPoint);
+
+        KinesisReaderCheckpoint checkpoint = underTest.generate(kinesisClient);
+
+        assertThat(checkpoint).hasSize(3);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
new file mode 100644
index 0000000..61a858f
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import static com.google.common.collect.Lists.newArrayList;
+
+import com.google.common.collect.Iterables;
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import org.joda.time.DateTime;
+import org.junit.Test;
+import java.util.List;
+
+/**
+ * Created by p.pastuszka on 22.07.2016.
+ */
+public class KinesisMockReadTest {
+    @Test
+    public void readsDataFromMockKinesis() {
+        int noOfShards = 3;
+        int noOfEventsPerShard = 100;
+        List<List<AmazonKinesisMock.TestData>> testData =
+                provideTestData(noOfShards, noOfEventsPerShard);
+
+        final Pipeline p = TestPipeline.create();
+        PCollection<AmazonKinesisMock.TestData> result = p.
+                apply(
+                        KinesisIO.Read.
+                                from("stream", InitialPositionInStream.TRIM_HORIZON).
+                                using(new AmazonKinesisMock.Provider(testData, 10)).
+                                withMaxNumRecords(noOfShards * noOfEventsPerShard)).
+                apply(ParDo.of(new KinesisRecordToTestData()));
+        PAssert.that(result).containsInAnyOrder(Iterables.concat(testData));
+        p.run();
+    }
+
+    private static class KinesisRecordToTestData extends
+            DoFn<KinesisRecord, AmazonKinesisMock.TestData> {
+        @ProcessElement
+        public void processElement(ProcessContext c) throws Exception {
+            c.output(new AmazonKinesisMock.TestData(c.element()));
+        }
+    }
+
+    private List<List<AmazonKinesisMock.TestData>> provideTestData(
+            int noOfShards,
+            int noOfEventsPerShard) {
+
+        int seqNumber = 0;
+
+        List<List<AmazonKinesisMock.TestData>> shardedData = newArrayList();
+        for (int i = 0; i < noOfShards; ++i) {
+            List<AmazonKinesisMock.TestData> shardData = newArrayList();
+            shardedData.add(shardData);
+
+            DateTime arrival = DateTime.now();
+            for (int j = 0; j < noOfEventsPerShard; ++j) {
+                arrival = arrival.plusSeconds(1);
+
+                seqNumber++;
+                shardData.add(new AmazonKinesisMock.TestData(
+                        Integer.toString(seqNumber),
+                        arrival.toInstant(),
+                        Integer.toString(seqNumber))
+                );
+            }
+        }
+
+        return shardedData;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
new file mode 100644
index 0000000..205f050
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+
+import com.google.common.collect.Iterables;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import static java.util.Arrays.asList;
+import java.util.Iterator;
+import java.util.List;
+
+/***
+ *
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class KinesisReaderCheckpointTest {
+    @Mock
+    private ShardCheckpoint a, b, c;
+
+    private KinesisReaderCheckpoint checkpoint;
+
+    @Before
+    public void setUp() {
+        checkpoint = new KinesisReaderCheckpoint(asList(a, b, c));
+    }
+
+    @Test
+    public void splitsCheckpointAccordingly() {
+        verifySplitInto(1);
+        verifySplitInto(2);
+        verifySplitInto(3);
+        verifySplitInto(4);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void isImmutable() {
+        Iterator<ShardCheckpoint> iterator = checkpoint.iterator();
+        iterator.remove();
+    }
+
+    private void verifySplitInto(int size) {
+        List<KinesisReaderCheckpoint> split = checkpoint.splitInto(size);
+        assertThat(Iterables.concat(split)).containsOnly(a, b, c);
+        assertThat(split).hasSize(Math.min(size, 3));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
new file mode 100644
index 0000000..fbc7c66
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Lists.newArrayList;
+
+import com.amazonaws.regions.Regions;
+import static org.assertj.core.api.Assertions.assertThat;
+import org.apache.commons.lang.RandomStringUtils;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Ignore;
+import org.junit.Test;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Integration test, that reads from the real Kinesis.
+ * You need to provide all {@link KinesisTestOptions} in order to run this.
+ */
+public class KinesisReaderIT {
+    private static final long PIPELINE_STARTUP_TIME = TimeUnit.SECONDS.toMillis(10);
+    private ExecutorService singleThreadExecutor = newSingleThreadExecutor();
+
+
+    @Ignore
+    @Test
+    public void readsDataFromRealKinesisStream()
+            throws IOException, InterruptedException, ExecutionException {
+        KinesisTestOptions options = readKinesisOptions();
+        List<String> testData = prepareTestData(1000);
+
+        Future<?> future = startTestPipeline(testData, options);
+        KinesisUploader.uploadAll(testData, options);
+        future.get();
+    }
+
+    private List<String> prepareTestData(int count) {
+        List<String> data = newArrayList();
+        for (int i = 0; i < count; ++i) {
+            data.add(RandomStringUtils.randomAlphabetic(32));
+        }
+        return data;
+    }
+
+    private Future<?> startTestPipeline(List<String> testData, KinesisTestOptions options)
+            throws InterruptedException {
+        final Pipeline p = TestPipeline.create();
+        PCollection<String> result = p.
+                apply(KinesisIO.Read.
+                        from(options.getAwsKinesisStream(), Instant.now()).
+                        using(options.getAwsAccessKey(), options.getAwsSecretKey(),
+                                Regions.fromName(options.getAwsKinesisRegion())).
+                        withMaxReadTime(Duration.standardMinutes(3))
+                ).
+                apply(ParDo.of(new RecordDataToString()));
+        PAssert.that(result).containsInAnyOrder(testData);
+
+        Future<?> future = singleThreadExecutor.submit(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                PipelineResult result = p.run();
+                PipelineResult.State state = result.getState();
+                while (state != PipelineResult.State.DONE && state != PipelineResult.State.FAILED) {
+                    Thread.sleep(1000);
+                    state = result.getState();
+                }
+                assertThat(state).isEqualTo(PipelineResult.State.DONE);
+                return null;
+            }
+        });
+        Thread.sleep(PIPELINE_STARTUP_TIME);
+        return future;
+    }
+
+    private KinesisTestOptions readKinesisOptions() {
+        PipelineOptionsFactory.register(KinesisTestOptions.class);
+        return TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
+    }
+
+    private static class RecordDataToString extends DoFn<KinesisRecord, String> {
+        @ProcessElement
+        public void processElement(ProcessContext c) throws Exception {
+            checkNotNull(c.element(), "Null record given");
+            c.output(new String(c.element().getData().array(), StandardCharsets.UTF_8));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
new file mode 100644
index 0000000..793fb57
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import static java.util.Arrays.asList;
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * Created by ppastuszka on 12.12.15.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class KinesisReaderTest {
+    @Mock
+    private SimplifiedKinesisClient kinesis;
+    @Mock
+    private CheckpointGenerator generator;
+    @Mock
+    private ShardCheckpoint firstCheckpoint, secondCheckpoint;
+    @Mock
+    private ShardRecordsIterator firstIterator, secondIterator;
+    @Mock
+    private KinesisRecord a, b, c, d;
+
+    private KinesisReader reader;
+
+    @Before
+    public void setUp() throws IOException, TransientKinesisException {
+        when(generator.generate(kinesis)).thenReturn(new KinesisReaderCheckpoint(
+                asList(firstCheckpoint, secondCheckpoint)
+        ));
+        when(firstCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(firstIterator);
+        when(secondCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(secondIterator);
+        when(firstIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
+        when(secondIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
+
+        reader = new KinesisReader(kinesis, generator, null);
+    }
+
+    @Test
+    public void startReturnsFalseIfNoDataAtTheBeginning() throws IOException {
+        assertThat(reader.start()).isFalse();
+    }
+
+    @Test(expected = NoSuchElementException.class)
+    public void throwsNoSuchElementExceptionIfNoData() throws IOException {
+        reader.start();
+        reader.getCurrent();
+    }
+
+    @Test
+    public void startReturnsTrueIfSomeDataAvailable() throws IOException,
+            TransientKinesisException {
+        when(firstIterator.next()).
+                thenReturn(CustomOptional.of(a)).
+                thenReturn(CustomOptional.<KinesisRecord>absent());
+
+        assertThat(reader.start()).isTrue();
+    }
+
+    @Test
+    public void advanceReturnsFalseIfThereIsTransientExceptionInKinesis()
+            throws IOException, TransientKinesisException {
+        reader.start();
+
+        when(firstIterator.next()).thenThrow(TransientKinesisException.class);
+
+        assertThat(reader.advance()).isFalse();
+    }
+
+    @Test
+    public void readsThroughAllDataAvailable() throws IOException, TransientKinesisException {
+        when(firstIterator.next()).
+                thenReturn(CustomOptional.<KinesisRecord>absent()).
+                thenReturn(CustomOptional.of(a)).
+                thenReturn(CustomOptional.<KinesisRecord>absent()).
+                thenReturn(CustomOptional.of(b)).
+                thenReturn(CustomOptional.<KinesisRecord>absent());
+
+        when(secondIterator.next()).
+                thenReturn(CustomOptional.of(c)).
+                thenReturn(CustomOptional.<KinesisRecord>absent()).
+                thenReturn(CustomOptional.of(d)).
+                thenReturn(CustomOptional.<KinesisRecord>absent());
+
+        assertThat(reader.start()).isTrue();
+        assertThat(reader.getCurrent()).isEqualTo(c);
+        assertThat(reader.advance()).isTrue();
+        assertThat(reader.getCurrent()).isEqualTo(a);
+        assertThat(reader.advance()).isTrue();
+        assertThat(reader.getCurrent()).isEqualTo(d);
+        assertThat(reader.advance()).isTrue();
+        assertThat(reader.getCurrent()).isEqualTo(b);
+        assertThat(reader.advance()).isFalse();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
new file mode 100644
index 0000000..b09b7eb
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import org.apache.beam.sdk.testing.CoderProperties;
+
+import org.joda.time.Instant;
+import org.junit.Test;
+import java.nio.ByteBuffer;
+
+/**
+ * Created by p.pastuszka on 20.07.2016.
+ */
+public class KinesisRecordCoderTest {
+    @Test
+    public void encodingAndDecodingWorks() throws Exception {
+        KinesisRecord record = new KinesisRecord(
+                ByteBuffer.wrap("data".getBytes()),
+                "sequence",
+                128L,
+                "partition",
+                Instant.now(),
+                Instant.now(),
+                "stream",
+                "shard"
+        );
+        CoderProperties.coderDecodeEncodeEqual(
+                new KinesisRecordCoder(), record
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
new file mode 100644
index 0000000..65a7605
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+
+/***
+ * Options for Kinesis integration tests.
+ */
+public interface KinesisTestOptions extends TestPipelineOptions {
+    @Description("AWS region where Kinesis stream resided")
+    @Default.String("aws-kinesis-region")
+    String getAwsKinesisRegion();
+    void setAwsKinesisRegion(String value);
+
+    @Description("Kinesis stream name")
+    @Default.String("aws-kinesis-stream")
+    String getAwsKinesisStream();
+    void setAwsKinesisStream(String value);
+
+    @Description("AWS secret key")
+    @Default.String("aws-secret-key")
+    String getAwsSecretKey();
+    void setAwsSecretKey(String value);
+
+    @Description("AWS access key")
+    @Default.String("aws-access-key")
+    String getAwsAccessKey();
+    void setAwsAccessKey(String value);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
new file mode 100644
index 0000000..0dcede9
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.google.common.collect.Lists.newArrayList;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.internal.StaticCredentialsProvider;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.PutRecordsRequest;
+import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
+import com.amazonaws.services.kinesis.model.PutRecordsResult;
+import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/***
+ * Sends records to Kinesis in reliable way.
+ */
+public class KinesisUploader {
+
+    public static final int MAX_NUMBER_OF_RECORDS_IN_BATCH = 499;
+
+    public static void uploadAll(List<String> data, KinesisTestOptions options) {
+        AmazonKinesis client = new AmazonKinesisClient(
+                new StaticCredentialsProvider(
+                        new BasicAWSCredentials(
+                                options.getAwsAccessKey(), options.getAwsSecretKey()))
+        ).withRegion(Regions.fromName(options.getAwsKinesisRegion()));
+
+        List<List<String>> partitions = Lists.partition(data, MAX_NUMBER_OF_RECORDS_IN_BATCH);
+
+
+        for (List<String> partition : partitions) {
+            List<PutRecordsRequestEntry> allRecords = newArrayList();
+            for (String row : partition) {
+                allRecords.add(new PutRecordsRequestEntry().
+                        withData(ByteBuffer.wrap(row.getBytes(Charsets.UTF_8))).
+                        withPartitionKey(Integer.toString(row.hashCode()))
+
+                );
+            }
+
+            PutRecordsResult result;
+            do {
+                result = client.putRecords(
+                        new PutRecordsRequest().
+                                withStreamName(options.getAwsKinesisStream()).
+                                withRecords(allRecords));
+                List<PutRecordsRequestEntry> failedRecords = newArrayList();
+                int i = 0;
+                for (PutRecordsResultEntry row : result.getRecords()) {
+                    if (row.getErrorCode() != null) {
+                        failedRecords.add(allRecords.get(i));
+                    }
+                    ++i;
+                }
+                allRecords = failedRecords;
+            }
+
+            while (result.getFailedRecordCount() > 0);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
new file mode 100644
index 0000000..360106d
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import com.google.common.collect.Lists;
+
+import static org.mockito.BDDMockito.given;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import java.util.Collections;
+import java.util.List;
+
+
+/***
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class RecordFilterTest {
+    @Mock
+    private ShardCheckpoint checkpoint;
+    @Mock
+    private KinesisRecord record1, record2, record3, record4, record5;
+
+    @Test
+    public void shouldFilterOutRecordsBeforeOrAtCheckpoint() {
+        given(checkpoint.isBeforeOrAt(record1)).willReturn(false);
+        given(checkpoint.isBeforeOrAt(record2)).willReturn(true);
+        given(checkpoint.isBeforeOrAt(record3)).willReturn(true);
+        given(checkpoint.isBeforeOrAt(record4)).willReturn(false);
+        given(checkpoint.isBeforeOrAt(record5)).willReturn(true);
+        List<KinesisRecord> records = Lists.newArrayList(record1, record2,
+                record3, record4, record5);
+        RecordFilter underTest = new RecordFilter();
+
+        List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint);
+
+        Assertions.assertThat(retainedRecords).containsOnly(record2, record3, record5);
+    }
+
+    @Test
+    public void shouldNotFailOnEmptyList() {
+        List<KinesisRecord> records = Collections.emptyList();
+        RecordFilter underTest = new RecordFilter();
+
+        List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint);
+
+        Assertions.assertThat(retainedRecords).isEmpty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
new file mode 100644
index 0000000..a508ddf
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.google.common.collect.Lists.newArrayList;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import org.junit.Test;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Created by ppastuszka on 12.12.15.
+ */
+public class RoundRobinTest {
+    @Test(expected = IllegalArgumentException.class)
+    public void doesNotAllowCreationWithEmptyCollection() {
+        new RoundRobin<>(Collections.emptyList());
+    }
+
+    @Test
+    public void goesThroughElementsInCycle() {
+        List<String> input = newArrayList("a", "b", "c");
+
+        RoundRobin<String> roundRobin = new RoundRobin<>(newArrayList(input));
+
+        input.addAll(input);  // duplicate the input
+        for (String element : input) {
+            assertThat(roundRobin.getCurrent()).isEqualTo(element);
+            assertThat(roundRobin.getCurrent()).isEqualTo(element);
+            roundRobin.moveForward();
+        }
+    }
+
+    @Test
+    public void usualIteratorGoesThroughElementsOnce() {
+        List<String> input = newArrayList("a", "b", "c");
+
+        RoundRobin<String> roundRobin = new RoundRobin<>(input);
+        assertThat(roundRobin).hasSize(3).containsOnly(input.toArray(new String[0]));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
new file mode 100644
index 0000000..2227cef
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream.LATEST;
+import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream.TRIM_HORIZON;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
+import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import java.io.IOException;
+
+/**
+ *
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class ShardCheckpointTest {
+    private static final String AT_SEQUENCE_SHARD_IT = "AT_SEQUENCE_SHARD_IT";
+    private static final String AFTER_SEQUENCE_SHARD_IT = "AFTER_SEQUENCE_SHARD_IT";
+    private static final String STREAM_NAME = "STREAM";
+    private static final String SHARD_ID = "SHARD_ID";
+    @Mock
+    private SimplifiedKinesisClient client;
+
+    @Before
+    public void setUp() throws IOException, TransientKinesisException {
+        when(client.getShardIterator(
+                eq(STREAM_NAME), eq(SHARD_ID), eq(AT_SEQUENCE_NUMBER),
+                anyString(), isNull(Instant.class))).
+                thenReturn(AT_SEQUENCE_SHARD_IT);
+        when(client.getShardIterator(
+                eq(STREAM_NAME), eq(SHARD_ID), eq(AFTER_SEQUENCE_NUMBER),
+                anyString(), isNull(Instant.class))).
+                thenReturn(AFTER_SEQUENCE_SHARD_IT);
+    }
+
+    @Test
+    public void testProvidingShardIterator() throws IOException, TransientKinesisException {
+        assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", null).getShardIterator(client))
+                .isEqualTo(AT_SEQUENCE_SHARD_IT);
+        assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", null).getShardIterator(client))
+                .isEqualTo(AFTER_SEQUENCE_SHARD_IT);
+        assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client)).isEqualTo
+                (AT_SEQUENCE_SHARD_IT);
+        assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client))
+                .isEqualTo(AT_SEQUENCE_SHARD_IT);
+    }
+
+    @Test
+    public void testComparisonWithExtendedSequenceNumber() {
+        assertThat(new ShardCheckpoint("", "", new StartingPoint(LATEST)).isBeforeOrAt(
+                recordWith(new ExtendedSequenceNumber("100", 0L))
+        )).isTrue();
+
+        assertThat(new ShardCheckpoint("", "", new StartingPoint(TRIM_HORIZON)).isBeforeOrAt(
+                recordWith(new ExtendedSequenceNumber("100", 0L))
+        )).isTrue();
+
+        assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "10", 1L).isBeforeOrAt(
+                recordWith(new ExtendedSequenceNumber("100", 0L))
+        )).isTrue();
+
+        assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
+                recordWith(new ExtendedSequenceNumber("100", 0L))
+        )).isTrue();
+
+        assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
+                recordWith(new ExtendedSequenceNumber("100", 0L))
+        )).isFalse();
+
+        assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 1L).isBeforeOrAt(
+                recordWith(new ExtendedSequenceNumber("100", 0L))
+        )).isFalse();
+
+        assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
+                recordWith(new ExtendedSequenceNumber("99", 1L))
+        )).isFalse();
+    }
+
+    @Test
+    public void testComparisonWithTimestamp() {
+        DateTime referenceTimestamp = DateTime.now();
+
+        assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
+                .isBeforeOrAt(recordWith(referenceTimestamp.minusMillis(10).toInstant()))
+        ).isFalse();
+
+        assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
+                .isBeforeOrAt(recordWith(referenceTimestamp.toInstant()))
+        ).isTrue();
+
+        assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
+                .isBeforeOrAt(recordWith(referenceTimestamp.plusMillis(10).toInstant()))
+        ).isTrue();
+    }
+
+    private KinesisRecord recordWith(ExtendedSequenceNumber extendedSequenceNumber) {
+        KinesisRecord record = mock(KinesisRecord.class);
+        given(record.getExtendedSequenceNumber()).willReturn(extendedSequenceNumber);
+        return record;
+    }
+
+    private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, String sequenceNumber,
+                                       Long subSequenceNumber) {
+        return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, sequenceNumber,
+                subSequenceNumber);
+    }
+
+    private KinesisRecord recordWith(Instant approximateArrivalTimestamp) {
+        KinesisRecord record = mock(KinesisRecord.class);
+        given(record.getApproximateArrivalTimestamp()).willReturn(approximateArrivalTimestamp);
+        return record;
+    }
+
+    private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, Instant timestamp) {
+        return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, timestamp);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
new file mode 100644
index 0000000..e2a3ccc
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Mockito.when;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import java.io.IOException;
+import java.util.Collections;
+
+/**
+ * Created by ppastuszka on 12.12.15.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class ShardRecordsIteratorTest {
+    private static final String INITIAL_ITERATOR = "INITIAL_ITERATOR";
+    private static final String SECOND_ITERATOR = "SECOND_ITERATOR";
+    private static final String SECOND_REFRESHED_ITERATOR = "SECOND_REFRESHED_ITERATOR";
+    private static final String THIRD_ITERATOR = "THIRD_ITERATOR";
+    private static final String STREAM_NAME = "STREAM_NAME";
+    private static final String SHARD_ID = "SHARD_ID";
+
+    @Mock
+    private SimplifiedKinesisClient kinesisClient;
+    @Mock
+    private ShardCheckpoint firstCheckpoint, aCheckpoint, bCheckpoint, cCheckpoint, dCheckpoint;
+    @Mock
+    private GetKinesisRecordsResult firstResult, secondResult, thirdResult;
+    @Mock
+    private KinesisRecord a, b, c, d;
+    @Mock
+    private RecordFilter recordFilter;
+
+    private ShardRecordsIterator iterator;
+
+    @Before
+    public void setUp() throws IOException, TransientKinesisException {
+        when(firstCheckpoint.getShardIterator(kinesisClient)).thenReturn(INITIAL_ITERATOR);
+        when(firstCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+        when(firstCheckpoint.getShardId()).thenReturn(SHARD_ID);
+
+        when(firstCheckpoint.moveAfter(a)).thenReturn(aCheckpoint);
+        when(aCheckpoint.moveAfter(b)).thenReturn(bCheckpoint);
+        when(aCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+        when(aCheckpoint.getShardId()).thenReturn(SHARD_ID);
+        when(bCheckpoint.moveAfter(c)).thenReturn(cCheckpoint);
+        when(bCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+        when(bCheckpoint.getShardId()).thenReturn(SHARD_ID);
+        when(cCheckpoint.moveAfter(d)).thenReturn(dCheckpoint);
+        when(cCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+        when(cCheckpoint.getShardId()).thenReturn(SHARD_ID);
+        when(dCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+        when(dCheckpoint.getShardId()).thenReturn(SHARD_ID);
+
+        when(kinesisClient.getRecords(INITIAL_ITERATOR, STREAM_NAME, SHARD_ID))
+                .thenReturn(firstResult);
+        when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
+                .thenReturn(secondResult);
+        when(kinesisClient.getRecords(THIRD_ITERATOR, STREAM_NAME, SHARD_ID))
+                .thenReturn(thirdResult);
+
+        when(firstResult.getNextShardIterator()).thenReturn(SECOND_ITERATOR);
+        when(secondResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
+        when(thirdResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
+
+        when(firstResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+        when(secondResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+        when(thirdResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+
+        when(recordFilter.apply(anyListOf(KinesisRecord.class), any(ShardCheckpoint
+                .class))).thenAnswer(new IdentityAnswer());
+
+        iterator = new ShardRecordsIterator(firstCheckpoint, kinesisClient, recordFilter);
+    }
+
+    @Test
+    public void returnsAbsentIfNoRecordsPresent() throws IOException, TransientKinesisException {
+        assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+        assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+        assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+    }
+
+    @Test
+    public void goesThroughAvailableRecords() throws IOException, TransientKinesisException {
+        when(firstResult.getRecords()).thenReturn(asList(a, b, c));
+        when(secondResult.getRecords()).thenReturn(singletonList(d));
+
+        assertThat(iterator.getCheckpoint()).isEqualTo(firstCheckpoint);
+        assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
+        assertThat(iterator.getCheckpoint()).isEqualTo(aCheckpoint);
+        assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
+        assertThat(iterator.getCheckpoint()).isEqualTo(bCheckpoint);
+        assertThat(iterator.next()).isEqualTo(CustomOptional.of(c));
+        assertThat(iterator.getCheckpoint()).isEqualTo(cCheckpoint);
+        assertThat(iterator.next()).isEqualTo(CustomOptional.of(d));
+        assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
+        assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+        assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
+    }
+
+    @Test
+    public void refreshesExpiredIterator() throws IOException, TransientKinesisException {
+        when(firstResult.getRecords()).thenReturn(singletonList(a));
+        when(secondResult.getRecords()).thenReturn(singletonList(b));
+
+        when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
+                .thenThrow(ExpiredIteratorException.class);
+        when(aCheckpoint.getShardIterator(kinesisClient))
+                .thenReturn(SECOND_REFRESHED_ITERATOR);
+        when(kinesisClient.getRecords(SECOND_REFRESHED_ITERATOR, STREAM_NAME, SHARD_ID))
+                .thenReturn(secondResult);
+
+        assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
+        assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
+        assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+    }
+
+    private static class IdentityAnswer implements Answer<Object> {
+        @Override
+        public Object answer(InvocationOnMock invocation) throws Throwable {
+            return invocation.getArguments()[0];
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
new file mode 100644
index 0000000..44d29d6
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.AmazonServiceException.ErrorType;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import com.amazonaws.services.kinesis.model.StreamDescription;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.reset;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import java.util.List;
+
+/***
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class SimplifiedKinesisClientTest {
+    private static final String STREAM = "stream";
+    private static final String SHARD_1 = "shard-01";
+    private static final String SHARD_2 = "shard-02";
+    private static final String SHARD_3 = "shard-03";
+    private static final String SHARD_ITERATOR = "iterator";
+    private static final String SEQUENCE_NUMBER = "abc123";
+
+    @Mock
+    private AmazonKinesis kinesis;
+    @InjectMocks
+    private SimplifiedKinesisClient underTest;
+
+    @Test
+    public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception {
+        given(kinesis.getShardIterator(new GetShardIteratorRequest()
+                .withStreamName(STREAM)
+                .withShardId(SHARD_1)
+                .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
+                .withStartingSequenceNumber(SEQUENCE_NUMBER)
+        )).willReturn(new GetShardIteratorResult()
+                .withShardIterator(SHARD_ITERATOR));
+
+        String stream = underTest.getShardIterator(STREAM, SHARD_1,
+                ShardIteratorType.AT_SEQUENCE_NUMBER, SEQUENCE_NUMBER, null);
+
+        assertThat(stream).isEqualTo(SHARD_ITERATOR);
+    }
+
+    @Test
+    public void shouldReturnIteratorStartingWithTimestamp() throws Exception {
+        Instant timestamp = Instant.now();
+        given(kinesis.getShardIterator(new GetShardIteratorRequest()
+                .withStreamName(STREAM)
+                .withShardId(SHARD_1)
+                .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
+                .withTimestamp(timestamp.toDate())
+        )).willReturn(new GetShardIteratorResult()
+                .withShardIterator(SHARD_ITERATOR));
+
+        String stream = underTest.getShardIterator(STREAM, SHARD_1,
+                ShardIteratorType.AT_SEQUENCE_NUMBER, null, timestamp);
+
+        assertThat(stream).isEqualTo(SHARD_ITERATOR);
+    }
+
+    @Test
+    public void shouldHandleExpiredIterationExceptionForGetShardIterator() {
+        shouldHandleGetShardIteratorError(new ExpiredIteratorException(""),
+                ExpiredIteratorException.class);
+    }
+
+    @Test
+    public void shouldHandleLimitExceededExceptionForGetShardIterator() {
+        shouldHandleGetShardIteratorError(new LimitExceededException(""),
+                TransientKinesisException.class);
+    }
+
+    @Test
+    public void shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator() {
+        shouldHandleGetShardIteratorError(new ProvisionedThroughputExceededException(""),
+                TransientKinesisException.class);
+    }
+
+    @Test
+    public void shouldHandleServiceErrorForGetShardIterator() {
+        shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Service),
+                TransientKinesisException.class);
+    }
+
+    @Test
+    public void shouldHandleClientErrorForGetShardIterator() {
+        shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Client),
+                RuntimeException.class);
+    }
+
+    @Test
+    public void shouldHandleUnexpectedExceptionForGetShardIterator() {
+        shouldHandleGetShardIteratorError(new NullPointerException(),
+                RuntimeException.class);
+    }
+
+    private void shouldHandleGetShardIteratorError(
+            Exception thrownException,
+            Class<? extends Exception> expectedExceptionClass) {
+        GetShardIteratorRequest request = new GetShardIteratorRequest()
+                .withStreamName(STREAM)
+                .withShardId(SHARD_1)
+                .withShardIteratorType(ShardIteratorType.LATEST);
+
+        given(kinesis.getShardIterator(request)).willThrow(thrownException);
+
+        try {
+            underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.LATEST, null, null);
+            failBecauseExceptionWasNotThrown(expectedExceptionClass);
+        } catch (Exception e) {
+            assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
+        } finally {
+            reset(kinesis);
+        }
+    }
+
+    @Test
+    public void shouldListAllShards() throws Exception {
+        Shard shard1 = new Shard().withShardId(SHARD_1);
+        Shard shard2 = new Shard().withShardId(SHARD_2);
+        Shard shard3 = new Shard().withShardId(SHARD_3);
+        given(kinesis.describeStream(STREAM, null)).willReturn(new DescribeStreamResult()
+                .withStreamDescription(new StreamDescription()
+                        .withShards(shard1, shard2)
+                        .withHasMoreShards(true)));
+        given(kinesis.describeStream(STREAM, SHARD_2)).willReturn(new DescribeStreamResult()
+                .withStreamDescription(new StreamDescription()
+                        .withShards(shard3)
+                        .withHasMoreShards(false)));
+
+        List<Shard> shards = underTest.listShards(STREAM);
+
+        assertThat(shards).containsOnly(shard1, shard2, shard3);
+    }
+
+    @Test
+    public void shouldHandleExpiredIterationExceptionForShardListing() {
+        shouldHandleShardListingError(new ExpiredIteratorException(""),
+                ExpiredIteratorException.class);
+    }
+
+    @Test
+    public void shouldHandleLimitExceededExceptionForShardListing() {
+        shouldHandleShardListingError(new LimitExceededException(""),
+                TransientKinesisException.class);
+    }
+
+    @Test
+    public void shouldHandleProvisionedThroughputExceededExceptionForShardListing() {
+        shouldHandleShardListingError(new ProvisionedThroughputExceededException(""),
+                TransientKinesisException.class);
+    }
+
+    @Test
+    public void shouldHandleServiceErrorForShardListing() {
+        shouldHandleShardListingError(newAmazonServiceException(ErrorType.Service),
+                TransientKinesisException.class);
+    }
+
+    @Test
+    public void shouldHandleClientErrorForShardListing() {
+        shouldHandleShardListingError(newAmazonServiceException(ErrorType.Client),
+                RuntimeException.class);
+    }
+
+    @Test
+    public void shouldHandleUnexpectedExceptionForShardListing() {
+        shouldHandleShardListingError(new NullPointerException(),
+                RuntimeException.class);
+    }
+
+    private void shouldHandleShardListingError(
+            Exception thrownException,
+            Class<? extends Exception> expectedExceptionClass) {
+        given(kinesis.describeStream(STREAM, null)).willThrow(thrownException);
+        try {
+            underTest.listShards(STREAM);
+            failBecauseExceptionWasNotThrown(expectedExceptionClass);
+        } catch (Exception e) {
+            assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
+        } finally {
+            reset(kinesis);
+        }
+    }
+
+    private AmazonServiceException newAmazonServiceException(ErrorType errorType) {
+        AmazonServiceException exception = new AmazonServiceException("");
+        exception.setErrorType(errorType);
+        return exception;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java
new file mode 100644
index 0000000..44dbf4a
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transforms for reading and writing from Amazon Kinesis.
+ */
+package org.apache.beam.sdk.io.kinesis;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 4198499..6cbd615 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -37,6 +37,7 @@
     <module>hdfs</module>
     <module>jms</module>
     <module>kafka</module>
+    <module>kinesis</module>
   </modules>
 
 </project>


[4/5] incubator-beam git commit: kinesis: a connector for Amazon Kinesis

Posted by dh...@apache.org.
kinesis: a connector for Amazon Kinesis


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bed22de6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bed22de6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bed22de6

Branch: refs/heads/master
Commit: bed22de6880339465637a339b2d8b8527b6cacc6
Parents: 95ab438
Author: Pastuszka Przemys\u0142aw <pa...@gmail.com>
Authored: Mon Jul 18 20:03:37 2016 +0200
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Aug 26 16:11:23 2016 -0700

----------------------------------------------------------------------
 sdks/java/io/kinesis/pom.xml                    | 179 +++++++++
 .../sdk/io/kinesis/CheckpointGenerator.java     |  30 ++
 .../beam/sdk/io/kinesis/CustomOptional.java     |  85 +++++
 .../io/kinesis/DynamicCheckpointGenerator.java  |  58 +++
 .../sdk/io/kinesis/GetKinesisRecordsResult.java |  54 +++
 .../sdk/io/kinesis/KinesisClientProvider.java   |  31 ++
 .../apache/beam/sdk/io/kinesis/KinesisIO.java   | 190 ++++++++++
 .../beam/sdk/io/kinesis/KinesisReader.java      | 145 +++++++
 .../sdk/io/kinesis/KinesisReaderCheckpoint.java |  98 +++++
 .../beam/sdk/io/kinesis/KinesisRecord.java      | 121 ++++++
 .../beam/sdk/io/kinesis/KinesisRecordCoder.java |  75 ++++
 .../beam/sdk/io/kinesis/KinesisSource.java      | 114 ++++++
 .../beam/sdk/io/kinesis/RecordFilter.java       |  41 ++
 .../apache/beam/sdk/io/kinesis/RoundRobin.java  |  54 +++
 .../beam/sdk/io/kinesis/ShardCheckpoint.java    | 175 +++++++++
 .../sdk/io/kinesis/ShardRecordsIterator.java    |  98 +++++
 .../sdk/io/kinesis/SimplifiedKinesisClient.java | 158 ++++++++
 .../beam/sdk/io/kinesis/StartingPoint.java      |  85 +++++
 .../io/kinesis/StaticCheckpointGenerator.java   |  42 +++
 .../io/kinesis/TransientKinesisException.java   |  29 ++
 .../beam/sdk/io/kinesis/package-info.java       |  22 ++
 .../beam/sdk/io/kinesis/AmazonKinesisMock.java  | 375 +++++++++++++++++++
 .../beam/sdk/io/kinesis/CustomOptionalTest.java |  31 ++
 .../kinesis/DynamicCheckpointGeneratorTest.java |  56 +++
 .../sdk/io/kinesis/KinesisMockReadTest.java     |  92 +++++
 .../io/kinesis/KinesisReaderCheckpointTest.java |  67 ++++
 .../beam/sdk/io/kinesis/KinesisReaderIT.java    | 119 ++++++
 .../beam/sdk/io/kinesis/KinesisReaderTest.java  | 119 ++++++
 .../sdk/io/kinesis/KinesisRecordCoderTest.java  |  46 +++
 .../beam/sdk/io/kinesis/KinesisTestOptions.java |  47 +++
 .../beam/sdk/io/kinesis/KinesisUploader.java    |  84 +++++
 .../beam/sdk/io/kinesis/RecordFilterTest.java   |  66 ++++
 .../beam/sdk/io/kinesis/RoundRobinTest.java     |  57 +++
 .../sdk/io/kinesis/ShardCheckpointTest.java     | 148 ++++++++
 .../io/kinesis/ShardRecordsIteratorTest.java    | 150 ++++++++
 .../io/kinesis/SimplifiedKinesisClientTest.java | 223 +++++++++++
 .../beam/sdk/io/kinesis/package-info.java       |  22 ++
 sdks/java/io/pom.xml                            |   1 +
 38 files changed, 3587 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/pom.xml b/sdks/java/io/kinesis/pom.xml
new file mode 100644
index 0000000..aec1786
--- /dev/null
+++ b/sdks/java/io/kinesis/pom.xml
@@ -0,0 +1,179 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-io-parent</artifactId>
+    <version>0.3.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-java-io-kinesis</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: IO :: Kinesis</name>
+  <description>Library to read Kinesis streams.</description>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <systemPropertyVariables>
+            <beamUseDummyRunner>false</beamUseDummyRunner>
+          </systemPropertyVariables>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-javadoc-plugin</artifactId>
+        <configuration>
+          <additionalparam>-Xdoclint:missing</additionalparam>
+        </configuration>
+      </plugin>
+      <!-- Integration Tests -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-failsafe-plugin</artifactId>
+        <configuration>
+          <useManifestOnlyJar>false</useManifestOnlyJar>
+          <redirectTestOutputToFile>true</redirectTestOutputToFile>
+        </configuration>
+        <executions>
+          <execution>
+            <goals>
+              <goal>integration-test</goal>
+              <goal>verify</goal>
+            </goals>
+            <configuration>
+              <systemPropertyVariables>
+                <beamTestPipelineOptions>${integrationTestPipelineOptions}</beamTestPipelineOptions>
+              </systemPropertyVariables>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <properties>
+    <aws.version>1.11.18</aws.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk-kinesis</artifactId>
+      <version>${aws.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>amazon-kinesis-client</artifactId>
+      <version>1.6.1</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+      <version>2.6</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk-core</artifactId>
+      <version>${aws.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>annotations</artifactId>
+    </dependency>
+
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.assertj</groupId>
+      <artifactId>assertj-core</artifactId>
+      <version>2.5.0</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
new file mode 100644
index 0000000..919d85a
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+
+import java.io.Serializable;
+
+/**
+ * Used to generate checkpoint object on demand.
+ * How exactly the checkpoint is generated is up to implementing class.
+ */
+interface CheckpointGenerator extends Serializable {
+    KinesisReaderCheckpoint generate(SimplifiedKinesisClient client)
+            throws TransientKinesisException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
new file mode 100644
index 0000000..804d6cc
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import java.util.NoSuchElementException;
+
+/***
+ * Similar to Guava {@code Optional}, but throws {@link NoSuchElementException} for missing element.
+ */
+abstract class CustomOptional<T> {
+    public static <T> CustomOptional<T> absent() {
+        return Absent.INSTANCE;
+    }
+
+    public static <T> CustomOptional<T> of(T v) {
+        return new Present<>(v);
+    }
+
+    public abstract boolean isPresent();
+
+    public abstract T get();
+
+    private static class Present<T> extends CustomOptional<T> {
+        private final T value;
+
+        private Present(T value) {
+            this.value = value;
+        }
+
+        @Override
+        public boolean isPresent() {
+            return true;
+        }
+
+        @Override
+        public T get() {
+            return value;
+        }
+
+
+        @Override
+        public boolean equals(Object o) {
+            Present<?> present = (Present<?>) o;
+
+            return value != null ? value.equals(present.value) : present.value == null;
+        }
+
+        @Override
+        public int hashCode() {
+            return value != null ? value.hashCode() : 0;
+        }
+    }
+
+    private static class Absent<T> extends CustomOptional<T> {
+        public static final Absent INSTANCE = new Absent();
+
+        private Absent() {
+        }
+
+        @Override
+        public boolean isPresent() {
+            return false;
+        }
+
+        @Override
+        public T get() {
+            throw new NoSuchElementException();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
new file mode 100644
index 0000000..d86960f
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Lists.transform;
+
+
+import com.google.common.base.Function;
+
+import com.amazonaws.services.kinesis.model.Shard;
+
+/**
+ * Creates {@link KinesisReaderCheckpoint}, which spans over all shards in given stream.
+ * List of shards is obtained dynamically on call to {@link #generate(SimplifiedKinesisClient)}.
+ */
+class DynamicCheckpointGenerator implements CheckpointGenerator {
+    private final String streamName;
+    private final StartingPoint startingPoint;
+
+    public DynamicCheckpointGenerator(String streamName, StartingPoint startingPoint) {
+        this.streamName = checkNotNull(streamName, "streamName");
+        this.startingPoint = checkNotNull(startingPoint, "startingPoint");
+    }
+
+    @Override
+    public KinesisReaderCheckpoint generate(SimplifiedKinesisClient kinesis)
+            throws TransientKinesisException {
+        return new KinesisReaderCheckpoint(
+                transform(kinesis.listShards(streamName), new Function<Shard, ShardCheckpoint>() {
+                    @Override
+                    public ShardCheckpoint apply(Shard shard) {
+                        return new ShardCheckpoint(streamName, shard.getShardId(), startingPoint);
+                    }
+                })
+        );
+    }
+
+    @Override
+    public String toString() {
+        return String.format("Checkpoint generator for %s: %s", streamName, startingPoint);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
new file mode 100644
index 0000000..f48b9d5
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.google.common.collect.Lists.transform;
+import com.google.common.base.Function;
+
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/***
+ * Represents the output of 'get' operation on Kinesis stream.
+ */
+class GetKinesisRecordsResult {
+    private final List<KinesisRecord> records;
+    private final String nextShardIterator;
+
+    public GetKinesisRecordsResult(List<UserRecord> records, String nextShardIterator,
+                                   final String streamName, final String shardId) {
+        this.records = transform(records, new Function<UserRecord, KinesisRecord>() {
+            @Nullable
+            @Override
+            public KinesisRecord apply(@Nullable UserRecord input) {
+                assert input != null;  // to make FindBugs happy
+                return new KinesisRecord(input, streamName, shardId);
+            }
+        });
+        this.nextShardIterator = nextShardIterator;
+    }
+
+    public List<KinesisRecord> getRecords() {
+        return records;
+    }
+
+    public String getNextShardIterator() {
+        return nextShardIterator;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
new file mode 100644
index 0000000..36c8953
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import java.io.Serializable;
+
+/**
+ * Provides instances of {@link AmazonKinesis} interface.
+ *
+ * Please note, that any instance of {@link KinesisClientProvider} must be
+ * {@link Serializable} to ensure it can be sent to worker machines.
+ */
+interface KinesisClientProvider extends Serializable {
+    AmazonKinesis get();
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
new file mode 100644
index 0000000..b3cb464
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+
+import org.apache.beam.sdk.transforms.PTransform;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.internal.StaticCredentialsProvider;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import org.joda.time.Instant;
+
+/**
+ * {@link PTransform}s for reading from
+ * <a href="https://aws.amazon.com/kinesis/">Kinesis</a> streams.
+ *
+ * <h3>Usage</h3>
+ *
+ * <p>Main class you're going to operate is called {@link KinesisIO}.
+ * It follows the usage conventions laid out by other *IO classes like
+ * BigQueryIO or PubsubIOLet's see how you can set up a simple Pipeline, which reads from Kinesis:
+ *
+ * <pre>{@code}
+ * p.
+ *   apply(KinesisIO.Read.
+ *     from("streamName", InitialPositionInStream.LATEST).
+ *     using("AWS_KEY", _"AWS_SECRET", STREAM_REGION).
+ *     apply( ... ) // other transformations
+ *</pre>
+ * </p>
+ *
+ * <p>
+ * As you can see you need to provide 3 things:
+ * <ul>
+ *   <li>name of the stream you're going to read</li>
+ *   <li>position in the stream where reading should start. There are two options:</li>
+ *   <ul>
+ *     <li>{@link InitialPositionInStream#LATEST} - reading will begin from end of the stream</li>
+ *     <li>{@link InitialPositionInStream#TRIM_HORIZON} - reading will begin at
+ *        the very beginning of the stream</li>
+ *   </ul>
+ *   <li>data used to initialize {@link AmazonKinesis} client</li>
+ *   <ul>
+ *     <li>credentials (aws key, aws secret)</li>
+ *    <li>region where the stream is located</li>
+ *   </ul>
+ * </ul>
+ * </p>
+ *
+ * <p>In case when you want to set up {@link AmazonKinesis} client by your own
+ * (for example if you're using more sophisticated authorization methods like Amazon STS, etc.)
+ * you can do it by implementing {@link KinesisClientProvider} class:
+ *
+ * <pre>{@code}
+ * public class MyCustomKinesisClientProvider implements KinesisClientProvider {
+ *   @Override
+ *   public AmazonKinesis get() {
+ *     // set up your client here
+ *   }
+ * }
+ * </pre>
+ *
+ * Usage is pretty straightforward:
+ *
+ * <pre>{@code}
+ * p.
+ *   apply(KinesisIO.Read.
+ *    from("streamName", InitialPositionInStream.LATEST).
+ *    using(MyCustomKinesisClientProvider()).
+ *    apply( ... ) // other transformations
+ * </pre>
+ * </p>
+ *
+ * <p>There\u2019s also possibility to start reading using arbitrary point in time -
+ * in this case you need to provide {@link Instant} object:
+ *
+ * <pre>{@code}
+ * p.
+ *   apply(KinesisIO.Read.
+ *     from("streamName", instant).
+ *     using(MyCustomKinesisClientProvider()).
+ *     apply( ... ) // other transformations
+ * </pre>
+ * </p>
+ *
+ */
+public final class KinesisIO {
+    /***
+     * A {@link PTransform} that reads from a Kinesis stream.
+     */
+    public static final class Read {
+
+        private final String streamName;
+        private final StartingPoint initialPosition;
+
+        private Read(String streamName, StartingPoint initialPosition) {
+            this.streamName = checkNotNull(streamName, "streamName");
+            this.initialPosition = checkNotNull(initialPosition, "initialPosition");
+        }
+
+        /***
+         * Specify reading from streamName at some initial position.
+         */
+        public static Read from(String streamName, InitialPositionInStream initialPosition) {
+            return new Read(streamName, new StartingPoint(
+                    checkNotNull(initialPosition, "initialPosition")));
+        }
+
+        /***
+         * Specify reading from streamName beginning at given {@link Instant}.
+         * This {@link Instant} must be in the past, i.e. before {@link Instant#now()}.
+         */
+        public static Read from(String streamName, Instant initialTimestamp) {
+            return new Read(streamName, new StartingPoint(
+                    checkNotNull(initialTimestamp, "initialTimestamp")));
+        }
+
+        /***
+         * Allows to specify custom {@link KinesisClientProvider}.
+         * {@link KinesisClientProvider} provides {@link AmazonKinesis} instances which are later
+         * used for communication with Kinesis.
+         * You should use this method if {@link Read#using(String, String, Regions)} does not
+         * suite your needs.
+         */
+        public org.apache.beam.sdk.io.Read.Unbounded<KinesisRecord> using
+        (KinesisClientProvider kinesisClientProvider) {
+            return org.apache.beam.sdk.io.Read.from(
+                    new KinesisSource(kinesisClientProvider, streamName,
+                            initialPosition));
+        }
+
+        /***
+         * Specify credential details and region to be used to read from Kinesis.
+         * If you need more sophisticated credential protocol, then you should look at
+         * {@link Read#using(KinesisClientProvider)}.
+         */
+        public org.apache.beam.sdk.io.Read.Unbounded<KinesisRecord> using(String awsAccessKey,
+                                                                          String awsSecretKey,
+                                                                          Regions region) {
+            return using(new BasicKinesisProvider(awsAccessKey, awsSecretKey, region));
+        }
+
+        private static final class BasicKinesisProvider implements KinesisClientProvider {
+
+            private final String accessKey;
+            private final String secretKey;
+            private final Regions region;
+
+            private BasicKinesisProvider(String accessKey, String secretKey, Regions region) {
+                this.accessKey = checkNotNull(accessKey, "accessKey");
+                this.secretKey = checkNotNull(secretKey, "secretKey");
+                this.region = checkNotNull(region, "region");
+            }
+
+
+            private AWSCredentialsProvider getCredentialsProvider() {
+                return new StaticCredentialsProvider(new BasicAWSCredentials(
+                        accessKey,
+                        secretKey
+                ));
+
+            }
+
+            @Override
+            public AmazonKinesis get() {
+                return new AmazonKinesisClient(getCredentialsProvider()).withRegion(region);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
new file mode 100644
index 0000000..38a0050
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+
+import org.apache.beam.sdk.io.UnboundedSource;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Lists.newArrayList;
+
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+
+/***
+ * Reads data from multiple kinesis shards in a single thread.
+ * It uses simple round robin algorithm when fetching data from shards.
+ */
+class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
+    private static final Logger LOG = LoggerFactory.getLogger(KinesisReader.class);
+
+    private final SimplifiedKinesisClient kinesis;
+    private final UnboundedSource<KinesisRecord, ?> source;
+    private final CheckpointGenerator initialCheckpointGenerator;
+    private RoundRobin<ShardRecordsIterator> shardIterators;
+    private CustomOptional<KinesisRecord> currentRecord = CustomOptional.absent();
+
+    public KinesisReader(SimplifiedKinesisClient kinesis,
+                         CheckpointGenerator initialCheckpointGenerator,
+                         UnboundedSource<KinesisRecord, ?> source) {
+        this.kinesis = checkNotNull(kinesis, "kinesis");
+        this.initialCheckpointGenerator =
+                checkNotNull(initialCheckpointGenerator, "initialCheckpointGenerator");
+        this.source = source;
+    }
+
+    /***
+     * Generates initial checkpoint and instantiates iterators for shards.
+     */
+    @Override
+    public boolean start() throws IOException {
+        LOG.info("Starting reader using {}", initialCheckpointGenerator);
+
+        try {
+            KinesisReaderCheckpoint initialCheckpoint =
+                    initialCheckpointGenerator.generate(kinesis);
+            List<ShardRecordsIterator> iterators = newArrayList();
+            for (ShardCheckpoint checkpoint : initialCheckpoint) {
+                iterators.add(checkpoint.getShardRecordsIterator(kinesis));
+            }
+            shardIterators = new RoundRobin<>(iterators);
+        } catch (TransientKinesisException e) {
+            throw new IOException(e);
+        }
+
+        return advance();
+    }
+
+    /***
+     * Moves to the next record in one of the shards.
+     * If current shard iterator can be move forward (i.e. there's a record present) then we do it.
+     * If not, we iterate over shards in a round-robin manner.
+     */
+    @Override
+    public boolean advance() throws IOException {
+        try {
+            for (int i = 0; i < shardIterators.size(); ++i) {
+                currentRecord = shardIterators.getCurrent().next();
+                if (currentRecord.isPresent()) {
+                    return true;
+                } else {
+                    shardIterators.moveForward();
+                }
+            }
+        } catch (TransientKinesisException e) {
+            LOG.warn("Transient exception occurred", e);
+        }
+        return false;
+    }
+
+    @Override
+    public byte[] getCurrentRecordId() throws NoSuchElementException {
+        return currentRecord.get().getUniqueId();
+    }
+
+    @Override
+    public KinesisRecord getCurrent() throws NoSuchElementException {
+        return currentRecord.get();
+    }
+
+    /***
+     * When {@link KinesisReader} was advanced to the current record.
+     * We cannot use approximate arrival timestamp given for each record by Kinesis as it
+     * is not guaranteed to be accurate - this could lead to mark some records as "late"
+     * even if they were not.
+     */
+    @Override
+    public Instant getCurrentTimestamp() throws NoSuchElementException {
+        return currentRecord.get().getReadTime();
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    /***
+     * Current time.
+     * We cannot give better approximation of the watermark with current semantics of
+     * {@link KinesisReader#getCurrentTimestamp()}, because we don't know when the next
+     * {@link KinesisReader#advance()} will be called.
+     */
+    @Override
+    public Instant getWatermark() {
+        return Instant.now();
+    }
+
+    @Override
+    public UnboundedSource.CheckpointMark getCheckpointMark() {
+        return KinesisReaderCheckpoint.asCurrentStateOf(shardIterators);
+    }
+
+    @Override
+    public UnboundedSource<KinesisRecord, ?> getCurrentSource() {
+        return source;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
new file mode 100644
index 0000000..6ceb742
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import org.apache.beam.sdk.io.UnboundedSource;
+import static com.google.common.collect.Iterables.transform;
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Lists.partition;
+
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/***
+ * Checkpoint representing a total progress in a set of shards in single stream.
+ * The set of shards covered by {@link KinesisReaderCheckpoint} may or may not be equal to set of
+ * all shards present in the stream.
+ * This class is immutable.
+ */
+class KinesisReaderCheckpoint implements Iterable<ShardCheckpoint>, UnboundedSource
+        .CheckpointMark, Serializable {
+    private final List<ShardCheckpoint> shardCheckpoints;
+
+    public KinesisReaderCheckpoint(Iterable<ShardCheckpoint> shardCheckpoints) {
+        this.shardCheckpoints = ImmutableList.copyOf(shardCheckpoints);
+    }
+
+    public static KinesisReaderCheckpoint asCurrentStateOf(Iterable<ShardRecordsIterator>
+                                                                   iterators) {
+        return new KinesisReaderCheckpoint(transform(iterators,
+                new Function<ShardRecordsIterator, ShardCheckpoint>() {
+
+                    @Nullable
+                    @Override
+                    public ShardCheckpoint apply(@Nullable
+                                                 ShardRecordsIterator shardRecordsIterator) {
+                        assert shardRecordsIterator != null;
+                        return shardRecordsIterator.getCheckpoint();
+                    }
+                }));
+    }
+
+    /***
+     * Splits given multi-shard checkpoint into partitions of approximately equal size.
+     *
+     * @param desiredNumSplits - upper limit for number of partitions to generate.
+     * @return list of checkpoints covering consecutive partitions of current checkpoint.
+     */
+    public List<KinesisReaderCheckpoint> splitInto(int desiredNumSplits) {
+        int partitionSize = divideAndRoundUp(shardCheckpoints.size(), desiredNumSplits);
+
+        List<KinesisReaderCheckpoint> checkpoints = newArrayList();
+        for (List<ShardCheckpoint> shardPartition : partition(shardCheckpoints, partitionSize)) {
+            checkpoints.add(new KinesisReaderCheckpoint(shardPartition));
+        }
+        return checkpoints;
+    }
+
+    private int divideAndRoundUp(int nominator, int denominator) {
+        return (nominator + denominator - 1) / denominator;
+    }
+
+    @Override
+    public void finalizeCheckpoint() throws IOException {
+
+    }
+
+    @Override
+    public String toString() {
+        return shardCheckpoints.toString();
+    }
+
+    @Override
+    public Iterator<ShardCheckpoint> iterator() {
+        return shardCheckpoints.iterator();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
new file mode 100644
index 0000000..cdb495c
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import com.google.common.base.Charsets;
+
+import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+import static org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode;
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.joda.time.Instant;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+/**
+ * {@link UserRecord} enhanced with utility methods.
+ */
+public class KinesisRecord implements Serializable {
+    private Instant readTime;
+    private String streamName;
+    private String shardId;
+    private long subSequenceNumber;
+    private String sequenceNumber;
+    private Instant approximateArrivalTimestamp;
+    private ByteBuffer data;
+    private String partitionKey;
+
+    public KinesisRecord(UserRecord record, String streamName, String shardId) {
+        this(record.getData(), record.getSequenceNumber(), record.getSubSequenceNumber(),
+                record.getPartitionKey(),
+                new Instant(record.getApproximateArrivalTimestamp()),
+                Instant.now(),
+                streamName, shardId);
+    }
+
+    public KinesisRecord(ByteBuffer data, String sequenceNumber, long subSequenceNumber,
+                         String partitionKey, Instant approximateArrivalTimestamp,
+                         Instant readTime,
+                         String streamName, String shardId) {
+        this.data = data;
+        this.sequenceNumber = sequenceNumber;
+        this.subSequenceNumber = subSequenceNumber;
+        this.partitionKey = partitionKey;
+        this.approximateArrivalTimestamp = approximateArrivalTimestamp;
+        this.readTime = readTime;
+        this.streamName = streamName;
+        this.shardId = shardId;
+    }
+
+    public ExtendedSequenceNumber getExtendedSequenceNumber() {
+        return new ExtendedSequenceNumber(getSequenceNumber(), getSubSequenceNumber());
+    }
+
+    /***
+     * @return unique id of the record based on its position in the stream
+     */
+    public byte[] getUniqueId() {
+        return getExtendedSequenceNumber().toString().getBytes(Charsets.UTF_8);
+    }
+
+    public Instant getReadTime() {
+        return readTime;
+    }
+
+    public String getStreamName() {
+        return streamName;
+    }
+
+    public String getShardId() {
+        return shardId;
+    }
+
+    public byte[] getDataAsBytes() {
+        return getData().array();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        return EqualsBuilder.reflectionEquals(this, obj);
+    }
+
+    @Override
+    public int hashCode() {
+        return reflectionHashCode(this);
+    }
+
+    public long getSubSequenceNumber() {
+        return subSequenceNumber;
+    }
+
+    public String getSequenceNumber() {
+        return sequenceNumber;
+    }
+
+    public Instant getApproximateArrivalTimestamp() {
+        return approximateArrivalTimestamp;
+    }
+
+    public ByteBuffer getData() {
+        return data;
+    }
+
+    public String getPartitionKey() {
+        return partitionKey;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
new file mode 100644
index 0000000..c383a4f
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+import org.joda.time.Instant;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/***
+ * A {@link Coder} for {@link KinesisRecord}.
+ */
+class KinesisRecordCoder extends AtomicCoder<KinesisRecord> {
+    private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
+    private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
+    private static final InstantCoder INSTANT_CODER = InstantCoder.of();
+    private static final VarLongCoder VAR_LONG_CODER = VarLongCoder.of();
+
+    public static KinesisRecordCoder of() {
+        return new KinesisRecordCoder();
+    }
+
+    @Override
+    public void encode(KinesisRecord value, OutputStream outStream, Context context) throws
+            IOException {
+        Context nested = context.nested();
+        BYTE_ARRAY_CODER.encode(value.getData().array(), outStream, nested);
+        STRING_CODER.encode(value.getSequenceNumber(), outStream, nested);
+        STRING_CODER.encode(value.getPartitionKey(), outStream, nested);
+        INSTANT_CODER.encode(value.getApproximateArrivalTimestamp(), outStream, nested);
+        VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream, nested);
+        INSTANT_CODER.encode(value.getReadTime(), outStream, nested);
+        STRING_CODER.encode(value.getStreamName(), outStream, nested);
+        STRING_CODER.encode(value.getShardId(), outStream, nested);
+    }
+
+    @Override
+    public KinesisRecord decode(InputStream inStream, Context context) throws IOException {
+        Context nested = context.nested();
+        ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream, nested));
+        String sequenceNumber = STRING_CODER.decode(inStream, nested);
+        String partitionKey = STRING_CODER.decode(inStream, nested);
+        Instant approximateArrivalTimestamp = INSTANT_CODER.decode(inStream, nested);
+        long subSequenceNumber = VAR_LONG_CODER.decode(inStream, nested);
+        Instant readTimestamp = INSTANT_CODER.decode(inStream, nested);
+        String streamName = STRING_CODER.decode(inStream, nested);
+        String shardId = STRING_CODER.decode(inStream, nested);
+        return new KinesisRecord(data, sequenceNumber, subSequenceNumber, partitionKey,
+                approximateArrivalTimestamp, readTimestamp, streamName, shardId
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
new file mode 100644
index 0000000..38c9fa4
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Lists.newArrayList;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.List;
+
+
+/***
+ * Represents source for single stream in Kinesis.
+ */
+class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoint> {
+    private static final Logger LOG = LoggerFactory.getLogger(KinesisSource.class);
+
+    private final KinesisClientProvider kinesis;
+    private CheckpointGenerator initialCheckpointGenerator;
+
+    public KinesisSource(KinesisClientProvider kinesis, String streamName,
+                         StartingPoint startingPoint) {
+        this(kinesis, new DynamicCheckpointGenerator(streamName, startingPoint));
+    }
+
+    private KinesisSource(KinesisClientProvider kinesisClientProvider,
+                          CheckpointGenerator initialCheckpoint) {
+        this.kinesis = kinesisClientProvider;
+        this.initialCheckpointGenerator = initialCheckpoint;
+        validate();
+    }
+
+    /***
+     * Generate splits for reading from the stream.
+     * Basically, it'll try to evenly split set of shards in the stream into
+     * {@code desiredNumSplits} partitions. Each partition is then a split.
+     */
+    @Override
+    public List<KinesisSource> generateInitialSplits(int desiredNumSplits,
+                                                     PipelineOptions options) throws Exception {
+        KinesisReaderCheckpoint checkpoint =
+                initialCheckpointGenerator.generate(SimplifiedKinesisClient.from(kinesis));
+
+        List<KinesisSource> sources = newArrayList();
+
+        for (KinesisReaderCheckpoint partition : checkpoint.splitInto(desiredNumSplits)) {
+            sources.add(new KinesisSource(
+                    kinesis,
+                    new StaticCheckpointGenerator(partition)));
+        }
+        return sources;
+    }
+
+    /***
+     * Creates reader based on given {@link KinesisReaderCheckpoint}.
+     * If {@link KinesisReaderCheckpoint} is not given, then we use
+     * {@code initialCheckpointGenerator} to generate new checkpoint.
+     */
+    @Override
+    public UnboundedReader<KinesisRecord> createReader(PipelineOptions options,
+                                                KinesisReaderCheckpoint checkpointMark) {
+
+        CheckpointGenerator checkpointGenerator = initialCheckpointGenerator;
+
+        if (checkpointMark != null) {
+            checkpointGenerator = new StaticCheckpointGenerator(checkpointMark);
+        }
+
+        LOG.info("Creating new reader using {}", checkpointGenerator);
+
+        return new KinesisReader(
+                SimplifiedKinesisClient.from(kinesis),
+                checkpointGenerator,
+                this);
+    }
+
+    @Override
+    public Coder<KinesisReaderCheckpoint> getCheckpointMarkCoder() {
+        return SerializableCoder.of(KinesisReaderCheckpoint.class);
+    }
+
+    @Override
+    public void validate() {
+        checkNotNull(kinesis);
+        checkNotNull(initialCheckpointGenerator);
+    }
+
+    @Override
+    public Coder<KinesisRecord> getDefaultOutputCoder() {
+        return KinesisRecordCoder.of();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java
new file mode 100644
index 0000000..4c7f39a
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.google.common.collect.Lists.newArrayList;
+
+import java.util.List;
+
+
+/**
+ * Filters out records, which were already processed and checkpointed.
+ * <p>
+ * We need this step, because we can get iterators from Kinesis only with "sequenceNumber" accuracy,
+ * not with "subSequenceNumber" accuracy.
+ */
+class RecordFilter {
+    public List<KinesisRecord> apply(List<KinesisRecord> records, ShardCheckpoint checkpoint) {
+        List<KinesisRecord> filteredRecords = newArrayList();
+        for (KinesisRecord record : records) {
+            if (checkpoint.isBeforeOrAt(record)) {
+                filteredRecords.add(record);
+            }
+        }
+        return filteredRecords;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
new file mode 100644
index 0000000..7257aa1
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.Queues.newArrayDeque;
+
+
+import java.util.Deque;
+import java.util.Iterator;
+
+/***
+ * Very simple implementation of round robin algorithm.
+ */
+class RoundRobin<T> implements Iterable<T> {
+    private final Deque<T> deque;
+
+    public RoundRobin(Iterable<T> collection) {
+        this.deque = newArrayDeque(collection);
+        checkArgument(!deque.isEmpty(), "Tried to initialize RoundRobin with empty collection");
+    }
+
+    public T getCurrent() {
+        return deque.getFirst();
+    }
+
+    public void moveForward() {
+        deque.addLast(deque.removeFirst());
+    }
+
+    public int size() {
+        return deque.size();
+    }
+
+    @Override
+    public Iterator<T> iterator() {
+        return deque.iterator();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
new file mode 100644
index 0000000..1d8628b
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
+import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.joda.time.Instant;
+import java.io.Serializable;
+
+
+/***
+ * Checkpoint mark for single shard in the stream.
+ * Current position in the shard is determined by either:
+ * <ul>
+ * <li>{@link #shardIteratorType} if it is equal to {@link ShardIteratorType#LATEST} or
+ * {@link ShardIteratorType#TRIM_HORIZON}</li>
+ * <li>combination of
+ * {@link #sequenceNumber} and {@link #subSequenceNumber} if
+ * {@link ShardIteratorType#AFTER_SEQUENCE_NUMBER} or
+ * {@link ShardIteratorType#AT_SEQUENCE_NUMBER}</li>
+ * </ul>
+ * This class is immutable.
+ */
+class ShardCheckpoint implements Serializable {
+    private final String streamName;
+    private final String shardId;
+    private final String sequenceNumber;
+    private final ShardIteratorType shardIteratorType;
+    private final Long subSequenceNumber;
+    private final Instant timestamp;
+
+    public ShardCheckpoint(String streamName, String shardId, StartingPoint
+            startingPoint) {
+        this(streamName, shardId,
+                ShardIteratorType.fromValue(startingPoint.getPositionName()),
+                startingPoint.getTimestamp());
+    }
+
+    public ShardCheckpoint(String streamName, String shardId, ShardIteratorType
+            shardIteratorType, Instant timestamp) {
+        this(streamName, shardId, shardIteratorType, null, null, timestamp);
+    }
+
+    public ShardCheckpoint(String streamName, String shardId, ShardIteratorType
+            shardIteratorType, String sequenceNumber, Long subSequenceNumber) {
+        this(streamName, shardId, shardIteratorType, sequenceNumber, subSequenceNumber, null);
+    }
+
+    private ShardCheckpoint(String streamName, String shardId, ShardIteratorType shardIteratorType,
+                            String sequenceNumber, Long subSequenceNumber, Instant timestamp) {
+        this.shardIteratorType = checkNotNull(shardIteratorType, "shardIteratorType");
+        this.streamName = checkNotNull(streamName, "streamName");
+        this.shardId = checkNotNull(shardId, "shardId");
+        if (shardIteratorType == AT_SEQUENCE_NUMBER || shardIteratorType == AFTER_SEQUENCE_NUMBER) {
+            checkNotNull(sequenceNumber,
+                    "You must provide sequence number for AT_SEQUENCE_NUMBER"
+                            + " or AFTER_SEQUENCE_NUMBER");
+        } else {
+            checkArgument(sequenceNumber == null,
+                    "Sequence number must be null for LATEST, TRIM_HORIZON or AT_TIMESTAMP");
+        }
+        if (shardIteratorType == AT_TIMESTAMP) {
+            checkNotNull(timestamp,
+                    "You must provide timestamp for AT_SEQUENCE_NUMBER"
+                            + " or AFTER_SEQUENCE_NUMBER");
+        } else {
+            checkArgument(timestamp == null,
+                    "Timestamp must be null for an iterator type other than AT_TIMESTAMP");
+        }
+
+        this.subSequenceNumber = subSequenceNumber;
+        this.sequenceNumber = sequenceNumber;
+        this.timestamp = timestamp;
+    }
+
+    /***
+     * Used to compare {@link ShardCheckpoint} object to {@link KinesisRecord}. Depending
+     * on the the underlying shardIteratorType, it will either compare the timestamp or the
+     * {@link ExtendedSequenceNumber}.
+     *
+     * @param other
+     * @return if current checkpoint mark points before or at given {@link ExtendedSequenceNumber}
+     */
+    public boolean isBeforeOrAt(KinesisRecord other) {
+        if (shardIteratorType == AT_TIMESTAMP) {
+            return timestamp.compareTo(other.getApproximateArrivalTimestamp()) <= 0;
+        }
+        int result = extendedSequenceNumber().compareTo(other.getExtendedSequenceNumber());
+        if (result == 0) {
+            return shardIteratorType == AT_SEQUENCE_NUMBER;
+        }
+        return result < 0;
+    }
+
+    private ExtendedSequenceNumber extendedSequenceNumber() {
+        String fullSequenceNumber = sequenceNumber;
+        if (fullSequenceNumber == null) {
+            fullSequenceNumber = shardIteratorType.toString();
+        }
+        return new ExtendedSequenceNumber(fullSequenceNumber, subSequenceNumber);
+    }
+
+    @Override
+    public String toString() {
+        return String.format("Checkpoint %s for stream %s, shard %s: %s", shardIteratorType,
+                streamName, shardId,
+                sequenceNumber);
+    }
+
+    public ShardRecordsIterator getShardRecordsIterator(SimplifiedKinesisClient kinesis)
+            throws TransientKinesisException {
+        return new ShardRecordsIterator(this, kinesis);
+    }
+
+    public String getShardIterator(SimplifiedKinesisClient kinesisClient)
+            throws TransientKinesisException {
+        if (checkpointIsInTheMiddleOfAUserRecord()) {
+            return kinesisClient.getShardIterator(streamName,
+                    shardId, AT_SEQUENCE_NUMBER,
+                    sequenceNumber, null);
+        }
+        return kinesisClient.getShardIterator(streamName,
+                shardId, shardIteratorType,
+                sequenceNumber, timestamp);
+    }
+
+    private boolean checkpointIsInTheMiddleOfAUserRecord() {
+        return shardIteratorType == AFTER_SEQUENCE_NUMBER && subSequenceNumber != null;
+    }
+
+    /***
+     * Used to advance checkpoint mark to position after given {@link Record}.
+     *
+     * @param record
+     * @return new checkpoint object pointing directly after given {@link Record}
+     */
+    public ShardCheckpoint moveAfter(KinesisRecord record) {
+        return new ShardCheckpoint(
+                streamName, shardId,
+                AFTER_SEQUENCE_NUMBER,
+                record.getSequenceNumber(),
+                record.getSubSequenceNumber());
+    }
+
+    public String getStreamName() {
+        return streamName;
+    }
+
+    public String getShardId() {
+        return shardId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
new file mode 100644
index 0000000..7dfe158
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Queues.newArrayDeque;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Deque;
+
+/***
+ * Iterates over records in a single shard.
+ * Under the hood records are retrieved from Kinesis in batches and stored in the in-memory queue.
+ * Then the caller of {@link ShardRecordsIterator#next()} can read from queue one by one.
+ */
+class ShardRecordsIterator {
+    private static final Logger LOG = LoggerFactory.getLogger(ShardRecordsIterator.class);
+
+    private final SimplifiedKinesisClient kinesis;
+    private final RecordFilter filter;
+    private ShardCheckpoint checkpoint;
+    private String shardIterator;
+    private Deque<KinesisRecord> data = newArrayDeque();
+
+    public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
+                                SimplifiedKinesisClient simplifiedKinesisClient) throws
+            TransientKinesisException {
+        this(initialCheckpoint, simplifiedKinesisClient, new RecordFilter());
+    }
+
+    public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
+                                SimplifiedKinesisClient simplifiedKinesisClient,
+                                RecordFilter filter) throws
+            TransientKinesisException {
+
+        this.checkpoint = checkNotNull(initialCheckpoint, "initialCheckpoint");
+        this.filter = checkNotNull(filter, "filter");
+        this.kinesis = checkNotNull(simplifiedKinesisClient, "simplifiedKinesisClient");
+        shardIterator = checkpoint.getShardIterator(kinesis);
+    }
+
+    /***
+     * Returns record if there's any present.
+     * Returns absent() if there are no new records at this time in the shard.
+     */
+    public CustomOptional<KinesisRecord> next() throws TransientKinesisException {
+        readMoreIfNecessary();
+
+        if (data.isEmpty()) {
+            return CustomOptional.absent();
+        } else {
+            KinesisRecord record = data.removeFirst();
+            checkpoint = checkpoint.moveAfter(record);
+            return CustomOptional.of(record);
+        }
+    }
+
+    private void readMoreIfNecessary() throws TransientKinesisException {
+        if (data.isEmpty()) {
+            GetKinesisRecordsResult response;
+            try {
+                response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(),
+                        checkpoint.getShardId());
+            } catch (ExpiredIteratorException e) {
+                LOG.info("Refreshing expired iterator", e);
+                shardIterator = checkpoint.getShardIterator(kinesis);
+                response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(),
+                        checkpoint.getShardId());
+            }
+            LOG.debug("Fetched {} new records", response.getRecords().size());
+            shardIterator = response.getNextShardIterator();
+            data.addAll(filter.apply(response.getRecords(), checkpoint));
+        }
+    }
+
+    public ShardCheckpoint getCheckpoint() {
+        return checkpoint;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
new file mode 100644
index 0000000..f9a1ea2
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+
+import com.google.common.collect.Lists;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import com.amazonaws.services.kinesis.model.StreamDescription;
+import org.joda.time.Instant;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+/***
+ * Wraps {@link AmazonKinesis} class providing much simpler interface and
+ * proper error handling.
+ */
+class SimplifiedKinesisClient {
+    private final AmazonKinesis kinesis;
+
+    public SimplifiedKinesisClient(AmazonKinesis kinesis) {
+        this.kinesis = kinesis;
+    }
+
+    public static SimplifiedKinesisClient from(KinesisClientProvider provider) {
+        return new SimplifiedKinesisClient(provider.get());
+    }
+
+    public String getShardIterator(final String streamName, final String shardId,
+                                   final ShardIteratorType shardIteratorType,
+                                   final String startingSequenceNumber, final Instant timestamp)
+            throws TransientKinesisException {
+        final Date date = timestamp != null ? timestamp.toDate() : null;
+        return wrapExceptions(new Callable<String>() {
+            @Override
+            public String call() throws Exception {
+                return kinesis.getShardIterator(new GetShardIteratorRequest()
+                        .withStreamName(streamName)
+                        .withShardId(shardId)
+                        .withShardIteratorType(shardIteratorType)
+                        .withStartingSequenceNumber(startingSequenceNumber)
+                        .withTimestamp(date)
+                ).getShardIterator();
+            }
+        });
+    }
+
+    public List<Shard> listShards(final String streamName) throws TransientKinesisException {
+        return wrapExceptions(new Callable<List<Shard>>() {
+            @Override
+            public List<Shard> call() throws Exception {
+                List<Shard> shards = Lists.newArrayList();
+                String lastShardId = null;
+
+                StreamDescription description;
+                do {
+                    description = kinesis.describeStream(streamName, lastShardId)
+                            .getStreamDescription();
+
+                    shards.addAll(description.getShards());
+                    lastShardId = shards.get(shards.size() - 1).getShardId();
+                } while (description.getHasMoreShards());
+
+                return shards;
+            }
+        });
+    }
+
+    /***
+     * Gets records from Kinesis and deaggregates them if needed.
+     *
+     * @return list of deaggregated records
+     * @throws TransientKinesisException - in case of recoverable situation
+     */
+    public GetKinesisRecordsResult getRecords(String shardIterator, String streamName,
+                                              String shardId) throws TransientKinesisException {
+        return getRecords(shardIterator, streamName, shardId, null);
+    }
+
+    /***
+     * Gets records from Kinesis and deaggregates them if needed.
+     *
+     * @return list of deaggregated records
+     * @throws TransientKinesisException - in case of recoverable situation
+     */
+    public GetKinesisRecordsResult getRecords(final String shardIterator, final String streamName,
+                                              final String shardId, final Integer limit)
+            throws
+            TransientKinesisException {
+        return wrapExceptions(new Callable<GetKinesisRecordsResult>() {
+            @Override
+            public GetKinesisRecordsResult call() throws Exception {
+                GetRecordsResult response = kinesis.getRecords(new GetRecordsRequest()
+                        .withShardIterator(shardIterator)
+                        .withLimit(limit));
+                return new GetKinesisRecordsResult(
+                        UserRecord.deaggregate(response.getRecords()),
+                        response.getNextShardIterator(),
+                        streamName, shardId);
+            }
+        });
+    }
+
+    /***
+     * Wraps Amazon specific exceptions into more friendly format.
+     *
+     * @throws TransientKinesisException              - in case of recoverable situation, i.e.
+     *                                  the request rate is too high, Kinesis remote service
+     *                                  failed, network issue, etc.
+     * @throws ExpiredIteratorException - if iterator needs to be refreshed
+     * @throws RuntimeException         - in all other cases
+     */
+    private <T> T wrapExceptions(Callable<T> callable) throws TransientKinesisException {
+        try {
+            return callable.call();
+        } catch (ExpiredIteratorException e) {
+            throw e;
+        } catch (LimitExceededException | ProvisionedThroughputExceededException e) {
+            throw new TransientKinesisException(
+                    "Too many requests to Kinesis. Wait some time and retry.", e);
+        } catch (AmazonServiceException e) {
+            if (e.getErrorType() == AmazonServiceException.ErrorType.Service) {
+                throw new TransientKinesisException(
+                        "Kinesis backend failed. Wait some time and retry.", e);
+            }
+            throw new RuntimeException("Kinesis client side failure", e);
+        } catch (Exception e) {
+            throw new RuntimeException("Unknown kinesis failure, when trying to reach kinesis", e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
new file mode 100644
index 0000000..8140269
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.joda.time.Instant;
+import java.io.Serializable;
+import java.util.Objects;
+
+/***
+ * Denotes a point at which the reader should start reading from a Kinesis stream.
+ * It can be expressed either as an {@link InitialPositionInStream} enum constant or a timestamp,
+ * in which case the reader will start reading at the specified point in time.
+ */
+class StartingPoint implements Serializable {
+    private final InitialPositionInStream position;
+    private final Instant timestamp;
+
+    public StartingPoint(InitialPositionInStream position) {
+        this.position = checkNotNull(position, "position");
+        this.timestamp = null;
+    }
+
+    public StartingPoint(Instant timestamp) {
+        this.timestamp = checkNotNull(timestamp, "timestamp");
+        this.position = null;
+    }
+
+    public InitialPositionInStream getPosition() {
+        return position;
+    }
+
+    public String getPositionName() {
+        return position != null ? position.name() : ShardIteratorType.AT_TIMESTAMP.name();
+    }
+
+    public Instant getTimestamp() {
+        return timestamp != null ? timestamp : null;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        StartingPoint that = (StartingPoint) o;
+        return position == that.position && Objects.equals(timestamp, that.timestamp);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(position, timestamp);
+    }
+
+    @Override
+    public String toString() {
+        if (timestamp == null) {
+            return position.toString();
+        } else {
+            return "Starting at timestamp " + timestamp;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java
new file mode 100644
index 0000000..22dc973
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Always returns the same instance of checkpoint.
+ */
+class StaticCheckpointGenerator implements CheckpointGenerator {
+    private final KinesisReaderCheckpoint checkpoint;
+
+    public StaticCheckpointGenerator(KinesisReaderCheckpoint checkpoint) {
+        checkNotNull(checkpoint, "checkpoint");
+        this.checkpoint = checkpoint;
+    }
+
+    @Override
+    public KinesisReaderCheckpoint generate(SimplifiedKinesisClient client) {
+        return checkpoint;
+    }
+
+    @Override
+    public String toString() {
+        return checkpoint.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
new file mode 100644
index 0000000..a1a974b
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import com.amazonaws.AmazonServiceException;
+
+/**
+ * Created by p.pastuszka on 21.06.2016.
+ */
+class TransientKinesisException extends Exception {
+    public TransientKinesisException(String s, AmazonServiceException e) {
+        super(s, e);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/package-info.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/package-info.java
new file mode 100644
index 0000000..5e37ef1
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Tests for KinesisIO.
+ */
+package org.apache.beam.sdk.io.kinesis;