You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/26 23:12:25 UTC

[3/5] incubator-beam git commit: kinesis: a connector for Amazon Kinesis

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
new file mode 100644
index 0000000..7ca8e0b
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Lists.transform;
+import com.google.common.base.Function;
+
+import com.amazonaws.AmazonWebServiceRequest;
+import com.amazonaws.ResponseMetadata;
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.model.AddTagsToStreamRequest;
+import com.amazonaws.services.kinesis.model.AddTagsToStreamResult;
+import com.amazonaws.services.kinesis.model.CreateStreamRequest;
+import com.amazonaws.services.kinesis.model.CreateStreamResult;
+import com.amazonaws.services.kinesis.model.DecreaseStreamRetentionPeriodRequest;
+import com.amazonaws.services.kinesis.model.DecreaseStreamRetentionPeriodResult;
+import com.amazonaws.services.kinesis.model.DeleteStreamRequest;
+import com.amazonaws.services.kinesis.model.DeleteStreamResult;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.DisableEnhancedMonitoringRequest;
+import com.amazonaws.services.kinesis.model.DisableEnhancedMonitoringResult;
+import com.amazonaws.services.kinesis.model.EnableEnhancedMonitoringRequest;
+import com.amazonaws.services.kinesis.model.EnableEnhancedMonitoringResult;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import com.amazonaws.services.kinesis.model.IncreaseStreamRetentionPeriodRequest;
+import com.amazonaws.services.kinesis.model.IncreaseStreamRetentionPeriodResult;
+import com.amazonaws.services.kinesis.model.ListStreamsRequest;
+import com.amazonaws.services.kinesis.model.ListStreamsResult;
+import com.amazonaws.services.kinesis.model.ListTagsForStreamRequest;
+import com.amazonaws.services.kinesis.model.ListTagsForStreamResult;
+import com.amazonaws.services.kinesis.model.MergeShardsRequest;
+import com.amazonaws.services.kinesis.model.MergeShardsResult;
+import com.amazonaws.services.kinesis.model.PutRecordRequest;
+import com.amazonaws.services.kinesis.model.PutRecordResult;
+import com.amazonaws.services.kinesis.model.PutRecordsRequest;
+import com.amazonaws.services.kinesis.model.PutRecordsResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.RemoveTagsFromStreamRequest;
+import com.amazonaws.services.kinesis.model.RemoveTagsFromStreamResult;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import com.amazonaws.services.kinesis.model.SplitShardRequest;
+import com.amazonaws.services.kinesis.model.SplitShardResult;
+import com.amazonaws.services.kinesis.model.StreamDescription;
+import static org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode;
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.joda.time.Instant;
+import static java.lang.Integer.parseInt;
+import static java.lang.Math.min;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/**
+ * Created by p.pastuszka on 21.07.2016.
+ */
+class AmazonKinesisMock implements AmazonKinesis {
+
+    static class TestData implements Serializable {
+        private final String data;
+        private final Instant arrivalTimestamp;
+        private final String sequenceNumber;
+
+        public TestData(KinesisRecord record) {
+            this(new String(record.getData().array()),
+                    record.getApproximateArrivalTimestamp(),
+                    record.getSequenceNumber());
+        }
+
+        public TestData(String data, Instant arrivalTimestamp, String sequenceNumber) {
+            this.data = data;
+            this.arrivalTimestamp = arrivalTimestamp;
+            this.sequenceNumber = sequenceNumber;
+        }
+
+        public Record convertToRecord() {
+            return new Record().
+                    withApproximateArrivalTimestamp(arrivalTimestamp.toDate()).
+                    withData(ByteBuffer.wrap(data.getBytes())).
+                    withSequenceNumber(sequenceNumber).
+                    withPartitionKey("");
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            return EqualsBuilder.reflectionEquals(this, obj);
+        }
+
+        @Override
+        public int hashCode() {
+            return reflectionHashCode(this);
+        }
+    }
+
+    static class Provider implements KinesisClientProvider {
+
+        private final List<List<TestData>> shardedData;
+        private final int numberOfRecordsPerGet;
+
+        public Provider(List<List<TestData>> shardedData, int numberOfRecordsPerGet) {
+            this.shardedData = shardedData;
+            this.numberOfRecordsPerGet = numberOfRecordsPerGet;
+        }
+
+        @Override
+        public AmazonKinesis get() {
+            return new AmazonKinesisMock(transform(shardedData,
+                    new Function<List<TestData>, List<Record>>() {
+                        @Override
+                        public List<Record> apply(@Nullable List<TestData> testDatas) {
+                            return transform(testDatas, new Function<TestData, Record>() {
+                                @Override
+                                public Record apply(@Nullable TestData testData) {
+                                    return testData.convertToRecord();
+                                }
+                            });
+                        }
+                    }), numberOfRecordsPerGet);
+        }
+    }
+
+    private final List<List<Record>> shardedData;
+    private final int numberOfRecordsPerGet;
+
+    public AmazonKinesisMock(List<List<Record>> shardedData, int numberOfRecordsPerGet) {
+        this.shardedData = shardedData;
+        this.numberOfRecordsPerGet = numberOfRecordsPerGet;
+    }
+
+    @Override
+    public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) {
+        String[] shardIteratorParts = getRecordsRequest.getShardIterator().split(":");
+        int shardId = parseInt(shardIteratorParts[0]);
+        int startingRecord = parseInt(shardIteratorParts[1]);
+        List<Record> shardData = shardedData.get(shardId);
+
+        int toIndex = min(startingRecord + numberOfRecordsPerGet, shardData.size());
+        int fromIndex = min(startingRecord, toIndex);
+        return new GetRecordsResult().
+                withRecords(shardData.subList(fromIndex, toIndex)).
+                withNextShardIterator(String.format("%s:%s", shardId, toIndex));
+    }
+
+    @Override
+    public GetShardIteratorResult getShardIterator(
+            GetShardIteratorRequest getShardIteratorRequest) {
+        ShardIteratorType shardIteratorType = ShardIteratorType.fromValue(
+                getShardIteratorRequest.getShardIteratorType());
+
+        String shardIterator;
+        if (shardIteratorType == ShardIteratorType.TRIM_HORIZON) {
+            shardIterator = String.format("%s:%s", getShardIteratorRequest.getShardId(), 0);
+        } else {
+            throw new RuntimeException("Not implemented");
+        }
+
+        return new GetShardIteratorResult().withShardIterator(shardIterator);
+    }
+
+    @Override
+    public DescribeStreamResult describeStream(String streamName, String exclusiveStartShardId) {
+        int nextShardId = 0;
+        if (exclusiveStartShardId != null) {
+            nextShardId = parseInt(exclusiveStartShardId) + 1;
+        }
+        boolean hasMoreShards = nextShardId + 1 < shardedData.size();
+
+        List<Shard> shards = newArrayList();
+        if (nextShardId < shardedData.size()) {
+            shards.add(new Shard().withShardId(Integer.toString(nextShardId)));
+        }
+
+        return new DescribeStreamResult().withStreamDescription(
+                new StreamDescription().withHasMoreShards(hasMoreShards).withShards(shards)
+        );
+    }
+
+    @Override
+    public void setEndpoint(String endpoint) {
+
+    }
+
+    @Override
+    public void setRegion(Region region) {
+
+    }
+
+    @Override
+    public AddTagsToStreamResult addTagsToStream(AddTagsToStreamRequest addTagsToStreamRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public CreateStreamResult createStream(CreateStreamRequest createStreamRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public CreateStreamResult createStream(String streamName, Integer shardCount) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public DecreaseStreamRetentionPeriodResult decreaseStreamRetentionPeriod(
+            DecreaseStreamRetentionPeriodRequest decreaseStreamRetentionPeriodRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public DeleteStreamResult deleteStream(DeleteStreamRequest deleteStreamRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public DeleteStreamResult deleteStream(String streamName) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public DescribeStreamResult describeStream(DescribeStreamRequest describeStreamRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public DescribeStreamResult describeStream(String streamName) {
+
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public DescribeStreamResult describeStream(String streamName,
+                                               Integer limit, String exclusiveStartShardId) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public DisableEnhancedMonitoringResult disableEnhancedMonitoring(
+            DisableEnhancedMonitoringRequest disableEnhancedMonitoringRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public EnableEnhancedMonitoringResult enableEnhancedMonitoring(
+            EnableEnhancedMonitoringRequest enableEnhancedMonitoringRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public GetShardIteratorResult getShardIterator(String streamName,
+                                                   String shardId,
+                                                   String shardIteratorType) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public GetShardIteratorResult getShardIterator(String streamName,
+                                                   String shardId,
+                                                   String shardIteratorType,
+                                                   String startingSequenceNumber) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public IncreaseStreamRetentionPeriodResult increaseStreamRetentionPeriod(
+            IncreaseStreamRetentionPeriodRequest increaseStreamRetentionPeriodRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public ListStreamsResult listStreams(ListStreamsRequest listStreamsRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public ListStreamsResult listStreams() {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public ListStreamsResult listStreams(String exclusiveStartStreamName) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public ListStreamsResult listStreams(Integer limit, String exclusiveStartStreamName) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public ListTagsForStreamResult listTagsForStream(
+            ListTagsForStreamRequest listTagsForStreamRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public MergeShardsResult mergeShards(MergeShardsRequest mergeShardsRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public MergeShardsResult mergeShards(String streamName,
+                                         String shardToMerge, String adjacentShardToMerge) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public PutRecordResult putRecord(PutRecordRequest putRecordRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public PutRecordResult putRecord(String streamName, ByteBuffer data, String partitionKey) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public PutRecordResult putRecord(String streamName, ByteBuffer data,
+                                     String partitionKey, String sequenceNumberForOrdering) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public RemoveTagsFromStreamResult removeTagsFromStream(
+            RemoveTagsFromStreamRequest removeTagsFromStreamRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public SplitShardResult splitShard(SplitShardRequest splitShardRequest) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public SplitShardResult splitShard(String streamName,
+                                       String shardToSplit, String newStartingHashKey) {
+        throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public void shutdown() {
+
+    }
+
+    @Override
+    public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) {
+        throw new RuntimeException("Not implemented");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
new file mode 100644
index 0000000..152fd6d
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import org.junit.Test;
+import java.util.NoSuchElementException;
+
+/**
+ * Created by ppastuszka on 12.12.15.
+ */
+public class CustomOptionalTest {
+    @Test(expected = NoSuchElementException.class)
+    public void absentThrowsNoSuchElementExceptionOnGet() {
+        CustomOptional.absent().get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
new file mode 100644
index 0000000..a9e5a69
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.amazonaws.services.kinesis.model.Shard;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.BDDMockito.given;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import static java.util.Arrays.asList;
+
+
+/***
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class DynamicCheckpointGeneratorTest {
+
+    @Mock
+    private SimplifiedKinesisClient kinesisClient;
+    @Mock
+    private Shard shard1, shard2, shard3;
+
+    @Test
+    public void shouldMapAllShardsToCheckpoints() throws Exception {
+        given(shard1.getShardId()).willReturn("shard-01");
+        given(shard2.getShardId()).willReturn("shard-02");
+        given(shard3.getShardId()).willReturn("shard-03");
+        given(kinesisClient.listShards("stream")).willReturn(asList(shard1, shard2, shard3));
+
+        StartingPoint startingPoint = new StartingPoint(InitialPositionInStream.LATEST);
+        DynamicCheckpointGenerator underTest = new DynamicCheckpointGenerator("stream",
+                startingPoint);
+
+        KinesisReaderCheckpoint checkpoint = underTest.generate(kinesisClient);
+
+        assertThat(checkpoint).hasSize(3);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
new file mode 100644
index 0000000..61a858f
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import static com.google.common.collect.Lists.newArrayList;
+
+import com.google.common.collect.Iterables;
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import org.joda.time.DateTime;
+import org.junit.Test;
+import java.util.List;
+
+/**
+ * Created by p.pastuszka on 22.07.2016.
+ */
+public class KinesisMockReadTest {
+    @Test
+    public void readsDataFromMockKinesis() {
+        int noOfShards = 3;
+        int noOfEventsPerShard = 100;
+        List<List<AmazonKinesisMock.TestData>> testData =
+                provideTestData(noOfShards, noOfEventsPerShard);
+
+        final Pipeline p = TestPipeline.create();
+        PCollection<AmazonKinesisMock.TestData> result = p.
+                apply(
+                        KinesisIO.Read.
+                                from("stream", InitialPositionInStream.TRIM_HORIZON).
+                                using(new AmazonKinesisMock.Provider(testData, 10)).
+                                withMaxNumRecords(noOfShards * noOfEventsPerShard)).
+                apply(ParDo.of(new KinesisRecordToTestData()));
+        PAssert.that(result).containsInAnyOrder(Iterables.concat(testData));
+        p.run();
+    }
+
+    private static class KinesisRecordToTestData extends
+            DoFn<KinesisRecord, AmazonKinesisMock.TestData> {
+        @ProcessElement
+        public void processElement(ProcessContext c) throws Exception {
+            c.output(new AmazonKinesisMock.TestData(c.element()));
+        }
+    }
+
+    private List<List<AmazonKinesisMock.TestData>> provideTestData(
+            int noOfShards,
+            int noOfEventsPerShard) {
+
+        int seqNumber = 0;
+
+        List<List<AmazonKinesisMock.TestData>> shardedData = newArrayList();
+        for (int i = 0; i < noOfShards; ++i) {
+            List<AmazonKinesisMock.TestData> shardData = newArrayList();
+            shardedData.add(shardData);
+
+            DateTime arrival = DateTime.now();
+            for (int j = 0; j < noOfEventsPerShard; ++j) {
+                arrival = arrival.plusSeconds(1);
+
+                seqNumber++;
+                shardData.add(new AmazonKinesisMock.TestData(
+                        Integer.toString(seqNumber),
+                        arrival.toInstant(),
+                        Integer.toString(seqNumber))
+                );
+            }
+        }
+
+        return shardedData;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
new file mode 100644
index 0000000..205f050
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+
+import com.google.common.collect.Iterables;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import static java.util.Arrays.asList;
+import java.util.Iterator;
+import java.util.List;
+
+/***
+ *
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class KinesisReaderCheckpointTest {
+    @Mock
+    private ShardCheckpoint a, b, c;
+
+    private KinesisReaderCheckpoint checkpoint;
+
+    @Before
+    public void setUp() {
+        checkpoint = new KinesisReaderCheckpoint(asList(a, b, c));
+    }
+
+    @Test
+    public void splitsCheckpointAccordingly() {
+        verifySplitInto(1);
+        verifySplitInto(2);
+        verifySplitInto(3);
+        verifySplitInto(4);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void isImmutable() {
+        Iterator<ShardCheckpoint> iterator = checkpoint.iterator();
+        iterator.remove();
+    }
+
+    private void verifySplitInto(int size) {
+        List<KinesisReaderCheckpoint> split = checkpoint.splitInto(size);
+        assertThat(Iterables.concat(split)).containsOnly(a, b, c);
+        assertThat(split).hasSize(Math.min(size, 3));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
new file mode 100644
index 0000000..fbc7c66
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Lists.newArrayList;
+
+import com.amazonaws.regions.Regions;
+import static org.assertj.core.api.Assertions.assertThat;
+import org.apache.commons.lang.RandomStringUtils;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Ignore;
+import org.junit.Test;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Integration test, that reads from the real Kinesis.
+ * You need to provide all {@link KinesisTestOptions} in order to run this.
+ */
+public class KinesisReaderIT {
+    private static final long PIPELINE_STARTUP_TIME = TimeUnit.SECONDS.toMillis(10);
+    private ExecutorService singleThreadExecutor = newSingleThreadExecutor();
+
+
+    @Ignore
+    @Test
+    public void readsDataFromRealKinesisStream()
+            throws IOException, InterruptedException, ExecutionException {
+        KinesisTestOptions options = readKinesisOptions();
+        List<String> testData = prepareTestData(1000);
+
+        Future<?> future = startTestPipeline(testData, options);
+        KinesisUploader.uploadAll(testData, options);
+        future.get();
+    }
+
+    private List<String> prepareTestData(int count) {
+        List<String> data = newArrayList();
+        for (int i = 0; i < count; ++i) {
+            data.add(RandomStringUtils.randomAlphabetic(32));
+        }
+        return data;
+    }
+
+    private Future<?> startTestPipeline(List<String> testData, KinesisTestOptions options)
+            throws InterruptedException {
+        final Pipeline p = TestPipeline.create();
+        PCollection<String> result = p.
+                apply(KinesisIO.Read.
+                        from(options.getAwsKinesisStream(), Instant.now()).
+                        using(options.getAwsAccessKey(), options.getAwsSecretKey(),
+                                Regions.fromName(options.getAwsKinesisRegion())).
+                        withMaxReadTime(Duration.standardMinutes(3))
+                ).
+                apply(ParDo.of(new RecordDataToString()));
+        PAssert.that(result).containsInAnyOrder(testData);
+
+        Future<?> future = singleThreadExecutor.submit(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                PipelineResult result = p.run();
+                PipelineResult.State state = result.getState();
+                while (state != PipelineResult.State.DONE && state != PipelineResult.State.FAILED) {
+                    Thread.sleep(1000);
+                    state = result.getState();
+                }
+                assertThat(state).isEqualTo(PipelineResult.State.DONE);
+                return null;
+            }
+        });
+        Thread.sleep(PIPELINE_STARTUP_TIME);
+        return future;
+    }
+
+    private KinesisTestOptions readKinesisOptions() {
+        PipelineOptionsFactory.register(KinesisTestOptions.class);
+        return TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
+    }
+
+    private static class RecordDataToString extends DoFn<KinesisRecord, String> {
+        @ProcessElement
+        public void processElement(ProcessContext c) throws Exception {
+            checkNotNull(c.element(), "Null record given");
+            c.output(new String(c.element().getData().array(), StandardCharsets.UTF_8));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
new file mode 100644
index 0000000..793fb57
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import static java.util.Arrays.asList;
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * Created by ppastuszka on 12.12.15.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class KinesisReaderTest {
+    @Mock
+    private SimplifiedKinesisClient kinesis;
+    @Mock
+    private CheckpointGenerator generator;
+    @Mock
+    private ShardCheckpoint firstCheckpoint, secondCheckpoint;
+    @Mock
+    private ShardRecordsIterator firstIterator, secondIterator;
+    @Mock
+    private KinesisRecord a, b, c, d;
+
+    private KinesisReader reader;
+
+    @Before
+    public void setUp() throws IOException, TransientKinesisException {
+        when(generator.generate(kinesis)).thenReturn(new KinesisReaderCheckpoint(
+                asList(firstCheckpoint, secondCheckpoint)
+        ));
+        when(firstCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(firstIterator);
+        when(secondCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(secondIterator);
+        when(firstIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
+        when(secondIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
+
+        reader = new KinesisReader(kinesis, generator, null);
+    }
+
+    @Test
+    public void startReturnsFalseIfNoDataAtTheBeginning() throws IOException {
+        assertThat(reader.start()).isFalse();
+    }
+
+    @Test(expected = NoSuchElementException.class)
+    public void throwsNoSuchElementExceptionIfNoData() throws IOException {
+        reader.start();
+        reader.getCurrent();
+    }
+
+    @Test
+    public void startReturnsTrueIfSomeDataAvailable() throws IOException,
+            TransientKinesisException {
+        when(firstIterator.next()).
+                thenReturn(CustomOptional.of(a)).
+                thenReturn(CustomOptional.<KinesisRecord>absent());
+
+        assertThat(reader.start()).isTrue();
+    }
+
+    @Test
+    public void advanceReturnsFalseIfThereIsTransientExceptionInKinesis()
+            throws IOException, TransientKinesisException {
+        reader.start();
+
+        when(firstIterator.next()).thenThrow(TransientKinesisException.class);
+
+        assertThat(reader.advance()).isFalse();
+    }
+
+    @Test
+    public void readsThroughAllDataAvailable() throws IOException, TransientKinesisException {
+        when(firstIterator.next()).
+                thenReturn(CustomOptional.<KinesisRecord>absent()).
+                thenReturn(CustomOptional.of(a)).
+                thenReturn(CustomOptional.<KinesisRecord>absent()).
+                thenReturn(CustomOptional.of(b)).
+                thenReturn(CustomOptional.<KinesisRecord>absent());
+
+        when(secondIterator.next()).
+                thenReturn(CustomOptional.of(c)).
+                thenReturn(CustomOptional.<KinesisRecord>absent()).
+                thenReturn(CustomOptional.of(d)).
+                thenReturn(CustomOptional.<KinesisRecord>absent());
+
+        assertThat(reader.start()).isTrue();
+        assertThat(reader.getCurrent()).isEqualTo(c);
+        assertThat(reader.advance()).isTrue();
+        assertThat(reader.getCurrent()).isEqualTo(a);
+        assertThat(reader.advance()).isTrue();
+        assertThat(reader.getCurrent()).isEqualTo(d);
+        assertThat(reader.advance()).isTrue();
+        assertThat(reader.getCurrent()).isEqualTo(b);
+        assertThat(reader.advance()).isFalse();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
new file mode 100644
index 0000000..b09b7eb
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import org.apache.beam.sdk.testing.CoderProperties;
+
+import org.joda.time.Instant;
+import org.junit.Test;
+import java.nio.ByteBuffer;
+
+/**
+ * Created by p.pastuszka on 20.07.2016.
+ */
+public class KinesisRecordCoderTest {
+    @Test
+    public void encodingAndDecodingWorks() throws Exception {
+        KinesisRecord record = new KinesisRecord(
+                ByteBuffer.wrap("data".getBytes()),
+                "sequence",
+                128L,
+                "partition",
+                Instant.now(),
+                Instant.now(),
+                "stream",
+                "shard"
+        );
+        CoderProperties.coderDecodeEncodeEqual(
+                new KinesisRecordCoder(), record
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
new file mode 100644
index 0000000..65a7605
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+
+/***
+ * Options for Kinesis integration tests.
+ */
+public interface KinesisTestOptions extends TestPipelineOptions {
+    @Description("AWS region where Kinesis stream resided")
+    @Default.String("aws-kinesis-region")
+    String getAwsKinesisRegion();
+    void setAwsKinesisRegion(String value);
+
+    @Description("Kinesis stream name")
+    @Default.String("aws-kinesis-stream")
+    String getAwsKinesisStream();
+    void setAwsKinesisStream(String value);
+
+    @Description("AWS secret key")
+    @Default.String("aws-secret-key")
+    String getAwsSecretKey();
+    void setAwsSecretKey(String value);
+
+    @Description("AWS access key")
+    @Default.String("aws-access-key")
+    String getAwsAccessKey();
+    void setAwsAccessKey(String value);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
new file mode 100644
index 0000000..0dcede9
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.google.common.collect.Lists.newArrayList;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.internal.StaticCredentialsProvider;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.PutRecordsRequest;
+import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
+import com.amazonaws.services.kinesis.model.PutRecordsResult;
+import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/***
+ * Sends records to Kinesis in reliable way.
+ */
+public class KinesisUploader {
+
+    public static final int MAX_NUMBER_OF_RECORDS_IN_BATCH = 499;
+
+    public static void uploadAll(List<String> data, KinesisTestOptions options) {
+        AmazonKinesis client = new AmazonKinesisClient(
+                new StaticCredentialsProvider(
+                        new BasicAWSCredentials(
+                                options.getAwsAccessKey(), options.getAwsSecretKey()))
+        ).withRegion(Regions.fromName(options.getAwsKinesisRegion()));
+
+        List<List<String>> partitions = Lists.partition(data, MAX_NUMBER_OF_RECORDS_IN_BATCH);
+
+
+        for (List<String> partition : partitions) {
+            List<PutRecordsRequestEntry> allRecords = newArrayList();
+            for (String row : partition) {
+                allRecords.add(new PutRecordsRequestEntry().
+                        withData(ByteBuffer.wrap(row.getBytes(Charsets.UTF_8))).
+                        withPartitionKey(Integer.toString(row.hashCode()))
+
+                );
+            }
+
+            PutRecordsResult result;
+            do {
+                result = client.putRecords(
+                        new PutRecordsRequest().
+                                withStreamName(options.getAwsKinesisStream()).
+                                withRecords(allRecords));
+                List<PutRecordsRequestEntry> failedRecords = newArrayList();
+                int i = 0;
+                for (PutRecordsResultEntry row : result.getRecords()) {
+                    if (row.getErrorCode() != null) {
+                        failedRecords.add(allRecords.get(i));
+                    }
+                    ++i;
+                }
+                allRecords = failedRecords;
+            }
+
+            while (result.getFailedRecordCount() > 0);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
new file mode 100644
index 0000000..360106d
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import com.google.common.collect.Lists;
+
+import static org.mockito.BDDMockito.given;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import java.util.Collections;
+import java.util.List;
+
+
+/***
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class RecordFilterTest {
+    @Mock
+    private ShardCheckpoint checkpoint;
+    @Mock
+    private KinesisRecord record1, record2, record3, record4, record5;
+
+    @Test
+    public void shouldFilterOutRecordsBeforeOrAtCheckpoint() {
+        given(checkpoint.isBeforeOrAt(record1)).willReturn(false);
+        given(checkpoint.isBeforeOrAt(record2)).willReturn(true);
+        given(checkpoint.isBeforeOrAt(record3)).willReturn(true);
+        given(checkpoint.isBeforeOrAt(record4)).willReturn(false);
+        given(checkpoint.isBeforeOrAt(record5)).willReturn(true);
+        List<KinesisRecord> records = Lists.newArrayList(record1, record2,
+                record3, record4, record5);
+        RecordFilter underTest = new RecordFilter();
+
+        List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint);
+
+        Assertions.assertThat(retainedRecords).containsOnly(record2, record3, record5);
+    }
+
+    @Test
+    public void shouldNotFailOnEmptyList() {
+        List<KinesisRecord> records = Collections.emptyList();
+        RecordFilter underTest = new RecordFilter();
+
+        List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint);
+
+        Assertions.assertThat(retainedRecords).isEmpty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
new file mode 100644
index 0000000..a508ddf
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.google.common.collect.Lists.newArrayList;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import org.junit.Test;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Created by ppastuszka on 12.12.15.
+ */
+public class RoundRobinTest {
+    @Test(expected = IllegalArgumentException.class)
+    public void doesNotAllowCreationWithEmptyCollection() {
+        new RoundRobin<>(Collections.emptyList());
+    }
+
+    @Test
+    public void goesThroughElementsInCycle() {
+        List<String> input = newArrayList("a", "b", "c");
+
+        RoundRobin<String> roundRobin = new RoundRobin<>(newArrayList(input));
+
+        input.addAll(input);  // duplicate the input
+        for (String element : input) {
+            assertThat(roundRobin.getCurrent()).isEqualTo(element);
+            assertThat(roundRobin.getCurrent()).isEqualTo(element);
+            roundRobin.moveForward();
+        }
+    }
+
+    @Test
+    public void usualIteratorGoesThroughElementsOnce() {
+        List<String> input = newArrayList("a", "b", "c");
+
+        RoundRobin<String> roundRobin = new RoundRobin<>(input);
+        assertThat(roundRobin).hasSize(3).containsOnly(input.toArray(new String[0]));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
new file mode 100644
index 0000000..2227cef
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream.LATEST;
+import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream.TRIM_HORIZON;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
+import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import java.io.IOException;
+
+/**
+ *
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class ShardCheckpointTest {
+    private static final String AT_SEQUENCE_SHARD_IT = "AT_SEQUENCE_SHARD_IT";
+    private static final String AFTER_SEQUENCE_SHARD_IT = "AFTER_SEQUENCE_SHARD_IT";
+    private static final String STREAM_NAME = "STREAM";
+    private static final String SHARD_ID = "SHARD_ID";
+    @Mock
+    private SimplifiedKinesisClient client;
+
+    @Before
+    public void setUp() throws IOException, TransientKinesisException {
+        when(client.getShardIterator(
+                eq(STREAM_NAME), eq(SHARD_ID), eq(AT_SEQUENCE_NUMBER),
+                anyString(), isNull(Instant.class))).
+                thenReturn(AT_SEQUENCE_SHARD_IT);
+        when(client.getShardIterator(
+                eq(STREAM_NAME), eq(SHARD_ID), eq(AFTER_SEQUENCE_NUMBER),
+                anyString(), isNull(Instant.class))).
+                thenReturn(AFTER_SEQUENCE_SHARD_IT);
+    }
+
+    @Test
+    public void testProvidingShardIterator() throws IOException, TransientKinesisException {
+        assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", null).getShardIterator(client))
+                .isEqualTo(AT_SEQUENCE_SHARD_IT);
+        assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", null).getShardIterator(client))
+                .isEqualTo(AFTER_SEQUENCE_SHARD_IT);
+        assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client)).isEqualTo
+                (AT_SEQUENCE_SHARD_IT);
+        assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client))
+                .isEqualTo(AT_SEQUENCE_SHARD_IT);
+    }
+
+    @Test
+    public void testComparisonWithExtendedSequenceNumber() {
+        assertThat(new ShardCheckpoint("", "", new StartingPoint(LATEST)).isBeforeOrAt(
+                recordWith(new ExtendedSequenceNumber("100", 0L))
+        )).isTrue();
+
+        assertThat(new ShardCheckpoint("", "", new StartingPoint(TRIM_HORIZON)).isBeforeOrAt(
+                recordWith(new ExtendedSequenceNumber("100", 0L))
+        )).isTrue();
+
+        assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "10", 1L).isBeforeOrAt(
+                recordWith(new ExtendedSequenceNumber("100", 0L))
+        )).isTrue();
+
+        assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
+                recordWith(new ExtendedSequenceNumber("100", 0L))
+        )).isTrue();
+
+        assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
+                recordWith(new ExtendedSequenceNumber("100", 0L))
+        )).isFalse();
+
+        assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 1L).isBeforeOrAt(
+                recordWith(new ExtendedSequenceNumber("100", 0L))
+        )).isFalse();
+
+        assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
+                recordWith(new ExtendedSequenceNumber("99", 1L))
+        )).isFalse();
+    }
+
+    @Test
+    public void testComparisonWithTimestamp() {
+        DateTime referenceTimestamp = DateTime.now();
+
+        assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
+                .isBeforeOrAt(recordWith(referenceTimestamp.minusMillis(10).toInstant()))
+        ).isFalse();
+
+        assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
+                .isBeforeOrAt(recordWith(referenceTimestamp.toInstant()))
+        ).isTrue();
+
+        assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
+                .isBeforeOrAt(recordWith(referenceTimestamp.plusMillis(10).toInstant()))
+        ).isTrue();
+    }
+
+    private KinesisRecord recordWith(ExtendedSequenceNumber extendedSequenceNumber) {
+        KinesisRecord record = mock(KinesisRecord.class);
+        given(record.getExtendedSequenceNumber()).willReturn(extendedSequenceNumber);
+        return record;
+    }
+
+    private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, String sequenceNumber,
+                                       Long subSequenceNumber) {
+        return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, sequenceNumber,
+                subSequenceNumber);
+    }
+
+    private KinesisRecord recordWith(Instant approximateArrivalTimestamp) {
+        KinesisRecord record = mock(KinesisRecord.class);
+        given(record.getApproximateArrivalTimestamp()).willReturn(approximateArrivalTimestamp);
+        return record;
+    }
+
+    private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, Instant timestamp) {
+        return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, timestamp);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
new file mode 100644
index 0000000..e2a3ccc
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Mockito.when;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import java.io.IOException;
+import java.util.Collections;
+
+/**
+ * Created by ppastuszka on 12.12.15.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class ShardRecordsIteratorTest {
+    private static final String INITIAL_ITERATOR = "INITIAL_ITERATOR";
+    private static final String SECOND_ITERATOR = "SECOND_ITERATOR";
+    private static final String SECOND_REFRESHED_ITERATOR = "SECOND_REFRESHED_ITERATOR";
+    private static final String THIRD_ITERATOR = "THIRD_ITERATOR";
+    private static final String STREAM_NAME = "STREAM_NAME";
+    private static final String SHARD_ID = "SHARD_ID";
+
+    @Mock
+    private SimplifiedKinesisClient kinesisClient;
+    @Mock
+    private ShardCheckpoint firstCheckpoint, aCheckpoint, bCheckpoint, cCheckpoint, dCheckpoint;
+    @Mock
+    private GetKinesisRecordsResult firstResult, secondResult, thirdResult;
+    @Mock
+    private KinesisRecord a, b, c, d;
+    @Mock
+    private RecordFilter recordFilter;
+
+    private ShardRecordsIterator iterator;
+
+    @Before
+    public void setUp() throws IOException, TransientKinesisException {
+        when(firstCheckpoint.getShardIterator(kinesisClient)).thenReturn(INITIAL_ITERATOR);
+        when(firstCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+        when(firstCheckpoint.getShardId()).thenReturn(SHARD_ID);
+
+        when(firstCheckpoint.moveAfter(a)).thenReturn(aCheckpoint);
+        when(aCheckpoint.moveAfter(b)).thenReturn(bCheckpoint);
+        when(aCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+        when(aCheckpoint.getShardId()).thenReturn(SHARD_ID);
+        when(bCheckpoint.moveAfter(c)).thenReturn(cCheckpoint);
+        when(bCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+        when(bCheckpoint.getShardId()).thenReturn(SHARD_ID);
+        when(cCheckpoint.moveAfter(d)).thenReturn(dCheckpoint);
+        when(cCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+        when(cCheckpoint.getShardId()).thenReturn(SHARD_ID);
+        when(dCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+        when(dCheckpoint.getShardId()).thenReturn(SHARD_ID);
+
+        when(kinesisClient.getRecords(INITIAL_ITERATOR, STREAM_NAME, SHARD_ID))
+                .thenReturn(firstResult);
+        when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
+                .thenReturn(secondResult);
+        when(kinesisClient.getRecords(THIRD_ITERATOR, STREAM_NAME, SHARD_ID))
+                .thenReturn(thirdResult);
+
+        when(firstResult.getNextShardIterator()).thenReturn(SECOND_ITERATOR);
+        when(secondResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
+        when(thirdResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
+
+        when(firstResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+        when(secondResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+        when(thirdResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+
+        when(recordFilter.apply(anyListOf(KinesisRecord.class), any(ShardCheckpoint
+                .class))).thenAnswer(new IdentityAnswer());
+
+        iterator = new ShardRecordsIterator(firstCheckpoint, kinesisClient, recordFilter);
+    }
+
+    @Test
+    public void returnsAbsentIfNoRecordsPresent() throws IOException, TransientKinesisException {
+        assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+        assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+        assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+    }
+
+    @Test
+    public void goesThroughAvailableRecords() throws IOException, TransientKinesisException {
+        when(firstResult.getRecords()).thenReturn(asList(a, b, c));
+        when(secondResult.getRecords()).thenReturn(singletonList(d));
+
+        assertThat(iterator.getCheckpoint()).isEqualTo(firstCheckpoint);
+        assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
+        assertThat(iterator.getCheckpoint()).isEqualTo(aCheckpoint);
+        assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
+        assertThat(iterator.getCheckpoint()).isEqualTo(bCheckpoint);
+        assertThat(iterator.next()).isEqualTo(CustomOptional.of(c));
+        assertThat(iterator.getCheckpoint()).isEqualTo(cCheckpoint);
+        assertThat(iterator.next()).isEqualTo(CustomOptional.of(d));
+        assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
+        assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+        assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
+    }
+
+    @Test
+    public void refreshesExpiredIterator() throws IOException, TransientKinesisException {
+        when(firstResult.getRecords()).thenReturn(singletonList(a));
+        when(secondResult.getRecords()).thenReturn(singletonList(b));
+
+        when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
+                .thenThrow(ExpiredIteratorException.class);
+        when(aCheckpoint.getShardIterator(kinesisClient))
+                .thenReturn(SECOND_REFRESHED_ITERATOR);
+        when(kinesisClient.getRecords(SECOND_REFRESHED_ITERATOR, STREAM_NAME, SHARD_ID))
+                .thenReturn(secondResult);
+
+        assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
+        assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
+        assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+    }
+
+    private static class IdentityAnswer implements Answer<Object> {
+        @Override
+        public Object answer(InvocationOnMock invocation) throws Throwable {
+            return invocation.getArguments()[0];
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
new file mode 100644
index 0000000..44d29d6
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.AmazonServiceException.ErrorType;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import com.amazonaws.services.kinesis.model.StreamDescription;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.reset;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import java.util.List;
+
+/***
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class SimplifiedKinesisClientTest {
+    private static final String STREAM = "stream";
+    private static final String SHARD_1 = "shard-01";
+    private static final String SHARD_2 = "shard-02";
+    private static final String SHARD_3 = "shard-03";
+    private static final String SHARD_ITERATOR = "iterator";
+    private static final String SEQUENCE_NUMBER = "abc123";
+
+    @Mock
+    private AmazonKinesis kinesis;
+    @InjectMocks
+    private SimplifiedKinesisClient underTest;
+
+    @Test
+    public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception {
+        given(kinesis.getShardIterator(new GetShardIteratorRequest()
+                .withStreamName(STREAM)
+                .withShardId(SHARD_1)
+                .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
+                .withStartingSequenceNumber(SEQUENCE_NUMBER)
+        )).willReturn(new GetShardIteratorResult()
+                .withShardIterator(SHARD_ITERATOR));
+
+        String stream = underTest.getShardIterator(STREAM, SHARD_1,
+                ShardIteratorType.AT_SEQUENCE_NUMBER, SEQUENCE_NUMBER, null);
+
+        assertThat(stream).isEqualTo(SHARD_ITERATOR);
+    }
+
+    @Test
+    public void shouldReturnIteratorStartingWithTimestamp() throws Exception {
+        Instant timestamp = Instant.now();
+        given(kinesis.getShardIterator(new GetShardIteratorRequest()
+                .withStreamName(STREAM)
+                .withShardId(SHARD_1)
+                .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
+                .withTimestamp(timestamp.toDate())
+        )).willReturn(new GetShardIteratorResult()
+                .withShardIterator(SHARD_ITERATOR));
+
+        String stream = underTest.getShardIterator(STREAM, SHARD_1,
+                ShardIteratorType.AT_SEQUENCE_NUMBER, null, timestamp);
+
+        assertThat(stream).isEqualTo(SHARD_ITERATOR);
+    }
+
+    @Test
+    public void shouldHandleExpiredIterationExceptionForGetShardIterator() {
+        shouldHandleGetShardIteratorError(new ExpiredIteratorException(""),
+                ExpiredIteratorException.class);
+    }
+
+    @Test
+    public void shouldHandleLimitExceededExceptionForGetShardIterator() {
+        shouldHandleGetShardIteratorError(new LimitExceededException(""),
+                TransientKinesisException.class);
+    }
+
+    @Test
+    public void shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator() {
+        shouldHandleGetShardIteratorError(new ProvisionedThroughputExceededException(""),
+                TransientKinesisException.class);
+    }
+
+    @Test
+    public void shouldHandleServiceErrorForGetShardIterator() {
+        shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Service),
+                TransientKinesisException.class);
+    }
+
+    @Test
+    public void shouldHandleClientErrorForGetShardIterator() {
+        shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Client),
+                RuntimeException.class);
+    }
+
+    @Test
+    public void shouldHandleUnexpectedExceptionForGetShardIterator() {
+        shouldHandleGetShardIteratorError(new NullPointerException(),
+                RuntimeException.class);
+    }
+
+    private void shouldHandleGetShardIteratorError(
+            Exception thrownException,
+            Class<? extends Exception> expectedExceptionClass) {
+        GetShardIteratorRequest request = new GetShardIteratorRequest()
+                .withStreamName(STREAM)
+                .withShardId(SHARD_1)
+                .withShardIteratorType(ShardIteratorType.LATEST);
+
+        given(kinesis.getShardIterator(request)).willThrow(thrownException);
+
+        try {
+            underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.LATEST, null, null);
+            failBecauseExceptionWasNotThrown(expectedExceptionClass);
+        } catch (Exception e) {
+            assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
+        } finally {
+            reset(kinesis);
+        }
+    }
+
+    @Test
+    public void shouldListAllShards() throws Exception {
+        Shard shard1 = new Shard().withShardId(SHARD_1);
+        Shard shard2 = new Shard().withShardId(SHARD_2);
+        Shard shard3 = new Shard().withShardId(SHARD_3);
+        given(kinesis.describeStream(STREAM, null)).willReturn(new DescribeStreamResult()
+                .withStreamDescription(new StreamDescription()
+                        .withShards(shard1, shard2)
+                        .withHasMoreShards(true)));
+        given(kinesis.describeStream(STREAM, SHARD_2)).willReturn(new DescribeStreamResult()
+                .withStreamDescription(new StreamDescription()
+                        .withShards(shard3)
+                        .withHasMoreShards(false)));
+
+        List<Shard> shards = underTest.listShards(STREAM);
+
+        assertThat(shards).containsOnly(shard1, shard2, shard3);
+    }
+
+    @Test
+    public void shouldHandleExpiredIterationExceptionForShardListing() {
+        shouldHandleShardListingError(new ExpiredIteratorException(""),
+                ExpiredIteratorException.class);
+    }
+
+    @Test
+    public void shouldHandleLimitExceededExceptionForShardListing() {
+        shouldHandleShardListingError(new LimitExceededException(""),
+                TransientKinesisException.class);
+    }
+
+    @Test
+    public void shouldHandleProvisionedThroughputExceededExceptionForShardListing() {
+        shouldHandleShardListingError(new ProvisionedThroughputExceededException(""),
+                TransientKinesisException.class);
+    }
+
+    @Test
+    public void shouldHandleServiceErrorForShardListing() {
+        shouldHandleShardListingError(newAmazonServiceException(ErrorType.Service),
+                TransientKinesisException.class);
+    }
+
+    @Test
+    public void shouldHandleClientErrorForShardListing() {
+        shouldHandleShardListingError(newAmazonServiceException(ErrorType.Client),
+                RuntimeException.class);
+    }
+
+    @Test
+    public void shouldHandleUnexpectedExceptionForShardListing() {
+        shouldHandleShardListingError(new NullPointerException(),
+                RuntimeException.class);
+    }
+
+    private void shouldHandleShardListingError(
+            Exception thrownException,
+            Class<? extends Exception> expectedExceptionClass) {
+        given(kinesis.describeStream(STREAM, null)).willThrow(thrownException);
+        try {
+            underTest.listShards(STREAM);
+            failBecauseExceptionWasNotThrown(expectedExceptionClass);
+        } catch (Exception e) {
+            assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
+        } finally {
+            reset(kinesis);
+        }
+    }
+
+    private AmazonServiceException newAmazonServiceException(ErrorType errorType) {
+        AmazonServiceException exception = new AmazonServiceException("");
+        exception.setErrorType(errorType);
+        return exception;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java
new file mode 100644
index 0000000..44dbf4a
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transforms for reading and writing from Amazon Kinesis.
+ */
+package org.apache.beam.sdk.io.kinesis;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 4198499..6cbd615 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -37,6 +37,7 @@
     <module>hdfs</module>
     <module>jms</module>
     <module>kafka</module>
+    <module>kinesis</module>
   </modules>
 
 </project>