You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/26 23:12:25 UTC
[3/5] incubator-beam git commit: kinesis: a connector for Amazon
Kinesis
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
new file mode 100644
index 0000000..7ca8e0b
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.collect.Lists.transform;
+import com.google.common.base.Function;
+
+import com.amazonaws.AmazonWebServiceRequest;
+import com.amazonaws.ResponseMetadata;
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.model.AddTagsToStreamRequest;
+import com.amazonaws.services.kinesis.model.AddTagsToStreamResult;
+import com.amazonaws.services.kinesis.model.CreateStreamRequest;
+import com.amazonaws.services.kinesis.model.CreateStreamResult;
+import com.amazonaws.services.kinesis.model.DecreaseStreamRetentionPeriodRequest;
+import com.amazonaws.services.kinesis.model.DecreaseStreamRetentionPeriodResult;
+import com.amazonaws.services.kinesis.model.DeleteStreamRequest;
+import com.amazonaws.services.kinesis.model.DeleteStreamResult;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.DisableEnhancedMonitoringRequest;
+import com.amazonaws.services.kinesis.model.DisableEnhancedMonitoringResult;
+import com.amazonaws.services.kinesis.model.EnableEnhancedMonitoringRequest;
+import com.amazonaws.services.kinesis.model.EnableEnhancedMonitoringResult;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import com.amazonaws.services.kinesis.model.IncreaseStreamRetentionPeriodRequest;
+import com.amazonaws.services.kinesis.model.IncreaseStreamRetentionPeriodResult;
+import com.amazonaws.services.kinesis.model.ListStreamsRequest;
+import com.amazonaws.services.kinesis.model.ListStreamsResult;
+import com.amazonaws.services.kinesis.model.ListTagsForStreamRequest;
+import com.amazonaws.services.kinesis.model.ListTagsForStreamResult;
+import com.amazonaws.services.kinesis.model.MergeShardsRequest;
+import com.amazonaws.services.kinesis.model.MergeShardsResult;
+import com.amazonaws.services.kinesis.model.PutRecordRequest;
+import com.amazonaws.services.kinesis.model.PutRecordResult;
+import com.amazonaws.services.kinesis.model.PutRecordsRequest;
+import com.amazonaws.services.kinesis.model.PutRecordsResult;
+import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.RemoveTagsFromStreamRequest;
+import com.amazonaws.services.kinesis.model.RemoveTagsFromStreamResult;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import com.amazonaws.services.kinesis.model.SplitShardRequest;
+import com.amazonaws.services.kinesis.model.SplitShardResult;
+import com.amazonaws.services.kinesis.model.StreamDescription;
+import static org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode;
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.joda.time.Instant;
+import static java.lang.Integer.parseInt;
+import static java.lang.Math.min;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/**
+ * Created by p.pastuszka on 21.07.2016.
+ */
+class AmazonKinesisMock implements AmazonKinesis {
+
+ static class TestData implements Serializable {
+ private final String data;
+ private final Instant arrivalTimestamp;
+ private final String sequenceNumber;
+
+ public TestData(KinesisRecord record) {
+ this(new String(record.getData().array()),
+ record.getApproximateArrivalTimestamp(),
+ record.getSequenceNumber());
+ }
+
+ public TestData(String data, Instant arrivalTimestamp, String sequenceNumber) {
+ this.data = data;
+ this.arrivalTimestamp = arrivalTimestamp;
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ public Record convertToRecord() {
+ return new Record().
+ withApproximateArrivalTimestamp(arrivalTimestamp.toDate()).
+ withData(ByteBuffer.wrap(data.getBytes())).
+ withSequenceNumber(sequenceNumber).
+ withPartitionKey("");
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return EqualsBuilder.reflectionEquals(this, obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return reflectionHashCode(this);
+ }
+ }
+
+ static class Provider implements KinesisClientProvider {
+
+ private final List<List<TestData>> shardedData;
+ private final int numberOfRecordsPerGet;
+
+ public Provider(List<List<TestData>> shardedData, int numberOfRecordsPerGet) {
+ this.shardedData = shardedData;
+ this.numberOfRecordsPerGet = numberOfRecordsPerGet;
+ }
+
+ @Override
+ public AmazonKinesis get() {
+ return new AmazonKinesisMock(transform(shardedData,
+ new Function<List<TestData>, List<Record>>() {
+ @Override
+ public List<Record> apply(@Nullable List<TestData> testDatas) {
+ return transform(testDatas, new Function<TestData, Record>() {
+ @Override
+ public Record apply(@Nullable TestData testData) {
+ return testData.convertToRecord();
+ }
+ });
+ }
+ }), numberOfRecordsPerGet);
+ }
+ }
+
+ private final List<List<Record>> shardedData;
+ private final int numberOfRecordsPerGet;
+
+ public AmazonKinesisMock(List<List<Record>> shardedData, int numberOfRecordsPerGet) {
+ this.shardedData = shardedData;
+ this.numberOfRecordsPerGet = numberOfRecordsPerGet;
+ }
+
+ @Override
+ public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) {
+ String[] shardIteratorParts = getRecordsRequest.getShardIterator().split(":");
+ int shardId = parseInt(shardIteratorParts[0]);
+ int startingRecord = parseInt(shardIteratorParts[1]);
+ List<Record> shardData = shardedData.get(shardId);
+
+ int toIndex = min(startingRecord + numberOfRecordsPerGet, shardData.size());
+ int fromIndex = min(startingRecord, toIndex);
+ return new GetRecordsResult().
+ withRecords(shardData.subList(fromIndex, toIndex)).
+ withNextShardIterator(String.format("%s:%s", shardId, toIndex));
+ }
+
+ @Override
+ public GetShardIteratorResult getShardIterator(
+ GetShardIteratorRequest getShardIteratorRequest) {
+ ShardIteratorType shardIteratorType = ShardIteratorType.fromValue(
+ getShardIteratorRequest.getShardIteratorType());
+
+ String shardIterator;
+ if (shardIteratorType == ShardIteratorType.TRIM_HORIZON) {
+ shardIterator = String.format("%s:%s", getShardIteratorRequest.getShardId(), 0);
+ } else {
+ throw new RuntimeException("Not implemented");
+ }
+
+ return new GetShardIteratorResult().withShardIterator(shardIterator);
+ }
+
+ @Override
+ public DescribeStreamResult describeStream(String streamName, String exclusiveStartShardId) {
+ int nextShardId = 0;
+ if (exclusiveStartShardId != null) {
+ nextShardId = parseInt(exclusiveStartShardId) + 1;
+ }
+ boolean hasMoreShards = nextShardId + 1 < shardedData.size();
+
+ List<Shard> shards = newArrayList();
+ if (nextShardId < shardedData.size()) {
+ shards.add(new Shard().withShardId(Integer.toString(nextShardId)));
+ }
+
+ return new DescribeStreamResult().withStreamDescription(
+ new StreamDescription().withHasMoreShards(hasMoreShards).withShards(shards)
+ );
+ }
+
+ @Override
+ public void setEndpoint(String endpoint) {
+
+ }
+
+ @Override
+ public void setRegion(Region region) {
+
+ }
+
+ @Override
+ public AddTagsToStreamResult addTagsToStream(AddTagsToStreamRequest addTagsToStreamRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public CreateStreamResult createStream(CreateStreamRequest createStreamRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public CreateStreamResult createStream(String streamName, Integer shardCount) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public DecreaseStreamRetentionPeriodResult decreaseStreamRetentionPeriod(
+ DecreaseStreamRetentionPeriodRequest decreaseStreamRetentionPeriodRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public DeleteStreamResult deleteStream(DeleteStreamRequest deleteStreamRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public DeleteStreamResult deleteStream(String streamName) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public DescribeStreamResult describeStream(DescribeStreamRequest describeStreamRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public DescribeStreamResult describeStream(String streamName) {
+
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public DescribeStreamResult describeStream(String streamName,
+ Integer limit, String exclusiveStartShardId) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public DisableEnhancedMonitoringResult disableEnhancedMonitoring(
+ DisableEnhancedMonitoringRequest disableEnhancedMonitoringRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public EnableEnhancedMonitoringResult enableEnhancedMonitoring(
+ EnableEnhancedMonitoringRequest enableEnhancedMonitoringRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public GetShardIteratorResult getShardIterator(String streamName,
+ String shardId,
+ String shardIteratorType) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public GetShardIteratorResult getShardIterator(String streamName,
+ String shardId,
+ String shardIteratorType,
+ String startingSequenceNumber) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public IncreaseStreamRetentionPeriodResult increaseStreamRetentionPeriod(
+ IncreaseStreamRetentionPeriodRequest increaseStreamRetentionPeriodRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public ListStreamsResult listStreams(ListStreamsRequest listStreamsRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public ListStreamsResult listStreams() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public ListStreamsResult listStreams(String exclusiveStartStreamName) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public ListStreamsResult listStreams(Integer limit, String exclusiveStartStreamName) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public ListTagsForStreamResult listTagsForStream(
+ ListTagsForStreamRequest listTagsForStreamRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public MergeShardsResult mergeShards(MergeShardsRequest mergeShardsRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public MergeShardsResult mergeShards(String streamName,
+ String shardToMerge, String adjacentShardToMerge) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public PutRecordResult putRecord(PutRecordRequest putRecordRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public PutRecordResult putRecord(String streamName, ByteBuffer data, String partitionKey) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public PutRecordResult putRecord(String streamName, ByteBuffer data,
+ String partitionKey, String sequenceNumberForOrdering) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public RemoveTagsFromStreamResult removeTagsFromStream(
+ RemoveTagsFromStreamRequest removeTagsFromStreamRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public SplitShardResult splitShard(SplitShardRequest splitShardRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public SplitShardResult splitShard(String streamName,
+ String shardToSplit, String newStartingHashKey) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public void shutdown() {
+
+ }
+
+ @Override
+ public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) {
+ throw new RuntimeException("Not implemented");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
new file mode 100644
index 0000000..152fd6d
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import org.junit.Test;
+import java.util.NoSuchElementException;
+
+/**
+ * Created by ppastuszka on 12.12.15.
+ */
+public class CustomOptionalTest {
+ @Test(expected = NoSuchElementException.class)
+ public void absentThrowsNoSuchElementExceptionOnGet() {
+ CustomOptional.absent().get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
new file mode 100644
index 0000000..a9e5a69
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import com.amazonaws.services.kinesis.model.Shard;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.BDDMockito.given;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import static java.util.Arrays.asList;
+
+
+/***
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class DynamicCheckpointGeneratorTest {
+
+ @Mock
+ private SimplifiedKinesisClient kinesisClient;
+ @Mock
+ private Shard shard1, shard2, shard3;
+
+ @Test
+ public void shouldMapAllShardsToCheckpoints() throws Exception {
+ given(shard1.getShardId()).willReturn("shard-01");
+ given(shard2.getShardId()).willReturn("shard-02");
+ given(shard3.getShardId()).willReturn("shard-03");
+ given(kinesisClient.listShards("stream")).willReturn(asList(shard1, shard2, shard3));
+
+ StartingPoint startingPoint = new StartingPoint(InitialPositionInStream.LATEST);
+ DynamicCheckpointGenerator underTest = new DynamicCheckpointGenerator("stream",
+ startingPoint);
+
+ KinesisReaderCheckpoint checkpoint = underTest.generate(kinesisClient);
+
+ assertThat(checkpoint).hasSize(3);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
new file mode 100644
index 0000000..61a858f
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import static com.google.common.collect.Lists.newArrayList;
+
+import com.google.common.collect.Iterables;
+
+import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
+import org.joda.time.DateTime;
+import org.junit.Test;
+import java.util.List;
+
+/**
+ * Created by p.pastuszka on 22.07.2016.
+ */
+public class KinesisMockReadTest {
+ @Test
+ public void readsDataFromMockKinesis() {
+ int noOfShards = 3;
+ int noOfEventsPerShard = 100;
+ List<List<AmazonKinesisMock.TestData>> testData =
+ provideTestData(noOfShards, noOfEventsPerShard);
+
+ final Pipeline p = TestPipeline.create();
+ PCollection<AmazonKinesisMock.TestData> result = p.
+ apply(
+ KinesisIO.Read.
+ from("stream", InitialPositionInStream.TRIM_HORIZON).
+ using(new AmazonKinesisMock.Provider(testData, 10)).
+ withMaxNumRecords(noOfShards * noOfEventsPerShard)).
+ apply(ParDo.of(new KinesisRecordToTestData()));
+ PAssert.that(result).containsInAnyOrder(Iterables.concat(testData));
+ p.run();
+ }
+
+ private static class KinesisRecordToTestData extends
+ DoFn<KinesisRecord, AmazonKinesisMock.TestData> {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ c.output(new AmazonKinesisMock.TestData(c.element()));
+ }
+ }
+
+ private List<List<AmazonKinesisMock.TestData>> provideTestData(
+ int noOfShards,
+ int noOfEventsPerShard) {
+
+ int seqNumber = 0;
+
+ List<List<AmazonKinesisMock.TestData>> shardedData = newArrayList();
+ for (int i = 0; i < noOfShards; ++i) {
+ List<AmazonKinesisMock.TestData> shardData = newArrayList();
+ shardedData.add(shardData);
+
+ DateTime arrival = DateTime.now();
+ for (int j = 0; j < noOfEventsPerShard; ++j) {
+ arrival = arrival.plusSeconds(1);
+
+ seqNumber++;
+ shardData.add(new AmazonKinesisMock.TestData(
+ Integer.toString(seqNumber),
+ arrival.toInstant(),
+ Integer.toString(seqNumber))
+ );
+ }
+ }
+
+ return shardedData;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
new file mode 100644
index 0000000..205f050
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+
+import com.google.common.collect.Iterables;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import static java.util.Arrays.asList;
+import java.util.Iterator;
+import java.util.List;
+
+/***
+ *
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class KinesisReaderCheckpointTest {
+ @Mock
+ private ShardCheckpoint a, b, c;
+
+ private KinesisReaderCheckpoint checkpoint;
+
+ @Before
+ public void setUp() {
+ checkpoint = new KinesisReaderCheckpoint(asList(a, b, c));
+ }
+
+ @Test
+ public void splitsCheckpointAccordingly() {
+ verifySplitInto(1);
+ verifySplitInto(2);
+ verifySplitInto(3);
+ verifySplitInto(4);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void isImmutable() {
+ Iterator<ShardCheckpoint> iterator = checkpoint.iterator();
+ iterator.remove();
+ }
+
+ private void verifySplitInto(int size) {
+ List<KinesisReaderCheckpoint> split = checkpoint.splitInto(size);
+ assertThat(Iterables.concat(split)).containsOnly(a, b, c);
+ assertThat(split).hasSize(Math.min(size, 3));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
new file mode 100644
index 0000000..fbc7c66
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Lists.newArrayList;
+
+import com.amazonaws.regions.Regions;
+import static org.assertj.core.api.Assertions.assertThat;
+import org.apache.commons.lang.RandomStringUtils;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Ignore;
+import org.junit.Test;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Integration test, that reads from the real Kinesis.
+ * You need to provide all {@link KinesisTestOptions} in order to run this.
+ */
+public class KinesisReaderIT {
+ private static final long PIPELINE_STARTUP_TIME = TimeUnit.SECONDS.toMillis(10);
+ private ExecutorService singleThreadExecutor = newSingleThreadExecutor();
+
+
+ @Ignore
+ @Test
+ public void readsDataFromRealKinesisStream()
+ throws IOException, InterruptedException, ExecutionException {
+ KinesisTestOptions options = readKinesisOptions();
+ List<String> testData = prepareTestData(1000);
+
+ Future<?> future = startTestPipeline(testData, options);
+ KinesisUploader.uploadAll(testData, options);
+ future.get();
+ }
+
+ private List<String> prepareTestData(int count) {
+ List<String> data = newArrayList();
+ for (int i = 0; i < count; ++i) {
+ data.add(RandomStringUtils.randomAlphabetic(32));
+ }
+ return data;
+ }
+
+ private Future<?> startTestPipeline(List<String> testData, KinesisTestOptions options)
+ throws InterruptedException {
+ final Pipeline p = TestPipeline.create();
+ PCollection<String> result = p.
+ apply(KinesisIO.Read.
+ from(options.getAwsKinesisStream(), Instant.now()).
+ using(options.getAwsAccessKey(), options.getAwsSecretKey(),
+ Regions.fromName(options.getAwsKinesisRegion())).
+ withMaxReadTime(Duration.standardMinutes(3))
+ ).
+ apply(ParDo.of(new RecordDataToString()));
+ PAssert.that(result).containsInAnyOrder(testData);
+
+ Future<?> future = singleThreadExecutor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ PipelineResult result = p.run();
+ PipelineResult.State state = result.getState();
+ while (state != PipelineResult.State.DONE && state != PipelineResult.State.FAILED) {
+ Thread.sleep(1000);
+ state = result.getState();
+ }
+ assertThat(state).isEqualTo(PipelineResult.State.DONE);
+ return null;
+ }
+ });
+ Thread.sleep(PIPELINE_STARTUP_TIME);
+ return future;
+ }
+
+ private KinesisTestOptions readKinesisOptions() {
+ PipelineOptionsFactory.register(KinesisTestOptions.class);
+ return TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
+ }
+
+ private static class RecordDataToString extends DoFn<KinesisRecord, String> {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ checkNotNull(c.element(), "Null record given");
+ c.output(new String(c.element().getData().array(), StandardCharsets.UTF_8));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
new file mode 100644
index 0000000..793fb57
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import static java.util.Arrays.asList;
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * Created by ppastuszka on 12.12.15.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class KinesisReaderTest {
+ @Mock
+ private SimplifiedKinesisClient kinesis;
+ @Mock
+ private CheckpointGenerator generator;
+ @Mock
+ private ShardCheckpoint firstCheckpoint, secondCheckpoint;
+ @Mock
+ private ShardRecordsIterator firstIterator, secondIterator;
+ @Mock
+ private KinesisRecord a, b, c, d;
+
+ private KinesisReader reader;
+
+ @Before
+ public void setUp() throws IOException, TransientKinesisException {
+ when(generator.generate(kinesis)).thenReturn(new KinesisReaderCheckpoint(
+ asList(firstCheckpoint, secondCheckpoint)
+ ));
+ when(firstCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(firstIterator);
+ when(secondCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(secondIterator);
+ when(firstIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
+ when(secondIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
+
+ reader = new KinesisReader(kinesis, generator, null);
+ }
+
+ @Test
+ public void startReturnsFalseIfNoDataAtTheBeginning() throws IOException {
+ assertThat(reader.start()).isFalse();
+ }
+
+ @Test(expected = NoSuchElementException.class)
+ public void throwsNoSuchElementExceptionIfNoData() throws IOException {
+ reader.start();
+ reader.getCurrent();
+ }
+
+ @Test
+ public void startReturnsTrueIfSomeDataAvailable() throws IOException,
+ TransientKinesisException {
+ when(firstIterator.next()).
+ thenReturn(CustomOptional.of(a)).
+ thenReturn(CustomOptional.<KinesisRecord>absent());
+
+ assertThat(reader.start()).isTrue();
+ }
+
+ @Test
+ public void advanceReturnsFalseIfThereIsTransientExceptionInKinesis()
+ throws IOException, TransientKinesisException {
+ reader.start();
+
+ when(firstIterator.next()).thenThrow(TransientKinesisException.class);
+
+ assertThat(reader.advance()).isFalse();
+ }
+
+ @Test
+ public void readsThroughAllDataAvailable() throws IOException, TransientKinesisException {
+ when(firstIterator.next()).
+ thenReturn(CustomOptional.<KinesisRecord>absent()).
+ thenReturn(CustomOptional.of(a)).
+ thenReturn(CustomOptional.<KinesisRecord>absent()).
+ thenReturn(CustomOptional.of(b)).
+ thenReturn(CustomOptional.<KinesisRecord>absent());
+
+ when(secondIterator.next()).
+ thenReturn(CustomOptional.of(c)).
+ thenReturn(CustomOptional.<KinesisRecord>absent()).
+ thenReturn(CustomOptional.of(d)).
+ thenReturn(CustomOptional.<KinesisRecord>absent());
+
+ assertThat(reader.start()).isTrue();
+ assertThat(reader.getCurrent()).isEqualTo(c);
+ assertThat(reader.advance()).isTrue();
+ assertThat(reader.getCurrent()).isEqualTo(a);
+ assertThat(reader.advance()).isTrue();
+ assertThat(reader.getCurrent()).isEqualTo(d);
+ assertThat(reader.advance()).isTrue();
+ assertThat(reader.getCurrent()).isEqualTo(b);
+ assertThat(reader.advance()).isFalse();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
new file mode 100644
index 0000000..b09b7eb
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import org.apache.beam.sdk.testing.CoderProperties;
+
+import org.joda.time.Instant;
+import org.junit.Test;
+import java.nio.ByteBuffer;
+
+/**
+ * Created by p.pastuszka on 20.07.2016.
+ */
+public class KinesisRecordCoderTest {
+ @Test
+ public void encodingAndDecodingWorks() throws Exception {
+ KinesisRecord record = new KinesisRecord(
+ ByteBuffer.wrap("data".getBytes()),
+ "sequence",
+ 128L,
+ "partition",
+ Instant.now(),
+ Instant.now(),
+ "stream",
+ "shard"
+ );
+ CoderProperties.coderDecodeEncodeEqual(
+ new KinesisRecordCoder(), record
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
new file mode 100644
index 0000000..65a7605
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+
+/***
+ * Options for Kinesis integration tests.
+ */
+public interface KinesisTestOptions extends TestPipelineOptions {
+ @Description("AWS region where Kinesis stream resided")
+ @Default.String("aws-kinesis-region")
+ String getAwsKinesisRegion();
+ void setAwsKinesisRegion(String value);
+
+ @Description("Kinesis stream name")
+ @Default.String("aws-kinesis-stream")
+ String getAwsKinesisStream();
+ void setAwsKinesisStream(String value);
+
+ @Description("AWS secret key")
+ @Default.String("aws-secret-key")
+ String getAwsSecretKey();
+ void setAwsSecretKey(String value);
+
+ @Description("AWS access key")
+ @Default.String("aws-access-key")
+ String getAwsAccessKey();
+ void setAwsAccessKey(String value);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
new file mode 100644
index 0000000..0dcede9
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.google.common.collect.Lists.newArrayList;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.internal.StaticCredentialsProvider;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.PutRecordsRequest;
+import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
+import com.amazonaws.services.kinesis.model.PutRecordsResult;
+import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/***
+ * Sends records to Kinesis in reliable way.
+ */
+public class KinesisUploader {
+
+ public static final int MAX_NUMBER_OF_RECORDS_IN_BATCH = 499;
+
+ public static void uploadAll(List<String> data, KinesisTestOptions options) {
+ AmazonKinesis client = new AmazonKinesisClient(
+ new StaticCredentialsProvider(
+ new BasicAWSCredentials(
+ options.getAwsAccessKey(), options.getAwsSecretKey()))
+ ).withRegion(Regions.fromName(options.getAwsKinesisRegion()));
+
+ List<List<String>> partitions = Lists.partition(data, MAX_NUMBER_OF_RECORDS_IN_BATCH);
+
+
+ for (List<String> partition : partitions) {
+ List<PutRecordsRequestEntry> allRecords = newArrayList();
+ for (String row : partition) {
+ allRecords.add(new PutRecordsRequestEntry().
+ withData(ByteBuffer.wrap(row.getBytes(Charsets.UTF_8))).
+ withPartitionKey(Integer.toString(row.hashCode()))
+
+ );
+ }
+
+ PutRecordsResult result;
+ do {
+ result = client.putRecords(
+ new PutRecordsRequest().
+ withStreamName(options.getAwsKinesisStream()).
+ withRecords(allRecords));
+ List<PutRecordsRequestEntry> failedRecords = newArrayList();
+ int i = 0;
+ for (PutRecordsResultEntry row : result.getRecords()) {
+ if (row.getErrorCode() != null) {
+ failedRecords.add(allRecords.get(i));
+ }
+ ++i;
+ }
+ allRecords = failedRecords;
+ }
+
+ while (result.getFailedRecordCount() > 0);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
new file mode 100644
index 0000000..360106d
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import com.google.common.collect.Lists;
+
+import static org.mockito.BDDMockito.given;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import java.util.Collections;
+import java.util.List;
+
+
+/***
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class RecordFilterTest {
+ @Mock
+ private ShardCheckpoint checkpoint;
+ @Mock
+ private KinesisRecord record1, record2, record3, record4, record5;
+
+ @Test
+ public void shouldFilterOutRecordsBeforeOrAtCheckpoint() {
+ given(checkpoint.isBeforeOrAt(record1)).willReturn(false);
+ given(checkpoint.isBeforeOrAt(record2)).willReturn(true);
+ given(checkpoint.isBeforeOrAt(record3)).willReturn(true);
+ given(checkpoint.isBeforeOrAt(record4)).willReturn(false);
+ given(checkpoint.isBeforeOrAt(record5)).willReturn(true);
+ List<KinesisRecord> records = Lists.newArrayList(record1, record2,
+ record3, record4, record5);
+ RecordFilter underTest = new RecordFilter();
+
+ List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint);
+
+ Assertions.assertThat(retainedRecords).containsOnly(record2, record3, record5);
+ }
+
+ @Test
+ public void shouldNotFailOnEmptyList() {
+ List<KinesisRecord> records = Collections.emptyList();
+ RecordFilter underTest = new RecordFilter();
+
+ List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint);
+
+ Assertions.assertThat(retainedRecords).isEmpty();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
new file mode 100644
index 0000000..a508ddf
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.google.common.collect.Lists.newArrayList;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import org.junit.Test;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Created by ppastuszka on 12.12.15.
+ */
+public class RoundRobinTest {
+ @Test(expected = IllegalArgumentException.class)
+ public void doesNotAllowCreationWithEmptyCollection() {
+ new RoundRobin<>(Collections.emptyList());
+ }
+
+ @Test
+ public void goesThroughElementsInCycle() {
+ List<String> input = newArrayList("a", "b", "c");
+
+ RoundRobin<String> roundRobin = new RoundRobin<>(newArrayList(input));
+
+ input.addAll(input); // duplicate the input
+ for (String element : input) {
+ assertThat(roundRobin.getCurrent()).isEqualTo(element);
+ assertThat(roundRobin.getCurrent()).isEqualTo(element);
+ roundRobin.moveForward();
+ }
+ }
+
+ @Test
+ public void usualIteratorGoesThroughElementsOnce() {
+ List<String> input = newArrayList("a", "b", "c");
+
+ RoundRobin<String> roundRobin = new RoundRobin<>(input);
+ assertThat(roundRobin).hasSize(3).containsOnly(input.toArray(new String[0]));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
new file mode 100644
index 0000000..2227cef
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream.LATEST;
+import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream.TRIM_HORIZON;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER;
+import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
+import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import java.io.IOException;
+
+/**
+ *
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class ShardCheckpointTest {
+ private static final String AT_SEQUENCE_SHARD_IT = "AT_SEQUENCE_SHARD_IT";
+ private static final String AFTER_SEQUENCE_SHARD_IT = "AFTER_SEQUENCE_SHARD_IT";
+ private static final String STREAM_NAME = "STREAM";
+ private static final String SHARD_ID = "SHARD_ID";
+ @Mock
+ private SimplifiedKinesisClient client;
+
+ @Before
+ public void setUp() throws IOException, TransientKinesisException {
+ when(client.getShardIterator(
+ eq(STREAM_NAME), eq(SHARD_ID), eq(AT_SEQUENCE_NUMBER),
+ anyString(), isNull(Instant.class))).
+ thenReturn(AT_SEQUENCE_SHARD_IT);
+ when(client.getShardIterator(
+ eq(STREAM_NAME), eq(SHARD_ID), eq(AFTER_SEQUENCE_NUMBER),
+ anyString(), isNull(Instant.class))).
+ thenReturn(AFTER_SEQUENCE_SHARD_IT);
+ }
+
+ @Test
+ public void testProvidingShardIterator() throws IOException, TransientKinesisException {
+ assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", null).getShardIterator(client))
+ .isEqualTo(AT_SEQUENCE_SHARD_IT);
+ assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", null).getShardIterator(client))
+ .isEqualTo(AFTER_SEQUENCE_SHARD_IT);
+ assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client)).isEqualTo
+ (AT_SEQUENCE_SHARD_IT);
+ assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client))
+ .isEqualTo(AT_SEQUENCE_SHARD_IT);
+ }
+
+ @Test
+ public void testComparisonWithExtendedSequenceNumber() {
+ assertThat(new ShardCheckpoint("", "", new StartingPoint(LATEST)).isBeforeOrAt(
+ recordWith(new ExtendedSequenceNumber("100", 0L))
+ )).isTrue();
+
+ assertThat(new ShardCheckpoint("", "", new StartingPoint(TRIM_HORIZON)).isBeforeOrAt(
+ recordWith(new ExtendedSequenceNumber("100", 0L))
+ )).isTrue();
+
+ assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "10", 1L).isBeforeOrAt(
+ recordWith(new ExtendedSequenceNumber("100", 0L))
+ )).isTrue();
+
+ assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
+ recordWith(new ExtendedSequenceNumber("100", 0L))
+ )).isTrue();
+
+ assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
+ recordWith(new ExtendedSequenceNumber("100", 0L))
+ )).isFalse();
+
+ assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 1L).isBeforeOrAt(
+ recordWith(new ExtendedSequenceNumber("100", 0L))
+ )).isFalse();
+
+ assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
+ recordWith(new ExtendedSequenceNumber("99", 1L))
+ )).isFalse();
+ }
+
+ @Test
+ public void testComparisonWithTimestamp() {
+ DateTime referenceTimestamp = DateTime.now();
+
+ assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
+ .isBeforeOrAt(recordWith(referenceTimestamp.minusMillis(10).toInstant()))
+ ).isFalse();
+
+ assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
+ .isBeforeOrAt(recordWith(referenceTimestamp.toInstant()))
+ ).isTrue();
+
+ assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
+ .isBeforeOrAt(recordWith(referenceTimestamp.plusMillis(10).toInstant()))
+ ).isTrue();
+ }
+
+ private KinesisRecord recordWith(ExtendedSequenceNumber extendedSequenceNumber) {
+ KinesisRecord record = mock(KinesisRecord.class);
+ given(record.getExtendedSequenceNumber()).willReturn(extendedSequenceNumber);
+ return record;
+ }
+
+ private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, String sequenceNumber,
+ Long subSequenceNumber) {
+ return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, sequenceNumber,
+ subSequenceNumber);
+ }
+
+ private KinesisRecord recordWith(Instant approximateArrivalTimestamp) {
+ KinesisRecord record = mock(KinesisRecord.class);
+ given(record.getApproximateArrivalTimestamp()).willReturn(approximateArrivalTimestamp);
+ return record;
+ }
+
+ private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, Instant timestamp) {
+ return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, timestamp);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
new file mode 100644
index 0000000..e2a3ccc
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Mockito.when;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import java.io.IOException;
+import java.util.Collections;
+
+/**
+ * Created by ppastuszka on 12.12.15.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class ShardRecordsIteratorTest {
+ private static final String INITIAL_ITERATOR = "INITIAL_ITERATOR";
+ private static final String SECOND_ITERATOR = "SECOND_ITERATOR";
+ private static final String SECOND_REFRESHED_ITERATOR = "SECOND_REFRESHED_ITERATOR";
+ private static final String THIRD_ITERATOR = "THIRD_ITERATOR";
+ private static final String STREAM_NAME = "STREAM_NAME";
+ private static final String SHARD_ID = "SHARD_ID";
+
+ @Mock
+ private SimplifiedKinesisClient kinesisClient;
+ @Mock
+ private ShardCheckpoint firstCheckpoint, aCheckpoint, bCheckpoint, cCheckpoint, dCheckpoint;
+ @Mock
+ private GetKinesisRecordsResult firstResult, secondResult, thirdResult;
+ @Mock
+ private KinesisRecord a, b, c, d;
+ @Mock
+ private RecordFilter recordFilter;
+
+ private ShardRecordsIterator iterator;
+
+ @Before
+ public void setUp() throws IOException, TransientKinesisException {
+ when(firstCheckpoint.getShardIterator(kinesisClient)).thenReturn(INITIAL_ITERATOR);
+ when(firstCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+ when(firstCheckpoint.getShardId()).thenReturn(SHARD_ID);
+
+ when(firstCheckpoint.moveAfter(a)).thenReturn(aCheckpoint);
+ when(aCheckpoint.moveAfter(b)).thenReturn(bCheckpoint);
+ when(aCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+ when(aCheckpoint.getShardId()).thenReturn(SHARD_ID);
+ when(bCheckpoint.moveAfter(c)).thenReturn(cCheckpoint);
+ when(bCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+ when(bCheckpoint.getShardId()).thenReturn(SHARD_ID);
+ when(cCheckpoint.moveAfter(d)).thenReturn(dCheckpoint);
+ when(cCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+ when(cCheckpoint.getShardId()).thenReturn(SHARD_ID);
+ when(dCheckpoint.getStreamName()).thenReturn(STREAM_NAME);
+ when(dCheckpoint.getShardId()).thenReturn(SHARD_ID);
+
+ when(kinesisClient.getRecords(INITIAL_ITERATOR, STREAM_NAME, SHARD_ID))
+ .thenReturn(firstResult);
+ when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
+ .thenReturn(secondResult);
+ when(kinesisClient.getRecords(THIRD_ITERATOR, STREAM_NAME, SHARD_ID))
+ .thenReturn(thirdResult);
+
+ when(firstResult.getNextShardIterator()).thenReturn(SECOND_ITERATOR);
+ when(secondResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
+ when(thirdResult.getNextShardIterator()).thenReturn(THIRD_ITERATOR);
+
+ when(firstResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+ when(secondResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+ when(thirdResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+
+ when(recordFilter.apply(anyListOf(KinesisRecord.class), any(ShardCheckpoint
+ .class))).thenAnswer(new IdentityAnswer());
+
+ iterator = new ShardRecordsIterator(firstCheckpoint, kinesisClient, recordFilter);
+ }
+
+ @Test
+ public void returnsAbsentIfNoRecordsPresent() throws IOException, TransientKinesisException {
+ assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+ assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+ assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+ }
+
+ @Test
+ public void goesThroughAvailableRecords() throws IOException, TransientKinesisException {
+ when(firstResult.getRecords()).thenReturn(asList(a, b, c));
+ when(secondResult.getRecords()).thenReturn(singletonList(d));
+
+ assertThat(iterator.getCheckpoint()).isEqualTo(firstCheckpoint);
+ assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
+ assertThat(iterator.getCheckpoint()).isEqualTo(aCheckpoint);
+ assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
+ assertThat(iterator.getCheckpoint()).isEqualTo(bCheckpoint);
+ assertThat(iterator.next()).isEqualTo(CustomOptional.of(c));
+ assertThat(iterator.getCheckpoint()).isEqualTo(cCheckpoint);
+ assertThat(iterator.next()).isEqualTo(CustomOptional.of(d));
+ assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
+ assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+ assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
+ }
+
+ @Test
+ public void refreshesExpiredIterator() throws IOException, TransientKinesisException {
+ when(firstResult.getRecords()).thenReturn(singletonList(a));
+ when(secondResult.getRecords()).thenReturn(singletonList(b));
+
+ when(kinesisClient.getRecords(SECOND_ITERATOR, STREAM_NAME, SHARD_ID))
+ .thenThrow(ExpiredIteratorException.class);
+ when(aCheckpoint.getShardIterator(kinesisClient))
+ .thenReturn(SECOND_REFRESHED_ITERATOR);
+ when(kinesisClient.getRecords(SECOND_REFRESHED_ITERATOR, STREAM_NAME, SHARD_ID))
+ .thenReturn(secondResult);
+
+ assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
+ assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
+ assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+ }
+
+ private static class IdentityAnswer implements Answer<Object> {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ return invocation.getArguments()[0];
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
new file mode 100644
index 0000000..44d29d6
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClientTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.AmazonServiceException.ErrorType;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import com.amazonaws.services.kinesis.model.StreamDescription;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.reset;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import java.util.List;
+
+/***
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class SimplifiedKinesisClientTest {
+ private static final String STREAM = "stream";
+ private static final String SHARD_1 = "shard-01";
+ private static final String SHARD_2 = "shard-02";
+ private static final String SHARD_3 = "shard-03";
+ private static final String SHARD_ITERATOR = "iterator";
+ private static final String SEQUENCE_NUMBER = "abc123";
+
+ @Mock
+ private AmazonKinesis kinesis;
+ @InjectMocks
+ private SimplifiedKinesisClient underTest;
+
+ @Test
+ public void shouldReturnIteratorStartingWithSequenceNumber() throws Exception {
+ given(kinesis.getShardIterator(new GetShardIteratorRequest()
+ .withStreamName(STREAM)
+ .withShardId(SHARD_1)
+ .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
+ .withStartingSequenceNumber(SEQUENCE_NUMBER)
+ )).willReturn(new GetShardIteratorResult()
+ .withShardIterator(SHARD_ITERATOR));
+
+ String stream = underTest.getShardIterator(STREAM, SHARD_1,
+ ShardIteratorType.AT_SEQUENCE_NUMBER, SEQUENCE_NUMBER, null);
+
+ assertThat(stream).isEqualTo(SHARD_ITERATOR);
+ }
+
+ @Test
+ public void shouldReturnIteratorStartingWithTimestamp() throws Exception {
+ Instant timestamp = Instant.now();
+ given(kinesis.getShardIterator(new GetShardIteratorRequest()
+ .withStreamName(STREAM)
+ .withShardId(SHARD_1)
+ .withShardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
+ .withTimestamp(timestamp.toDate())
+ )).willReturn(new GetShardIteratorResult()
+ .withShardIterator(SHARD_ITERATOR));
+
+ String stream = underTest.getShardIterator(STREAM, SHARD_1,
+ ShardIteratorType.AT_SEQUENCE_NUMBER, null, timestamp);
+
+ assertThat(stream).isEqualTo(SHARD_ITERATOR);
+ }
+
+ @Test
+ public void shouldHandleExpiredIterationExceptionForGetShardIterator() {
+ shouldHandleGetShardIteratorError(new ExpiredIteratorException(""),
+ ExpiredIteratorException.class);
+ }
+
+ @Test
+ public void shouldHandleLimitExceededExceptionForGetShardIterator() {
+ shouldHandleGetShardIteratorError(new LimitExceededException(""),
+ TransientKinesisException.class);
+ }
+
+ @Test
+ public void shouldHandleProvisionedThroughputExceededExceptionForGetShardIterator() {
+ shouldHandleGetShardIteratorError(new ProvisionedThroughputExceededException(""),
+ TransientKinesisException.class);
+ }
+
+ @Test
+ public void shouldHandleServiceErrorForGetShardIterator() {
+ shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Service),
+ TransientKinesisException.class);
+ }
+
+ @Test
+ public void shouldHandleClientErrorForGetShardIterator() {
+ shouldHandleGetShardIteratorError(newAmazonServiceException(ErrorType.Client),
+ RuntimeException.class);
+ }
+
+ @Test
+ public void shouldHandleUnexpectedExceptionForGetShardIterator() {
+ shouldHandleGetShardIteratorError(new NullPointerException(),
+ RuntimeException.class);
+ }
+
+ private void shouldHandleGetShardIteratorError(
+ Exception thrownException,
+ Class<? extends Exception> expectedExceptionClass) {
+ GetShardIteratorRequest request = new GetShardIteratorRequest()
+ .withStreamName(STREAM)
+ .withShardId(SHARD_1)
+ .withShardIteratorType(ShardIteratorType.LATEST);
+
+ given(kinesis.getShardIterator(request)).willThrow(thrownException);
+
+ try {
+ underTest.getShardIterator(STREAM, SHARD_1, ShardIteratorType.LATEST, null, null);
+ failBecauseExceptionWasNotThrown(expectedExceptionClass);
+ } catch (Exception e) {
+ assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
+ } finally {
+ reset(kinesis);
+ }
+ }
+
+ @Test
+ public void shouldListAllShards() throws Exception {
+ Shard shard1 = new Shard().withShardId(SHARD_1);
+ Shard shard2 = new Shard().withShardId(SHARD_2);
+ Shard shard3 = new Shard().withShardId(SHARD_3);
+ given(kinesis.describeStream(STREAM, null)).willReturn(new DescribeStreamResult()
+ .withStreamDescription(new StreamDescription()
+ .withShards(shard1, shard2)
+ .withHasMoreShards(true)));
+ given(kinesis.describeStream(STREAM, SHARD_2)).willReturn(new DescribeStreamResult()
+ .withStreamDescription(new StreamDescription()
+ .withShards(shard3)
+ .withHasMoreShards(false)));
+
+ List<Shard> shards = underTest.listShards(STREAM);
+
+ assertThat(shards).containsOnly(shard1, shard2, shard3);
+ }
+
+ @Test
+ public void shouldHandleExpiredIterationExceptionForShardListing() {
+ shouldHandleShardListingError(new ExpiredIteratorException(""),
+ ExpiredIteratorException.class);
+ }
+
+ @Test
+ public void shouldHandleLimitExceededExceptionForShardListing() {
+ shouldHandleShardListingError(new LimitExceededException(""),
+ TransientKinesisException.class);
+ }
+
+ @Test
+ public void shouldHandleProvisionedThroughputExceededExceptionForShardListing() {
+ shouldHandleShardListingError(new ProvisionedThroughputExceededException(""),
+ TransientKinesisException.class);
+ }
+
+ @Test
+ public void shouldHandleServiceErrorForShardListing() {
+ shouldHandleShardListingError(newAmazonServiceException(ErrorType.Service),
+ TransientKinesisException.class);
+ }
+
+ @Test
+ public void shouldHandleClientErrorForShardListing() {
+ shouldHandleShardListingError(newAmazonServiceException(ErrorType.Client),
+ RuntimeException.class);
+ }
+
+ @Test
+ public void shouldHandleUnexpectedExceptionForShardListing() {
+ shouldHandleShardListingError(new NullPointerException(),
+ RuntimeException.class);
+ }
+
+ private void shouldHandleShardListingError(
+ Exception thrownException,
+ Class<? extends Exception> expectedExceptionClass) {
+ given(kinesis.describeStream(STREAM, null)).willThrow(thrownException);
+ try {
+ underTest.listShards(STREAM);
+ failBecauseExceptionWasNotThrown(expectedExceptionClass);
+ } catch (Exception e) {
+ assertThat(e).isExactlyInstanceOf(expectedExceptionClass);
+ } finally {
+ reset(kinesis);
+ }
+ }
+
+ private AmazonServiceException newAmazonServiceException(ErrorType errorType) {
+ AmazonServiceException exception = new AmazonServiceException("");
+ exception.setErrorType(errorType);
+ return exception;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java
new file mode 100644
index 0000000..44dbf4a
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Transforms for reading and writing from Amazon Kinesis.
+ */
+package org.apache.beam.sdk.io.kinesis;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bed22de6/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 4198499..6cbd615 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -37,6 +37,7 @@
<module>hdfs</module>
<module>jms</module>
<module>kafka</module>
+ <module>kinesis</module>
</modules>
</project>