You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/26 23:12:23 UTC
[1/5] incubator-beam git commit: Fix javadoc in Kinesis
Repository: incubator-beam
Updated Branches:
refs/heads/master 95ab43809 -> a17a99f58
Fix javadoc in Kinesis
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a5005fbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a5005fbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a5005fbe
Branch: refs/heads/master
Commit: a5005fbe7c0a43bdd2d825c2fc77556956bf0bfa
Parents: bed5056
Author: Dan Halperin <dh...@google.com>
Authored: Fri Aug 26 15:20:44 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Aug 26 16:11:23 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/kinesis/TransientKinesisException.java | 2 +-
.../java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java | 2 +-
.../java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java | 2 +-
.../java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java | 2 +-
.../java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java | 2 +-
.../org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java | 2 +-
.../test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java | 2 +-
.../org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java | 2 +-
8 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5005fbe/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
index a1a974b..57ad8a8 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.io.kinesis;
import com.amazonaws.AmazonServiceException;
/**
- * Created by p.pastuszka on 21.06.2016.
+ * A transient exception thrown by Kinesis.
*/
class TransientKinesisException extends Exception {
public TransientKinesisException(String s, AmazonServiceException e) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5005fbe/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
index b007fa4..046c9d9 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
@@ -74,7 +74,7 @@ import org.apache.commons.lang.builder.EqualsBuilder;
import org.joda.time.Instant;
/**
- * Created by p.pastuszka on 21.07.2016.
+ * Mock implemenation of {@link AmazonKinesis} for testing.
*/
class AmazonKinesisMock implements AmazonKinesis {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5005fbe/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
index cb0d0e2..20e8372 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
@@ -21,7 +21,7 @@ import java.util.NoSuchElementException;
import org.junit.Test;
/**
- * Created by ppastuszka on 12.12.15.
+ * Tests {@link CustomOptional}.
*/
public class CustomOptionalTest {
@Test(expected = NoSuchElementException.class)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5005fbe/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
index 304220b..f0ab46c 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
@@ -32,7 +32,7 @@ import org.joda.time.DateTime;
import org.junit.Test;
/**
- * Created by p.pastuszka on 22.07.2016.
+ * Tests {@link AmazonKinesisMock}.
*/
public class KinesisMockReadTest {
@Test
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5005fbe/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
index 29a24821..3111029 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
@@ -30,7 +30,7 @@ import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
/**
- * Created by ppastuszka on 12.12.15.
+ * Tests {@link KinesisReader}.
*/
@RunWith(MockitoJUnitRunner.class)
public class KinesisReaderTest {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5005fbe/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
index d301f25..8771c86 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
@@ -23,7 +23,7 @@ import org.joda.time.Instant;
import org.junit.Test;
/**
- * Created by p.pastuszka on 20.07.2016.
+ * Tests {@link KinesisRecordCoder}.
*/
public class KinesisRecordCoderTest {
@Test
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5005fbe/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
index aedc89e..f032eea 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
@@ -25,7 +25,7 @@ import java.util.List;
import org.junit.Test;
/**
- * Created by ppastuszka on 12.12.15.
+ * Tests {@link RoundRobin}.
*/
public class RoundRobinTest {
@Test(expected = IllegalArgumentException.class)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a5005fbe/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
index 585b884..49e806d 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
@@ -36,7 +36,7 @@ import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
/**
- * Created by ppastuszka on 12.12.15.
+ * Tests {@link ShardRecordsIterator}.
*/
@RunWith(MockitoJUnitRunner.class)
public class ShardRecordsIteratorTest {
[5/5] incubator-beam git commit: Closes #687
Posted by dh...@apache.org.
Closes #687
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a17a99f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a17a99f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a17a99f5
Branch: refs/heads/master
Commit: a17a99f580d9818e11751b6996a15aa60b2e0c56
Parents: 95ab438 a5005fb
Author: Dan Halperin <dh...@google.com>
Authored: Fri Aug 26 16:12:08 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Aug 26 16:12:08 2016 -0700
----------------------------------------------------------------------
sdks/java/io/kinesis/pom.xml | 179 +++++++++
.../sdk/io/kinesis/CheckpointGenerator.java | 30 ++
.../beam/sdk/io/kinesis/CustomOptional.java | 85 +++++
.../io/kinesis/DynamicCheckpointGenerator.java | 56 +++
.../sdk/io/kinesis/GetKinesisRecordsResult.java | 54 +++
.../sdk/io/kinesis/KinesisClientProvider.java | 31 ++
.../apache/beam/sdk/io/kinesis/KinesisIO.java | 190 ++++++++++
.../beam/sdk/io/kinesis/KinesisReader.java | 145 +++++++
.../sdk/io/kinesis/KinesisReaderCheckpoint.java | 96 +++++
.../beam/sdk/io/kinesis/KinesisRecord.java | 121 ++++++
.../beam/sdk/io/kinesis/KinesisRecordCoder.java | 74 ++++
.../beam/sdk/io/kinesis/KinesisSource.java | 112 ++++++
.../beam/sdk/io/kinesis/RecordFilter.java | 41 ++
.../apache/beam/sdk/io/kinesis/RoundRobin.java | 53 +++
.../beam/sdk/io/kinesis/ShardCheckpoint.java | 175 +++++++++
.../sdk/io/kinesis/ShardRecordsIterator.java | 98 +++++
.../sdk/io/kinesis/SimplifiedKinesisClient.java | 157 ++++++++
.../beam/sdk/io/kinesis/StartingPoint.java | 85 +++++
.../io/kinesis/StaticCheckpointGenerator.java | 42 +++
.../io/kinesis/TransientKinesisException.java | 29 ++
.../beam/sdk/io/kinesis/package-info.java | 22 ++
.../beam/sdk/io/kinesis/AmazonKinesisMock.java | 375 +++++++++++++++++++
.../beam/sdk/io/kinesis/CustomOptionalTest.java | 31 ++
.../kinesis/DynamicCheckpointGeneratorTest.java | 57 +++
.../sdk/io/kinesis/KinesisMockReadTest.java | 91 +++++
.../io/kinesis/KinesisReaderCheckpointTest.java | 67 ++++
.../beam/sdk/io/kinesis/KinesisReaderIT.java | 119 ++++++
.../beam/sdk/io/kinesis/KinesisReaderTest.java | 120 ++++++
.../sdk/io/kinesis/KinesisRecordCoderTest.java | 45 +++
.../beam/sdk/io/kinesis/KinesisTestOptions.java | 47 +++
.../beam/sdk/io/kinesis/KinesisUploader.java | 84 +++++
.../beam/sdk/io/kinesis/RecordFilterTest.java | 66 ++++
.../beam/sdk/io/kinesis/RoundRobinTest.java | 57 +++
.../sdk/io/kinesis/ShardCheckpointTest.java | 149 ++++++++
.../io/kinesis/ShardRecordsIteratorTest.java | 151 ++++++++
.../io/kinesis/SimplifiedKinesisClientTest.java | 224 +++++++++++
.../beam/sdk/io/kinesis/package-info.java | 22 ++
sdks/java/io/pom.xml | 1 +
38 files changed, 3581 insertions(+)
----------------------------------------------------------------------
[2/5] incubator-beam git commit: Organize imports in Kinesis
Posted by dh...@apache.org.
Organize imports in Kinesis
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bed50565
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bed50565
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bed50565
Branch: refs/heads/master
Commit: bed50565cc5cd8892d1a4577cb5d6505f7bb0367
Parents: bed22de
Author: Dan Halperin <dh...@google.com>
Authored: Fri Aug 26 15:17:05 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Aug 26 16:11:23 2016 -0700
----------------------------------------------------------------------
.../io/kinesis/DynamicCheckpointGenerator.java | 4 +--
.../sdk/io/kinesis/GetKinesisRecordsResult.java | 2 +-
.../apache/beam/sdk/io/kinesis/KinesisIO.java | 2 +-
.../beam/sdk/io/kinesis/KinesisReader.java | 8 +++---
.../sdk/io/kinesis/KinesisReaderCheckpoint.java | 4 +--
.../beam/sdk/io/kinesis/KinesisRecord.java | 8 +++---
.../beam/sdk/io/kinesis/KinesisRecordCoder.java | 9 +++----
.../beam/sdk/io/kinesis/KinesisSource.java | 10 +++----
.../apache/beam/sdk/io/kinesis/RoundRobin.java | 1 -
.../beam/sdk/io/kinesis/ShardCheckpoint.java | 8 +++---
.../sdk/io/kinesis/ShardRecordsIterator.java | 2 +-
.../sdk/io/kinesis/SimplifiedKinesisClient.java | 5 ++--
.../beam/sdk/io/kinesis/StartingPoint.java | 2 +-
.../beam/sdk/io/kinesis/AmazonKinesisMock.java | 12 ++++-----
.../beam/sdk/io/kinesis/CustomOptionalTest.java | 2 +-
.../kinesis/DynamicCheckpointGeneratorTest.java | 7 ++---
.../sdk/io/kinesis/KinesisMockReadTest.java | 11 ++++----
.../io/kinesis/KinesisReaderCheckpointTest.java | 10 +++----
.../beam/sdk/io/kinesis/KinesisReaderIT.java | 28 ++++++++++----------
.../beam/sdk/io/kinesis/KinesisReaderTest.java | 7 ++---
.../sdk/io/kinesis/KinesisRecordCoderTest.java | 3 +--
.../beam/sdk/io/kinesis/KinesisUploader.java | 4 +--
.../beam/sdk/io/kinesis/RecordFilterTest.java | 8 +++---
.../beam/sdk/io/kinesis/RoundRobinTest.java | 4 +--
.../sdk/io/kinesis/ShardCheckpointTest.java | 7 ++---
.../io/kinesis/ShardRecordsIteratorTest.java | 11 ++++----
.../io/kinesis/SimplifiedKinesisClientTest.java | 11 ++++----
27 files changed, 92 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
index d86960f..2ec293c 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
@@ -20,10 +20,8 @@ package org.apache.beam.sdk.io.kinesis;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Lists.transform;
-
-import com.google.common.base.Function;
-
import com.amazonaws.services.kinesis.model.Shard;
+import com.google.common.base.Function;
/**
* Creates {@link KinesisReaderCheckpoint}, which spans over all shards in given stream.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
index f48b9d5..c0f00de 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
@@ -18,9 +18,9 @@
package org.apache.beam.sdk.io.kinesis;
import static com.google.common.collect.Lists.transform;
-import com.google.common.base.Function;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+import com.google.common.base.Function;
import java.util.List;
import javax.annotation.Nullable;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
index b3cb464..811051c 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
@@ -18,7 +18,6 @@
package org.apache.beam.sdk.io.kinesis;
-import org.apache.beam.sdk.transforms.PTransform;
import static com.google.common.base.Preconditions.checkNotNull;
import com.amazonaws.auth.AWSCredentialsProvider;
@@ -28,6 +27,7 @@ import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import org.apache.beam.sdk.transforms.PTransform;
import org.joda.time.Instant;
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
index 38a0050..219a705 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
@@ -18,16 +18,16 @@
package org.apache.beam.sdk.io.kinesis;
-import org.apache.beam.sdk.io.UnboundedSource;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Lists.newArrayList;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.NoSuchElementException;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/***
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
index 6ceb742..663ba44 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
@@ -17,20 +17,18 @@
*/
package org.apache.beam.sdk.io.kinesis;
-import org.apache.beam.sdk.io.UnboundedSource;
import static com.google.common.collect.Iterables.transform;
import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.collect.Lists.partition;
-
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
-
import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.UnboundedSource;
/***
* Checkpoint representing a total progress in a set of shards in single stream.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
index cdb495c..fe2a33d 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
@@ -17,15 +17,15 @@
*/
package org.apache.beam.sdk.io.kinesis;
-import com.google.common.base.Charsets;
+import static org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
-import static org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode;
-import org.apache.commons.lang.builder.EqualsBuilder;
-import org.joda.time.Instant;
+import com.google.common.base.Charsets;
import java.io.Serializable;
import java.nio.ByteBuffer;
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.joda.time.Instant;
/**
* {@link UserRecord} enhanced with utility methods.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
index c383a4f..5b13e31 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
@@ -17,18 +17,17 @@
*/
package org.apache.beam.sdk.io.kinesis;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
-
import org.joda.time.Instant;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
/***
* A {@link Coder} for {@link KinesisRecord}.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
index 38c9fa4..62cba08 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
@@ -17,18 +17,16 @@
*/
package org.apache.beam.sdk.io.kinesis;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Lists.newArrayList;
+
+import java.util.List;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
-
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.collect.Lists.newArrayList;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
/***
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
index 7257aa1..7adae4b 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.kinesis;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Queues.newArrayDeque;
-
import java.util.Deque;
import java.util.Iterator;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
index 1d8628b..9920aca 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
@@ -18,17 +18,17 @@
package org.apache.beam.sdk.io.kinesis;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
import static com.amazonaws.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER;
import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER;
import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import org.joda.time.Instant;
import java.io.Serializable;
+import org.joda.time.Instant;
/***
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
index 7dfe158..d17996a 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
@@ -21,9 +21,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Queues.newArrayDeque;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import java.util.Deque;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Deque;
/***
* Iterates over records in a single shard.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
index f9a1ea2..96267d1 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
@@ -18,8 +18,6 @@
package org.apache.beam.sdk.io.kinesis;
-import com.google.common.collect.Lists;
-
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
@@ -32,10 +30,11 @@ import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededExcepti
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamDescription;
-import org.joda.time.Instant;
+import com.google.common.collect.Lists;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
+import org.joda.time.Instant;
/***
* Wraps {@link AmazonKinesis} class providing much simpler interface and
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
index 8140269..b7ee917 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
@@ -22,9 +22,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
-import org.joda.time.Instant;
import java.io.Serializable;
import java.util.Objects;
+import org.joda.time.Instant;
/***
* Denotes a point at which the reader should start reading from a Kinesis stream.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
index 7ca8e0b..b007fa4 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
@@ -19,7 +19,9 @@ package org.apache.beam.sdk.io.kinesis;
import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.collect.Lists.transform;
-import com.google.common.base.Function;
+import static java.lang.Integer.parseInt;
+import static java.lang.Math.min;
+import static org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.ResponseMetadata;
@@ -63,15 +65,13 @@ import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.SplitShardRequest;
import com.amazonaws.services.kinesis.model.SplitShardResult;
import com.amazonaws.services.kinesis.model.StreamDescription;
-import static org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode;
-import org.apache.commons.lang.builder.EqualsBuilder;
-import org.joda.time.Instant;
-import static java.lang.Integer.parseInt;
-import static java.lang.Math.min;
+import com.google.common.base.Function;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.List;
import javax.annotation.Nullable;
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.joda.time.Instant;
/**
* Created by p.pastuszka on 21.07.2016.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
index 152fd6d..cb0d0e2 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
@@ -17,8 +17,8 @@
*/
package org.apache.beam.sdk.io.kinesis;
-import org.junit.Test;
import java.util.NoSuchElementException;
+import org.junit.Test;
/**
* Created by ppastuszka on 12.12.15.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
index a9e5a69..c92ac9a 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
@@ -17,15 +17,16 @@
*/
package org.apache.beam.sdk.io.kinesis;
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
-import com.amazonaws.services.kinesis.model.Shard;
+import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.amazonaws.services.kinesis.model.Shard;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
-import static java.util.Arrays.asList;
/***
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
index 61a858f..304220b 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
@@ -17,20 +17,19 @@
*/
package org.apache.beam.sdk.io.kinesis;
+import static com.google.common.collect.Lists.newArrayList;
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.google.common.collect.Iterables;
+import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
-import static com.google.common.collect.Lists.newArrayList;
-
-import com.google.common.collect.Iterables;
-
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import org.joda.time.DateTime;
import org.junit.Test;
-import java.util.List;
/**
* Created by p.pastuszka on 22.07.2016.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
index 205f050..8c8da64 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
@@ -18,17 +18,17 @@
package org.apache.beam.sdk.io.kinesis;
-import com.google.common.collect.Iterables;
-
+import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
+
+import com.google.common.collect.Iterables;
+import java.util.Iterator;
+import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
-import static java.util.Arrays.asList;
-import java.util.Iterator;
-import java.util.List;
/***
*
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
index fbc7c66..73a2455 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
@@ -17,6 +17,20 @@
*/
package org.apache.beam.sdk.io.kinesis;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Lists.newArrayList;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.amazonaws.regions.Regions;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -25,25 +39,11 @@ import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.collect.Lists.newArrayList;
-
-import com.amazonaws.regions.Regions;
-import static org.assertj.core.api.Assertions.assertThat;
import org.apache.commons.lang.RandomStringUtils;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Ignore;
import org.junit.Test;
-import static java.util.concurrent.Executors.newSingleThreadExecutor;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
/**
* Integration test, that reads from the real Kinesis.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
index 793fb57..29a24821 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
@@ -17,16 +17,17 @@
*/
package org.apache.beam.sdk.io.kinesis;
+import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
-import static java.util.Arrays.asList;
-import java.io.IOException;
-import java.util.NoSuchElementException;
/**
* Created by ppastuszka on 12.12.15.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
index b09b7eb..d301f25 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
@@ -17,11 +17,10 @@
*/
package org.apache.beam.sdk.io.kinesis;
+import java.nio.ByteBuffer;
import org.apache.beam.sdk.testing.CoderProperties;
-
import org.joda.time.Instant;
import org.junit.Test;
-import java.nio.ByteBuffer;
/**
* Created by p.pastuszka on 20.07.2016.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
index 0dcede9..c98242b 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
@@ -18,8 +18,6 @@
package org.apache.beam.sdk.io.kinesis;
import static com.google.common.collect.Lists.newArrayList;
-import com.google.common.base.Charsets;
-import com.google.common.collect.Lists;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.internal.StaticCredentialsProvider;
@@ -30,6 +28,8 @@ import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import com.amazonaws.services.kinesis.model.PutRecordsResult;
import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
import java.nio.ByteBuffer;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
index 360106d..f979c01 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
@@ -17,16 +17,16 @@
*/
package org.apache.beam.sdk.io.kinesis;
-import com.google.common.collect.Lists;
-
import static org.mockito.BDDMockito.given;
+
+import com.google.common.collect.Lists;
+import java.util.Collections;
+import java.util.List;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
-import java.util.Collections;
-import java.util.List;
/***
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
index a508ddf..aedc89e 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
@@ -18,11 +18,11 @@
package org.apache.beam.sdk.io.kinesis;
import static com.google.common.collect.Lists.newArrayList;
-
import static org.assertj.core.api.Assertions.assertThat;
-import org.junit.Test;
+
import java.util.Collections;
import java.util.List;
+import org.junit.Test;
/**
* Created by ppastuszka on 12.12.15.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
index 2227cef..39ab36f 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
@@ -22,8 +22,6 @@ import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPos
import static com.amazonaws.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER;
import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER;
import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
-import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
-import com.amazonaws.services.kinesis.model.ShardIteratorType;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
import static org.mockito.Matchers.anyString;
@@ -31,6 +29,10 @@ import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+
+import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import java.io.IOException;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.junit.Before;
@@ -38,7 +40,6 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
-import java.io.IOException;
/**
*
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
index e2a3ccc..585b884 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
@@ -17,11 +17,16 @@
*/
package org.apache.beam.sdk.io.kinesis;
-import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Mockito.when;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import java.io.IOException;
+import java.util.Collections;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -29,10 +34,6 @@ import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
-import static java.util.Arrays.asList;
-import static java.util.Collections.singletonList;
-import java.io.IOException;
-import java.util.Collections;
/**
* Created by ppastuszka on 12.12.15.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed50565/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
index 44d29d6..96434fd 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
@@ -17,6 +17,11 @@
*/
package org.apache.beam.sdk.io.kinesis;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.reset;
+
import com.amazonaws.AmazonServiceException;
import com.amazonaws.AmazonServiceException.ErrorType;
import com.amazonaws.services.kinesis.AmazonKinesis;
@@ -29,17 +34,13 @@ import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededExcepti
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamDescription;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
-import static org.mockito.BDDMockito.given;
-import static org.mockito.Mockito.reset;
+import java.util.List;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
-import java.util.List;
/***
*/
[3/5] incubator-beam git commit: kinesis: a connector for Amazon
Kinesis
Posted by dh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
new file mode 100644
index 0000000..7ca8e0b
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Lists.transform;
+import com.google.common.base.Function;
+
+import com.amazonaws.AmazonWebServiceRequest;
+import com.amazonaws.ResponseMetadata;
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.model.AddTagsToStreamRequest;
+import com.amazonaws.services.kinesis.model.AddTagsToStreamResult;
+import com.amazonaws.services.kinesis.model.CreateStreamRequest;
+import com.amazonaws.services.kinesis.model.CreateStreamResult;
+import com.amazonaws.services.kinesis.model.DecreaseStreamRetentionPeriodRequest;
+import com.amazonaws.services.kinesis.model.DecreaseStreamRetentionPeriodResult;
+import com.amazonaws.services.kinesis.model.DeleteStreamRequest;
+import com.amazonaws.services.kinesis.model.DeleteStreamResult;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.DisableEnhancedMonitoringRequest;
+import com.amazonaws.services.kinesis.model.DisableEnhancedMonitoringResult;
+import com.amazonaws.services.kinesis.model.EnableEnhancedMonitoringRequest;
+import com.amazonaws.services.kinesis.model.EnableEnhancedMonitoringResult;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import com.amazonaws.services.kinesis.model.IncreaseStreamRetentionPeriodRequest;
+import com.amazonaws.services.kinesis.model.IncreaseStreamRetentionPeriodResult;
+import com.amazonaws.services.kinesis.model.ListStreamsRequest;
+import com.amazonaws.services.kinesis.model.ListStreamsResult;
+import com.amazonaws.services.kinesis.model.ListTagsForStreamRequest;
+import com.amazonaws.services.kinesis.model.ListTagsForStreamResult;
+import com.amazonaws.services.kinesis.model.MergeShardsRequest;
+import com.amazonaws.services.kinesis.model.MergeShardsResult;
+import com.amazonaws.services.kinesis.model.PutRecordRequest;
+import com.amazonaws.services.kinesis.model.PutRecordResult;
+import com.amazonaws.services.kinesis.model.PutRecordsRequest;
+import com.amazonaws.services.kinesis.model.PutRecordsResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.RemoveTagsFromStreamRequest;
+import com.amazonaws.services.kinesis.model.RemoveTagsFromStreamResult;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import com.amazonaws.services.kinesis.model.SplitShardRequest;
+import com.amazonaws.services.kinesis.model.SplitShardResult;
+import com.amazonaws.services.kinesis.model.StreamDescription;
+import static org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode;
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.joda.time.Instant;
+import static java.lang.Integer.parseInt;
+import static java.lang.Math.min;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/**
+ * Created by p.pastuszka on 21.07.2016.
+ */
+class AmazonKinesisMock implements AmazonKinesis {
+
+ static class TestData implements Serializable {
+ private final String data;
+ private final Instant arrivalTimestamp;
+ private final String sequenceNumber;
+
+ public TestData(KinesisRecord record) {
+ this(new String(record.getData().array()),
+ record.getApproximateArrivalTimestamp(),
+ record.getSequenceNumber());
+ }
+
+ public TestData(String data, Instant arrivalTimestamp, String sequenceNumber) {
+ this.data = data;
+ this.arrivalTimestamp = arrivalTimestamp;
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ public Record convertToRecord() {
+ return new Record().
+ withApproximateArrivalTimestamp(arrivalTimestamp.toDate()).
+ withData(ByteBuffer.wrap(data.getBytes())).
+ withSequenceNumber(sequenceNumber).
+ withPartitionKey("");
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return EqualsBuilder.reflectionEquals(this, obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return reflectionHashCode(this);
+ }
+ }
+
+ static class Provider implements KinesisClientProvider {
+
+ private final List<List<TestData>> shardedData;
+ private final int numberOfRecordsPerGet;
+
+ public Provider(List<List<TestData>> shardedData, int numberOfRecordsPerGet) {
+ this.shardedData = shardedData;
+ this.numberOfRecordsPerGet = numberOfRecordsPerGet;
+ }
+
+ @Override
+ public AmazonKinesis get() {
+ return new AmazonKinesisMock(transform(shardedData,
+ new Function<List<TestData>, List<Record>>() {
+ @Override
+ public List<Record> apply(@Nullable List<TestData> testDatas) {
+ return transform(testDatas, new Function<TestData, Record>() {
+ @Override
+ public Record apply(@Nullable TestData testData) {
+ return testData.convertToRecord();
+ }
+ });
+ }
+ }), numberOfRecordsPerGet);
+ }
+ }
+
+ private final List<List<Record>> shardedData;
+ private final int numberOfRecordsPerGet;
+
+ public AmazonKinesisMock(List<List<Record>> shardedData, int numberOfRecordsPerGet) {
+ this.shardedData = shardedData;
+ this.numberOfRecordsPerGet = numberOfRecordsPerGet;
+ }
+
+ @Override
+ public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) {
+ String[] shardIteratorParts = getRecordsRequest.getShardIterator().split(":");
+ int shardId = parseInt(shardIteratorParts[0]);
+ int startingRecord = parseInt(shardIteratorParts[1]);
+ List<Record> shardData = shardedData.get(shardId);
+
+ int toIndex = min(startingRecord + numberOfRecordsPerGet, shardData.size());
+ int fromIndex = min(startingRecord, toIndex);
+ return new GetRecordsResult().
+ withRecords(shardData.subList(fromIndex, toIndex)).
+ withNextShardIterator(String.format("%s:%s", shardId, toIndex));
+ }
+
+ @Override
+ public GetShardIteratorResult getShardIterator(
+ GetShardIteratorRequest getShardIteratorRequest) {
+ ShardIteratorType shardIteratorType = ShardIteratorType.fromValue(
+ getShardIteratorRequest.getShardIteratorType());
+
+ String shardIterator;
+ if (shardIteratorType == ShardIteratorType.TRIM_HORIZON) {
+ shardIterator = String.format("%s:%s", getShardIteratorRequest.getShardId(), 0);
+ } else {
+ throw new RuntimeException("Not implemented");
+ }
+
+ return new GetShardIteratorResult().withShardIterator(shardIterator);
+ }
+
+ @Override
+ public DescribeStreamResult describeStream(String streamName, String exclusiveStartShardId) {
+ int nextShardId = 0;
+ if (exclusiveStartShardId != null) {
+ nextShardId = parseInt(exclusiveStartShardId) + 1;
+ }
+ boolean hasMoreShards = nextShardId + 1 < shardedData.size();
+
+ List<Shard> shards = newArrayList();
+ if (nextShardId < shardedData.size()) {
+ shards.add(new Shard().withShardId(Integer.toString(nextShardId)));
+ }
+
+ return new DescribeStreamResult().withStreamDescription(
+ new StreamDescription().withHasMoreShards(hasMoreShards).withShards(shards)
+ );
+ }
+
+ @Override
+ public void setEndpoint(String endpoint) {
+
+ }
+
+ @Override
+ public void setRegion(Region region) {
+
+ }
+
+ @Override
+ public AddTagsToStreamResult addTagsToStream(AddTagsToStreamRequest addTagsToStreamRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public CreateStreamResult createStream(CreateStreamRequest createStreamRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public CreateStreamResult createStream(String streamName, Integer shardCount) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public DecreaseStreamRetentionPeriodResult decreaseStreamRetentionPeriod(
+ DecreaseStreamRetentionPeriodRequest decreaseStreamRetentionPeriodRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public DeleteStreamResult deleteStream(DeleteStreamRequest deleteStreamRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public DeleteStreamResult deleteStream(String streamName) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public DescribeStreamResult describeStream(DescribeStreamRequest describeStreamRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public DescribeStreamResult describeStream(String streamName) {
+
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public DescribeStreamResult describeStream(String streamName,
+ Integer limit, String exclusiveStartShardId) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public DisableEnhancedMonitoringResult disableEnhancedMonitoring(
+ DisableEnhancedMonitoringRequest disableEnhancedMonitoringRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public EnableEnhancedMonitoringResult enableEnhancedMonitoring(
+ EnableEnhancedMonitoringRequest enableEnhancedMonitoringRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public GetShardIteratorResult getShardIterator(String streamName,
+ String shardId,
+ String shardIteratorType) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public GetShardIteratorResult getShardIterator(String streamName,
+ String shardId,
+ String shardIteratorType,
+ String startingSequenceNumber) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public IncreaseStreamRetentionPeriodResult increaseStreamRetentionPeriod(
+ IncreaseStreamRetentionPeriodRequest increaseStreamRetentionPeriodRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public ListStreamsResult listStreams(ListStreamsRequest listStreamsRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public ListStreamsResult listStreams() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public ListStreamsResult listStreams(String exclusiveStartStreamName) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public ListStreamsResult listStreams(Integer limit, String exclusiveStartStreamName) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public ListTagsForStreamResult listTagsForStream(
+ ListTagsForStreamRequest listTagsForStreamRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public MergeShardsResult mergeShards(MergeShardsRequest mergeShardsRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public MergeShardsResult mergeShards(String streamName,
+ String shardToMerge, String adjacentShardToMerge) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public PutRecordResult putRecord(PutRecordRequest putRecordRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public PutRecordResult putRecord(String streamName, ByteBuffer data, String partitionKey) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public PutRecordResult putRecord(String streamName, ByteBuffer data,
+ String partitionKey, String sequenceNumberForOrdering) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public RemoveTagsFromStreamResult removeTagsFromStream(
+ RemoveTagsFromStreamRequest removeTagsFromStreamRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public SplitShardResult splitShard(SplitShardRequest splitShardRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public SplitShardResult splitShard(String streamName,
+ String shardToSplit, String newStartingHashKey) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public void shutdown() {
+
+ }
+
+ @Override
+ public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) {
+ throw new RuntimeException("Not implemented");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
new file mode 100644
index 0000000..152fd6d
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import org.junit.Test;
+import java.util.NoSuchElementException;
+
+/**
+ * Created by ppastuszka on 12.12.15.
+ */
+public class CustomOptionalTest {
+ @Test(expected = NoSuchElementException.class)
+ public void absentThrowsNoSuchElementExceptionOnGet() {
+ CustomOptional.absent().get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
new file mode 100644
index 0000000..a9e5a69
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.amazonaws.services.kinesis.model.Shard;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.BDDMockito.given;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import static java.util.Arrays.asList;
+
+
+/***
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class DynamicCheckpointGeneratorTest {
+
+ @Mock
+ private SimplifiedKinesisClient kinesisClient;
+ @Mock
+ private Shard shard1, shard2, shard3;
+
+ @Test
+ public void shouldMapAllShardsToCheckpoints() throws Exception {
+ given(shard1.getShardId()).willReturn("shard-01");
+ given(shard2.getShardId()).willReturn("shard-02");
+ given(shard3.getShardId()).willReturn("shard-03");
+ given(kinesisClient.listShards("stream")).willReturn(asList(shard1, shard2, shard3));
+
+ StartingPoint startingPoint = new StartingPoint(InitialPositionInStream.LATEST);
+ DynamicCheckpointGenerator underTest = new DynamicCheckpointGenerator("stream",
+ startingPoint);
+
+ KinesisReaderCheckpoint checkpoint = underTest.generate(kinesisClient);
+
+ assertThat(checkpoint).hasSize(3);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
new file mode 100644
index 0000000..61a858f
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import static com.google.common.collect.Lists.newArrayList;
+
+import com.google.common.collect.Iterables;
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import org.joda.time.DateTime;
+import org.junit.Test;
+import java.util.List;
+
+/**
+ * Created by p.pastuszka on 22.07.2016.
+ */
+public class KinesisMockReadTest {
+ @Test
+ public void readsDataFromMockKinesis() {
+ int noOfShards = 3;
+ int noOfEventsPerShard = 100;
+ List<List<AmazonKinesisMock.TestData>> testData =
+ provideTestData(noOfShards, noOfEventsPerShard);
+
+ final Pipeline p = TestPipeline.create();
+ PCollection<AmazonKinesisMock.TestData> result = p.
+ apply(
+ KinesisIO.Read.
+ from("stream", InitialPositionInStream.TRIM_HORIZON).
+ using(new AmazonKinesisMock.Provider(testData, 10)).
+ withMaxNumRecords(noOfShards * noOfEventsPerShard)).
+ apply(ParDo.of(new KinesisRecordToTestData()));
+ PAssert.that(result).containsInAnyOrder(Iterables.concat(testData));
+ p.run();
+ }
+
+ private static class KinesisRecordToTestData extends
+ DoFn<KinesisRecord, AmazonKinesisMock.TestData> {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ c.output(new AmazonKinesisMock.TestData(c.element()));
+ }
+ }
+
+ private List<List<AmazonKinesisMock.TestData>> provideTestData(
+ int noOfShards,
+ int noOfEventsPerShard) {
+
+ int seqNumber = 0;
+
+ List<List<AmazonKinesisMock.TestData>> shardedData = newArrayList();
+ for (int i = 0; i < noOfShards; ++i) {
+ List<AmazonKinesisMock.TestData> shardData = newArrayList();
+ shardedData.add(shardData);
+
+ DateTime arrival = DateTime.now();
+ for (int j = 0; j < noOfEventsPerShard; ++j) {
+ arrival = arrival.plusSeconds(1);
+
+ seqNumber++;
+ shardData.add(new AmazonKinesisMock.TestData(
+ Integer.toString(seqNumber),
+ arrival.toInstant(),
+ Integer.toString(seqNumber))
+ );
+ }
+ }
+
+ return shardedData;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
new file mode 100644
index 0000000..205f050
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+
+import com.google.common.collect.Iterables;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import static java.util.Arrays.asList;
+import java.util.Iterator;
+import java.util.List;
+
+/***
+ *
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class KinesisReaderCheckpointTest {
+ @Mock
+ private ShardCheckpoint a, b, c;
+
+ private KinesisReaderCheckpoint checkpoint;
+
+ @Before
+ public void setUp() {
+ checkpoint = new KinesisReaderCheckpoint(asList(a, b, c));
+ }
+
+ @Test
+ public void splitsCheckpointAccordingly() {
+ verifySplitInto(1);
+ verifySplitInto(2);
+ verifySplitInto(3);
+ verifySplitInto(4);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void isImmutable() {
+ Iterator<ShardCheckpoint> iterator = checkpoint.iterator();
+ iterator.remove();
+ }
+
+ private void verifySplitInto(int size) {
+ List<KinesisReaderCheckpoint> split = checkpoint.splitInto(size);
+ assertThat(Iterables.concat(split)).containsOnly(a, b, c);
+ assertThat(split).hasSize(Math.min(size, 3));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
new file mode 100644
index 0000000..fbc7c66
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Lists.newArrayList;
+
+import com.amazonaws.regions.Regions;
+import static org.assertj.core.api.Assertions.assertThat;
+import org.apache.commons.lang.RandomStringUtils;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Ignore;
+import org.junit.Test;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Integration test, that reads from the real Kinesis.
+ * You need to provide all {@link KinesisTestOptions} in order to run this.
+ */
+public class KinesisReaderIT {
+ private static final long PIPELINE_STARTUP_TIME = TimeUnit.SECONDS.toMillis(10);
+ private ExecutorService singleThreadExecutor = newSingleThreadExecutor();
+
+
+ @Ignore
+ @Test
+ public void readsDataFromRealKinesisStream()
+ throws IOException, InterruptedException, ExecutionException {
+ KinesisTestOptions options = readKinesisOptions();
+ List<String> testData = prepareTestData(1000);
+
+ Future<?> future = startTestPipeline(testData, options);
+ KinesisUploader.uploadAll(testData, options);
+ future.get();
+ }
+
+ private List<String> prepareTestData(int count) {
+ List<String> data = newArrayList();
+ for (int i = 0; i < count; ++i) {
+ data.add(RandomStringUtils.randomAlphabetic(32));
+ }
+ return data;
+ }
+
+ private Future<?> startTestPipeline(List<String> testData, KinesisTestOptions options)
+ throws InterruptedException {
+ final Pipeline p = TestPipeline.create();
+ PCollection<String> result = p.
+ apply(KinesisIO.Read.
+ from(options.getAwsKinesisStream(), Instant.now()).
+ using(options.getAwsAccessKey(), options.getAwsSecretKey(),
+ Regions.fromName(options.getAwsKinesisRegion())).
+ withMaxReadTime(Duration.standardMinutes(3))
+ ).
+ apply(ParDo.of(new RecordDataToString()));
+ PAssert.that(result).containsInAnyOrder(testData);
+
+ Future<?> future = singleThreadExecutor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ PipelineResult result = p.run();
+ PipelineResult.State state = result.getState();
+ while (state != PipelineResult.State.DONE && state != PipelineResult.State.FAILED) {
+ Thread.sleep(1000);
+ state = result.getState();
+ }
+ assertThat(state).isEqualTo(PipelineResult.State.DONE);
+ return null;
+ }
+ });
+ Thread.sleep(PIPELINE_STARTUP_TIME);
+ return future;
+ }
+
+ private KinesisTestOptions readKinesisOptions() {
+ PipelineOptionsFactory.register(KinesisTestOptions.class);
+ return TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
+ }
+
+ private static class RecordDataToString extends DoFn<KinesisRecord, String> {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ checkNotNull(c.element(), "Null record given");
+ c.output(new String(c.element().getData().array(), StandardCharsets.UTF_8));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
new file mode 100644
index 0000000..793fb57
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import static java.util.Arrays.asList;
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * Created by ppastuszka on 12.12.15.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class KinesisReaderTest {
+ @Mock
+ private SimplifiedKinesisClient kinesis;
+ @Mock
+ private CheckpointGenerator generator;
+ @Mock
+ private ShardCheckpoint firstCheckpoint, secondCheckpoint;
+ @Mock
+ private ShardRecordsIterator firstIterator, secondIterator;
+ @Mock
+ private KinesisRecord a, b, c, d;
+
+ private KinesisReader reader;
+
+ @Before
+ public void setUp() throws IOException, TransientKinesisException {
+ when(generator.generate(kinesis)).thenReturn(new KinesisReaderCheckpoint(
+ asList(firstCheckpoint, secondCheckpoint)
+ ));
+ when(firstCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(firstIterator);
+ when(secondCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(secondIterator);
+ when(firstIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
+ when(secondIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
+
+ reader = new KinesisReader(kinesis, generator, null);
+ }
+
+ @Test
+ public void startReturnsFalseIfNoDataAtTheBeginning() throws IOException {
+ assertThat(reader.start()).isFalse();
+ }
+
+ @Test(expected = NoSuchElementException.class)
+ public void throwsNoSuchElementExceptionIfNoData() throws IOException {
+ reader.start();
+ reader.getCurrent();
+ }
+
+ @Test
+ public void startReturnsTrueIfSomeDataAvailable() throws IOException,
+ TransientKinesisException {
+ when(firstIterator.next()).
+ thenReturn(CustomOptional.of(a)).
+ thenReturn(CustomOptional.<KinesisRecord>absent());
+
+ assertThat(reader.start()).isTrue();
+ }
+
+ @Test
+ public void advanceReturnsFalseIfThereIsTransientExceptionInKinesis()
+ throws IOException, TransientKinesisException {
+ reader.start();
+
+ when(firstIterator.next()).thenThrow(TransientKinesisException.class);
+
+ assertThat(reader.advance()).isFalse();
+ }
+
+ @Test
+ public void readsThroughAllDataAvailable() throws IOException, TransientKinesisException {
+ when(firstIterator.next()).
+ thenReturn(CustomOptional.<KinesisRecord>absent()).
+ thenReturn(CustomOptional.of(a)).
+ thenReturn(CustomOptional.<KinesisRecord>absent()).
+ thenReturn(CustomOptional.of(b)).
+ thenReturn(CustomOptional.<KinesisRecord>absent());
+
+ when(secondIterator.next()).
+ thenReturn(CustomOptional.of(c)).
+ thenReturn(CustomOptional.<KinesisRecord>absent()).
+ thenReturn(CustomOptional.of(d)).
+ thenReturn(CustomOptional.<KinesisRecord>absent());
+
+ assertThat(reader.start()).isTrue();
+ assertThat(reader.getCurrent()).isEqualTo(c);
+ assertThat(reader.advance()).isTrue();
+ assertThat(reader.getCurrent()).isEqualTo(a);
+ assertThat(reader.advance()).isTrue();
+ assertThat(reader.getCurrent()).isEqualTo(d);
+ assertThat(reader.advance()).isTrue();
+ assertThat(reader.getCurrent()).isEqualTo(b);
+ assertThat(reader.advance()).isFalse();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
new file mode 100644
index 0000000..b09b7eb
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import org.apache.beam.sdk.testing.CoderProperties;
+
+import org.joda.time.Instant;
+import org.junit.Test;
+import java.nio.ByteBuffer;
+
+/**
+ * Created by p.pastuszka on 20.07.2016.
+ */
+public class KinesisRecordCoderTest {
+ @Test
+ public void encodingAndDecodingWorks() throws Exception {
+ KinesisRecord record = new KinesisRecord(
+ ByteBuffer.wrap("data".getBytes()),
+ "sequence",
+ 128L,
+ "partition",
+ Instant.now(),
+ Instant.now(),
+ "stream",
+ "shard"
+ );
+ CoderProperties.coderDecodeEncodeEqual(
+ new KinesisRecordCoder(), record
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
new file mode 100644
index 0000000..65a7605
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+
+/***
+ * Options for Kinesis integration tests.
+ */
+public interface KinesisTestOptions extends TestPipelineOptions {
+ @Description("AWS region where Kinesis stream resided")
+ @Default.String("aws-kinesis-region")
+ String getAwsKinesisRegion();
+ void setAwsKinesisRegion(String value);
+
+ @Description("Kinesis stream name")
+ @Default.String("aws-kinesis-stream")
+ String getAwsKinesisStream();
+ void setAwsKinesisStream(String value);
+
+ @Description("AWS secret key")
+ @Default.String("aws-secret-key")
+ String getAwsSecretKey();
+ void setAwsSecretKey(String value);
+
+ @Description("AWS access key")
+ @Default.String("aws-access-key")
+ String getAwsAccessKey();
+ void setAwsAccessKey(String value);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
new file mode 100644
index 0000000..0dcede9
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.google.common.collect.Lists.newArrayList;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.internal.StaticCredentialsProvider;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.PutRecordsRequest;
+import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
+import com.amazonaws.services.kinesis.model.PutRecordsResult;
+import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/***
+ * Sends records to Kinesis in reliable way.
+ */
+public class KinesisUploader {
+
+ public static final int MAX_NUMBER_OF_RECORDS_IN_BATCH = 499;
+
+ public static void uploadAll(List<String> data, KinesisTestOptions options) {
+ AmazonKinesis client = new AmazonKinesisClient(
+ new StaticCredentialsProvider(
+ new BasicAWSCredentials(
+ options.getAwsAccessKey(), options.getAwsSecretKey()))
+ ).withRegion(Regions.fromName(options.getAwsKinesisRegion()));
+
+ List<List<String>> partitions = Lists.partition(data, MAX_NUMBER_OF_RECORDS_IN_BATCH);
+
+
+ for (List<String> partition : partitions) {
+ List<PutRecordsRequestEntry> allRecords = newArrayList();
+ for (String row : partition) {
+ allRecords.add(new PutRecordsRequestEntry().
+ withData(ByteBuffer.wrap(row.getBytes(Charsets.UTF_8))).
+ withPartitionKey(Integer.toString(row.hashCode()))
+
+ );
+ }
+
+ PutRecordsResult result;
+ do {
+ result = client.putRecords(
+ new PutRecordsRequest().
+ withStreamName(options.getAwsKinesisStream()).
+ withRecords(allRecords));
+ List<PutRecordsRequestEntry> failedRecords = newArrayList();
+ int i = 0;
+ for (PutRecordsResultEntry row : result.getRecords()) {
+ if (row.getErrorCode() != null) {
+ failedRecords.add(allRecords.get(i));
+ }
+ ++i;
+ }
+ allRecords = failedRecords;
+ }
+
+ while (result.getFailedRecordCount() > 0);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
new file mode 100644
index 0000000..360106d
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import com.google.common.collect.Lists;
+
+import static org.mockito.BDDMockito.given;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import java.util.Collections;
+import java.util.List;
+
+
+/***
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class RecordFilterTest {
+ @Mock
+ private ShardCheckpoint checkpoint;
+ @Mock
+ private KinesisRecord record1, record2, record3, record4, record5;
+
+ @Test
+ public void shouldFilterOutRecordsBeforeOrAtCheckpoint() {
+ given(checkpoint.isBeforeOrAt(record1)).willReturn(false);
+ given(checkpoint.isBeforeOrAt(record2)).willReturn(true);
+ given(checkpoint.isBeforeOrAt(record3)).willReturn(true);
+ given(checkpoint.isBeforeOrAt(record4)).willReturn(false);
+ given(checkpoint.isBeforeOrAt(record5)).willReturn(true);
+ List<KinesisRecord> records = Lists.newArrayList(record1, record2,
+ record3, record4, record5);
+ RecordFilter underTest = new RecordFilter();
+
+ List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint);
+
+ Assertions.assertThat(retainedRecords).containsOnly(record2, record3, record5);
+ }
+
+ @Test
+ public void shouldNotFailOnEmptyList() {
+ List<KinesisRecord> records = Collections.emptyList();
+ RecordFilter underTest = new RecordFilter();
+
+ List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint);
+
+ Assertions.assertThat(retainedRecords).isEmpty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
new file mode 100644
index 0000000..a508ddf
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.google.common.collect.Lists.newArrayList;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import org.junit.Test;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Created by ppastuszka on 12.12.15.
+ */
+public class RoundRobinTest {
+ @Test(expected = IllegalArgumentException.class)
+ public void doesNotAllowCreationWithEmptyCollection() {
+ new RoundRobin<>(Collections.emptyList());
+ }
+
+ @Test
+ public void goesThroughElementsInCycle() {
+ List<String> input = newArrayList("a", "b", "c");
+
+ RoundRobin<String> roundRobin = new RoundRobin<>(newArrayList(input));
+
+ input.addAll(input); // duplicate the input
+ for (String element : input) {
+ assertThat(roundRobin.getCurrent()).isEqualTo(element);
+ assertThat(roundRobin.getCurrent()).isEqualTo(element);
+ roundRobin.moveForward();
+ }
+ }
+
+ @Test
+ public void usualIteratorGoesThroughElementsOnce() {
+ List<String> input = newArrayList("a", "b", "c");
+
+ RoundRobin<String> roundRobin = new RoundRobin<>(input);
+ assertThat(roundRobin).hasSize(3).containsOnly(input.toArray(new String[0]));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
new file mode 100644
index 0000000..2227cef
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream.LATEST;
+import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream.TRIM_HORIZON;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
+import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import java.io.IOException;
+
+/**
+ *
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class ShardCheckpointTest {
+ private static final String AT_SEQUENCE_SHARD_IT = "AT_SEQUENCE_SHARD_IT";
+ private static final String AFTER_SEQUENCE_SHARD_IT = "AFTER_SEQUENCE_SHARD_IT";
+ private static final String STREAM_NAME = "STREAM";
+ private static final String SHARD_ID = "SHARD_ID";
+ @Mock
+ private SimplifiedKinesisClient client;
+
+ @Before
+ public void setUp() throws IOException, TransientKinesisException {
+ when(client.getShardIterator(
+ eq(STREAM_NAME), eq(SHARD_ID), eq(AT_SEQUENCE_NUMBER),
+ anyString(), isNull(Instant.class))).
+ thenReturn(AT_SEQUENCE_SHARD_IT);
+ when(client.getShardIterator(
+ eq(STREAM_NAME), eq(SHARD_ID), eq(AFTER_SEQUENCE_NUMBER),
+ anyString(), isNull(Instant.class))).
+ thenReturn(AFTER_SEQUENCE_SHARD_IT);
+ }
+
+ @Test
+ public void testProvidingShardIterator() throws IOException, TransientKinesisException {
+ assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", null).getShardIterator(client))
+ .isEqualTo(AT_SEQUENCE_SHARD_IT);
+ assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", null).getShardIterator(client))
+ .isEqualTo(AFTER_SEQUENCE_SHARD_IT);
+ assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client)).isEqualTo
+ (AT_SEQUENCE_SHARD_IT);
+ assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client))
+ .isEqualTo(AT_SEQUENCE_SHARD_IT);
+ }
+
+ @Test
+ public void testComparisonWithExtendedSequenceNumber() {
+ assertThat(new ShardCheckpoint("", "", new StartingPoint(LATEST)).isBeforeOrAt(
+ recordWith(new ExtendedSequenceNumber("100", 0L))
+ )).isTrue();
+
+ assertThat(new ShardCheckpoint("", "", new StartingPoint(TRIM_HORIZON)).isBeforeOrAt(
+ recordWith(new ExtendedSequenceNumber("100", 0L))
+ )).isTrue();
+
+ assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "10", 1L).isBeforeOrAt(
+ recordWith(new ExtendedSequenceNumber("100", 0L))
+ )).isTrue();
+
+ assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
+ recordWith(new ExtendedSequenceNumber("100", 0L))
+ )).isTrue();
+
+ assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
+ recordWith(new ExtendedSequenceNumber("100", 0L))
+ )).isFalse();
+
+ assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 1L).isBeforeOrAt(
+ recordWith(new ExtendedSequenceNumber("100", 0L))
+ )).isFalse();
+
+ assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
+ recordWith(new ExtendedSequenceNumber("99", 1L))
+ )).isFalse();
+ }
+
+ @Test
+ public void testComparisonWithTimestamp() {
+ DateTime referenceTimestamp = DateTime.now();
+
+ assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
+ .isBeforeOrAt(recordWith(referenceTimestamp.minusMillis(10).toInstant()))
+ ).isFalse();
+
+ assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
+ .isBeforeOrAt(recordWith(referenceTimestamp.toInstant()))
+ ).isTrue();
+
+ assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
+ .isBeforeOrAt(recordWith(referenceTimestamp.plusMillis(10).toInstant()))
+ ).isTrue();
+ }
+
+ private KinesisRecord recordWith(ExtendedSequenceNumber extendedSequenceNumber) {
+ KinesisRecord record = mock(KinesisRecord.class);
+ given(record.getExtendedSequenceNumber()).willReturn(extendedSequenceNumber);
+ return record;
+ }
+
+ private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, String sequenceNumber,
+ Long subSequenceNumber) {
+ return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, sequenceNumber,
+ subSequenceNumber);
+ }
+
+ private KinesisRecord recordWith(Instant approximateArrivalTimestamp) {
+ KinesisRecord record = mock(KinesisRecord.class);
+ given(record.getApproximateArrivalTimestamp()).willReturn(approximateArrivalTimestamp);
+ return record;
+ }
+
+ private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, Instant timestamp) {
+ return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, timestamp);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
new file mode 100644
index 0000000..e2a3ccc
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Mockito.when;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import java.io.IOException;
+import java.util.Collections;
+
+/**
+ * Created by ppastuszka on 12.12.15.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class ShardRecordsIteratorTest {
+ private static final String INITIAL_ITERATOR = "INITIAL_ITERATOR";
+ private static final String SECOND_ITERATOR = "SECOND_ITERATOR";
+ private static final String SECOND_REFRESHED_ITERATOR = "SECOND_REFRESHED_ITERATOR";
+ private static final String THIRD_ITERATOR = "THIRD_ITERATOR";
+ private static final String STREAM_NAME = "STREAM_NAME";
+ private static final String SHARD_ID = "SHARD_ID";
+
+ @Mock
+ private SimplifiedKinesisClient kinesisClient;
+ @Mock
+ private ShardCheckpoint firstCheckpoint, aCheckpoint, bCheckpoint, cCheckpoint, dCheckpoint;
+ @Mock
+ private GetKinesisRecordsResult firstResult, secondResult, thirdResult;
+ @Mock
+ private KinesisRecord a, b, c, d;
+ @Mock
+ private RecordFilter recordFilter;
+
+ private ShardRecordsIterator iterator;
+
+ @Before
+ public void setUp() throws IOException, TransientKinesisException {
+ when(firstCheckpoint.getShardIterator(kinesisClient)).thenReturn(INITIAL_ITERATOR);
+ when(firstCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+ when(firstCheckpoint.getShardId()).thenReturn(SHARD_ID);
+
+ when(firstCheckpoint.moveAfter(a)).thenReturn(aCheckpoint);
+ when(aCheckpoint.moveAfter(b)).thenReturn(bCheckpoint);
+ when(aCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+ when(aCheckpoint.getShardId()).thenReturn(SHARD_ID);
+ when(bCheckpoint.moveAfter(c)).thenReturn(cCheckpoint);
+ when(bCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+ when(bCheckpoint.getShardId()).thenReturn(SHARD_ID);
+ when(cCheckpoint.moveAfter(d)).thenReturn(dCheckpoint);
+ when(cCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+ when(cCheckpoint.getShardId()).thenReturn(SHARD_ID);
+ when(dCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+ when(dCheckpoint.getShardId()).thenReturn(SHARD_ID);
+
+ when(kinesisClient.getRecords(INITIAL_ITERATOR, STREAM_NAME, SHARD_ID))
+ .thenReturn(firstResult);
+ when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
+ .thenReturn(secondResult);
+ when(kinesisClient.getRecords(THIRD_ITERATOR, STREAM_NAME, SHARD_ID))
+ .thenReturn(thirdResult);
+
+ when(firstResult.getNextShardIterator()).thenReturn(SECOND_ITERATOR);
+ when(secondResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
+ when(thirdResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
+
+ when(firstResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+ when(secondResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+ when(thirdResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+
+ when(recordFilter.apply(anyListOf(KinesisRecord.class), any(ShardCheckpoint
+ .class))).thenAnswer(new IdentityAnswer());
+
+ iterator = new ShardRecordsIterator(firstCheckpoint, kinesisClient, recordFilter);
+ }
+
+ @Test
+ public void returnsAbsentIfNoRecordsPresent() throws IOException, TransientKinesisException {
+ assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+ assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+ assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+ }
+
+ @Test
+ public void goesThroughAvailableRecords() throws IOException, TransientKinesisException {
+ when(firstResult.getRecords()).thenReturn(asList(a, b, c));
+ when(secondResult.getRecords()).thenReturn(singletonList(d));
+
+ assertThat(iterator.getCheckpoint()).isEqualTo(firstCheckpoint);
+ assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
+ assertThat(iterator.getCheckpoint()).isEqualTo(aCheckpoint);
+ assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
+ assertThat(iterator.getCheckpoint()).isEqualTo(bCheckpoint);
+ assertThat(iterator.next()).isEqualTo(CustomOptional.of(c));
+ assertThat(iterator.getCheckpoint()).isEqualTo(cCheckpoint);
+ assertThat(iterator.next()).isEqualTo(CustomOptional.of(d));
+ assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
+ assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+ assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
+ }
+
+ @Test
+ public void refreshesExpiredIterator() throws IOException, TransientKinesisException {
+ when(firstResult.getRecords()).thenReturn(singletonList(a));
+ when(secondResult.getRecords()).thenReturn(singletonList(b));
+
+ when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
+ .thenThrow(ExpiredIteratorException.class);
+ when(aCheckpoint.getShardIterator(kinesisClient))
+ .thenReturn(SECOND_REFRESHED_ITERATOR);
+ when(kinesisClient.getRecords(SECOND_REFRESHED_ITERATOR, STREAM_NAME, SHARD_ID))
+ .thenReturn(secondResult);
+
+ assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
+ assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
+ assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+ }
+
+ private static class IdentityAnswer implements Answer<Object> {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ return invocation.getArguments()[0];
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
new file mode 100644
index 0000000..44d29d6
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.AmazonServiceException.ErrorType;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import com.amazonaws.services.kinesis.model.StreamDescription;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.reset;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import java.util.List;
+
+/***
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class SimplifiedKinesisClientTest {
+ private static final String STREAM = "stream";
+ private static final String SHARD_1 = "shard-01";
+ private static final String SHARD_2 = "shard-02";
+ private static final String SHARD_3 = "shard-03";
+ private static final String SHARD_ITERATOR = "iterator";
+ private static final String SEQUENCE_NUMBER = "abc123";
+
+ @Mock
+ private AmazonKinesis kinesis;
+ @InjectMocks
+ private SimplifiedKinesisClient underTest;
+
+ @Test
+ public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception {
+ given(kinesis.getShardIterator(new GetShardIteratorRequest()
+ .withStreamName(STREAM)
+ .withShardId(SHARD_1)
+ .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
+ .withStartingSequenceNumber(SEQUENCE_NUMBER)
+ )).willReturn(new GetShardIteratorResult()
+ .withShardIterator(SHARD_ITERATOR));
+
+ String stream = underTest.getShardIterator(STREAM, SHARD_1,
+ ShardIteratorType.AT_SEQUENCE_NUMBER, SEQUENCE_NUMBER, null);
+
+ assertThat(stream).isEqualTo(SHARD_ITERATOR);
+ }
+
+ @Test
+ public void shouldReturnIteratorStartingWithTimestamp() throws Exception {
+ Instant timestamp = Instant.now();
+ given(kinesis.getShardIterator(new GetShardIteratorRequest()
+ .withStreamName(STREAM)
+ .withShardId(SHARD_1)
+ .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
+ .withTimestamp(timestamp.toDate())
+ )).willReturn(new GetShardIteratorResult()
+ .withShardIterator(SHARD_ITERATOR));
+
+ String stream = underTest.getShardIterator(STREAM, SHARD_1,
+ ShardIteratorType.AT_SEQUENCE_NUMBER, null, timestamp);
+
+ assertThat(stream).isEqualTo(SHARD_ITERATOR);
+ }
+
+ @Test
+ public void shouldHandleExpiredIterationExceptionForGetShardIterator() {
+ shouldHandleGetShardIteratorError(new ExpiredIteratorException(""),
+ ExpiredIteratorException.class);
+ }
+
+ @Test
+ public void shouldHandleLimitExceededExceptionForGetShardIterator() {
+ shouldHandleGetShardIteratorError(new LimitExceededException(""),
+ TransientKinesisException.class);
+ }
+
+ @Test
+ public void shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator() {
+ shouldHandleGetShardIteratorError(new ProvisionedThroughputExceededException(""),
+ TransientKinesisException.class);
+ }
+
+ @Test
+ public void shouldHandleServiceErrorForGetShardIterator() {
+ shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Service),
+ TransientKinesisException.class);
+ }
+
+ @Test
+ public void shouldHandleClientErrorForGetShardIterator() {
+ shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Client),
+ RuntimeException.class);
+ }
+
+ @Test
+ public void shouldHandleUnexpectedExceptionForGetShardIterator() {
+ shouldHandleGetShardIteratorError(new NullPointerException(),
+ RuntimeException.class);
+ }
+
+ private void shouldHandleGetShardIteratorError(
+ Exception thrownException,
+ Class<? extends Exception> expectedExceptionClass) {
+ GetShardIteratorRequest request = new GetShardIteratorRequest()
+ .withStreamName(STREAM)
+ .withShardId(SHARD_1)
+ .withShardIteratorType(ShardIteratorType.LATEST);
+
+ given(kinesis.getShardIterator(request)).willThrow(thrownException);
+
+ try {
+ underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.LATEST, null, null);
+ failBecauseExceptionWasNotThrown(expectedExceptionClass);
+ } catch (Exception e) {
+ assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
+ } finally {
+ reset(kinesis);
+ }
+ }
+
+ @Test
+ public void shouldListAllShards() throws Exception {
+ Shard shard1 = new Shard().withShardId(SHARD_1);
+ Shard shard2 = new Shard().withShardId(SHARD_2);
+ Shard shard3 = new Shard().withShardId(SHARD_3);
+ given(kinesis.describeStream(STREAM, null)).willReturn(new DescribeStreamResult()
+ .withStreamDescription(new StreamDescription()
+ .withShards(shard1, shard2)
+ .withHasMoreShards(true)));
+ given(kinesis.describeStream(STREAM, SHARD_2)).willReturn(new DescribeStreamResult()
+ .withStreamDescription(new StreamDescription()
+ .withShards(shard3)
+ .withHasMoreShards(false)));
+
+ List<Shard> shards = underTest.listShards(STREAM);
+
+ assertThat(shards).containsOnly(shard1, shard2, shard3);
+ }
+
+ @Test
+ public void shouldHandleExpiredIterationExceptionForShardListing() {
+ shouldHandleShardListingError(new ExpiredIteratorException(""),
+ ExpiredIteratorException.class);
+ }
+
+ @Test
+ public void shouldHandleLimitExceededExceptionForShardListing() {
+ shouldHandleShardListingError(new LimitExceededException(""),
+ TransientKinesisException.class);
+ }
+
+ @Test
+ public void shouldHandleProvisionedThroughputExceededExceptionForShardListing() {
+ shouldHandleShardListingError(new ProvisionedThroughputExceededException(""),
+ TransientKinesisException.class);
+ }
+
+ @Test
+ public void shouldHandleServiceErrorForShardListing() {
+ shouldHandleShardListingError(newAmazonServiceException(ErrorType.Service),
+ TransientKinesisException.class);
+ }
+
+ @Test
+ public void shouldHandleClientErrorForShardListing() {
+ shouldHandleShardListingError(newAmazonServiceException(ErrorType.Client),
+ RuntimeException.class);
+ }
+
+ @Test
+ public void shouldHandleUnexpectedExceptionForShardListing() {
+ shouldHandleShardListingError(new NullPointerException(),
+ RuntimeException.class);
+ }
+
+ private void shouldHandleShardListingError(
+ Exception thrownException,
+ Class<? extends Exception> expectedExceptionClass) {
+ given(kinesis.describeStream(STREAM, null)).willThrow(thrownException);
+ try {
+ underTest.listShards(STREAM);
+ failBecauseExceptionWasNotThrown(expectedExceptionClass);
+ } catch (Exception e) {
+ assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
+ } finally {
+ reset(kinesis);
+ }
+ }
+
+ private AmazonServiceException newAmazonServiceException(ErrorType errorType) {
+ AmazonServiceException exception = new AmazonServiceException("");
+ exception.setErrorType(errorType);
+ return exception;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java
new file mode 100644
index 0000000..44dbf4a
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transforms for reading and writing from Amazon Kinesis.
+ */
+package org.apache.beam.sdk.io.kinesis;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 4198499..6cbd615 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -37,6 +37,7 @@
<module>hdfs</module>
<module>jms</module>
<module>kafka</module>
+ <module>kinesis</module>
</modules>
</project>
[4/5] incubator-beam git commit: kinesis: a connector for Amazon
Kinesis
Posted by dh...@apache.org.
kinesis: a connector for Amazon Kinesis
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bed22de6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bed22de6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bed22de6
Branch: refs/heads/master
Commit: bed22de6880339465637a339b2d8b8527b6cacc6
Parents: 95ab438
Author: Pastuszka Przemys\u0142aw <pa...@gmail.com>
Authored: Mon Jul 18 20:03:37 2016 +0200
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Aug 26 16:11:23 2016 -0700
----------------------------------------------------------------------
sdks/java/io/kinesis/pom.xml | 179 +++++++++
.../sdk/io/kinesis/CheckpointGenerator.java | 30 ++
.../beam/sdk/io/kinesis/CustomOptional.java | 85 +++++
.../io/kinesis/DynamicCheckpointGenerator.java | 58 +++
.../sdk/io/kinesis/GetKinesisRecordsResult.java | 54 +++
.../sdk/io/kinesis/KinesisClientProvider.java | 31 ++
.../apache/beam/sdk/io/kinesis/KinesisIO.java | 190 ++++++++++
.../beam/sdk/io/kinesis/KinesisReader.java | 145 +++++++
.../sdk/io/kinesis/KinesisReaderCheckpoint.java | 98 +++++
.../beam/sdk/io/kinesis/KinesisRecord.java | 121 ++++++
.../beam/sdk/io/kinesis/KinesisRecordCoder.java | 75 ++++
.../beam/sdk/io/kinesis/KinesisSource.java | 114 ++++++
.../beam/sdk/io/kinesis/RecordFilter.java | 41 ++
.../apache/beam/sdk/io/kinesis/RoundRobin.java | 54 +++
.../beam/sdk/io/kinesis/ShardCheckpoint.java | 175 +++++++++
.../sdk/io/kinesis/ShardRecordsIterator.java | 98 +++++
.../sdk/io/kinesis/SimplifiedKinesisClient.java | 158 ++++++++
.../beam/sdk/io/kinesis/StartingPoint.java | 85 +++++
.../io/kinesis/StaticCheckpointGenerator.java | 42 +++
.../io/kinesis/TransientKinesisException.java | 29 ++
.../beam/sdk/io/kinesis/package-info.java | 22 ++
.../beam/sdk/io/kinesis/AmazonKinesisMock.java | 375 +++++++++++++++++++
.../beam/sdk/io/kinesis/CustomOptionalTest.java | 31 ++
.../kinesis/DynamicCheckpointGeneratorTest.java | 56 +++
.../sdk/io/kinesis/KinesisMockReadTest.java | 92 +++++
.../io/kinesis/KinesisReaderCheckpointTest.java | 67 ++++
.../beam/sdk/io/kinesis/KinesisReaderIT.java | 119 ++++++
.../beam/sdk/io/kinesis/KinesisReaderTest.java | 119 ++++++
.../sdk/io/kinesis/KinesisRecordCoderTest.java | 46 +++
.../beam/sdk/io/kinesis/KinesisTestOptions.java | 47 +++
.../beam/sdk/io/kinesis/KinesisUploader.java | 84 +++++
.../beam/sdk/io/kinesis/RecordFilterTest.java | 66 ++++
.../beam/sdk/io/kinesis/RoundRobinTest.java | 57 +++
.../sdk/io/kinesis/ShardCheckpointTest.java | 148 ++++++++
.../io/kinesis/ShardRecordsIteratorTest.java | 150 ++++++++
.../io/kinesis/SimplifiedKinesisClientTest.java | 223 +++++++++++
.../beam/sdk/io/kinesis/package-info.java | 22 ++
sdks/java/io/pom.xml | 1 +
38 files changed, 3587 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/pom.xml b/sdks/java/io/kinesis/pom.xml
new file mode 100644
index 0000000..aec1786
--- /dev/null
+++ b/sdks/java/io/kinesis/pom.xml
@@ -0,0 +1,179 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-io-parent</artifactId>
+ <version>0.3.0-incubating-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>beam-sdks-java-io-kinesis</artifactId>
+ <name>Apache Beam :: SDKs :: Java :: IO :: Kinesis</name>
+ <description>Library to read Kinesis streams.</description>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <beamUseDummyRunner>false</beamUseDummyRunner>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <configuration>
+ <additionalparam>-Xdoclint:missing</additionalparam>
+ </configuration>
+ </plugin>
+ <!-- Integration Tests -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <configuration>
+ <useManifestOnlyJar>false</useManifestOnlyJar>
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ <configuration>
+ <systemPropertyVariables>
+ <beamTestPipelineOptions>${integrationTestPipelineOptions}</beamTestPipelineOptions>
+ </systemPropertyVariables>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <properties>
+ <aws.version>1.11.18</aws.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-kinesis</artifactId>
+ <version>${aws.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>amazon-kinesis-client</artifactId>
+ <version>1.6.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <version>2.6</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-core</artifactId>
+ <version>${aws.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>annotations</artifactId>
+ </dependency>
+
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <version>2.5.0</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
new file mode 100644
index 0000000..919d85a
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+
+import java.io.Serializable;
+
+/**
+ * Used to generate checkpoint object on demand.
+ * How exactly the checkpoint is generated is up to implementing class.
+ */
+interface CheckpointGenerator extends Serializable {
+ KinesisReaderCheckpoint generate(SimplifiedKinesisClient client)
+ throws TransientKinesisException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
new file mode 100644
index 0000000..804d6cc
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import java.util.NoSuchElementException;
+
+/***
+ * Similar to Guava {@code Optional}, but throws {@link NoSuchElementException} for missing element.
+ */
+abstract class CustomOptional<T> {
+ public static <T> CustomOptional<T> absent() {
+ return Absent.INSTANCE;
+ }
+
+ public static <T> CustomOptional<T> of(T v) {
+ return new Present<>(v);
+ }
+
+ public abstract boolean isPresent();
+
+ public abstract T get();
+
+ private static class Present<T> extends CustomOptional<T> {
+ private final T value;
+
+ private Present(T value) {
+ this.value = value;
+ }
+
+ @Override
+ public boolean isPresent() {
+ return true;
+ }
+
+ @Override
+ public T get() {
+ return value;
+ }
+
+
+ @Override
+ public boolean equals(Object o) {
+ Present<?> present = (Present<?>) o;
+
+ return value != null ? value.equals(present.value) : present.value == null;
+ }
+
+ @Override
+ public int hashCode() {
+ return value != null ? value.hashCode() : 0;
+ }
+ }
+
+ private static class Absent<T> extends CustomOptional<T> {
+ public static final Absent INSTANCE = new Absent();
+
+ private Absent() {
+ }
+
+ @Override
+ public boolean isPresent() {
+ return false;
+ }
+
+ @Override
+ public T get() {
+ throw new NoSuchElementException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
new file mode 100644
index 0000000..d86960f
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Lists.transform;
+
+
+import com.google.common.base.Function;
+
+import com.amazonaws.services.kinesis.model.Shard;
+
+/**
+ * Creates {@link KinesisReaderCheckpoint}, which spans over all shards in given stream.
+ * List of shards is obtained dynamically on call to {@link #generate(SimplifiedKinesisClient)}.
+ */
+class DynamicCheckpointGenerator implements CheckpointGenerator {
+ private final String streamName;
+ private final StartingPoint startingPoint;
+
+ public DynamicCheckpointGenerator(String streamName, StartingPoint startingPoint) {
+ this.streamName = checkNotNull(streamName, "streamName");
+ this.startingPoint = checkNotNull(startingPoint, "startingPoint");
+ }
+
+ @Override
+ public KinesisReaderCheckpoint generate(SimplifiedKinesisClient kinesis)
+ throws TransientKinesisException {
+ return new KinesisReaderCheckpoint(
+ transform(kinesis.listShards(streamName), new Function<Shard, ShardCheckpoint>() {
+ @Override
+ public ShardCheckpoint apply(Shard shard) {
+ return new ShardCheckpoint(streamName, shard.getShardId(), startingPoint);
+ }
+ })
+ );
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Checkpoint generator for %s: %s", streamName, startingPoint);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
new file mode 100644
index 0000000..f48b9d5
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.google.common.collect.Lists.transform;
+import com.google.common.base.Function;
+
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/***
+ * Represents the output of 'get' operation on Kinesis stream.
+ */
+class GetKinesisRecordsResult {
+ private final List<KinesisRecord> records;
+ private final String nextShardIterator;
+
+ public GetKinesisRecordsResult(List<UserRecord> records, String nextShardIterator,
+ final String streamName, final String shardId) {
+ this.records = transform(records, new Function<UserRecord, KinesisRecord>() {
+ @Nullable
+ @Override
+ public KinesisRecord apply(@Nullable UserRecord input) {
+ assert input != null; // to make FindBugs happy
+ return new KinesisRecord(input, streamName, shardId);
+ }
+ });
+ this.nextShardIterator = nextShardIterator;
+ }
+
+ public List<KinesisRecord> getRecords() {
+ return records;
+ }
+
+ public String getNextShardIterator() {
+ return nextShardIterator;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
new file mode 100644
index 0000000..36c8953
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import java.io.Serializable;
+
+/**
+ * Provides instances of {@link AmazonKinesis} interface.
+ *
+ * Please note, that any instance of {@link KinesisClientProvider} must be
+ * {@link Serializable} to ensure it can be sent to worker machines.
+ */
+interface KinesisClientProvider extends Serializable {
+ AmazonKinesis get();
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
new file mode 100644
index 0000000..b3cb464
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+
+import org.apache.beam.sdk.transforms.PTransform;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.internal.StaticCredentialsProvider;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import org.joda.time.Instant;
+
+/**
+ * {@link PTransform}s for reading from
+ * <a href="https://aws.amazon.com/kinesis/">Kinesis</a> streams.
+ *
+ * <h3>Usage</h3>
+ *
+ * <p>Main class you're going to operate is called {@link KinesisIO}.
+ * It follows the usage conventions laid out by other *IO classes like
+ * BigQueryIO or PubsubIOLet's see how you can set up a simple Pipeline, which reads from Kinesis:
+ *
+ * <pre>{@code}
+ * p.
+ * apply(KinesisIO.Read.
+ * from("streamName", InitialPositionInStream.LATEST).
+ * using("AWS_KEY", _"AWS_SECRET", STREAM_REGION).
+ * apply( ... ) // other transformations
+ *</pre>
+ * </p>
+ *
+ * <p>
+ * As you can see you need to provide 3 things:
+ * <ul>
+ * <li>name of the stream you're going to read</li>
+ * <li>position in the stream where reading should start. There are two options:</li>
+ * <ul>
+ * <li>{@link InitialPositionInStream#LATEST} - reading will begin from end of the stream</li>
+ * <li>{@link InitialPositionInStream#TRIM_HORIZON} - reading will begin at
+ * the very beginning of the stream</li>
+ * </ul>
+ * <li>data used to initialize {@link AmazonKinesis} client</li>
+ * <ul>
+ * <li>credentials (aws key, aws secret)</li>
+ * <li>region where the stream is located</li>
+ * </ul>
+ * </ul>
+ * </p>
+ *
+ * <p>In case when you want to set up {@link AmazonKinesis} client by your own
+ * (for example if you're using more sophisticated authorization methods like Amazon STS, etc.)
+ * you can do it by implementing {@link KinesisClientProvider} class:
+ *
+ * <pre>{@code}
+ * public class MyCustomKinesisClientProvider implements KinesisClientProvider {
+ * @Override
+ * public AmazonKinesis get() {
+ * // set up your client here
+ * }
+ * }
+ * </pre>
+ *
+ * Usage is pretty straightforward:
+ *
+ * <pre>{@code}
+ * p.
+ * apply(KinesisIO.Read.
+ * from("streamName", InitialPositionInStream.LATEST).
+ * using(MyCustomKinesisClientProvider()).
+ * apply( ... ) // other transformations
+ * </pre>
+ * </p>
+ *
+ * <p>There\u2019s also possibility to start reading using arbitrary point in time -
+ * in this case you need to provide {@link Instant} object:
+ *
+ * <pre>{@code}
+ * p.
+ * apply(KinesisIO.Read.
+ * from("streamName", instant).
+ * using(MyCustomKinesisClientProvider()).
+ * apply( ... ) // other transformations
+ * </pre>
+ * </p>
+ *
+ */
+public final class KinesisIO {
+ /***
+ * A {@link PTransform} that reads from a Kinesis stream.
+ */
+ public static final class Read {
+
+ private final String streamName;
+ private final StartingPoint initialPosition;
+
+ private Read(String streamName, StartingPoint initialPosition) {
+ this.streamName = checkNotNull(streamName, "streamName");
+ this.initialPosition = checkNotNull(initialPosition, "initialPosition");
+ }
+
+ /***
+ * Specify reading from streamName at some initial position.
+ */
+ public static Read from(String streamName, InitialPositionInStream initialPosition) {
+ return new Read(streamName, new StartingPoint(
+ checkNotNull(initialPosition, "initialPosition")));
+ }
+
+ /***
+ * Specify reading from streamName beginning at given {@link Instant}.
+ * This {@link Instant} must be in the past, i.e. before {@link Instant#now()}.
+ */
+ public static Read from(String streamName, Instant initialTimestamp) {
+ return new Read(streamName, new StartingPoint(
+ checkNotNull(initialTimestamp, "initialTimestamp")));
+ }
+
+ /***
+ * Allows to specify custom {@link KinesisClientProvider}.
+ * {@link KinesisClientProvider} provides {@link AmazonKinesis} instances which are later
+ * used for communication with Kinesis.
+ * You should use this method if {@link Read#using(String, String, Regions)} does not
+ * suite your needs.
+ */
+ public org.apache.beam.sdk.io.Read.Unbounded<KinesisRecord> using
+ (KinesisClientProvider kinesisClientProvider) {
+ return org.apache.beam.sdk.io.Read.from(
+ new KinesisSource(kinesisClientProvider, streamName,
+ initialPosition));
+ }
+
+ /***
+ * Specify credential details and region to be used to read from Kinesis.
+ * If you need more sophisticated credential protocol, then you should look at
+ * {@link Read#using(KinesisClientProvider)}.
+ */
+ public org.apache.beam.sdk.io.Read.Unbounded<KinesisRecord> using(String awsAccessKey,
+ String awsSecretKey,
+ Regions region) {
+ return using(new BasicKinesisProvider(awsAccessKey, awsSecretKey, region));
+ }
+
+ private static final class BasicKinesisProvider implements KinesisClientProvider {
+
+ private final String accessKey;
+ private final String secretKey;
+ private final Regions region;
+
+ private BasicKinesisProvider(String accessKey, String secretKey, Regions region) {
+ this.accessKey = checkNotNull(accessKey, "accessKey");
+ this.secretKey = checkNotNull(secretKey, "secretKey");
+ this.region = checkNotNull(region, "region");
+ }
+
+
+ private AWSCredentialsProvider getCredentialsProvider() {
+ return new StaticCredentialsProvider(new BasicAWSCredentials(
+ accessKey,
+ secretKey
+ ));
+
+ }
+
+ @Override
+ public AmazonKinesis get() {
+ return new AmazonKinesisClient(getCredentialsProvider()).withRegion(region);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
new file mode 100644
index 0000000..38a0050
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+
+import org.apache.beam.sdk.io.UnboundedSource;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Lists.newArrayList;
+
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+
+/***
+ * Reads data from multiple kinesis shards in a single thread.
+ * It uses simple round robin algorithm when fetching data from shards.
+ */
+class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
+ private static final Logger LOG = LoggerFactory.getLogger(KinesisReader.class);
+
+ private final SimplifiedKinesisClient kinesis;
+ private final UnboundedSource<KinesisRecord, ?> source;
+ private final CheckpointGenerator initialCheckpointGenerator;
+ private RoundRobin<ShardRecordsIterator> shardIterators;
+ private CustomOptional<KinesisRecord> currentRecord = CustomOptional.absent();
+
+ public KinesisReader(SimplifiedKinesisClient kinesis,
+ CheckpointGenerator initialCheckpointGenerator,
+ UnboundedSource<KinesisRecord, ?> source) {
+ this.kinesis = checkNotNull(kinesis, "kinesis");
+ this.initialCheckpointGenerator =
+ checkNotNull(initialCheckpointGenerator, "initialCheckpointGenerator");
+ this.source = source;
+ }
+
+ /***
+ * Generates initial checkpoint and instantiates iterators for shards.
+ */
+ @Override
+ public boolean start() throws IOException {
+ LOG.info("Starting reader using {}", initialCheckpointGenerator);
+
+ try {
+ KinesisReaderCheckpoint initialCheckpoint =
+ initialCheckpointGenerator.generate(kinesis);
+ List<ShardRecordsIterator> iterators = newArrayList();
+ for (ShardCheckpoint checkpoint : initialCheckpoint) {
+ iterators.add(checkpoint.getShardRecordsIterator(kinesis));
+ }
+ shardIterators = new RoundRobin<>(iterators);
+ } catch (TransientKinesisException e) {
+ throw new IOException(e);
+ }
+
+ return advance();
+ }
+
+ /***
+ * Moves to the next record in one of the shards.
+ * If current shard iterator can be move forward (i.e. there's a record present) then we do it.
+ * If not, we iterate over shards in a round-robin manner.
+ */
+ @Override
+ public boolean advance() throws IOException {
+ try {
+ for (int i = 0; i < shardIterators.size(); ++i) {
+ currentRecord = shardIterators.getCurrent().next();
+ if (currentRecord.isPresent()) {
+ return true;
+ } else {
+ shardIterators.moveForward();
+ }
+ }
+ } catch (TransientKinesisException e) {
+ LOG.warn("Transient exception occurred", e);
+ }
+ return false;
+ }
+
+ @Override
+ public byte[] getCurrentRecordId() throws NoSuchElementException {
+ return currentRecord.get().getUniqueId();
+ }
+
+ @Override
+ public KinesisRecord getCurrent() throws NoSuchElementException {
+ return currentRecord.get();
+ }
+
+ /***
+ * When {@link KinesisReader} was advanced to the current record.
+ * We cannot use approximate arrival timestamp given for each record by Kinesis as it
+ * is not guaranteed to be accurate - this could lead to mark some records as "late"
+ * even if they were not.
+ */
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ return currentRecord.get().getReadTime();
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ /***
+ * Current time.
+ * We cannot give better approximation of the watermark with current semantics of
+ * {@link KinesisReader#getCurrentTimestamp()}, because we don't know when the next
+ * {@link KinesisReader#advance()} will be called.
+ */
+ @Override
+ public Instant getWatermark() {
+ return Instant.now();
+ }
+
+ @Override
+ public UnboundedSource.CheckpointMark getCheckpointMark() {
+ return KinesisReaderCheckpoint.asCurrentStateOf(shardIterators);
+ }
+
+ @Override
+ public UnboundedSource<KinesisRecord, ?> getCurrentSource() {
+ return source;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
new file mode 100644
index 0000000..6ceb742
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import org.apache.beam.sdk.io.UnboundedSource;
+import static com.google.common.collect.Iterables.transform;
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Lists.partition;
+
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/***
+ * Checkpoint representing a total progress in a set of shards in single stream.
+ * The set of shards covered by {@link KinesisReaderCheckpoint} may or may not be equal to set of
+ * all shards present in the stream.
+ * This class is immutable.
+ */
+class KinesisReaderCheckpoint implements Iterable<ShardCheckpoint>, UnboundedSource
+ .CheckpointMark, Serializable {
+ private final List<ShardCheckpoint> shardCheckpoints;
+
+ public KinesisReaderCheckpoint(Iterable<ShardCheckpoint> shardCheckpoints) {
+ this.shardCheckpoints = ImmutableList.copyOf(shardCheckpoints);
+ }
+
+ public static KinesisReaderCheckpoint asCurrentStateOf(Iterable<ShardRecordsIterator>
+ iterators) {
+ return new KinesisReaderCheckpoint(transform(iterators,
+ new Function<ShardRecordsIterator, ShardCheckpoint>() {
+
+ @Nullable
+ @Override
+ public ShardCheckpoint apply(@Nullable
+ ShardRecordsIterator shardRecordsIterator) {
+ assert shardRecordsIterator != null;
+ return shardRecordsIterator.getCheckpoint();
+ }
+ }));
+ }
+
+ /***
+ * Splits given multi-shard checkpoint into partitions of approximately equal size.
+ *
+ * @param desiredNumSplits - upper limit for number of partitions to generate.
+ * @return list of checkpoints covering consecutive partitions of current checkpoint.
+ */
+ public List<KinesisReaderCheckpoint> splitInto(int desiredNumSplits) {
+ int partitionSize = divideAndRoundUp(shardCheckpoints.size(), desiredNumSplits);
+
+ List<KinesisReaderCheckpoint> checkpoints = newArrayList();
+ for (List<ShardCheckpoint> shardPartition : partition(shardCheckpoints, partitionSize)) {
+ checkpoints.add(new KinesisReaderCheckpoint(shardPartition));
+ }
+ return checkpoints;
+ }
+
+ private int divideAndRoundUp(int nominator, int denominator) {
+ return (nominator + denominator - 1) / denominator;
+ }
+
+ @Override
+ public void finalizeCheckpoint() throws IOException {
+
+ }
+
+ @Override
+ public String toString() {
+ return shardCheckpoints.toString();
+ }
+
+ @Override
+ public Iterator<ShardCheckpoint> iterator() {
+ return shardCheckpoints.iterator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
new file mode 100644
index 0000000..cdb495c
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import com.google.common.base.Charsets;
+
+import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+import static org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode;
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.joda.time.Instant;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+/**
+ * {@link UserRecord} enhanced with utility methods.
+ */
+public class KinesisRecord implements Serializable {
+ private Instant readTime;
+ private String streamName;
+ private String shardId;
+ private long subSequenceNumber;
+ private String sequenceNumber;
+ private Instant approximateArrivalTimestamp;
+ private ByteBuffer data;
+ private String partitionKey;
+
+ public KinesisRecord(UserRecord record, String streamName, String shardId) {
+ this(record.getData(), record.getSequenceNumber(), record.getSubSequenceNumber(),
+ record.getPartitionKey(),
+ new Instant(record.getApproximateArrivalTimestamp()),
+ Instant.now(),
+ streamName, shardId);
+ }
+
+ public KinesisRecord(ByteBuffer data, String sequenceNumber, long subSequenceNumber,
+ String partitionKey, Instant approximateArrivalTimestamp,
+ Instant readTime,
+ String streamName, String shardId) {
+ this.data = data;
+ this.sequenceNumber = sequenceNumber;
+ this.subSequenceNumber = subSequenceNumber;
+ this.partitionKey = partitionKey;
+ this.approximateArrivalTimestamp = approximateArrivalTimestamp;
+ this.readTime = readTime;
+ this.streamName = streamName;
+ this.shardId = shardId;
+ }
+
+ public ExtendedSequenceNumber getExtendedSequenceNumber() {
+ return new ExtendedSequenceNumber(getSequenceNumber(), getSubSequenceNumber());
+ }
+
+ /***
+ * @return unique id of the record based on its position in the stream
+ */
+ public byte[] getUniqueId() {
+ return getExtendedSequenceNumber().toString().getBytes(Charsets.UTF_8);
+ }
+
+ public Instant getReadTime() {
+ return readTime;
+ }
+
+ public String getStreamName() {
+ return streamName;
+ }
+
+ public String getShardId() {
+ return shardId;
+ }
+
+ public byte[] getDataAsBytes() {
+ return getData().array();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return EqualsBuilder.reflectionEquals(this, obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return reflectionHashCode(this);
+ }
+
+ public long getSubSequenceNumber() {
+ return subSequenceNumber;
+ }
+
+ public String getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ public Instant getApproximateArrivalTimestamp() {
+ return approximateArrivalTimestamp;
+ }
+
+ public ByteBuffer getData() {
+ return data;
+ }
+
+ public String getPartitionKey() {
+ return partitionKey;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
new file mode 100644
index 0000000..c383a4f
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+
+import org.joda.time.Instant;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/***
+ * A {@link Coder} for {@link KinesisRecord}.
+ */
+class KinesisRecordCoder extends AtomicCoder<KinesisRecord> {
+ private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
+ private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
+ private static final InstantCoder INSTANT_CODER = InstantCoder.of();
+ private static final VarLongCoder VAR_LONG_CODER = VarLongCoder.of();
+
+ public static KinesisRecordCoder of() {
+ return new KinesisRecordCoder();
+ }
+
+ @Override
+ public void encode(KinesisRecord value, OutputStream outStream, Context context) throws
+ IOException {
+ Context nested = context.nested();
+ BYTE_ARRAY_CODER.encode(value.getData().array(), outStream, nested);
+ STRING_CODER.encode(value.getSequenceNumber(), outStream, nested);
+ STRING_CODER.encode(value.getPartitionKey(), outStream, nested);
+ INSTANT_CODER.encode(value.getApproximateArrivalTimestamp(), outStream, nested);
+ VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream, nested);
+ INSTANT_CODER.encode(value.getReadTime(), outStream, nested);
+ STRING_CODER.encode(value.getStreamName(), outStream, nested);
+ STRING_CODER.encode(value.getShardId(), outStream, nested);
+ }
+
+ @Override
+ public KinesisRecord decode(InputStream inStream, Context context) throws IOException {
+ Context nested = context.nested();
+ ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream, nested));
+ String sequenceNumber = STRING_CODER.decode(inStream, nested);
+ String partitionKey = STRING_CODER.decode(inStream, nested);
+ Instant approximateArrivalTimestamp = INSTANT_CODER.decode(inStream, nested);
+ long subSequenceNumber = VAR_LONG_CODER.decode(inStream, nested);
+ Instant readTimestamp = INSTANT_CODER.decode(inStream, nested);
+ String streamName = STRING_CODER.decode(inStream, nested);
+ String shardId = STRING_CODER.decode(inStream, nested);
+ return new KinesisRecord(data, sequenceNumber, subSequenceNumber, partitionKey,
+ approximateArrivalTimestamp, readTimestamp, streamName, shardId
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
new file mode 100644
index 0000000..38c9fa4
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Lists.newArrayList;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.List;
+
+
+/***
+ * Represents source for single stream in Kinesis.
+ */
+class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoint> {
+ private static final Logger LOG = LoggerFactory.getLogger(KinesisSource.class);
+
+ private final KinesisClientProvider kinesis;
+ private CheckpointGenerator initialCheckpointGenerator;
+
+ public KinesisSource(KinesisClientProvider kinesis, String streamName,
+ StartingPoint startingPoint) {
+ this(kinesis, new DynamicCheckpointGenerator(streamName, startingPoint));
+ }
+
+ private KinesisSource(KinesisClientProvider kinesisClientProvider,
+ CheckpointGenerator initialCheckpoint) {
+ this.kinesis = kinesisClientProvider;
+ this.initialCheckpointGenerator = initialCheckpoint;
+ validate();
+ }
+
+ /***
+ * Generate splits for reading from the stream.
+ * Basically, it'll try to evenly split set of shards in the stream into
+ * {@code desiredNumSplits} partitions. Each partition is then a split.
+ */
+ @Override
+ public List<KinesisSource> generateInitialSplits(int desiredNumSplits,
+ PipelineOptions options) throws Exception {
+ KinesisReaderCheckpoint checkpoint =
+ initialCheckpointGenerator.generate(SimplifiedKinesisClient.from(kinesis));
+
+ List<KinesisSource> sources = newArrayList();
+
+ for (KinesisReaderCheckpoint partition : checkpoint.splitInto(desiredNumSplits)) {
+ sources.add(new KinesisSource(
+ kinesis,
+ new StaticCheckpointGenerator(partition)));
+ }
+ return sources;
+ }
+
+ /***
+ * Creates reader based on given {@link KinesisReaderCheckpoint}.
+ * If {@link KinesisReaderCheckpoint} is not given, then we use
+ * {@code initialCheckpointGenerator} to generate new checkpoint.
+ */
+ @Override
+ public UnboundedReader<KinesisRecord> createReader(PipelineOptions options,
+ KinesisReaderCheckpoint checkpointMark) {
+
+ CheckpointGenerator checkpointGenerator = initialCheckpointGenerator;
+
+ if (checkpointMark != null) {
+ checkpointGenerator = new StaticCheckpointGenerator(checkpointMark);
+ }
+
+ LOG.info("Creating new reader using {}", checkpointGenerator);
+
+ return new KinesisReader(
+ SimplifiedKinesisClient.from(kinesis),
+ checkpointGenerator,
+ this);
+ }
+
+ @Override
+ public Coder<KinesisReaderCheckpoint> getCheckpointMarkCoder() {
+ return SerializableCoder.of(KinesisReaderCheckpoint.class);
+ }
+
+ @Override
+ public void validate() {
+ checkNotNull(kinesis);
+ checkNotNull(initialCheckpointGenerator);
+ }
+
+ @Override
+ public Coder<KinesisRecord> getDefaultOutputCoder() {
+ return KinesisRecordCoder.of();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java
new file mode 100644
index 0000000..4c7f39a
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.google.common.collect.Lists.newArrayList;
+
+import java.util.List;
+
+
+/**
+ * Filters out records, which were already processed and checkpointed.
+ * <p>
+ * We need this step, because we can get iterators from Kinesis only with "sequenceNumber" accuracy,
+ * not with "subSequenceNumber" accuracy.
+ */
+class RecordFilter {
+ public List<KinesisRecord> apply(List<KinesisRecord> records, ShardCheckpoint checkpoint) {
+ List<KinesisRecord> filteredRecords = newArrayList();
+ for (KinesisRecord record : records) {
+ if (checkpoint.isBeforeOrAt(record)) {
+ filteredRecords.add(record);
+ }
+ }
+ return filteredRecords;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
new file mode 100644
index 0000000..7257aa1
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.Queues.newArrayDeque;
+
+
+import java.util.Deque;
+import java.util.Iterator;
+
+/***
+ * Very simple implementation of round robin algorithm.
+ */
+class RoundRobin<T> implements Iterable<T> {
+ private final Deque<T> deque;
+
+ public RoundRobin(Iterable<T> collection) {
+ this.deque = newArrayDeque(collection);
+ checkArgument(!deque.isEmpty(), "Tried to initialize RoundRobin with empty collection");
+ }
+
+ public T getCurrent() {
+ return deque.getFirst();
+ }
+
+ public void moveForward() {
+ deque.addLast(deque.removeFirst());
+ }
+
+ public int size() {
+ return deque.size();
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ return deque.iterator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
new file mode 100644
index 0000000..1d8628b
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
+import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.joda.time.Instant;
+import java.io.Serializable;
+
+
+/***
+ * Checkpoint mark for single shard in the stream.
+ * Current position in the shard is determined by either:
+ * <ul>
+ * <li>{@link #shardIteratorType} if it is equal to {@link ShardIteratorType#LATEST} or
+ * {@link ShardIteratorType#TRIM_HORIZON}</li>
+ * <li>combination of
+ * {@link #sequenceNumber} and {@link #subSequenceNumber} if
+ * {@link ShardIteratorType#AFTER_SEQUENCE_NUMBER} or
+ * {@link ShardIteratorType#AT_SEQUENCE_NUMBER}</li>
+ * </ul>
+ * This class is immutable.
+ */
+class ShardCheckpoint implements Serializable {
+ private final String streamName;
+ private final String shardId;
+ private final String sequenceNumber;
+ private final ShardIteratorType shardIteratorType;
+ private final Long subSequenceNumber;
+ private final Instant timestamp;
+
+ public ShardCheckpoint(String streamName, String shardId, StartingPoint
+ startingPoint) {
+ this(streamName, shardId,
+ ShardIteratorType.fromValue(startingPoint.getPositionName()),
+ startingPoint.getTimestamp());
+ }
+
+ public ShardCheckpoint(String streamName, String shardId, ShardIteratorType
+ shardIteratorType, Instant timestamp) {
+ this(streamName, shardId, shardIteratorType, null, null, timestamp);
+ }
+
+ public ShardCheckpoint(String streamName, String shardId, ShardIteratorType
+ shardIteratorType, String sequenceNumber, Long subSequenceNumber) {
+ this(streamName, shardId, shardIteratorType, sequenceNumber, subSequenceNumber, null);
+ }
+
+ private ShardCheckpoint(String streamName, String shardId, ShardIteratorType shardIteratorType,
+ String sequenceNumber, Long subSequenceNumber, Instant timestamp) {
+ this.shardIteratorType = checkNotNull(shardIteratorType, "shardIteratorType");
+ this.streamName = checkNotNull(streamName, "streamName");
+ this.shardId = checkNotNull(shardId, "shardId");
+ if (shardIteratorType == AT_SEQUENCE_NUMBER || shardIteratorType == AFTER_SEQUENCE_NUMBER) {
+ checkNotNull(sequenceNumber,
+ "You must provide sequence number for AT_SEQUENCE_NUMBER"
+ + " or AFTER_SEQUENCE_NUMBER");
+ } else {
+ checkArgument(sequenceNumber == null,
+ "Sequence number must be null for LATEST, TRIM_HORIZON or AT_TIMESTAMP");
+ }
+ if (shardIteratorType == AT_TIMESTAMP) {
+ checkNotNull(timestamp,
+ "You must provide timestamp for AT_SEQUENCE_NUMBER"
+ + " or AFTER_SEQUENCE_NUMBER");
+ } else {
+ checkArgument(timestamp == null,
+ "Timestamp must be null for an iterator type other than AT_TIMESTAMP");
+ }
+
+ this.subSequenceNumber = subSequenceNumber;
+ this.sequenceNumber = sequenceNumber;
+ this.timestamp = timestamp;
+ }
+
+ /***
+ * Used to compare {@link ShardCheckpoint} object to {@link KinesisRecord}. Depending
+ * on the the underlying shardIteratorType, it will either compare the timestamp or the
+ * {@link ExtendedSequenceNumber}.
+ *
+ * @param other
+ * @return if current checkpoint mark points before or at given {@link ExtendedSequenceNumber}
+ */
+ public boolean isBeforeOrAt(KinesisRecord other) {
+ if (shardIteratorType == AT_TIMESTAMP) {
+ return timestamp.compareTo(other.getApproximateArrivalTimestamp()) <= 0;
+ }
+ int result = extendedSequenceNumber().compareTo(other.getExtendedSequenceNumber());
+ if (result == 0) {
+ return shardIteratorType == AT_SEQUENCE_NUMBER;
+ }
+ return result < 0;
+ }
+
+ private ExtendedSequenceNumber extendedSequenceNumber() {
+ String fullSequenceNumber = sequenceNumber;
+ if (fullSequenceNumber == null) {
+ fullSequenceNumber = shardIteratorType.toString();
+ }
+ return new ExtendedSequenceNumber(fullSequenceNumber, subSequenceNumber);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Checkpoint %s for stream %s, shard %s: %s", shardIteratorType,
+ streamName, shardId,
+ sequenceNumber);
+ }
+
+ public ShardRecordsIterator getShardRecordsIterator(SimplifiedKinesisClient kinesis)
+ throws TransientKinesisException {
+ return new ShardRecordsIterator(this, kinesis);
+ }
+
+ public String getShardIterator(SimplifiedKinesisClient kinesisClient)
+ throws TransientKinesisException {
+ if (checkpointIsInTheMiddleOfAUserRecord()) {
+ return kinesisClient.getShardIterator(streamName,
+ shardId, AT_SEQUENCE_NUMBER,
+ sequenceNumber, null);
+ }
+ return kinesisClient.getShardIterator(streamName,
+ shardId, shardIteratorType,
+ sequenceNumber, timestamp);
+ }
+
+ private boolean checkpointIsInTheMiddleOfAUserRecord() {
+ return shardIteratorType == AFTER_SEQUENCE_NUMBER && subSequenceNumber != null;
+ }
+
+ /***
+ * Used to advance checkpoint mark to position after given {@link Record}.
+ *
+ * @param record
+ * @return new checkpoint object pointing directly after given {@link Record}
+ */
+ public ShardCheckpoint moveAfter(KinesisRecord record) {
+ return new ShardCheckpoint(
+ streamName, shardId,
+ AFTER_SEQUENCE_NUMBER,
+ record.getSequenceNumber(),
+ record.getSubSequenceNumber());
+ }
+
+ public String getStreamName() {
+ return streamName;
+ }
+
+ public String getShardId() {
+ return shardId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
new file mode 100644
index 0000000..7dfe158
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Queues.newArrayDeque;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Deque;
+
+/***
+ * Iterates over records in a single shard.
+ * Under the hood records are retrieved from Kinesis in batches and stored in the in-memory queue.
+ * Then the caller of {@link ShardRecordsIterator#next()} can read from queue one by one.
+ */
+class ShardRecordsIterator {
+ private static final Logger LOG = LoggerFactory.getLogger(ShardRecordsIterator.class);
+
+ private final SimplifiedKinesisClient kinesis;
+ private final RecordFilter filter;
+ private ShardCheckpoint checkpoint;
+ private String shardIterator;
+ private Deque<KinesisRecord> data = newArrayDeque();
+
+ public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
+ SimplifiedKinesisClient simplifiedKinesisClient) throws
+ TransientKinesisException {
+ this(initialCheckpoint, simplifiedKinesisClient, new RecordFilter());
+ }
+
+ public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
+ SimplifiedKinesisClient simplifiedKinesisClient,
+ RecordFilter filter) throws
+ TransientKinesisException {
+
+ this.checkpoint = checkNotNull(initialCheckpoint, "initialCheckpoint");
+ this.filter = checkNotNull(filter, "filter");
+ this.kinesis = checkNotNull(simplifiedKinesisClient, "simplifiedKinesisClient");
+ shardIterator = checkpoint.getShardIterator(kinesis);
+ }
+
+ /***
+ * Returns record if there's any present.
+ * Returns absent() if there are no new records at this time in the shard.
+ */
+ public CustomOptional<KinesisRecord> next() throws TransientKinesisException {
+ readMoreIfNecessary();
+
+ if (data.isEmpty()) {
+ return CustomOptional.absent();
+ } else {
+ KinesisRecord record = data.removeFirst();
+ checkpoint = checkpoint.moveAfter(record);
+ return CustomOptional.of(record);
+ }
+ }
+
+ private void readMoreIfNecessary() throws TransientKinesisException {
+ if (data.isEmpty()) {
+ GetKinesisRecordsResult response;
+ try {
+ response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(),
+ checkpoint.getShardId());
+ } catch (ExpiredIteratorException e) {
+ LOG.info("Refreshing expired iterator", e);
+ shardIterator = checkpoint.getShardIterator(kinesis);
+ response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(),
+ checkpoint.getShardId());
+ }
+ LOG.debug("Fetched {} new records", response.getRecords().size());
+ shardIterator = response.getNextShardIterator();
+ data.addAll(filter.apply(response.getRecords(), checkpoint));
+ }
+ }
+
+ public ShardCheckpoint getCheckpoint() {
+ return checkpoint;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
new file mode 100644
index 0000000..f9a1ea2
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+
+import com.google.common.collect.Lists;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import com.amazonaws.services.kinesis.model.StreamDescription;
+import org.joda.time.Instant;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+/***
+ * Wraps {@link AmazonKinesis} class providing much simpler interface and
+ * proper error handling.
+ */
+class SimplifiedKinesisClient {
+ private final AmazonKinesis kinesis;
+
+ public SimplifiedKinesisClient(AmazonKinesis kinesis) {
+ this.kinesis = kinesis;
+ }
+
+ public static SimplifiedKinesisClient from(KinesisClientProvider provider) {
+ return new SimplifiedKinesisClient(provider.get());
+ }
+
+ public String getShardIterator(final String streamName, final String shardId,
+ final ShardIteratorType shardIteratorType,
+ final String startingSequenceNumber, final Instant timestamp)
+ throws TransientKinesisException {
+ final Date date = timestamp != null ? timestamp.toDate() : null;
+ return wrapExceptions(new Callable<String>() {
+ @Override
+ public String call() throws Exception {
+ return kinesis.getShardIterator(new GetShardIteratorRequest()
+ .withStreamName(streamName)
+ .withShardId(shardId)
+ .withShardIteratorType(shardIteratorType)
+ .withStartingSequenceNumber(startingSequenceNumber)
+ .withTimestamp(date)
+ ).getShardIterator();
+ }
+ });
+ }
+
+ public List<Shard> listShards(final String streamName) throws TransientKinesisException {
+ return wrapExceptions(new Callable<List<Shard>>() {
+ @Override
+ public List<Shard> call() throws Exception {
+ List<Shard> shards = Lists.newArrayList();
+ String lastShardId = null;
+
+ StreamDescription description;
+ do {
+ description = kinesis.describeStream(streamName, lastShardId)
+ .getStreamDescription();
+
+ shards.addAll(description.getShards());
+ lastShardId = shards.get(shards.size() - 1).getShardId();
+ } while (description.getHasMoreShards());
+
+ return shards;
+ }
+ });
+ }
+
+ /***
+ * Gets records from Kinesis and deaggregates them if needed.
+ *
+ * @return list of deaggregated records
+ * @throws TransientKinesisException - in case of recoverable situation
+ */
+ public GetKinesisRecordsResult getRecords(String shardIterator, String streamName,
+ String shardId) throws TransientKinesisException {
+ return getRecords(shardIterator, streamName, shardId, null);
+ }
+
+ /***
+ * Gets records from Kinesis and deaggregates them if needed.
+ *
+ * @return list of deaggregated records
+ * @throws TransientKinesisException - in case of recoverable situation
+ */
+ public GetKinesisRecordsResult getRecords(final String shardIterator, final String streamName,
+ final String shardId, final Integer limit)
+ throws
+ TransientKinesisException {
+ return wrapExceptions(new Callable<GetKinesisRecordsResult>() {
+ @Override
+ public GetKinesisRecordsResult call() throws Exception {
+ GetRecordsResult response = kinesis.getRecords(new GetRecordsRequest()
+ .withShardIterator(shardIterator)
+ .withLimit(limit));
+ return new GetKinesisRecordsResult(
+ UserRecord.deaggregate(response.getRecords()),
+ response.getNextShardIterator(),
+ streamName, shardId);
+ }
+ });
+ }
+
+ /***
+ * Wraps Amazon specific exceptions into more friendly format.
+ *
+ * @throws TransientKinesisException - in case of recoverable situation, i.e.
+ * the request rate is too high, Kinesis remote service
+ * failed, network issue, etc.
+ * @throws ExpiredIteratorException - if iterator needs to be refreshed
+ * @throws RuntimeException - in all other cases
+ */
+ private <T> T wrapExceptions(Callable<T> callable) throws TransientKinesisException {
+ try {
+ return callable.call();
+ } catch (ExpiredIteratorException e) {
+ throw e;
+ } catch (LimitExceededException | ProvisionedThroughputExceededException e) {
+ throw new TransientKinesisException(
+ "Too many requests to Kinesis. Wait some time and retry.", e);
+ } catch (AmazonServiceException e) {
+ if (e.getErrorType() == AmazonServiceException.ErrorType.Service) {
+ throw new TransientKinesisException(
+ "Kinesis backend failed. Wait some time and retry.", e);
+ }
+ throw new RuntimeException("Kinesis client side failure", e);
+ } catch (Exception e) {
+ throw new RuntimeException("Unknown kinesis failure, when trying to reach kinesis", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
new file mode 100644
index 0000000..8140269
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.joda.time.Instant;
+import java.io.Serializable;
+import java.util.Objects;
+
+/***
+ * Denotes a point at which the reader should start reading from a Kinesis stream.
+ * It can be expressed either as an {@link InitialPositionInStream} enum constant or a timestamp,
+ * in which case the reader will start reading at the specified point in time.
+ */
+class StartingPoint implements Serializable {
+ private final InitialPositionInStream position;
+ private final Instant timestamp;
+
+ public StartingPoint(InitialPositionInStream position) {
+ this.position = checkNotNull(position, "position");
+ this.timestamp = null;
+ }
+
+ public StartingPoint(Instant timestamp) {
+ this.timestamp = checkNotNull(timestamp, "timestamp");
+ this.position = null;
+ }
+
+ public InitialPositionInStream getPosition() {
+ return position;
+ }
+
+ public String getPositionName() {
+ return position != null ? position.name() : ShardIteratorType.AT_TIMESTAMP.name();
+ }
+
+ public Instant getTimestamp() {
+ return timestamp != null ? timestamp : null;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ StartingPoint that = (StartingPoint) o;
+ return position == that.position && Objects.equals(timestamp, that.timestamp);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(position, timestamp);
+ }
+
+ @Override
+ public String toString() {
+ if (timestamp == null) {
+ return position.toString();
+ } else {
+ return "Starting at timestamp " + timestamp;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java
new file mode 100644
index 0000000..22dc973
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Always returns the same instance of checkpoint.
+ */
+class StaticCheckpointGenerator implements CheckpointGenerator {
+ private final KinesisReaderCheckpoint checkpoint;
+
+ public StaticCheckpointGenerator(KinesisReaderCheckpoint checkpoint) {
+ checkNotNull(checkpoint, "checkpoint");
+ this.checkpoint = checkpoint;
+ }
+
+ @Override
+ public KinesisReaderCheckpoint generate(SimplifiedKinesisClient client) {
+ return checkpoint;
+ }
+
+ @Override
+ public String toString() {
+ return checkpoint.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
new file mode 100644
index 0000000..a1a974b
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import com.amazonaws.AmazonServiceException;
+
+/**
+ * Created by p.pastuszka on 21.06.2016.
+ */
+class TransientKinesisException extends Exception {
+ public TransientKinesisException(String s, AmazonServiceException e) {
+ super(s, e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/package-info.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/package-info.java
new file mode 100644
index 0000000..5e37ef1
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Tests for KinesisIO.
+ */
+package org.apache.beam.sdk.io.kinesis;