You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 17:09:33 UTC
[06/28] beam git commit: Revert "[BEAM-2610] This closes #3553"
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
index 80c950f..3e3984a 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/SimplifiedKinesisClient.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.kinesis;
+
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
@@ -30,11 +31,9 @@ import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamDescription;
import com.google.common.collect.Lists;
-
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
-
import org.joda.time.Instant;
/**
@@ -42,121 +41,117 @@ import org.joda.time.Instant;
* proper error handling.
*/
class SimplifiedKinesisClient {
+ private final AmazonKinesis kinesis;
- private final AmazonKinesis kinesis;
-
- public SimplifiedKinesisClient(AmazonKinesis kinesis) {
- this.kinesis = kinesis;
- }
-
- public static SimplifiedKinesisClient from(KinesisClientProvider provider) {
- return new SimplifiedKinesisClient(provider.get());
- }
-
- public String getShardIterator(final String streamName, final String shardId,
- final ShardIteratorType shardIteratorType,
- final String startingSequenceNumber, final Instant timestamp)
- throws TransientKinesisException {
- final Date date = timestamp != null ? timestamp.toDate() : null;
- return wrapExceptions(new Callable<String>() {
-
- @Override
- public String call() throws Exception {
- return kinesis.getShardIterator(new GetShardIteratorRequest()
- .withStreamName(streamName)
- .withShardId(shardId)
- .withShardIteratorType(shardIteratorType)
- .withStartingSequenceNumber(startingSequenceNumber)
- .withTimestamp(date)
- ).getShardIterator();
- }
- });
- }
-
- public List<Shard> listShards(final String streamName) throws TransientKinesisException {
- return wrapExceptions(new Callable<List<Shard>>() {
-
- @Override
- public List<Shard> call() throws Exception {
- List<Shard> shards = Lists.newArrayList();
- String lastShardId = null;
-
- StreamDescription description;
- do {
- description = kinesis.describeStream(streamName, lastShardId)
- .getStreamDescription();
+ public SimplifiedKinesisClient(AmazonKinesis kinesis) {
+ this.kinesis = kinesis;
+ }
- shards.addAll(description.getShards());
- lastShardId = shards.get(shards.size() - 1).getShardId();
- } while (description.getHasMoreShards());
+ public static SimplifiedKinesisClient from(KinesisClientProvider provider) {
+ return new SimplifiedKinesisClient(provider.get());
+ }
- return shards;
- }
- });
- }
+ public String getShardIterator(final String streamName, final String shardId,
+ final ShardIteratorType shardIteratorType,
+ final String startingSequenceNumber, final Instant timestamp)
+ throws TransientKinesisException {
+ final Date date = timestamp != null ? timestamp.toDate() : null;
+ return wrapExceptions(new Callable<String>() {
+ @Override
+ public String call() throws Exception {
+ return kinesis.getShardIterator(new GetShardIteratorRequest()
+ .withStreamName(streamName)
+ .withShardId(shardId)
+ .withShardIteratorType(shardIteratorType)
+ .withStartingSequenceNumber(startingSequenceNumber)
+ .withTimestamp(date)
+ ).getShardIterator();
+ }
+ });
+ }
- /**
- * Gets records from Kinesis and deaggregates them if needed.
- *
- * @return list of deaggregated records
- * @throws TransientKinesisException - in case of recoverable situation
- */
- public GetKinesisRecordsResult getRecords(String shardIterator, String streamName,
- String shardId) throws TransientKinesisException {
- return getRecords(shardIterator, streamName, shardId, null);
- }
+ public List<Shard> listShards(final String streamName) throws TransientKinesisException {
+ return wrapExceptions(new Callable<List<Shard>>() {
+ @Override
+ public List<Shard> call() throws Exception {
+ List<Shard> shards = Lists.newArrayList();
+ String lastShardId = null;
+
+ StreamDescription description;
+ do {
+ description = kinesis.describeStream(streamName, lastShardId)
+ .getStreamDescription();
+
+ shards.addAll(description.getShards());
+ lastShardId = shards.get(shards.size() - 1).getShardId();
+ } while (description.getHasMoreShards());
+
+ return shards;
+ }
+ });
+ }
- /**
- * Gets records from Kinesis and deaggregates them if needed.
- *
- * @return list of deaggregated records
- * @throws TransientKinesisException - in case of recoverable situation
- */
- public GetKinesisRecordsResult getRecords(final String shardIterator, final String streamName,
- final String shardId, final Integer limit)
- throws
- TransientKinesisException {
- return wrapExceptions(new Callable<GetKinesisRecordsResult>() {
+ /**
+ * Gets records from Kinesis and deaggregates them if needed.
+ *
+ * @return list of deaggregated records
+ * @throws TransientKinesisException - in case of recoverable situation
+ */
+ public GetKinesisRecordsResult getRecords(String shardIterator, String streamName,
+ String shardId) throws TransientKinesisException {
+ return getRecords(shardIterator, streamName, shardId, null);
+ }
- @Override
- public GetKinesisRecordsResult call() throws Exception {
- GetRecordsResult response = kinesis.getRecords(new GetRecordsRequest()
- .withShardIterator(shardIterator)
- .withLimit(limit));
- return new GetKinesisRecordsResult(
- UserRecord.deaggregate(response.getRecords()),
- response.getNextShardIterator(),
- streamName, shardId);
- }
- });
- }
+ /**
+ * Gets records from Kinesis and deaggregates them if needed.
+ *
+ * @return list of deaggregated records
+ * @throws TransientKinesisException - in case of recoverable situation
+ */
+ public GetKinesisRecordsResult getRecords(final String shardIterator, final String streamName,
+ final String shardId, final Integer limit)
+ throws
+ TransientKinesisException {
+ return wrapExceptions(new Callable<GetKinesisRecordsResult>() {
+ @Override
+ public GetKinesisRecordsResult call() throws Exception {
+ GetRecordsResult response = kinesis.getRecords(new GetRecordsRequest()
+ .withShardIterator(shardIterator)
+ .withLimit(limit));
+ return new GetKinesisRecordsResult(
+ UserRecord.deaggregate(response.getRecords()),
+ response.getNextShardIterator(),
+ streamName, shardId);
+ }
+ });
+ }
- /**
- * Wraps Amazon specific exceptions into more friendly format.
- *
- * @throws TransientKinesisException - in case of recoverable situation, i.e.
- * the request rate is too high, Kinesis remote service
- * failed, network issue, etc.
- * @throws ExpiredIteratorException - if iterator needs to be refreshed
- * @throws RuntimeException - in all other cases
- */
- private <T> T wrapExceptions(Callable<T> callable) throws TransientKinesisException {
- try {
- return callable.call();
- } catch (ExpiredIteratorException e) {
- throw e;
- } catch (LimitExceededException | ProvisionedThroughputExceededException e) {
- throw new TransientKinesisException(
- "Too many requests to Kinesis. Wait some time and retry.", e);
- } catch (AmazonServiceException e) {
- if (e.getErrorType() == AmazonServiceException.ErrorType.Service) {
- throw new TransientKinesisException(
- "Kinesis backend failed. Wait some time and retry.", e);
- }
- throw new RuntimeException("Kinesis client side failure", e);
- } catch (Exception e) {
- throw new RuntimeException("Unknown kinesis failure, when trying to reach kinesis", e);
+ /**
+ * Wraps Amazon specific exceptions into more friendly format.
+ *
+ * @throws TransientKinesisException - in case of recoverable situation, i.e.
+ * the request rate is too high, Kinesis remote service
+ * failed, network issue, etc.
+ * @throws ExpiredIteratorException - if iterator needs to be refreshed
+ * @throws RuntimeException - in all other cases
+ */
+ private <T> T wrapExceptions(Callable<T> callable) throws TransientKinesisException {
+ try {
+ return callable.call();
+ } catch (ExpiredIteratorException e) {
+ throw e;
+ } catch (LimitExceededException | ProvisionedThroughputExceededException e) {
+ throw new TransientKinesisException(
+ "Too many requests to Kinesis. Wait some time and retry.", e);
+ } catch (AmazonServiceException e) {
+ if (e.getErrorType() == AmazonServiceException.ErrorType.Service) {
+ throw new TransientKinesisException(
+ "Kinesis backend failed. Wait some time and retry.", e);
+ }
+ throw new RuntimeException("Kinesis client side failure", e);
+ } catch (Exception e) {
+ throw new RuntimeException("Unknown kinesis failure, when trying to reach kinesis", e);
+ }
}
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
index f9298fa..d8842c4 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StartingPoint.java
@@ -17,14 +17,13 @@
*/
package org.apache.beam.sdk.io.kinesis;
+
import static com.google.common.base.Preconditions.checkNotNull;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
-
import java.io.Serializable;
import java.util.Objects;
-
import org.joda.time.Instant;
/**
@@ -33,55 +32,54 @@ import org.joda.time.Instant;
* in which case the reader will start reading at the specified point in time.
*/
class StartingPoint implements Serializable {
+ private final InitialPositionInStream position;
+ private final Instant timestamp;
- private final InitialPositionInStream position;
- private final Instant timestamp;
-
- public StartingPoint(InitialPositionInStream position) {
- this.position = checkNotNull(position, "position");
- this.timestamp = null;
- }
-
- public StartingPoint(Instant timestamp) {
- this.timestamp = checkNotNull(timestamp, "timestamp");
- this.position = null;
- }
+ public StartingPoint(InitialPositionInStream position) {
+ this.position = checkNotNull(position, "position");
+ this.timestamp = null;
+ }
- public InitialPositionInStream getPosition() {
- return position;
- }
+ public StartingPoint(Instant timestamp) {
+ this.timestamp = checkNotNull(timestamp, "timestamp");
+ this.position = null;
+ }
- public String getPositionName() {
- return position != null ? position.name() : ShardIteratorType.AT_TIMESTAMP.name();
- }
+ public InitialPositionInStream getPosition() {
+ return position;
+ }
- public Instant getTimestamp() {
- return timestamp != null ? timestamp : null;
- }
+ public String getPositionName() {
+ return position != null ? position.name() : ShardIteratorType.AT_TIMESTAMP.name();
+ }
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
+ public Instant getTimestamp() {
+ return timestamp != null ? timestamp : null;
}
- if (o == null || getClass() != o.getClass()) {
- return false;
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ StartingPoint that = (StartingPoint) o;
+ return position == that.position && Objects.equals(timestamp, that.timestamp);
}
- StartingPoint that = (StartingPoint) o;
- return position == that.position && Objects.equals(timestamp, that.timestamp);
- }
- @Override
- public int hashCode() {
- return Objects.hash(position, timestamp);
- }
+ @Override
+ public int hashCode() {
+ return Objects.hash(position, timestamp);
+ }
- @Override
- public String toString() {
- if (timestamp == null) {
- return position.toString();
- } else {
- return "Starting at timestamp " + timestamp;
+ @Override
+ public String toString() {
+ if (timestamp == null) {
+ return position.toString();
+ } else {
+ return "Starting at timestamp " + timestamp;
+ }
}
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java
index 1ec865d..22dc973 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/StaticCheckpointGenerator.java
@@ -23,21 +23,20 @@ import static com.google.common.base.Preconditions.checkNotNull;
* Always returns the same instance of checkpoint.
*/
class StaticCheckpointGenerator implements CheckpointGenerator {
+ private final KinesisReaderCheckpoint checkpoint;
- private final KinesisReaderCheckpoint checkpoint;
+ public StaticCheckpointGenerator(KinesisReaderCheckpoint checkpoint) {
+ checkNotNull(checkpoint, "checkpoint");
+ this.checkpoint = checkpoint;
+ }
- public StaticCheckpointGenerator(KinesisReaderCheckpoint checkpoint) {
- checkNotNull(checkpoint, "checkpoint");
- this.checkpoint = checkpoint;
- }
+ @Override
+ public KinesisReaderCheckpoint generate(SimplifiedKinesisClient client) {
+ return checkpoint;
+ }
- @Override
- public KinesisReaderCheckpoint generate(SimplifiedKinesisClient client) {
- return checkpoint;
- }
-
- @Override
- public String toString() {
- return checkpoint.toString();
- }
+ @Override
+ public String toString() {
+ return checkpoint.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
index 68ca0d7..57ad8a8 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/TransientKinesisException.java
@@ -23,8 +23,7 @@ import com.amazonaws.AmazonServiceException;
* A transient exception thrown by Kinesis.
*/
class TransientKinesisException extends Exception {
-
- public TransientKinesisException(String s, AmazonServiceException e) {
- super(s, e);
- }
+ public TransientKinesisException(String s, AmazonServiceException e) {
+ super(s, e);
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
index 994d6e3..046c9d9 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/AmazonKinesisMock.java
@@ -66,12 +66,10 @@ import com.amazonaws.services.kinesis.model.SplitShardRequest;
import com.amazonaws.services.kinesis.model.SplitShardResult;
import com.amazonaws.services.kinesis.model.StreamDescription;
import com.google.common.base.Function;
-
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.List;
import javax.annotation.Nullable;
-
import org.apache.commons.lang.builder.EqualsBuilder;
import org.joda.time.Instant;
@@ -80,301 +78,298 @@ import org.joda.time.Instant;
*/
class AmazonKinesisMock implements AmazonKinesis {
- static class TestData implements Serializable {
+ static class TestData implements Serializable {
+ private final String data;
+ private final Instant arrivalTimestamp;
+ private final String sequenceNumber;
+
+ public TestData(KinesisRecord record) {
+ this(new String(record.getData().array()),
+ record.getApproximateArrivalTimestamp(),
+ record.getSequenceNumber());
+ }
+
+ public TestData(String data, Instant arrivalTimestamp, String sequenceNumber) {
+ this.data = data;
+ this.arrivalTimestamp = arrivalTimestamp;
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ public Record convertToRecord() {
+ return new Record().
+ withApproximateArrivalTimestamp(arrivalTimestamp.toDate()).
+ withData(ByteBuffer.wrap(data.getBytes())).
+ withSequenceNumber(sequenceNumber).
+ withPartitionKey("");
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return EqualsBuilder.reflectionEquals(this, obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return reflectionHashCode(this);
+ }
+ }
+
+ static class Provider implements KinesisClientProvider {
+
+ private final List<List<TestData>> shardedData;
+ private final int numberOfRecordsPerGet;
+
+ public Provider(List<List<TestData>> shardedData, int numberOfRecordsPerGet) {
+ this.shardedData = shardedData;
+ this.numberOfRecordsPerGet = numberOfRecordsPerGet;
+ }
+
+ @Override
+ public AmazonKinesis get() {
+ return new AmazonKinesisMock(transform(shardedData,
+ new Function<List<TestData>, List<Record>>() {
+ @Override
+ public List<Record> apply(@Nullable List<TestData> testDatas) {
+ return transform(testDatas, new Function<TestData, Record>() {
+ @Override
+ public Record apply(@Nullable TestData testData) {
+ return testData.convertToRecord();
+ }
+ });
+ }
+ }), numberOfRecordsPerGet);
+ }
+ }
- private final String data;
- private final Instant arrivalTimestamp;
- private final String sequenceNumber;
+ private final List<List<Record>> shardedData;
+ private final int numberOfRecordsPerGet;
- public TestData(KinesisRecord record) {
- this(new String(record.getData().array()),
- record.getApproximateArrivalTimestamp(),
- record.getSequenceNumber());
+ public AmazonKinesisMock(List<List<Record>> shardedData, int numberOfRecordsPerGet) {
+ this.shardedData = shardedData;
+ this.numberOfRecordsPerGet = numberOfRecordsPerGet;
}
- public TestData(String data, Instant arrivalTimestamp, String sequenceNumber) {
- this.data = data;
- this.arrivalTimestamp = arrivalTimestamp;
- this.sequenceNumber = sequenceNumber;
+ @Override
+ public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) {
+ String[] shardIteratorParts = getRecordsRequest.getShardIterator().split(":");
+ int shardId = parseInt(shardIteratorParts[0]);
+ int startingRecord = parseInt(shardIteratorParts[1]);
+ List<Record> shardData = shardedData.get(shardId);
+
+ int toIndex = min(startingRecord + numberOfRecordsPerGet, shardData.size());
+ int fromIndex = min(startingRecord, toIndex);
+ return new GetRecordsResult().
+ withRecords(shardData.subList(fromIndex, toIndex)).
+ withNextShardIterator(String.format("%s:%s", shardId, toIndex));
}
- public Record convertToRecord() {
- return new Record().
- withApproximateArrivalTimestamp(arrivalTimestamp.toDate()).
- withData(ByteBuffer.wrap(data.getBytes())).
- withSequenceNumber(sequenceNumber).
- withPartitionKey("");
+ @Override
+ public GetShardIteratorResult getShardIterator(
+ GetShardIteratorRequest getShardIteratorRequest) {
+ ShardIteratorType shardIteratorType = ShardIteratorType.fromValue(
+ getShardIteratorRequest.getShardIteratorType());
+
+ String shardIterator;
+ if (shardIteratorType == ShardIteratorType.TRIM_HORIZON) {
+ shardIterator = String.format("%s:%s", getShardIteratorRequest.getShardId(), 0);
+ } else {
+ throw new RuntimeException("Not implemented");
+ }
+
+ return new GetShardIteratorResult().withShardIterator(shardIterator);
}
@Override
- public boolean equals(Object obj) {
- return EqualsBuilder.reflectionEquals(this, obj);
+ public DescribeStreamResult describeStream(String streamName, String exclusiveStartShardId) {
+ int nextShardId = 0;
+ if (exclusiveStartShardId != null) {
+ nextShardId = parseInt(exclusiveStartShardId) + 1;
+ }
+ boolean hasMoreShards = nextShardId + 1 < shardedData.size();
+
+ List<Shard> shards = newArrayList();
+ if (nextShardId < shardedData.size()) {
+ shards.add(new Shard().withShardId(Integer.toString(nextShardId)));
+ }
+
+ return new DescribeStreamResult().withStreamDescription(
+ new StreamDescription().withHasMoreShards(hasMoreShards).withShards(shards)
+ );
}
@Override
- public int hashCode() {
- return reflectionHashCode(this);
+ public void setEndpoint(String endpoint) {
+
}
- }
- static class Provider implements KinesisClientProvider {
+ @Override
+ public void setRegion(Region region) {
- private final List<List<TestData>> shardedData;
- private final int numberOfRecordsPerGet;
+ }
+
+ @Override
+ public AddTagsToStreamResult addTagsToStream(AddTagsToStreamRequest addTagsToStreamRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public CreateStreamResult createStream(CreateStreamRequest createStreamRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public CreateStreamResult createStream(String streamName, Integer shardCount) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public DecreaseStreamRetentionPeriodResult decreaseStreamRetentionPeriod(
+ DecreaseStreamRetentionPeriodRequest decreaseStreamRetentionPeriodRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public DeleteStreamResult deleteStream(DeleteStreamRequest deleteStreamRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public DeleteStreamResult deleteStream(String streamName) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public DescribeStreamResult describeStream(DescribeStreamRequest describeStreamRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public DescribeStreamResult describeStream(String streamName) {
+
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public DescribeStreamResult describeStream(String streamName,
+ Integer limit, String exclusiveStartShardId) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public DisableEnhancedMonitoringResult disableEnhancedMonitoring(
+ DisableEnhancedMonitoringRequest disableEnhancedMonitoringRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public EnableEnhancedMonitoringResult enableEnhancedMonitoring(
+ EnableEnhancedMonitoringRequest enableEnhancedMonitoringRequest) {
+ throw new RuntimeException("Not implemented");
+ }
- public Provider(List<List<TestData>> shardedData, int numberOfRecordsPerGet) {
- this.shardedData = shardedData;
- this.numberOfRecordsPerGet = numberOfRecordsPerGet;
+ @Override
+ public GetShardIteratorResult getShardIterator(String streamName,
+ String shardId,
+ String shardIteratorType) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public GetShardIteratorResult getShardIterator(String streamName,
+ String shardId,
+ String shardIteratorType,
+ String startingSequenceNumber) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public IncreaseStreamRetentionPeriodResult increaseStreamRetentionPeriod(
+ IncreaseStreamRetentionPeriodRequest increaseStreamRetentionPeriodRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public ListStreamsResult listStreams(ListStreamsRequest listStreamsRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public ListStreamsResult listStreams() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public ListStreamsResult listStreams(String exclusiveStartStreamName) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public ListStreamsResult listStreams(Integer limit, String exclusiveStartStreamName) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public ListTagsForStreamResult listTagsForStream(
+ ListTagsForStreamRequest listTagsForStreamRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public MergeShardsResult mergeShards(MergeShardsRequest mergeShardsRequest) {
+ throw new RuntimeException("Not implemented");
}
@Override
- public AmazonKinesis get() {
- return new AmazonKinesisMock(transform(shardedData,
- new Function<List<TestData>, List<Record>>() {
+ public MergeShardsResult mergeShards(String streamName,
+ String shardToMerge, String adjacentShardToMerge) {
+ throw new RuntimeException("Not implemented");
+ }
- @Override
- public List<Record> apply(@Nullable List<TestData> testDatas) {
- return transform(testDatas, new Function<TestData, Record>() {
+ @Override
+ public PutRecordResult putRecord(PutRecordRequest putRecordRequest) {
+ throw new RuntimeException("Not implemented");
+ }
- @Override
- public Record apply(@Nullable TestData testData) {
- return testData.convertToRecord();
- }
- });
- }
- }), numberOfRecordsPerGet);
+ @Override
+ public PutRecordResult putRecord(String streamName, ByteBuffer data, String partitionKey) {
+ throw new RuntimeException("Not implemented");
}
- }
- private final List<List<Record>> shardedData;
- private final int numberOfRecordsPerGet;
+ @Override
+ public PutRecordResult putRecord(String streamName, ByteBuffer data,
+ String partitionKey, String sequenceNumberForOrdering) {
+ throw new RuntimeException("Not implemented");
+ }
- public AmazonKinesisMock(List<List<Record>> shardedData, int numberOfRecordsPerGet) {
- this.shardedData = shardedData;
- this.numberOfRecordsPerGet = numberOfRecordsPerGet;
- }
+ @Override
+ public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest) {
+ throw new RuntimeException("Not implemented");
+ }
- @Override
- public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) {
- String[] shardIteratorParts = getRecordsRequest.getShardIterator().split(":");
- int shardId = parseInt(shardIteratorParts[0]);
- int startingRecord = parseInt(shardIteratorParts[1]);
- List<Record> shardData = shardedData.get(shardId);
+ @Override
+ public RemoveTagsFromStreamResult removeTagsFromStream(
+ RemoveTagsFromStreamRequest removeTagsFromStreamRequest) {
+ throw new RuntimeException("Not implemented");
+ }
- int toIndex = min(startingRecord + numberOfRecordsPerGet, shardData.size());
- int fromIndex = min(startingRecord, toIndex);
- return new GetRecordsResult().
- withRecords(shardData.subList(fromIndex, toIndex)).
- withNextShardIterator(String.format("%s:%s", shardId, toIndex));
- }
+ @Override
+ public SplitShardResult splitShard(SplitShardRequest splitShardRequest) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public SplitShardResult splitShard(String streamName,
+ String shardToSplit, String newStartingHashKey) {
+ throw new RuntimeException("Not implemented");
+ }
- @Override
- public GetShardIteratorResult getShardIterator(
- GetShardIteratorRequest getShardIteratorRequest) {
- ShardIteratorType shardIteratorType = ShardIteratorType.fromValue(
- getShardIteratorRequest.getShardIteratorType());
-
- String shardIterator;
- if (shardIteratorType == ShardIteratorType.TRIM_HORIZON) {
- shardIterator = String.format("%s:%s", getShardIteratorRequest.getShardId(), 0);
- } else {
- throw new RuntimeException("Not implemented");
- }
-
- return new GetShardIteratorResult().withShardIterator(shardIterator);
- }
-
- @Override
- public DescribeStreamResult describeStream(String streamName, String exclusiveStartShardId) {
- int nextShardId = 0;
- if (exclusiveStartShardId != null) {
- nextShardId = parseInt(exclusiveStartShardId) + 1;
- }
- boolean hasMoreShards = nextShardId + 1 < shardedData.size();
-
- List<Shard> shards = newArrayList();
- if (nextShardId < shardedData.size()) {
- shards.add(new Shard().withShardId(Integer.toString(nextShardId)));
- }
-
- return new DescribeStreamResult().withStreamDescription(
- new StreamDescription().withHasMoreShards(hasMoreShards).withShards(shards)
- );
- }
-
- @Override
- public void setEndpoint(String endpoint) {
-
- }
-
- @Override
- public void setRegion(Region region) {
-
- }
-
- @Override
- public AddTagsToStreamResult addTagsToStream(AddTagsToStreamRequest addTagsToStreamRequest) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public CreateStreamResult createStream(CreateStreamRequest createStreamRequest) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public CreateStreamResult createStream(String streamName, Integer shardCount) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public DecreaseStreamRetentionPeriodResult decreaseStreamRetentionPeriod(
- DecreaseStreamRetentionPeriodRequest decreaseStreamRetentionPeriodRequest) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public DeleteStreamResult deleteStream(DeleteStreamRequest deleteStreamRequest) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public DeleteStreamResult deleteStream(String streamName) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public DescribeStreamResult describeStream(DescribeStreamRequest describeStreamRequest) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public DescribeStreamResult describeStream(String streamName) {
-
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public DescribeStreamResult describeStream(String streamName,
- Integer limit, String exclusiveStartShardId) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public DisableEnhancedMonitoringResult disableEnhancedMonitoring(
- DisableEnhancedMonitoringRequest disableEnhancedMonitoringRequest) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public EnableEnhancedMonitoringResult enableEnhancedMonitoring(
- EnableEnhancedMonitoringRequest enableEnhancedMonitoringRequest) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public GetShardIteratorResult getShardIterator(String streamName,
- String shardId,
- String shardIteratorType) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public GetShardIteratorResult getShardIterator(String streamName,
- String shardId,
- String shardIteratorType,
- String startingSequenceNumber) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public IncreaseStreamRetentionPeriodResult increaseStreamRetentionPeriod(
- IncreaseStreamRetentionPeriodRequest increaseStreamRetentionPeriodRequest) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public ListStreamsResult listStreams(ListStreamsRequest listStreamsRequest) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public ListStreamsResult listStreams() {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public ListStreamsResult listStreams(String exclusiveStartStreamName) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public ListStreamsResult listStreams(Integer limit, String exclusiveStartStreamName) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public ListTagsForStreamResult listTagsForStream(
- ListTagsForStreamRequest listTagsForStreamRequest) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public MergeShardsResult mergeShards(MergeShardsRequest mergeShardsRequest) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public MergeShardsResult mergeShards(String streamName,
- String shardToMerge, String adjacentShardToMerge) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public PutRecordResult putRecord(PutRecordRequest putRecordRequest) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public PutRecordResult putRecord(String streamName, ByteBuffer data, String partitionKey) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public PutRecordResult putRecord(String streamName, ByteBuffer data,
- String partitionKey, String sequenceNumberForOrdering) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public PutRecordsResult putRecords(PutRecordsRequest putRecordsRequest) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public RemoveTagsFromStreamResult removeTagsFromStream(
- RemoveTagsFromStreamRequest removeTagsFromStreamRequest) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public SplitShardResult splitShard(SplitShardRequest splitShardRequest) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public SplitShardResult splitShard(String streamName,
- String shardToSplit, String newStartingHashKey) {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public void shutdown() {
-
- }
-
- @Override
- public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) {
- throw new RuntimeException("Not implemented");
- }
+ @Override
+ public void shutdown() {
+
+ }
+
+ @Override
+ public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) {
+ throw new RuntimeException("Not implemented");
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
index 0b16bb7..00acffe 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/CustomOptionalTest.java
@@ -18,27 +18,24 @@
package org.apache.beam.sdk.io.kinesis;
import com.google.common.testing.EqualsTester;
-
import java.util.NoSuchElementException;
-
import org.junit.Test;
/**
* Tests {@link CustomOptional}.
*/
public class CustomOptionalTest {
+ @Test(expected = NoSuchElementException.class)
+ public void absentThrowsNoSuchElementExceptionOnGet() {
+ CustomOptional.absent().get();
+ }
- @Test(expected = NoSuchElementException.class)
- public void absentThrowsNoSuchElementExceptionOnGet() {
- CustomOptional.absent().get();
- }
-
- @Test
- public void testEqualsAndHashCode() {
- new EqualsTester()
- .addEqualityGroup(CustomOptional.absent(), CustomOptional.absent())
- .addEqualityGroup(CustomOptional.of(3), CustomOptional.of(3))
- .addEqualityGroup(CustomOptional.of(11))
- .addEqualityGroup(CustomOptional.of("3")).testEquals();
- }
+ @Test
+ public void testEqualsAndHashCode() {
+ new EqualsTester()
+ .addEqualityGroup(CustomOptional.absent(), CustomOptional.absent())
+ .addEqualityGroup(CustomOptional.of(3), CustomOptional.of(3))
+ .addEqualityGroup(CustomOptional.of(11))
+ .addEqualityGroup(CustomOptional.of("3")).testEquals();
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
index 1bb9717..c92ac9a 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGeneratorTest.java
@@ -28,29 +28,30 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
+
/***
*/
@RunWith(MockitoJUnitRunner.class)
public class DynamicCheckpointGeneratorTest {
- @Mock
- private SimplifiedKinesisClient kinesisClient;
- @Mock
- private Shard shard1, shard2, shard3;
+ @Mock
+ private SimplifiedKinesisClient kinesisClient;
+ @Mock
+ private Shard shard1, shard2, shard3;
- @Test
- public void shouldMapAllShardsToCheckpoints() throws Exception {
- given(shard1.getShardId()).willReturn("shard-01");
- given(shard2.getShardId()).willReturn("shard-02");
- given(shard3.getShardId()).willReturn("shard-03");
- given(kinesisClient.listShards("stream")).willReturn(asList(shard1, shard2, shard3));
+ @Test
+ public void shouldMapAllShardsToCheckpoints() throws Exception {
+ given(shard1.getShardId()).willReturn("shard-01");
+ given(shard2.getShardId()).willReturn("shard-02");
+ given(shard3.getShardId()).willReturn("shard-03");
+ given(kinesisClient.listShards("stream")).willReturn(asList(shard1, shard2, shard3));
- StartingPoint startingPoint = new StartingPoint(InitialPositionInStream.LATEST);
- DynamicCheckpointGenerator underTest = new DynamicCheckpointGenerator("stream",
- startingPoint);
+ StartingPoint startingPoint = new StartingPoint(InitialPositionInStream.LATEST);
+ DynamicCheckpointGenerator underTest = new DynamicCheckpointGenerator("stream",
+ startingPoint);
- KinesisReaderCheckpoint checkpoint = underTest.generate(kinesisClient);
+ KinesisReaderCheckpoint checkpoint = underTest.generate(kinesisClient);
- assertThat(checkpoint).hasSize(3);
- }
+ assertThat(checkpoint).hasSize(3);
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
index 44ad67d..567e25f 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisMockReadTest.java
@@ -21,9 +21,7 @@ import static com.google.common.collect.Lists.newArrayList;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.google.common.collect.Iterables;
-
import java.util.List;
-
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
@@ -38,60 +36,59 @@ import org.junit.Test;
*/
public class KinesisMockReadTest {
- @Rule
- public final transient TestPipeline p = TestPipeline.create();
-
- @Test
- public void readsDataFromMockKinesis() {
- int noOfShards = 3;
- int noOfEventsPerShard = 100;
- List<List<AmazonKinesisMock.TestData>> testData =
- provideTestData(noOfShards, noOfEventsPerShard);
-
- PCollection<AmazonKinesisMock.TestData> result = p
- .apply(
- KinesisIO.read()
- .from("stream", InitialPositionInStream.TRIM_HORIZON)
- .withClientProvider(new AmazonKinesisMock.Provider(testData, 10))
- .withMaxNumRecords(noOfShards * noOfEventsPerShard))
- .apply(ParDo.of(new KinesisRecordToTestData()));
- PAssert.that(result).containsInAnyOrder(Iterables.concat(testData));
- p.run();
- }
-
- private static class KinesisRecordToTestData extends
- DoFn<KinesisRecord, AmazonKinesisMock.TestData> {
+ @Rule
+ public final transient TestPipeline p = TestPipeline.create();
+
+ @Test
+ public void readsDataFromMockKinesis() {
+ int noOfShards = 3;
+ int noOfEventsPerShard = 100;
+ List<List<AmazonKinesisMock.TestData>> testData =
+ provideTestData(noOfShards, noOfEventsPerShard);
+
+ PCollection<AmazonKinesisMock.TestData> result = p
+ .apply(
+ KinesisIO.read()
+ .from("stream", InitialPositionInStream.TRIM_HORIZON)
+ .withClientProvider(new AmazonKinesisMock.Provider(testData, 10))
+ .withMaxNumRecords(noOfShards * noOfEventsPerShard))
+ .apply(ParDo.of(new KinesisRecordToTestData()));
+ PAssert.that(result).containsInAnyOrder(Iterables.concat(testData));
+ p.run();
+ }
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- c.output(new AmazonKinesisMock.TestData(c.element()));
+ private static class KinesisRecordToTestData extends
+ DoFn<KinesisRecord, AmazonKinesisMock.TestData> {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ c.output(new AmazonKinesisMock.TestData(c.element()));
+ }
}
- }
- private List<List<AmazonKinesisMock.TestData>> provideTestData(
- int noOfShards,
- int noOfEventsPerShard) {
+ private List<List<AmazonKinesisMock.TestData>> provideTestData(
+ int noOfShards,
+ int noOfEventsPerShard) {
- int seqNumber = 0;
+ int seqNumber = 0;
- List<List<AmazonKinesisMock.TestData>> shardedData = newArrayList();
- for (int i = 0; i < noOfShards; ++i) {
- List<AmazonKinesisMock.TestData> shardData = newArrayList();
- shardedData.add(shardData);
+ List<List<AmazonKinesisMock.TestData>> shardedData = newArrayList();
+ for (int i = 0; i < noOfShards; ++i) {
+ List<AmazonKinesisMock.TestData> shardData = newArrayList();
+ shardedData.add(shardData);
- DateTime arrival = DateTime.now();
- for (int j = 0; j < noOfEventsPerShard; ++j) {
- arrival = arrival.plusSeconds(1);
+ DateTime arrival = DateTime.now();
+ for (int j = 0; j < noOfEventsPerShard; ++j) {
+ arrival = arrival.plusSeconds(1);
- seqNumber++;
- shardData.add(new AmazonKinesisMock.TestData(
- Integer.toString(seqNumber),
- arrival.toInstant(),
- Integer.toString(seqNumber))
- );
- }
- }
+ seqNumber++;
+ shardData.add(new AmazonKinesisMock.TestData(
+ Integer.toString(seqNumber),
+ arrival.toInstant(),
+ Integer.toString(seqNumber))
+ );
+ }
+ }
- return shardedData;
- }
+ return shardedData;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
index 1038a47..8c8da64 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpointTest.java
@@ -17,14 +17,13 @@
*/
package org.apache.beam.sdk.io.kinesis;
+
import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import com.google.common.collect.Iterables;
-
import java.util.Iterator;
import java.util.List;
-
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -36,34 +35,33 @@ import org.mockito.runners.MockitoJUnitRunner;
*/
@RunWith(MockitoJUnitRunner.class)
public class KinesisReaderCheckpointTest {
+ @Mock
+ private ShardCheckpoint a, b, c;
- @Mock
- private ShardCheckpoint a, b, c;
-
- private KinesisReaderCheckpoint checkpoint;
+ private KinesisReaderCheckpoint checkpoint;
- @Before
- public void setUp() {
- checkpoint = new KinesisReaderCheckpoint(asList(a, b, c));
- }
+ @Before
+ public void setUp() {
+ checkpoint = new KinesisReaderCheckpoint(asList(a, b, c));
+ }
- @Test
- public void splitsCheckpointAccordingly() {
- verifySplitInto(1);
- verifySplitInto(2);
- verifySplitInto(3);
- verifySplitInto(4);
- }
+ @Test
+ public void splitsCheckpointAccordingly() {
+ verifySplitInto(1);
+ verifySplitInto(2);
+ verifySplitInto(3);
+ verifySplitInto(4);
+ }
- @Test(expected = UnsupportedOperationException.class)
- public void isImmutable() {
- Iterator<ShardCheckpoint> iterator = checkpoint.iterator();
- iterator.remove();
- }
+ @Test(expected = UnsupportedOperationException.class)
+ public void isImmutable() {
+ Iterator<ShardCheckpoint> iterator = checkpoint.iterator();
+ iterator.remove();
+ }
- private void verifySplitInto(int size) {
- List<KinesisReaderCheckpoint> split = checkpoint.splitInto(size);
- assertThat(Iterables.concat(split)).containsOnly(a, b, c);
- assertThat(split).hasSize(Math.min(size, 3));
- }
+ private void verifySplitInto(int size) {
+ List<KinesisReaderCheckpoint> split = checkpoint.splitInto(size);
+ assertThat(Iterables.concat(split)).containsOnly(a, b, c);
+ assertThat(split).hasSize(Math.min(size, 3));
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
index 5781033..8eb6546 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderIT.java
@@ -23,7 +23,6 @@ import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.assertj.core.api.Assertions.assertThat;
import com.amazonaws.regions.Regions;
-
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
@@ -32,7 +31,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
@@ -52,75 +50,72 @@ import org.junit.Test;
* You need to provide all {@link KinesisTestOptions} in order to run this.
*/
public class KinesisReaderIT {
-
- private static final long PIPELINE_STARTUP_TIME = TimeUnit.SECONDS.toMillis(10);
- private ExecutorService singleThreadExecutor = newSingleThreadExecutor();
-
- @Rule
- public final transient TestPipeline p = TestPipeline.create();
-
- @Ignore
- @Test
- public void readsDataFromRealKinesisStream()
- throws IOException, InterruptedException, ExecutionException {
- KinesisTestOptions options = readKinesisOptions();
- List<String> testData = prepareTestData(1000);
-
- Future<?> future = startTestPipeline(testData, options);
- KinesisUploader.uploadAll(testData, options);
- future.get();
- }
-
- private List<String> prepareTestData(int count) {
- List<String> data = newArrayList();
- for (int i = 0; i < count; ++i) {
- data.add(RandomStringUtils.randomAlphabetic(32));
+ private static final long PIPELINE_STARTUP_TIME = TimeUnit.SECONDS.toMillis(10);
+ private ExecutorService singleThreadExecutor = newSingleThreadExecutor();
+
+ @Rule
+ public final transient TestPipeline p = TestPipeline.create();
+
+ @Ignore
+ @Test
+ public void readsDataFromRealKinesisStream()
+ throws IOException, InterruptedException, ExecutionException {
+ KinesisTestOptions options = readKinesisOptions();
+ List<String> testData = prepareTestData(1000);
+
+ Future<?> future = startTestPipeline(testData, options);
+ KinesisUploader.uploadAll(testData, options);
+ future.get();
}
- return data;
- }
- private Future<?> startTestPipeline(List<String> testData, KinesisTestOptions options)
- throws InterruptedException {
-
- PCollection<String> result = p.
- apply(KinesisIO.read()
- .from(options.getAwsKinesisStream(), Instant.now())
- .withClientProvider(options.getAwsAccessKey(), options.getAwsSecretKey(),
- Regions.fromName(options.getAwsKinesisRegion()))
- .withMaxReadTime(Duration.standardMinutes(3))
- ).
- apply(ParDo.of(new RecordDataToString()));
- PAssert.that(result).containsInAnyOrder(testData);
-
- Future<?> future = singleThreadExecutor.submit(new Callable<Void>() {
-
- @Override
- public Void call() throws Exception {
- PipelineResult result = p.run();
- PipelineResult.State state = result.getState();
- while (state != PipelineResult.State.DONE && state != PipelineResult.State.FAILED) {
- Thread.sleep(1000);
- state = result.getState();
+ private List<String> prepareTestData(int count) {
+ List<String> data = newArrayList();
+ for (int i = 0; i < count; ++i) {
+ data.add(RandomStringUtils.randomAlphabetic(32));
}
- assertThat(state).isEqualTo(PipelineResult.State.DONE);
- return null;
- }
- });
- Thread.sleep(PIPELINE_STARTUP_TIME);
- return future;
- }
+ return data;
+ }
- private KinesisTestOptions readKinesisOptions() {
- PipelineOptionsFactory.register(KinesisTestOptions.class);
- return TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
- }
+ private Future<?> startTestPipeline(List<String> testData, KinesisTestOptions options)
+ throws InterruptedException {
+
+ PCollection<String> result = p.
+ apply(KinesisIO.read()
+ .from(options.getAwsKinesisStream(), Instant.now())
+ .withClientProvider(options.getAwsAccessKey(), options.getAwsSecretKey(),
+ Regions.fromName(options.getAwsKinesisRegion()))
+ .withMaxReadTime(Duration.standardMinutes(3))
+ ).
+ apply(ParDo.of(new RecordDataToString()));
+ PAssert.that(result).containsInAnyOrder(testData);
+
+ Future<?> future = singleThreadExecutor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ PipelineResult result = p.run();
+ PipelineResult.State state = result.getState();
+ while (state != PipelineResult.State.DONE && state != PipelineResult.State.FAILED) {
+ Thread.sleep(1000);
+ state = result.getState();
+ }
+ assertThat(state).isEqualTo(PipelineResult.State.DONE);
+ return null;
+ }
+ });
+ Thread.sleep(PIPELINE_STARTUP_TIME);
+ return future;
+ }
- private static class RecordDataToString extends DoFn<KinesisRecord, String> {
+ private KinesisTestOptions readKinesisOptions() {
+ PipelineOptionsFactory.register(KinesisTestOptions.class);
+ return TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
+ }
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- checkNotNull(c.element(), "Null record given");
- c.output(new String(c.element().getData().array(), StandardCharsets.UTF_8));
+ private static class RecordDataToString extends DoFn<KinesisRecord, String> {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ checkNotNull(c.element(), "Null record given");
+ c.output(new String(c.element().getData().array(), StandardCharsets.UTF_8));
+ }
}
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
index a26501a..3111029 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
@@ -23,7 +23,6 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.NoSuchElementException;
-
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -35,88 +34,87 @@ import org.mockito.runners.MockitoJUnitRunner;
*/
@RunWith(MockitoJUnitRunner.class)
public class KinesisReaderTest {
-
- @Mock
- private SimplifiedKinesisClient kinesis;
- @Mock
- private CheckpointGenerator generator;
- @Mock
- private ShardCheckpoint firstCheckpoint, secondCheckpoint;
- @Mock
- private ShardRecordsIterator firstIterator, secondIterator;
- @Mock
- private KinesisRecord a, b, c, d;
-
- private KinesisReader reader;
-
- @Before
- public void setUp() throws IOException, TransientKinesisException {
- when(generator.generate(kinesis)).thenReturn(new KinesisReaderCheckpoint(
- asList(firstCheckpoint, secondCheckpoint)
- ));
- when(firstCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(firstIterator);
- when(secondCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(secondIterator);
- when(firstIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
- when(secondIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
-
- reader = new KinesisReader(kinesis, generator, null);
- }
-
- @Test
- public void startReturnsFalseIfNoDataAtTheBeginning() throws IOException {
- assertThat(reader.start()).isFalse();
- }
-
- @Test(expected = NoSuchElementException.class)
- public void throwsNoSuchElementExceptionIfNoData() throws IOException {
- reader.start();
- reader.getCurrent();
- }
-
- @Test
- public void startReturnsTrueIfSomeDataAvailable() throws IOException,
- TransientKinesisException {
- when(firstIterator.next()).
- thenReturn(CustomOptional.of(a)).
- thenReturn(CustomOptional.<KinesisRecord>absent());
-
- assertThat(reader.start()).isTrue();
- }
-
- @Test
- public void advanceReturnsFalseIfThereIsTransientExceptionInKinesis()
- throws IOException, TransientKinesisException {
- reader.start();
-
- when(firstIterator.next()).thenThrow(TransientKinesisException.class);
-
- assertThat(reader.advance()).isFalse();
- }
-
- @Test
- public void readsThroughAllDataAvailable() throws IOException, TransientKinesisException {
- when(firstIterator.next()).
- thenReturn(CustomOptional.<KinesisRecord>absent()).
- thenReturn(CustomOptional.of(a)).
- thenReturn(CustomOptional.<KinesisRecord>absent()).
- thenReturn(CustomOptional.of(b)).
- thenReturn(CustomOptional.<KinesisRecord>absent());
-
- when(secondIterator.next()).
- thenReturn(CustomOptional.of(c)).
- thenReturn(CustomOptional.<KinesisRecord>absent()).
- thenReturn(CustomOptional.of(d)).
- thenReturn(CustomOptional.<KinesisRecord>absent());
-
- assertThat(reader.start()).isTrue();
- assertThat(reader.getCurrent()).isEqualTo(c);
- assertThat(reader.advance()).isTrue();
- assertThat(reader.getCurrent()).isEqualTo(a);
- assertThat(reader.advance()).isTrue();
- assertThat(reader.getCurrent()).isEqualTo(d);
- assertThat(reader.advance()).isTrue();
- assertThat(reader.getCurrent()).isEqualTo(b);
- assertThat(reader.advance()).isFalse();
- }
+ @Mock
+ private SimplifiedKinesisClient kinesis;
+ @Mock
+ private CheckpointGenerator generator;
+ @Mock
+ private ShardCheckpoint firstCheckpoint, secondCheckpoint;
+ @Mock
+ private ShardRecordsIterator firstIterator, secondIterator;
+ @Mock
+ private KinesisRecord a, b, c, d;
+
+ private KinesisReader reader;
+
+ @Before
+ public void setUp() throws IOException, TransientKinesisException {
+ when(generator.generate(kinesis)).thenReturn(new KinesisReaderCheckpoint(
+ asList(firstCheckpoint, secondCheckpoint)
+ ));
+ when(firstCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(firstIterator);
+ when(secondCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(secondIterator);
+ when(firstIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
+ when(secondIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
+
+ reader = new KinesisReader(kinesis, generator, null);
+ }
+
+ @Test
+ public void startReturnsFalseIfNoDataAtTheBeginning() throws IOException {
+ assertThat(reader.start()).isFalse();
+ }
+
+ @Test(expected = NoSuchElementException.class)
+ public void throwsNoSuchElementExceptionIfNoData() throws IOException {
+ reader.start();
+ reader.getCurrent();
+ }
+
+ @Test
+ public void startReturnsTrueIfSomeDataAvailable() throws IOException,
+ TransientKinesisException {
+ when(firstIterator.next()).
+ thenReturn(CustomOptional.of(a)).
+ thenReturn(CustomOptional.<KinesisRecord>absent());
+
+ assertThat(reader.start()).isTrue();
+ }
+
+ @Test
+ public void advanceReturnsFalseIfThereIsTransientExceptionInKinesis()
+ throws IOException, TransientKinesisException {
+ reader.start();
+
+ when(firstIterator.next()).thenThrow(TransientKinesisException.class);
+
+ assertThat(reader.advance()).isFalse();
+ }
+
+ @Test
+ public void readsThroughAllDataAvailable() throws IOException, TransientKinesisException {
+ when(firstIterator.next()).
+ thenReturn(CustomOptional.<KinesisRecord>absent()).
+ thenReturn(CustomOptional.of(a)).
+ thenReturn(CustomOptional.<KinesisRecord>absent()).
+ thenReturn(CustomOptional.of(b)).
+ thenReturn(CustomOptional.<KinesisRecord>absent());
+
+ when(secondIterator.next()).
+ thenReturn(CustomOptional.of(c)).
+ thenReturn(CustomOptional.<KinesisRecord>absent()).
+ thenReturn(CustomOptional.of(d)).
+ thenReturn(CustomOptional.<KinesisRecord>absent());
+
+ assertThat(reader.start()).isTrue();
+ assertThat(reader.getCurrent()).isEqualTo(c);
+ assertThat(reader.advance()).isTrue();
+ assertThat(reader.getCurrent()).isEqualTo(a);
+ assertThat(reader.advance()).isTrue();
+ assertThat(reader.getCurrent()).isEqualTo(d);
+ assertThat(reader.advance()).isTrue();
+ assertThat(reader.getCurrent()).isEqualTo(b);
+ assertThat(reader.advance()).isFalse();
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
index c9f01bb..8771c86 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoderTest.java
@@ -18,7 +18,6 @@
package org.apache.beam.sdk.io.kinesis;
import java.nio.ByteBuffer;
-
import org.apache.beam.sdk.testing.CoderProperties;
import org.joda.time.Instant;
import org.junit.Test;
@@ -27,21 +26,20 @@ import org.junit.Test;
* Tests {@link KinesisRecordCoder}.
*/
public class KinesisRecordCoderTest {
-
- @Test
- public void encodingAndDecodingWorks() throws Exception {
- KinesisRecord record = new KinesisRecord(
- ByteBuffer.wrap("data".getBytes()),
- "sequence",
- 128L,
- "partition",
- Instant.now(),
- Instant.now(),
- "stream",
- "shard"
- );
- CoderProperties.coderDecodeEncodeEqual(
- new KinesisRecordCoder(), record
- );
- }
+ @Test
+ public void encodingAndDecodingWorks() throws Exception {
+ KinesisRecord record = new KinesisRecord(
+ ByteBuffer.wrap("data".getBytes()),
+ "sequence",
+ 128L,
+ "partition",
+ Instant.now(),
+ Instant.now(),
+ "stream",
+ "shard"
+ );
+ CoderProperties.coderDecodeEncodeEqual(
+ new KinesisRecordCoder(), record
+ );
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
index 76bcb27..324de46 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisTestOptions.java
@@ -25,28 +25,23 @@ import org.apache.beam.sdk.testing.TestPipelineOptions;
* Options for Kinesis integration tests.
*/
public interface KinesisTestOptions extends TestPipelineOptions {
-
- @Description("AWS region where Kinesis stream resided")
- @Default.String("aws-kinesis-region")
- String getAwsKinesisRegion();
-
- void setAwsKinesisRegion(String value);
-
- @Description("Kinesis stream name")
- @Default.String("aws-kinesis-stream")
- String getAwsKinesisStream();
-
- void setAwsKinesisStream(String value);
-
- @Description("AWS secret key")
- @Default.String("aws-secret-key")
- String getAwsSecretKey();
-
- void setAwsSecretKey(String value);
-
- @Description("AWS access key")
- @Default.String("aws-access-key")
- String getAwsAccessKey();
-
- void setAwsAccessKey(String value);
+ @Description("AWS region where Kinesis stream resided")
+ @Default.String("aws-kinesis-region")
+ String getAwsKinesisRegion();
+ void setAwsKinesisRegion(String value);
+
+ @Description("Kinesis stream name")
+ @Default.String("aws-kinesis-stream")
+ String getAwsKinesisStream();
+ void setAwsKinesisStream(String value);
+
+ @Description("AWS secret key")
+ @Default.String("aws-secret-key")
+ String getAwsSecretKey();
+ void setAwsSecretKey(String value);
+
+ @Description("AWS access key")
+ @Default.String("aws-access-key")
+ String getAwsAccessKey();
+ void setAwsAccessKey(String value);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
index 7a7cb02..7518ff7 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisUploader.java
@@ -29,7 +29,6 @@ import com.amazonaws.services.kinesis.model.PutRecordsResult;
import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
-
import java.nio.ByteBuffer;
import java.util.List;
@@ -38,46 +37,47 @@ import java.util.List;
*/
public class KinesisUploader {
- public static final int MAX_NUMBER_OF_RECORDS_IN_BATCH = 499;
+ public static final int MAX_NUMBER_OF_RECORDS_IN_BATCH = 499;
- public static void uploadAll(List<String> data, KinesisTestOptions options) {
- AmazonKinesisClient client = new AmazonKinesisClient(
- new StaticCredentialsProvider(
- new BasicAWSCredentials(
- options.getAwsAccessKey(), options.getAwsSecretKey()))
- ).withRegion(Regions.fromName(options.getAwsKinesisRegion()));
+ public static void uploadAll(List<String> data, KinesisTestOptions options) {
+ AmazonKinesisClient client = new AmazonKinesisClient(
+ new StaticCredentialsProvider(
+ new BasicAWSCredentials(
+ options.getAwsAccessKey(), options.getAwsSecretKey()))
+ ).withRegion(Regions.fromName(options.getAwsKinesisRegion()));
- List<List<String>> partitions = Lists.partition(data, MAX_NUMBER_OF_RECORDS_IN_BATCH);
+ List<List<String>> partitions = Lists.partition(data, MAX_NUMBER_OF_RECORDS_IN_BATCH);
- for (List<String> partition : partitions) {
- List<PutRecordsRequestEntry> allRecords = newArrayList();
- for (String row : partition) {
- allRecords.add(new PutRecordsRequestEntry().
- withData(ByteBuffer.wrap(row.getBytes(Charsets.UTF_8))).
- withPartitionKey(Integer.toString(row.hashCode()))
- );
- }
+ for (List<String> partition : partitions) {
+ List<PutRecordsRequestEntry> allRecords = newArrayList();
+ for (String row : partition) {
+ allRecords.add(new PutRecordsRequestEntry().
+ withData(ByteBuffer.wrap(row.getBytes(Charsets.UTF_8))).
+ withPartitionKey(Integer.toString(row.hashCode()))
- PutRecordsResult result;
- do {
- result = client.putRecords(
- new PutRecordsRequest().
- withStreamName(options.getAwsKinesisStream()).
- withRecords(allRecords));
- List<PutRecordsRequestEntry> failedRecords = newArrayList();
- int i = 0;
- for (PutRecordsResultEntry row : result.getRecords()) {
- if (row.getErrorCode() != null) {
- failedRecords.add(allRecords.get(i));
- }
- ++i;
- }
- allRecords = failedRecords;
- }
+ );
+ }
- while (result.getFailedRecordCount() > 0);
+ PutRecordsResult result;
+ do {
+ result = client.putRecords(
+ new PutRecordsRequest().
+ withStreamName(options.getAwsKinesisStream()).
+ withRecords(allRecords));
+ List<PutRecordsRequestEntry> failedRecords = newArrayList();
+ int i = 0;
+ for (PutRecordsResultEntry row : result.getRecords()) {
+ if (row.getErrorCode() != null) {
+ failedRecords.add(allRecords.get(i));
+ }
+ ++i;
+ }
+ allRecords = failedRecords;
+ }
+
+ while (result.getFailedRecordCount() > 0);
+ }
}
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
index cb32562..f979c01 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RecordFilterTest.java
@@ -20,49 +20,47 @@ package org.apache.beam.sdk.io.kinesis;
import static org.mockito.BDDMockito.given;
import com.google.common.collect.Lists;
-
import java.util.Collections;
import java.util.List;
-
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
+
/***
*/
@RunWith(MockitoJUnitRunner.class)
public class RecordFilterTest {
+ @Mock
+ private ShardCheckpoint checkpoint;
+ @Mock
+ private KinesisRecord record1, record2, record3, record4, record5;
- @Mock
- private ShardCheckpoint checkpoint;
- @Mock
- private KinesisRecord record1, record2, record3, record4, record5;
-
- @Test
- public void shouldFilterOutRecordsBeforeOrAtCheckpoint() {
- given(checkpoint.isBeforeOrAt(record1)).willReturn(false);
- given(checkpoint.isBeforeOrAt(record2)).willReturn(true);
- given(checkpoint.isBeforeOrAt(record3)).willReturn(true);
- given(checkpoint.isBeforeOrAt(record4)).willReturn(false);
- given(checkpoint.isBeforeOrAt(record5)).willReturn(true);
- List<KinesisRecord> records = Lists.newArrayList(record1, record2,
- record3, record4, record5);
- RecordFilter underTest = new RecordFilter();
+ @Test
+ public void shouldFilterOutRecordsBeforeOrAtCheckpoint() {
+ given(checkpoint.isBeforeOrAt(record1)).willReturn(false);
+ given(checkpoint.isBeforeOrAt(record2)).willReturn(true);
+ given(checkpoint.isBeforeOrAt(record3)).willReturn(true);
+ given(checkpoint.isBeforeOrAt(record4)).willReturn(false);
+ given(checkpoint.isBeforeOrAt(record5)).willReturn(true);
+ List<KinesisRecord> records = Lists.newArrayList(record1, record2,
+ record3, record4, record5);
+ RecordFilter underTest = new RecordFilter();
- List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint);
+ List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint);
- Assertions.assertThat(retainedRecords).containsOnly(record2, record3, record5);
- }
+ Assertions.assertThat(retainedRecords).containsOnly(record2, record3, record5);
+ }
- @Test
- public void shouldNotFailOnEmptyList() {
- List<KinesisRecord> records = Collections.emptyList();
- RecordFilter underTest = new RecordFilter();
+ @Test
+ public void shouldNotFailOnEmptyList() {
+ List<KinesisRecord> records = Collections.emptyList();
+ RecordFilter underTest = new RecordFilter();
- List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint);
+ List<KinesisRecord> retainedRecords = underTest.apply(records, checkpoint);
- Assertions.assertThat(retainedRecords).isEmpty();
- }
+ Assertions.assertThat(retainedRecords).isEmpty();
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
index e4abce4..f032eea 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
@@ -22,38 +22,36 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.util.Collections;
import java.util.List;
-
import org.junit.Test;
/**
* Tests {@link RoundRobin}.
*/
public class RoundRobinTest {
+ @Test(expected = IllegalArgumentException.class)
+ public void doesNotAllowCreationWithEmptyCollection() {
+ new RoundRobin<>(Collections.emptyList());
+ }
- @Test(expected = IllegalArgumentException.class)
- public void doesNotAllowCreationWithEmptyCollection() {
- new RoundRobin<>(Collections.emptyList());
- }
-
- @Test
- public void goesThroughElementsInCycle() {
- List<String> input = newArrayList("a", "b", "c");
+ @Test
+ public void goesThroughElementsInCycle() {
+ List<String> input = newArrayList("a", "b", "c");
- RoundRobin<String> roundRobin = new RoundRobin<>(newArrayList(input));
+ RoundRobin<String> roundRobin = new RoundRobin<>(newArrayList(input));
- input.addAll(input); // duplicate the input
- for (String element : input) {
- assertThat(roundRobin.getCurrent()).isEqualTo(element);
- assertThat(roundRobin.getCurrent()).isEqualTo(element);
- roundRobin.moveForward();
+ input.addAll(input); // duplicate the input
+ for (String element : input) {
+ assertThat(roundRobin.getCurrent()).isEqualTo(element);
+ assertThat(roundRobin.getCurrent()).isEqualTo(element);
+ roundRobin.moveForward();
+ }
}
- }
- @Test
- public void usualIteratorGoesThroughElementsOnce() {
- List<String> input = newArrayList("a", "b", "c");
+ @Test
+ public void usualIteratorGoesThroughElementsOnce() {
+ List<String> input = newArrayList("a", "b", "c");
- RoundRobin<String> roundRobin = new RoundRobin<>(input);
- assertThat(roundRobin).hasSize(3).containsOnly(input.toArray(new String[0]));
- }
+ RoundRobin<String> roundRobin = new RoundRobin<>(input);
+ assertThat(roundRobin).hasSize(3).containsOnly(input.toArray(new String[0]));
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
index d4784c4..39ab36f 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardCheckpointTest.java
@@ -32,9 +32,7 @@ import static org.mockito.Mockito.when;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
-
import java.io.IOException;
-
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.junit.Before;
@@ -48,105 +46,104 @@ import org.mockito.runners.MockitoJUnitRunner;
*/
@RunWith(MockitoJUnitRunner.class)
public class ShardCheckpointTest {
-
- private static final String AT_SEQUENCE_SHARD_IT = "AT_SEQUENCE_SHARD_IT";
- private static final String AFTER_SEQUENCE_SHARD_IT = "AFTER_SEQUENCE_SHARD_IT";
- private static final String STREAM_NAME = "STREAM";
- private static final String SHARD_ID = "SHARD_ID";
- @Mock
- private SimplifiedKinesisClient client;
-
- @Before
- public void setUp() throws IOException, TransientKinesisException {
- when(client.getShardIterator(
- eq(STREAM_NAME), eq(SHARD_ID), eq(AT_SEQUENCE_NUMBER),
- anyString(), isNull(Instant.class))).
- thenReturn(AT_SEQUENCE_SHARD_IT);
- when(client.getShardIterator(
- eq(STREAM_NAME), eq(SHARD_ID), eq(AFTER_SEQUENCE_NUMBER),
- anyString(), isNull(Instant.class))).
- thenReturn(AFTER_SEQUENCE_SHARD_IT);
- }
-
- @Test
- public void testProvidingShardIterator() throws IOException, TransientKinesisException {
- assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", null).getShardIterator(client))
- .isEqualTo(AT_SEQUENCE_SHARD_IT);
- assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", null).getShardIterator(client))
- .isEqualTo(AFTER_SEQUENCE_SHARD_IT);
- assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client)).isEqualTo
- (AT_SEQUENCE_SHARD_IT);
- assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client))
- .isEqualTo(AT_SEQUENCE_SHARD_IT);
- }
-
- @Test
- public void testComparisonWithExtendedSequenceNumber() {
- assertThat(new ShardCheckpoint("", "", new StartingPoint(LATEST)).isBeforeOrAt(
- recordWith(new ExtendedSequenceNumber("100", 0L))
- )).isTrue();
-
- assertThat(new ShardCheckpoint("", "", new StartingPoint(TRIM_HORIZON)).isBeforeOrAt(
- recordWith(new ExtendedSequenceNumber("100", 0L))
- )).isTrue();
-
- assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "10", 1L).isBeforeOrAt(
- recordWith(new ExtendedSequenceNumber("100", 0L))
- )).isTrue();
-
- assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
- recordWith(new ExtendedSequenceNumber("100", 0L))
- )).isTrue();
-
- assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
- recordWith(new ExtendedSequenceNumber("100", 0L))
- )).isFalse();
-
- assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 1L).isBeforeOrAt(
- recordWith(new ExtendedSequenceNumber("100", 0L))
- )).isFalse();
-
- assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
- recordWith(new ExtendedSequenceNumber("99", 1L))
- )).isFalse();
- }
-
- @Test
- public void testComparisonWithTimestamp() {
- DateTime referenceTimestamp = DateTime.now();
-
- assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
- .isBeforeOrAt(recordWith(referenceTimestamp.minusMillis(10).toInstant()))
- ).isFalse();
-
- assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
- .isBeforeOrAt(recordWith(referenceTimestamp.toInstant()))
- ).isTrue();
-
- assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
- .isBeforeOrAt(recordWith(referenceTimestamp.plusMillis(10).toInstant()))
- ).isTrue();
- }
-
- private KinesisRecord recordWith(ExtendedSequenceNumber extendedSequenceNumber) {
- KinesisRecord record = mock(KinesisRecord.class);
- given(record.getExtendedSequenceNumber()).willReturn(extendedSequenceNumber);
- return record;
- }
-
- private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, String sequenceNumber,
- Long subSequenceNumber) {
- return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, sequenceNumber,
- subSequenceNumber);
- }
-
- private KinesisRecord recordWith(Instant approximateArrivalTimestamp) {
- KinesisRecord record = mock(KinesisRecord.class);
- given(record.getApproximateArrivalTimestamp()).willReturn(approximateArrivalTimestamp);
- return record;
- }
-
- private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, Instant timestamp) {
- return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, timestamp);
- }
+ private static final String AT_SEQUENCE_SHARD_IT = "AT_SEQUENCE_SHARD_IT";
+ private static final String AFTER_SEQUENCE_SHARD_IT = "AFTER_SEQUENCE_SHARD_IT";
+ private static final String STREAM_NAME = "STREAM";
+ private static final String SHARD_ID = "SHARD_ID";
+ @Mock
+ private SimplifiedKinesisClient client;
+
+ @Before
+ public void setUp() throws IOException, TransientKinesisException {
+ when(client.getShardIterator(
+ eq(STREAM_NAME), eq(SHARD_ID), eq(AT_SEQUENCE_NUMBER),
+ anyString(), isNull(Instant.class))).
+ thenReturn(AT_SEQUENCE_SHARD_IT);
+ when(client.getShardIterator(
+ eq(STREAM_NAME), eq(SHARD_ID), eq(AFTER_SEQUENCE_NUMBER),
+ anyString(), isNull(Instant.class))).
+ thenReturn(AFTER_SEQUENCE_SHARD_IT);
+ }
+
+ @Test
+ public void testProvidingShardIterator() throws IOException, TransientKinesisException {
+ assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", null).getShardIterator(client))
+ .isEqualTo(AT_SEQUENCE_SHARD_IT);
+ assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", null).getShardIterator(client))
+ .isEqualTo(AFTER_SEQUENCE_SHARD_IT);
+ assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client)).isEqualTo
+ (AT_SEQUENCE_SHARD_IT);
+ assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 10L).getShardIterator(client))
+ .isEqualTo(AT_SEQUENCE_SHARD_IT);
+ }
+
+ @Test
+ public void testComparisonWithExtendedSequenceNumber() {
+ assertThat(new ShardCheckpoint("", "", new StartingPoint(LATEST)).isBeforeOrAt(
+ recordWith(new ExtendedSequenceNumber("100", 0L))
+ )).isTrue();
+
+ assertThat(new ShardCheckpoint("", "", new StartingPoint(TRIM_HORIZON)).isBeforeOrAt(
+ recordWith(new ExtendedSequenceNumber("100", 0L))
+ )).isTrue();
+
+ assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "10", 1L).isBeforeOrAt(
+ recordWith(new ExtendedSequenceNumber("100", 0L))
+ )).isTrue();
+
+ assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
+ recordWith(new ExtendedSequenceNumber("100", 0L))
+ )).isTrue();
+
+ assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
+ recordWith(new ExtendedSequenceNumber("100", 0L))
+ )).isFalse();
+
+ assertThat(checkpoint(AT_SEQUENCE_NUMBER, "100", 1L).isBeforeOrAt(
+ recordWith(new ExtendedSequenceNumber("100", 0L))
+ )).isFalse();
+
+ assertThat(checkpoint(AFTER_SEQUENCE_NUMBER, "100", 0L).isBeforeOrAt(
+ recordWith(new ExtendedSequenceNumber("99", 1L))
+ )).isFalse();
+ }
+
+ @Test
+ public void testComparisonWithTimestamp() {
+ DateTime referenceTimestamp = DateTime.now();
+
+ assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
+ .isBeforeOrAt(recordWith(referenceTimestamp.minusMillis(10).toInstant()))
+ ).isFalse();
+
+ assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
+ .isBeforeOrAt(recordWith(referenceTimestamp.toInstant()))
+ ).isTrue();
+
+ assertThat(checkpoint(AT_TIMESTAMP, referenceTimestamp.toInstant())
+ .isBeforeOrAt(recordWith(referenceTimestamp.plusMillis(10).toInstant()))
+ ).isTrue();
+ }
+
+ private KinesisRecord recordWith(ExtendedSequenceNumber extendedSequenceNumber) {
+ KinesisRecord record = mock(KinesisRecord.class);
+ given(record.getExtendedSequenceNumber()).willReturn(extendedSequenceNumber);
+ return record;
+ }
+
+ private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, String sequenceNumber,
+ Long subSequenceNumber) {
+ return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, sequenceNumber,
+ subSequenceNumber);
+ }
+
+ private KinesisRecord recordWith(Instant approximateArrivalTimestamp) {
+ KinesisRecord record = mock(KinesisRecord.class);
+ given(record.getApproximateArrivalTimestamp()).willReturn(approximateArrivalTimestamp);
+ return record;
+ }
+
+ private ShardCheckpoint checkpoint(ShardIteratorType iteratorType, Instant timestamp) {
+ return new ShardCheckpoint(STREAM_NAME, SHARD_ID, iteratorType, timestamp);
+ }
}