You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ta...@apache.org on 2017/07/13 03:07:01 UTC
[49/50] [abbrv] beam git commit: Reformatting Kinesis IO to comply
with official code style
Reformatting Kinesis IO to comply with official code style
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7925a668
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7925a668
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7925a668
Branch: refs/heads/DSL_SQL
Commit: 7925a668b12e272c7b2631ff6b20376e92ad90be
Parents: 4abd714
Author: Pawel Kaczmarczyk <p....@ocado.com>
Authored: Mon Jun 19 11:10:25 2017 +0200
Committer: Tyler Akidau <ta...@apache.org>
Committed: Wed Jul 12 20:01:02 2017 -0700
----------------------------------------------------------------------
.../sdk/io/kinesis/CheckpointGenerator.java | 6 +-
.../beam/sdk/io/kinesis/CustomOptional.java | 111 ++--
.../io/kinesis/DynamicCheckpointGenerator.java | 52 +-
.../sdk/io/kinesis/GetKinesisRecordsResult.java | 49 +-
.../sdk/io/kinesis/KinesisClientProvider.java | 4 +-
.../apache/beam/sdk/io/kinesis/KinesisIO.java | 279 +++++-----
.../beam/sdk/io/kinesis/KinesisReader.java | 206 +++----
.../sdk/io/kinesis/KinesisReaderCheckpoint.java | 97 ++--
.../beam/sdk/io/kinesis/KinesisRecord.java | 177 +++---
.../beam/sdk/io/kinesis/KinesisRecordCoder.java | 68 +--
.../beam/sdk/io/kinesis/KinesisSource.java | 147 ++---
.../beam/sdk/io/kinesis/RecordFilter.java | 18 +-
.../apache/beam/sdk/io/kinesis/RoundRobin.java | 37 +-
.../beam/sdk/io/kinesis/ShardCheckpoint.java | 241 ++++-----
.../sdk/io/kinesis/ShardRecordsIterator.java | 106 ++--
.../sdk/io/kinesis/SimplifiedKinesisClient.java | 215 ++++----
.../beam/sdk/io/kinesis/StartingPoint.java | 84 +--
.../io/kinesis/StaticCheckpointGenerator.java | 27 +-
.../io/kinesis/TransientKinesisException.java | 7 +-
.../beam/sdk/io/kinesis/AmazonKinesisMock.java | 539 ++++++++++---------
.../beam/sdk/io/kinesis/CustomOptionalTest.java | 27 +-
.../kinesis/DynamicCheckpointGeneratorTest.java | 33 +-
.../sdk/io/kinesis/KinesisMockReadTest.java | 97 ++--
.../io/kinesis/KinesisReaderCheckpointTest.java | 52 +-
.../beam/sdk/io/kinesis/KinesisReaderIT.java | 127 ++---
.../beam/sdk/io/kinesis/KinesisReaderTest.java | 166 +++---
.../sdk/io/kinesis/KinesisRecordCoderTest.java | 34 +-
.../beam/sdk/io/kinesis/KinesisTestOptions.java | 43 +-
.../beam/sdk/io/kinesis/KinesisUploader.java | 70 +--
.../beam/sdk/io/kinesis/RecordFilterTest.java | 52 +-
.../beam/sdk/io/kinesis/RoundRobinTest.java | 42 +-
.../sdk/io/kinesis/ShardCheckpointTest.java | 203 +++----
.../io/kinesis/ShardRecordsIteratorTest.java | 216 ++++----
.../io/kinesis/SimplifiedKinesisClientTest.java | 351 ++++++------
34 files changed, 2031 insertions(+), 1952 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
index 919d85a..2629c57 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CheckpointGenerator.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.io.kinesis;
-
import java.io.Serializable;
/**
@@ -25,6 +24,7 @@ import java.io.Serializable;
* How exactly the checkpoint is generated is up to implementing class.
*/
interface CheckpointGenerator extends Serializable {
- KinesisReaderCheckpoint generate(SimplifiedKinesisClient client)
- throws TransientKinesisException;
+
+ KinesisReaderCheckpoint generate(SimplifiedKinesisClient client)
+ throws TransientKinesisException;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
index 4bed0e3..5a28214 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/CustomOptional.java
@@ -24,76 +24,79 @@ import java.util.Objects;
* Similar to Guava {@code Optional}, but throws {@link NoSuchElementException} for missing element.
*/
abstract class CustomOptional<T> {
- @SuppressWarnings("unchecked")
- public static <T> CustomOptional<T> absent() {
- return (Absent<T>) Absent.INSTANCE;
- }
- public static <T> CustomOptional<T> of(T v) {
- return new Present<>(v);
- }
+ @SuppressWarnings("unchecked")
+ public static <T> CustomOptional<T> absent() {
+ return (Absent<T>) Absent.INSTANCE;
+ }
- public abstract boolean isPresent();
+ public static <T> CustomOptional<T> of(T v) {
+ return new Present<>(v);
+ }
- public abstract T get();
+ public abstract boolean isPresent();
- private static class Present<T> extends CustomOptional<T> {
- private final T value;
+ public abstract T get();
- private Present(T value) {
- this.value = value;
- }
+ private static class Present<T> extends CustomOptional<T> {
- @Override
- public boolean isPresent() {
- return true;
- }
+ private final T value;
- @Override
- public T get() {
- return value;
- }
+ private Present(T value) {
+ this.value = value;
+ }
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof Present)) {
- return false;
- }
+ @Override
+ public boolean isPresent() {
+ return true;
+ }
- Present<?> present = (Present<?>) o;
- return Objects.equals(value, present.value);
- }
+ @Override
+ public T get() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof Present)) {
+ return false;
+ }
- @Override
- public int hashCode() {
- return Objects.hash(value);
- }
+ Present<?> present = (Present<?>) o;
+ return Objects.equals(value, present.value);
}
- private static class Absent<T> extends CustomOptional<T> {
- private static final Absent<Object> INSTANCE = new Absent<>();
+ @Override
+ public int hashCode() {
+ return Objects.hash(value);
+ }
+ }
- private Absent() {
- }
+ private static class Absent<T> extends CustomOptional<T> {
- @Override
- public boolean isPresent() {
- return false;
- }
+ private static final Absent<Object> INSTANCE = new Absent<>();
- @Override
- public T get() {
- throw new NoSuchElementException();
- }
+ private Absent() {
+ }
+
+ @Override
+ public boolean isPresent() {
+ return false;
+ }
- @Override
- public boolean equals(Object o) {
- return o instanceof Absent;
- }
+ @Override
+ public T get() {
+ throw new NoSuchElementException();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return o instanceof Absent;
+ }
- @Override
- public int hashCode() {
- return 0;
- }
+ @Override
+ public int hashCode() {
+ return 0;
}
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
index 2ec293c..9933019 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/DynamicCheckpointGenerator.java
@@ -28,29 +28,31 @@ import com.google.common.base.Function;
* List of shards is obtained dynamically on call to {@link #generate(SimplifiedKinesisClient)}.
*/
class DynamicCheckpointGenerator implements CheckpointGenerator {
- private final String streamName;
- private final StartingPoint startingPoint;
-
- public DynamicCheckpointGenerator(String streamName, StartingPoint startingPoint) {
- this.streamName = checkNotNull(streamName, "streamName");
- this.startingPoint = checkNotNull(startingPoint, "startingPoint");
- }
-
- @Override
- public KinesisReaderCheckpoint generate(SimplifiedKinesisClient kinesis)
- throws TransientKinesisException {
- return new KinesisReaderCheckpoint(
- transform(kinesis.listShards(streamName), new Function<Shard, ShardCheckpoint>() {
- @Override
- public ShardCheckpoint apply(Shard shard) {
- return new ShardCheckpoint(streamName, shard.getShardId(), startingPoint);
- }
- })
- );
- }
-
- @Override
- public String toString() {
- return String.format("Checkpoint generator for %s: %s", streamName, startingPoint);
- }
+
+ private final String streamName;
+ private final StartingPoint startingPoint;
+
+ public DynamicCheckpointGenerator(String streamName, StartingPoint startingPoint) {
+ this.streamName = checkNotNull(streamName, "streamName");
+ this.startingPoint = checkNotNull(startingPoint, "startingPoint");
+ }
+
+ @Override
+ public KinesisReaderCheckpoint generate(SimplifiedKinesisClient kinesis)
+ throws TransientKinesisException {
+ return new KinesisReaderCheckpoint(
+ transform(kinesis.listShards(streamName), new Function<Shard, ShardCheckpoint>() {
+
+ @Override
+ public ShardCheckpoint apply(Shard shard) {
+ return new ShardCheckpoint(streamName, shard.getShardId(), startingPoint);
+ }
+ })
+ );
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Checkpoint generator for %s: %s", streamName, startingPoint);
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
index 5a34d7d..f605f55 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/GetKinesisRecordsResult.java
@@ -21,6 +21,7 @@ import static com.google.common.collect.Lists.transform;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
import com.google.common.base.Function;
+
import java.util.List;
import javax.annotation.Nullable;
@@ -28,27 +29,29 @@ import javax.annotation.Nullable;
* Represents the output of 'get' operation on Kinesis stream.
*/
class GetKinesisRecordsResult {
- private final List<KinesisRecord> records;
- private final String nextShardIterator;
-
- public GetKinesisRecordsResult(List<UserRecord> records, String nextShardIterator,
- final String streamName, final String shardId) {
- this.records = transform(records, new Function<UserRecord, KinesisRecord>() {
- @Nullable
- @Override
- public KinesisRecord apply(@Nullable UserRecord input) {
- assert input != null; // to make FindBugs happy
- return new KinesisRecord(input, streamName, shardId);
- }
- });
- this.nextShardIterator = nextShardIterator;
- }
-
- public List<KinesisRecord> getRecords() {
- return records;
- }
-
- public String getNextShardIterator() {
- return nextShardIterator;
- }
+
+ private final List<KinesisRecord> records;
+ private final String nextShardIterator;
+
+ public GetKinesisRecordsResult(List<UserRecord> records, String nextShardIterator,
+ final String streamName, final String shardId) {
+ this.records = transform(records, new Function<UserRecord, KinesisRecord>() {
+
+ @Nullable
+ @Override
+ public KinesisRecord apply(@Nullable UserRecord input) {
+ assert input != null; // to make FindBugs happy
+ return new KinesisRecord(input, streamName, shardId);
+ }
+ });
+ this.nextShardIterator = nextShardIterator;
+ }
+
+ public List<KinesisRecord> getRecords() {
+ return records;
+ }
+
+ public String getNextShardIterator() {
+ return nextShardIterator;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
index c7fd7f6..b5b721e 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisClientProvider.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.kinesis;
import com.amazonaws.services.kinesis.AmazonKinesis;
+
import java.io.Serializable;
/**
@@ -27,5 +28,6 @@ import java.io.Serializable;
* {@link Serializable} to ensure it can be sent to worker machines.
*/
interface KinesisClientProvider extends Serializable {
- AmazonKinesis get();
+
+ AmazonKinesis get();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
index b85eb63..bc8ada1 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.io.kinesis;
-
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -29,7 +28,9 @@ import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.google.auto.value.AutoValue;
+
import javax.annotation.Nullable;
+
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
import org.apache.beam.sdk.transforms.PTransform;
@@ -102,142 +103,148 @@ import org.joda.time.Instant;
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
public final class KinesisIO {
- /** Returns a new {@link Read} transform for reading from Kinesis. */
- public static Read read() {
- return new AutoValue_KinesisIO_Read.Builder().setMaxNumRecords(-1).build();
+
+ /** Returns a new {@link Read} transform for reading from Kinesis. */
+ public static Read read() {
+ return new AutoValue_KinesisIO_Read.Builder().setMaxNumRecords(-1).build();
+ }
+
+ /** Implementation of {@link #read}. */
+ @AutoValue
+ public abstract static class Read extends PTransform<PBegin, PCollection<KinesisRecord>> {
+
+ @Nullable
+ abstract String getStreamName();
+
+ @Nullable
+ abstract StartingPoint getInitialPosition();
+
+ @Nullable
+ abstract KinesisClientProvider getClientProvider();
+
+ abstract int getMaxNumRecords();
+
+ @Nullable
+ abstract Duration getMaxReadTime();
+
+ abstract Builder toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+
+ abstract Builder setStreamName(String streamName);
+
+ abstract Builder setInitialPosition(StartingPoint startingPoint);
+
+ abstract Builder setClientProvider(KinesisClientProvider clientProvider);
+
+ abstract Builder setMaxNumRecords(int maxNumRecords);
+
+ abstract Builder setMaxReadTime(Duration maxReadTime);
+
+ abstract Read build();
}
- /** Implementation of {@link #read}. */
- @AutoValue
- public abstract static class Read extends PTransform<PBegin, PCollection<KinesisRecord>> {
- @Nullable
- abstract String getStreamName();
-
- @Nullable
- abstract StartingPoint getInitialPosition();
-
- @Nullable
- abstract KinesisClientProvider getClientProvider();
-
- abstract int getMaxNumRecords();
-
- @Nullable
- abstract Duration getMaxReadTime();
-
- abstract Builder toBuilder();
-
- @AutoValue.Builder
- abstract static class Builder {
- abstract Builder setStreamName(String streamName);
- abstract Builder setInitialPosition(StartingPoint startingPoint);
- abstract Builder setClientProvider(KinesisClientProvider clientProvider);
- abstract Builder setMaxNumRecords(int maxNumRecords);
- abstract Builder setMaxReadTime(Duration maxReadTime);
-
- abstract Read build();
- }
-
- /**
- * Specify reading from streamName at some initial position.
- */
- public Read from(String streamName, InitialPositionInStream initialPosition) {
- return toBuilder()
- .setStreamName(streamName)
- .setInitialPosition(
- new StartingPoint(checkNotNull(initialPosition, "initialPosition")))
- .build();
- }
-
- /**
- * Specify reading from streamName beginning at given {@link Instant}.
- * This {@link Instant} must be in the past, i.e. before {@link Instant#now()}.
- */
- public Read from(String streamName, Instant initialTimestamp) {
- return toBuilder()
- .setStreamName(streamName)
- .setInitialPosition(
- new StartingPoint(checkNotNull(initialTimestamp, "initialTimestamp")))
- .build();
- }
-
- /**
- * Allows to specify custom {@link KinesisClientProvider}.
- * {@link KinesisClientProvider} provides {@link AmazonKinesis} instances which are later
- * used for communication with Kinesis.
- * You should use this method if {@link Read#withClientProvider(String, String, Regions)}
- * does not suit your needs.
- */
- public Read withClientProvider(KinesisClientProvider kinesisClientProvider) {
- return toBuilder().setClientProvider(kinesisClientProvider).build();
- }
-
- /**
- * Specify credential details and region to be used to read from Kinesis.
- * If you need more sophisticated credential protocol, then you should look at
- * {@link Read#withClientProvider(KinesisClientProvider)}.
- */
- public Read withClientProvider(String awsAccessKey, String awsSecretKey, Regions region) {
- return withClientProvider(new BasicKinesisProvider(awsAccessKey, awsSecretKey, region));
- }
-
- /** Specifies to read at most a given number of records. */
- public Read withMaxNumRecords(int maxNumRecords) {
- checkArgument(
- maxNumRecords > 0, "maxNumRecords must be positive, but was: %s", maxNumRecords);
- return toBuilder().setMaxNumRecords(maxNumRecords).build();
- }
-
- /** Specifies to read at most a given number of records. */
- public Read withMaxReadTime(Duration maxReadTime) {
- checkNotNull(maxReadTime, "maxReadTime");
- return toBuilder().setMaxReadTime(maxReadTime).build();
- }
-
- @Override
- public PCollection<KinesisRecord> expand(PBegin input) {
- org.apache.beam.sdk.io.Read.Unbounded<KinesisRecord> read =
- org.apache.beam.sdk.io.Read.from(
- new KinesisSource(getClientProvider(), getStreamName(), getInitialPosition()));
- if (getMaxNumRecords() > 0) {
- BoundedReadFromUnboundedSource<KinesisRecord> bounded =
- read.withMaxNumRecords(getMaxNumRecords());
- return getMaxReadTime() == null
- ? input.apply(bounded)
- : input.apply(bounded.withMaxReadTime(getMaxReadTime()));
- } else {
- return getMaxReadTime() == null
- ? input.apply(read)
- : input.apply(read.withMaxReadTime(getMaxReadTime()));
- }
- }
-
- private static final class BasicKinesisProvider implements KinesisClientProvider {
-
- private final String accessKey;
- private final String secretKey;
- private final Regions region;
-
- private BasicKinesisProvider(String accessKey, String secretKey, Regions region) {
- this.accessKey = checkNotNull(accessKey, "accessKey");
- this.secretKey = checkNotNull(secretKey, "secretKey");
- this.region = checkNotNull(region, "region");
- }
-
-
- private AWSCredentialsProvider getCredentialsProvider() {
- return new StaticCredentialsProvider(new BasicAWSCredentials(
- accessKey,
- secretKey
- ));
-
- }
-
- @Override
- public AmazonKinesis get() {
- AmazonKinesisClient client = new AmazonKinesisClient(getCredentialsProvider());
- client.withRegion(region);
- return client;
- }
- }
+ /**
+ * Specify reading from streamName at some initial position.
+ */
+ public Read from(String streamName, InitialPositionInStream initialPosition) {
+ return toBuilder()
+ .setStreamName(streamName)
+ .setInitialPosition(
+ new StartingPoint(checkNotNull(initialPosition, "initialPosition")))
+ .build();
+ }
+
+ /**
+ * Specify reading from streamName beginning at given {@link Instant}.
+ * This {@link Instant} must be in the past, i.e. before {@link Instant#now()}.
+ */
+ public Read from(String streamName, Instant initialTimestamp) {
+ return toBuilder()
+ .setStreamName(streamName)
+ .setInitialPosition(
+ new StartingPoint(checkNotNull(initialTimestamp, "initialTimestamp")))
+ .build();
+ }
+
+ /**
+ * Allows to specify custom {@link KinesisClientProvider}.
+ * {@link KinesisClientProvider} provides {@link AmazonKinesis} instances which are later
+ * used for communication with Kinesis.
+ * You should use this method if {@link Read#withClientProvider(String, String, Regions)}
+ * does not suit your needs.
+ */
+ public Read withClientProvider(KinesisClientProvider kinesisClientProvider) {
+ return toBuilder().setClientProvider(kinesisClientProvider).build();
+ }
+
+ /**
+ * Specify credential details and region to be used to read from Kinesis.
+ * If you need more sophisticated credential protocol, then you should look at
+ * {@link Read#withClientProvider(KinesisClientProvider)}.
+ */
+ public Read withClientProvider(String awsAccessKey, String awsSecretKey, Regions region) {
+ return withClientProvider(new BasicKinesisProvider(awsAccessKey, awsSecretKey, region));
+ }
+
+ /** Specifies to read at most a given number of records. */
+ public Read withMaxNumRecords(int maxNumRecords) {
+ checkArgument(
+ maxNumRecords > 0, "maxNumRecords must be positive, but was: %s", maxNumRecords);
+ return toBuilder().setMaxNumRecords(maxNumRecords).build();
+ }
+
+ /** Specifies to read at most a given number of records. */
+ public Read withMaxReadTime(Duration maxReadTime) {
+ checkNotNull(maxReadTime, "maxReadTime");
+ return toBuilder().setMaxReadTime(maxReadTime).build();
+ }
+
+ @Override
+ public PCollection<KinesisRecord> expand(PBegin input) {
+ org.apache.beam.sdk.io.Read.Unbounded<KinesisRecord> read =
+ org.apache.beam.sdk.io.Read.from(
+ new KinesisSource(getClientProvider(), getStreamName(), getInitialPosition()));
+ if (getMaxNumRecords() > 0) {
+ BoundedReadFromUnboundedSource<KinesisRecord> bounded =
+ read.withMaxNumRecords(getMaxNumRecords());
+ return getMaxReadTime() == null
+ ? input.apply(bounded)
+ : input.apply(bounded.withMaxReadTime(getMaxReadTime()));
+ } else {
+ return getMaxReadTime() == null
+ ? input.apply(read)
+ : input.apply(read.withMaxReadTime(getMaxReadTime()));
+ }
+ }
+
+ private static final class BasicKinesisProvider implements KinesisClientProvider {
+
+ private final String accessKey;
+ private final String secretKey;
+ private final Regions region;
+
+ private BasicKinesisProvider(String accessKey, String secretKey, Regions region) {
+ this.accessKey = checkNotNull(accessKey, "accessKey");
+ this.secretKey = checkNotNull(secretKey, "secretKey");
+ this.region = checkNotNull(region, "region");
+ }
+
+ private AWSCredentialsProvider getCredentialsProvider() {
+ return new StaticCredentialsProvider(new BasicAWSCredentials(
+ accessKey,
+ secretKey
+ ));
+
+ }
+
+ @Override
+ public AmazonKinesis get() {
+ AmazonKinesisClient client = new AmazonKinesisClient(getCredentialsProvider());
+ client.withRegion(region);
+ return client;
+ }
}
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
index 2138094..e5c32d2 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
@@ -17,129 +17,129 @@
*/
package org.apache.beam.sdk.io.kinesis;
-
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Lists.newArrayList;
import java.io.IOException;
import java.util.List;
import java.util.NoSuchElementException;
+
import org.apache.beam.sdk.io.UnboundedSource;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* Reads data from multiple kinesis shards in a single thread.
* It uses simple round robin algorithm when fetching data from shards.
*/
class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
- private static final Logger LOG = LoggerFactory.getLogger(KinesisReader.class);
-
- private final SimplifiedKinesisClient kinesis;
- private final UnboundedSource<KinesisRecord, ?> source;
- private final CheckpointGenerator initialCheckpointGenerator;
- private RoundRobin<ShardRecordsIterator> shardIterators;
- private CustomOptional<KinesisRecord> currentRecord = CustomOptional.absent();
-
- public KinesisReader(SimplifiedKinesisClient kinesis,
- CheckpointGenerator initialCheckpointGenerator,
- UnboundedSource<KinesisRecord, ?> source) {
- this.kinesis = checkNotNull(kinesis, "kinesis");
- this.initialCheckpointGenerator =
- checkNotNull(initialCheckpointGenerator, "initialCheckpointGenerator");
- this.source = source;
- }
-
- /**
- * Generates initial checkpoint and instantiates iterators for shards.
- */
- @Override
- public boolean start() throws IOException {
- LOG.info("Starting reader using {}", initialCheckpointGenerator);
-
- try {
- KinesisReaderCheckpoint initialCheckpoint =
- initialCheckpointGenerator.generate(kinesis);
- List<ShardRecordsIterator> iterators = newArrayList();
- for (ShardCheckpoint checkpoint : initialCheckpoint) {
- iterators.add(checkpoint.getShardRecordsIterator(kinesis));
- }
- shardIterators = new RoundRobin<>(iterators);
- } catch (TransientKinesisException e) {
- throw new IOException(e);
- }
- return advance();
+ private static final Logger LOG = LoggerFactory.getLogger(KinesisReader.class);
+
+ private final SimplifiedKinesisClient kinesis;
+ private final UnboundedSource<KinesisRecord, ?> source;
+ private final CheckpointGenerator initialCheckpointGenerator;
+ private RoundRobin<ShardRecordsIterator> shardIterators;
+ private CustomOptional<KinesisRecord> currentRecord = CustomOptional.absent();
+
+ public KinesisReader(SimplifiedKinesisClient kinesis,
+ CheckpointGenerator initialCheckpointGenerator,
+ UnboundedSource<KinesisRecord, ?> source) {
+ this.kinesis = checkNotNull(kinesis, "kinesis");
+ this.initialCheckpointGenerator =
+ checkNotNull(initialCheckpointGenerator, "initialCheckpointGenerator");
+ this.source = source;
+ }
+
+ /**
+ * Generates initial checkpoint and instantiates iterators for shards.
+ */
+ @Override
+ public boolean start() throws IOException {
+ LOG.info("Starting reader using {}", initialCheckpointGenerator);
+
+ try {
+ KinesisReaderCheckpoint initialCheckpoint =
+ initialCheckpointGenerator.generate(kinesis);
+ List<ShardRecordsIterator> iterators = newArrayList();
+ for (ShardCheckpoint checkpoint : initialCheckpoint) {
+ iterators.add(checkpoint.getShardRecordsIterator(kinesis));
+ }
+ shardIterators = new RoundRobin<>(iterators);
+ } catch (TransientKinesisException e) {
+ throw new IOException(e);
}
- /**
- * Moves to the next record in one of the shards.
- * If current shard iterator can be move forward (i.e. there's a record present) then we do it.
- * If not, we iterate over shards in a round-robin manner.
- */
- @Override
- public boolean advance() throws IOException {
- try {
- for (int i = 0; i < shardIterators.size(); ++i) {
- currentRecord = shardIterators.getCurrent().next();
- if (currentRecord.isPresent()) {
- return true;
- } else {
- shardIterators.moveForward();
- }
- }
- } catch (TransientKinesisException e) {
- LOG.warn("Transient exception occurred", e);
+ return advance();
+ }
+
+ /**
+ * Moves to the next record in one of the shards.
+ * If current shard iterator can be move forward (i.e. there's a record present) then we do it.
+ * If not, we iterate over shards in a round-robin manner.
+ */
+ @Override
+ public boolean advance() throws IOException {
+ try {
+ for (int i = 0; i < shardIterators.size(); ++i) {
+ currentRecord = shardIterators.getCurrent().next();
+ if (currentRecord.isPresent()) {
+ return true;
+ } else {
+ shardIterators.moveForward();
}
- return false;
- }
-
- @Override
- public byte[] getCurrentRecordId() throws NoSuchElementException {
- return currentRecord.get().getUniqueId();
- }
-
- @Override
- public KinesisRecord getCurrent() throws NoSuchElementException {
- return currentRecord.get();
- }
-
- /**
- * When {@link KinesisReader} was advanced to the current record.
- * We cannot use approximate arrival timestamp given for each record by Kinesis as it
- * is not guaranteed to be accurate - this could lead to mark some records as "late"
- * even if they were not.
- */
- @Override
- public Instant getCurrentTimestamp() throws NoSuchElementException {
- return currentRecord.get().getReadTime();
- }
-
- @Override
- public void close() throws IOException {
- }
-
- /**
- * Current time.
- * We cannot give better approximation of the watermark with current semantics of
- * {@link KinesisReader#getCurrentTimestamp()}, because we don't know when the next
- * {@link KinesisReader#advance()} will be called.
- */
- @Override
- public Instant getWatermark() {
- return Instant.now();
- }
-
- @Override
- public UnboundedSource.CheckpointMark getCheckpointMark() {
- return KinesisReaderCheckpoint.asCurrentStateOf(shardIterators);
- }
-
- @Override
- public UnboundedSource<KinesisRecord, ?> getCurrentSource() {
- return source;
+ }
+ } catch (TransientKinesisException e) {
+ LOG.warn("Transient exception occurred", e);
}
+ return false;
+ }
+
+ @Override
+ public byte[] getCurrentRecordId() throws NoSuchElementException {
+ return currentRecord.get().getUniqueId();
+ }
+
+ @Override
+ public KinesisRecord getCurrent() throws NoSuchElementException {
+ return currentRecord.get();
+ }
+
+ /**
+ * When {@link KinesisReader} was advanced to the current record.
+ * We cannot use approximate arrival timestamp given for each record by Kinesis as it
+ * is not guaranteed to be accurate - this could lead to mark some records as "late"
+ * even if they were not.
+ */
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ return currentRecord.get().getReadTime();
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ /**
+ * Current time.
+ * We cannot give better approximation of the watermark with current semantics of
+ * {@link KinesisReader#getCurrentTimestamp()}, because we don't know when the next
+ * {@link KinesisReader#advance()} will be called.
+ */
+ @Override
+ public Instant getWatermark() {
+ return Instant.now();
+ }
+
+ @Override
+ public UnboundedSource.CheckpointMark getCheckpointMark() {
+ return KinesisReaderCheckpoint.asCurrentStateOf(shardIterators);
+ }
+
+ @Override
+ public UnboundedSource<KinesisRecord, ?> getCurrentSource() {
+ return source;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
index f0fa45d..d995e75 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
@@ -23,11 +23,13 @@ import static com.google.common.collect.Lists.partition;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
+
import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
+
import org.apache.beam.sdk.io.UnboundedSource;
/**
@@ -37,60 +39,61 @@ import org.apache.beam.sdk.io.UnboundedSource;
* This class is immutable.
*/
class KinesisReaderCheckpoint implements Iterable<ShardCheckpoint>, UnboundedSource
- .CheckpointMark, Serializable {
- private final List<ShardCheckpoint> shardCheckpoints;
+ .CheckpointMark, Serializable {
- public KinesisReaderCheckpoint(Iterable<ShardCheckpoint> shardCheckpoints) {
- this.shardCheckpoints = ImmutableList.copyOf(shardCheckpoints);
- }
+ private final List<ShardCheckpoint> shardCheckpoints;
- public static KinesisReaderCheckpoint asCurrentStateOf(Iterable<ShardRecordsIterator>
- iterators) {
- return new KinesisReaderCheckpoint(transform(iterators,
- new Function<ShardRecordsIterator, ShardCheckpoint>() {
-
- @Nullable
- @Override
- public ShardCheckpoint apply(@Nullable
- ShardRecordsIterator shardRecordsIterator) {
- assert shardRecordsIterator != null;
- return shardRecordsIterator.getCheckpoint();
- }
- }));
- }
+ public KinesisReaderCheckpoint(Iterable<ShardCheckpoint> shardCheckpoints) {
+ this.shardCheckpoints = ImmutableList.copyOf(shardCheckpoints);
+ }
- /**
- * Splits given multi-shard checkpoint into partitions of approximately equal size.
- *
- * @param desiredNumSplits - upper limit for number of partitions to generate.
- * @return list of checkpoints covering consecutive partitions of current checkpoint.
- */
- public List<KinesisReaderCheckpoint> splitInto(int desiredNumSplits) {
- int partitionSize = divideAndRoundUp(shardCheckpoints.size(), desiredNumSplits);
-
- List<KinesisReaderCheckpoint> checkpoints = newArrayList();
- for (List<ShardCheckpoint> shardPartition : partition(shardCheckpoints, partitionSize)) {
- checkpoints.add(new KinesisReaderCheckpoint(shardPartition));
- }
- return checkpoints;
- }
+ public static KinesisReaderCheckpoint asCurrentStateOf(Iterable<ShardRecordsIterator>
+ iterators) {
+ return new KinesisReaderCheckpoint(transform(iterators,
+ new Function<ShardRecordsIterator, ShardCheckpoint>() {
- private int divideAndRoundUp(int nominator, int denominator) {
- return (nominator + denominator - 1) / denominator;
- }
+ @Nullable
+ @Override
+ public ShardCheckpoint apply(@Nullable
+ ShardRecordsIterator shardRecordsIterator) {
+ assert shardRecordsIterator != null;
+ return shardRecordsIterator.getCheckpoint();
+ }
+ }));
+ }
- @Override
- public void finalizeCheckpoint() throws IOException {
+ /**
+ * Splits given multi-shard checkpoint into partitions of approximately equal size.
+ *
+ * @param desiredNumSplits - upper limit for number of partitions to generate.
+ * @return list of checkpoints covering consecutive partitions of current checkpoint.
+ */
+ public List<KinesisReaderCheckpoint> splitInto(int desiredNumSplits) {
+ int partitionSize = divideAndRoundUp(shardCheckpoints.size(), desiredNumSplits);
+ List<KinesisReaderCheckpoint> checkpoints = newArrayList();
+ for (List<ShardCheckpoint> shardPartition : partition(shardCheckpoints, partitionSize)) {
+ checkpoints.add(new KinesisReaderCheckpoint(shardPartition));
}
+ return checkpoints;
+ }
- @Override
- public String toString() {
- return shardCheckpoints.toString();
- }
+ private int divideAndRoundUp(int nominator, int denominator) {
+ return (nominator + denominator - 1) / denominator;
+ }
- @Override
- public Iterator<ShardCheckpoint> iterator() {
- return shardCheckpoints.iterator();
- }
+ @Override
+ public void finalizeCheckpoint() throws IOException {
+
+ }
+
+ @Override
+ public String toString() {
+ return shardCheckpoints.toString();
+ }
+
+ @Override
+ public Iterator<ShardCheckpoint> iterator() {
+ return shardCheckpoints.iterator();
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
index 02b5370..057b7bb 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecord.java
@@ -22,7 +22,9 @@ import static org.apache.commons.lang.builder.HashCodeBuilder.reflectionHashCode
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
import com.google.common.base.Charsets;
+
import java.nio.ByteBuffer;
+
import org.apache.commons.lang.builder.EqualsBuilder;
import org.joda.time.Instant;
@@ -30,91 +32,92 @@ import org.joda.time.Instant;
* {@link UserRecord} enhanced with utility methods.
*/
public class KinesisRecord {
- private Instant readTime;
- private String streamName;
- private String shardId;
- private long subSequenceNumber;
- private String sequenceNumber;
- private Instant approximateArrivalTimestamp;
- private ByteBuffer data;
- private String partitionKey;
-
- public KinesisRecord(UserRecord record, String streamName, String shardId) {
- this(record.getData(), record.getSequenceNumber(), record.getSubSequenceNumber(),
- record.getPartitionKey(),
- new Instant(record.getApproximateArrivalTimestamp()),
- Instant.now(),
- streamName, shardId);
- }
-
- public KinesisRecord(ByteBuffer data, String sequenceNumber, long subSequenceNumber,
- String partitionKey, Instant approximateArrivalTimestamp,
- Instant readTime,
- String streamName, String shardId) {
- this.data = data;
- this.sequenceNumber = sequenceNumber;
- this.subSequenceNumber = subSequenceNumber;
- this.partitionKey = partitionKey;
- this.approximateArrivalTimestamp = approximateArrivalTimestamp;
- this.readTime = readTime;
- this.streamName = streamName;
- this.shardId = shardId;
- }
-
- public ExtendedSequenceNumber getExtendedSequenceNumber() {
- return new ExtendedSequenceNumber(getSequenceNumber(), getSubSequenceNumber());
- }
-
- /***
- * @return unique id of the record based on its position in the stream
- */
- public byte[] getUniqueId() {
- return getExtendedSequenceNumber().toString().getBytes(Charsets.UTF_8);
- }
-
- public Instant getReadTime() {
- return readTime;
- }
-
- public String getStreamName() {
- return streamName;
- }
-
- public String getShardId() {
- return shardId;
- }
-
- public byte[] getDataAsBytes() {
- return getData().array();
- }
-
- @Override
- public boolean equals(Object obj) {
- return EqualsBuilder.reflectionEquals(this, obj);
- }
-
- @Override
- public int hashCode() {
- return reflectionHashCode(this);
- }
-
- public long getSubSequenceNumber() {
- return subSequenceNumber;
- }
-
- public String getSequenceNumber() {
- return sequenceNumber;
- }
-
- public Instant getApproximateArrivalTimestamp() {
- return approximateArrivalTimestamp;
- }
-
- public ByteBuffer getData() {
- return data;
- }
-
- public String getPartitionKey() {
- return partitionKey;
- }
+
+ private Instant readTime;
+ private String streamName;
+ private String shardId;
+ private long subSequenceNumber;
+ private String sequenceNumber;
+ private Instant approximateArrivalTimestamp;
+ private ByteBuffer data;
+ private String partitionKey;
+
+ public KinesisRecord(UserRecord record, String streamName, String shardId) {
+ this(record.getData(), record.getSequenceNumber(), record.getSubSequenceNumber(),
+ record.getPartitionKey(),
+ new Instant(record.getApproximateArrivalTimestamp()),
+ Instant.now(),
+ streamName, shardId);
+ }
+
+ public KinesisRecord(ByteBuffer data, String sequenceNumber, long subSequenceNumber,
+ String partitionKey, Instant approximateArrivalTimestamp,
+ Instant readTime,
+ String streamName, String shardId) {
+ this.data = data;
+ this.sequenceNumber = sequenceNumber;
+ this.subSequenceNumber = subSequenceNumber;
+ this.partitionKey = partitionKey;
+ this.approximateArrivalTimestamp = approximateArrivalTimestamp;
+ this.readTime = readTime;
+ this.streamName = streamName;
+ this.shardId = shardId;
+ }
+
+ public ExtendedSequenceNumber getExtendedSequenceNumber() {
+ return new ExtendedSequenceNumber(getSequenceNumber(), getSubSequenceNumber());
+ }
+
+ /***
+ * @return unique id of the record based on its position in the stream
+ */
+ public byte[] getUniqueId() {
+ return getExtendedSequenceNumber().toString().getBytes(Charsets.UTF_8);
+ }
+
+ public Instant getReadTime() {
+ return readTime;
+ }
+
+ public String getStreamName() {
+ return streamName;
+ }
+
+ public String getShardId() {
+ return shardId;
+ }
+
+ public byte[] getDataAsBytes() {
+ return getData().array();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return EqualsBuilder.reflectionEquals(this, obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return reflectionHashCode(this);
+ }
+
+ public long getSubSequenceNumber() {
+ return subSequenceNumber;
+ }
+
+ public String getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ public Instant getApproximateArrivalTimestamp() {
+ return approximateArrivalTimestamp;
+ }
+
+ public ByteBuffer getData() {
+ return data;
+ }
+
+ public String getPartitionKey() {
+ return partitionKey;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
index f233e27..dcf564d 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisRecordCoder.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
@@ -33,40 +34,41 @@ import org.joda.time.Instant;
* A {@link Coder} for {@link KinesisRecord}.
*/
class KinesisRecordCoder extends AtomicCoder<KinesisRecord> {
- private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
- private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
- private static final InstantCoder INSTANT_CODER = InstantCoder.of();
- private static final VarLongCoder VAR_LONG_CODER = VarLongCoder.of();
- public static KinesisRecordCoder of() {
- return new KinesisRecordCoder();
- }
+ private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
+ private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of();
+ private static final InstantCoder INSTANT_CODER = InstantCoder.of();
+ private static final VarLongCoder VAR_LONG_CODER = VarLongCoder.of();
+
+ public static KinesisRecordCoder of() {
+ return new KinesisRecordCoder();
+ }
- @Override
- public void encode(KinesisRecord value, OutputStream outStream) throws
- IOException {
- BYTE_ARRAY_CODER.encode(value.getData().array(), outStream);
- STRING_CODER.encode(value.getSequenceNumber(), outStream);
- STRING_CODER.encode(value.getPartitionKey(), outStream);
- INSTANT_CODER.encode(value.getApproximateArrivalTimestamp(), outStream);
- VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream);
- INSTANT_CODER.encode(value.getReadTime(), outStream);
- STRING_CODER.encode(value.getStreamName(), outStream);
- STRING_CODER.encode(value.getShardId(), outStream);
- }
+ @Override
+ public void encode(KinesisRecord value, OutputStream outStream) throws
+ IOException {
+ BYTE_ARRAY_CODER.encode(value.getData().array(), outStream);
+ STRING_CODER.encode(value.getSequenceNumber(), outStream);
+ STRING_CODER.encode(value.getPartitionKey(), outStream);
+ INSTANT_CODER.encode(value.getApproximateArrivalTimestamp(), outStream);
+ VAR_LONG_CODER.encode(value.getSubSequenceNumber(), outStream);
+ INSTANT_CODER.encode(value.getReadTime(), outStream);
+ STRING_CODER.encode(value.getStreamName(), outStream);
+ STRING_CODER.encode(value.getShardId(), outStream);
+ }
- @Override
- public KinesisRecord decode(InputStream inStream) throws IOException {
- ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream));
- String sequenceNumber = STRING_CODER.decode(inStream);
- String partitionKey = STRING_CODER.decode(inStream);
- Instant approximateArrivalTimestamp = INSTANT_CODER.decode(inStream);
- long subSequenceNumber = VAR_LONG_CODER.decode(inStream);
- Instant readTimestamp = INSTANT_CODER.decode(inStream);
- String streamName = STRING_CODER.decode(inStream);
- String shardId = STRING_CODER.decode(inStream);
- return new KinesisRecord(data, sequenceNumber, subSequenceNumber, partitionKey,
- approximateArrivalTimestamp, readTimestamp, streamName, shardId
- );
- }
+ @Override
+ public KinesisRecord decode(InputStream inStream) throws IOException {
+ ByteBuffer data = ByteBuffer.wrap(BYTE_ARRAY_CODER.decode(inStream));
+ String sequenceNumber = STRING_CODER.decode(inStream);
+ String partitionKey = STRING_CODER.decode(inStream);
+ Instant approximateArrivalTimestamp = INSTANT_CODER.decode(inStream);
+ long subSequenceNumber = VAR_LONG_CODER.decode(inStream);
+ Instant readTimestamp = INSTANT_CODER.decode(inStream);
+ String streamName = STRING_CODER.decode(inStream);
+ String shardId = STRING_CODER.decode(inStream);
+ return new KinesisRecord(data, sequenceNumber, subSequenceNumber, partitionKey,
+ approximateArrivalTimestamp, readTimestamp, streamName, shardId
+ );
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
index 7e67d07..362792b 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Lists.newArrayList;
import java.util.List;
+
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.UnboundedSource;
@@ -28,85 +29,85 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* Represents source for single stream in Kinesis.
*/
class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoint> {
- private static final Logger LOG = LoggerFactory.getLogger(KinesisSource.class);
-
- private final KinesisClientProvider kinesis;
- private CheckpointGenerator initialCheckpointGenerator;
- public KinesisSource(KinesisClientProvider kinesis, String streamName,
- StartingPoint startingPoint) {
- this(kinesis, new DynamicCheckpointGenerator(streamName, startingPoint));
+ private static final Logger LOG = LoggerFactory.getLogger(KinesisSource.class);
+
+ private final KinesisClientProvider kinesis;
+ private CheckpointGenerator initialCheckpointGenerator;
+
+ public KinesisSource(KinesisClientProvider kinesis, String streamName,
+ StartingPoint startingPoint) {
+ this(kinesis, new DynamicCheckpointGenerator(streamName, startingPoint));
+ }
+
+ private KinesisSource(KinesisClientProvider kinesisClientProvider,
+ CheckpointGenerator initialCheckpoint) {
+ this.kinesis = kinesisClientProvider;
+ this.initialCheckpointGenerator = initialCheckpoint;
+ validate();
+ }
+
+ /**
+ * Generate splits for reading from the stream.
+ * Basically, it'll try to evenly split set of shards in the stream into
+ * {@code desiredNumSplits} partitions. Each partition is then a split.
+ */
+ @Override
+ public List<KinesisSource> split(int desiredNumSplits,
+ PipelineOptions options) throws Exception {
+ KinesisReaderCheckpoint checkpoint =
+ initialCheckpointGenerator.generate(SimplifiedKinesisClient.from(kinesis));
+
+ List<KinesisSource> sources = newArrayList();
+
+ for (KinesisReaderCheckpoint partition : checkpoint.splitInto(desiredNumSplits)) {
+ sources.add(new KinesisSource(
+ kinesis,
+ new StaticCheckpointGenerator(partition)));
}
-
- private KinesisSource(KinesisClientProvider kinesisClientProvider,
- CheckpointGenerator initialCheckpoint) {
- this.kinesis = kinesisClientProvider;
- this.initialCheckpointGenerator = initialCheckpoint;
- validate();
+ return sources;
+ }
+
+ /**
+ * Creates reader based on given {@link KinesisReaderCheckpoint}.
+ * If {@link KinesisReaderCheckpoint} is not given, then we use
+ * {@code initialCheckpointGenerator} to generate new checkpoint.
+ */
+ @Override
+ public UnboundedReader<KinesisRecord> createReader(PipelineOptions options,
+ KinesisReaderCheckpoint checkpointMark) {
+
+ CheckpointGenerator checkpointGenerator = initialCheckpointGenerator;
+
+ if (checkpointMark != null) {
+ checkpointGenerator = new StaticCheckpointGenerator(checkpointMark);
}
- /**
- * Generate splits for reading from the stream.
- * Basically, it'll try to evenly split set of shards in the stream into
- * {@code desiredNumSplits} partitions. Each partition is then a split.
- */
- @Override
- public List<KinesisSource> split(int desiredNumSplits,
- PipelineOptions options) throws Exception {
- KinesisReaderCheckpoint checkpoint =
- initialCheckpointGenerator.generate(SimplifiedKinesisClient.from(kinesis));
-
- List<KinesisSource> sources = newArrayList();
-
- for (KinesisReaderCheckpoint partition : checkpoint.splitInto(desiredNumSplits)) {
- sources.add(new KinesisSource(
- kinesis,
- new StaticCheckpointGenerator(partition)));
- }
- return sources;
- }
-
- /**
- * Creates reader based on given {@link KinesisReaderCheckpoint}.
- * If {@link KinesisReaderCheckpoint} is not given, then we use
- * {@code initialCheckpointGenerator} to generate new checkpoint.
- */
- @Override
- public UnboundedReader<KinesisRecord> createReader(PipelineOptions options,
- KinesisReaderCheckpoint checkpointMark) {
-
- CheckpointGenerator checkpointGenerator = initialCheckpointGenerator;
-
- if (checkpointMark != null) {
- checkpointGenerator = new StaticCheckpointGenerator(checkpointMark);
- }
-
- LOG.info("Creating new reader using {}", checkpointGenerator);
-
- return new KinesisReader(
- SimplifiedKinesisClient.from(kinesis),
- checkpointGenerator,
- this);
- }
-
- @Override
- public Coder<KinesisReaderCheckpoint> getCheckpointMarkCoder() {
- return SerializableCoder.of(KinesisReaderCheckpoint.class);
- }
-
- @Override
- public void validate() {
- checkNotNull(kinesis);
- checkNotNull(initialCheckpointGenerator);
- }
-
- @Override
- public Coder<KinesisRecord> getDefaultOutputCoder() {
- return KinesisRecordCoder.of();
- }
+ LOG.info("Creating new reader using {}", checkpointGenerator);
+
+ return new KinesisReader(
+ SimplifiedKinesisClient.from(kinesis),
+ checkpointGenerator,
+ this);
+ }
+
+ @Override
+ public Coder<KinesisReaderCheckpoint> getCheckpointMarkCoder() {
+ return SerializableCoder.of(KinesisReaderCheckpoint.class);
+ }
+
+ @Override
+ public void validate() {
+ checkNotNull(kinesis);
+ checkNotNull(initialCheckpointGenerator);
+ }
+
+ @Override
+ public Coder<KinesisRecord> getDefaultOutputCoder() {
+ return KinesisRecordCoder.of();
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java
index 40e65fc..eca725c 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RecordFilter.java
@@ -21,7 +21,6 @@ import static com.google.common.collect.Lists.newArrayList;
import java.util.List;
-
/**
* Filters out records, which were already processed and checkpointed.
*
@@ -29,13 +28,14 @@ import java.util.List;
* accuracy, not with "subSequenceNumber" accuracy.
*/
class RecordFilter {
- public List<KinesisRecord> apply(List<KinesisRecord> records, ShardCheckpoint checkpoint) {
- List<KinesisRecord> filteredRecords = newArrayList();
- for (KinesisRecord record : records) {
- if (checkpoint.isBeforeOrAt(record)) {
- filteredRecords.add(record);
- }
- }
- return filteredRecords;
+
+ public List<KinesisRecord> apply(List<KinesisRecord> records, ShardCheckpoint checkpoint) {
+ List<KinesisRecord> filteredRecords = newArrayList();
+ for (KinesisRecord record : records) {
+ if (checkpoint.isBeforeOrAt(record)) {
+ filteredRecords.add(record);
+ }
}
+ return filteredRecords;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
index e4ff541..806d982 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
@@ -27,27 +27,28 @@ import java.util.Iterator;
* Very simple implementation of round robin algorithm.
*/
class RoundRobin<T> implements Iterable<T> {
- private final Deque<T> deque;
- public RoundRobin(Iterable<T> collection) {
- this.deque = newArrayDeque(collection);
- checkArgument(!deque.isEmpty(), "Tried to initialize RoundRobin with empty collection");
- }
+ private final Deque<T> deque;
- public T getCurrent() {
- return deque.getFirst();
- }
+ public RoundRobin(Iterable<T> collection) {
+ this.deque = newArrayDeque(collection);
+ checkArgument(!deque.isEmpty(), "Tried to initialize RoundRobin with empty collection");
+ }
- public void moveForward() {
- deque.addLast(deque.removeFirst());
- }
+ public T getCurrent() {
+ return deque.getFirst();
+ }
- public int size() {
- return deque.size();
- }
+ public void moveForward() {
+ deque.addLast(deque.removeFirst());
+ }
- @Override
- public Iterator<T> iterator() {
- return deque.iterator();
- }
+ public int size() {
+ return deque.size();
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ return deque.iterator();
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
index 6aa3504..95f97b8 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.io.kinesis;
-
import static com.amazonaws.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER;
import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_SEQUENCE_NUMBER;
import static com.amazonaws.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
@@ -27,9 +26,10 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
+
import java.io.Serializable;
-import org.joda.time.Instant;
+import org.joda.time.Instant;
/**
* Checkpoint mark for single shard in the stream.
@@ -45,131 +45,132 @@ import org.joda.time.Instant;
* This class is immutable.
*/
class ShardCheckpoint implements Serializable {
- private final String streamName;
- private final String shardId;
- private final String sequenceNumber;
- private final ShardIteratorType shardIteratorType;
- private final Long subSequenceNumber;
- private final Instant timestamp;
-
- public ShardCheckpoint(String streamName, String shardId, StartingPoint
- startingPoint) {
- this(streamName, shardId,
- ShardIteratorType.fromValue(startingPoint.getPositionName()),
- startingPoint.getTimestamp());
- }
-
- public ShardCheckpoint(String streamName, String shardId, ShardIteratorType
- shardIteratorType, Instant timestamp) {
- this(streamName, shardId, shardIteratorType, null, null, timestamp);
- }
-
- public ShardCheckpoint(String streamName, String shardId, ShardIteratorType
- shardIteratorType, String sequenceNumber, Long subSequenceNumber) {
- this(streamName, shardId, shardIteratorType, sequenceNumber, subSequenceNumber, null);
- }
-
- private ShardCheckpoint(String streamName, String shardId, ShardIteratorType shardIteratorType,
- String sequenceNumber, Long subSequenceNumber, Instant timestamp) {
- this.shardIteratorType = checkNotNull(shardIteratorType, "shardIteratorType");
- this.streamName = checkNotNull(streamName, "streamName");
- this.shardId = checkNotNull(shardId, "shardId");
- if (shardIteratorType == AT_SEQUENCE_NUMBER || shardIteratorType == AFTER_SEQUENCE_NUMBER) {
- checkNotNull(sequenceNumber,
- "You must provide sequence number for AT_SEQUENCE_NUMBER"
- + " or AFTER_SEQUENCE_NUMBER");
- } else {
- checkArgument(sequenceNumber == null,
- "Sequence number must be null for LATEST, TRIM_HORIZON or AT_TIMESTAMP");
- }
- if (shardIteratorType == AT_TIMESTAMP) {
- checkNotNull(timestamp,
- "You must provide timestamp for AT_SEQUENCE_NUMBER"
- + " or AFTER_SEQUENCE_NUMBER");
- } else {
- checkArgument(timestamp == null,
- "Timestamp must be null for an iterator type other than AT_TIMESTAMP");
- }
-
- this.subSequenceNumber = subSequenceNumber;
- this.sequenceNumber = sequenceNumber;
- this.timestamp = timestamp;
- }
-
- /**
- * Used to compare {@link ShardCheckpoint} object to {@link KinesisRecord}. Depending
- * on the the underlying shardIteratorType, it will either compare the timestamp or the
- * {@link ExtendedSequenceNumber}.
- *
- * @param other
- * @return if current checkpoint mark points before or at given {@link ExtendedSequenceNumber}
- */
- public boolean isBeforeOrAt(KinesisRecord other) {
- if (shardIteratorType == AT_TIMESTAMP) {
- return timestamp.compareTo(other.getApproximateArrivalTimestamp()) <= 0;
- }
- int result = extendedSequenceNumber().compareTo(other.getExtendedSequenceNumber());
- if (result == 0) {
- return shardIteratorType == AT_SEQUENCE_NUMBER;
- }
- return result < 0;
- }
-
- private ExtendedSequenceNumber extendedSequenceNumber() {
- String fullSequenceNumber = sequenceNumber;
- if (fullSequenceNumber == null) {
- fullSequenceNumber = shardIteratorType.toString();
- }
- return new ExtendedSequenceNumber(fullSequenceNumber, subSequenceNumber);
- }
- @Override
- public String toString() {
- return String.format("Checkpoint %s for stream %s, shard %s: %s", shardIteratorType,
- streamName, shardId,
- sequenceNumber);
+ private final String streamName;
+ private final String shardId;
+ private final String sequenceNumber;
+ private final ShardIteratorType shardIteratorType;
+ private final Long subSequenceNumber;
+ private final Instant timestamp;
+
+ public ShardCheckpoint(String streamName, String shardId, StartingPoint
+ startingPoint) {
+ this(streamName, shardId,
+ ShardIteratorType.fromValue(startingPoint.getPositionName()),
+ startingPoint.getTimestamp());
+ }
+
+ public ShardCheckpoint(String streamName, String shardId, ShardIteratorType
+ shardIteratorType, Instant timestamp) {
+ this(streamName, shardId, shardIteratorType, null, null, timestamp);
+ }
+
+ public ShardCheckpoint(String streamName, String shardId, ShardIteratorType
+ shardIteratorType, String sequenceNumber, Long subSequenceNumber) {
+ this(streamName, shardId, shardIteratorType, sequenceNumber, subSequenceNumber, null);
+ }
+
+ private ShardCheckpoint(String streamName, String shardId, ShardIteratorType shardIteratorType,
+ String sequenceNumber, Long subSequenceNumber, Instant timestamp) {
+ this.shardIteratorType = checkNotNull(shardIteratorType, "shardIteratorType");
+ this.streamName = checkNotNull(streamName, "streamName");
+ this.shardId = checkNotNull(shardId, "shardId");
+ if (shardIteratorType == AT_SEQUENCE_NUMBER || shardIteratorType == AFTER_SEQUENCE_NUMBER) {
+ checkNotNull(sequenceNumber,
+ "You must provide sequence number for AT_SEQUENCE_NUMBER"
+ + " or AFTER_SEQUENCE_NUMBER");
+ } else {
+ checkArgument(sequenceNumber == null,
+ "Sequence number must be null for LATEST, TRIM_HORIZON or AT_TIMESTAMP");
}
-
- public ShardRecordsIterator getShardRecordsIterator(SimplifiedKinesisClient kinesis)
- throws TransientKinesisException {
- return new ShardRecordsIterator(this, kinesis);
+ if (shardIteratorType == AT_TIMESTAMP) {
+ checkNotNull(timestamp,
+ "You must provide timestamp for AT_SEQUENCE_NUMBER"
+ + " or AFTER_SEQUENCE_NUMBER");
+ } else {
+ checkArgument(timestamp == null,
+ "Timestamp must be null for an iterator type other than AT_TIMESTAMP");
}
- public String getShardIterator(SimplifiedKinesisClient kinesisClient)
- throws TransientKinesisException {
- if (checkpointIsInTheMiddleOfAUserRecord()) {
- return kinesisClient.getShardIterator(streamName,
- shardId, AT_SEQUENCE_NUMBER,
- sequenceNumber, null);
- }
- return kinesisClient.getShardIterator(streamName,
- shardId, shardIteratorType,
- sequenceNumber, timestamp);
+ this.subSequenceNumber = subSequenceNumber;
+ this.sequenceNumber = sequenceNumber;
+ this.timestamp = timestamp;
+ }
+
+ /**
+ * Used to compare {@link ShardCheckpoint} object to {@link KinesisRecord}. Depending
+ * on the the underlying shardIteratorType, it will either compare the timestamp or the
+ * {@link ExtendedSequenceNumber}.
+ *
+ * @param other
+ * @return if current checkpoint mark points before or at given {@link ExtendedSequenceNumber}
+ */
+ public boolean isBeforeOrAt(KinesisRecord other) {
+ if (shardIteratorType == AT_TIMESTAMP) {
+ return timestamp.compareTo(other.getApproximateArrivalTimestamp()) <= 0;
}
-
- private boolean checkpointIsInTheMiddleOfAUserRecord() {
- return shardIteratorType == AFTER_SEQUENCE_NUMBER && subSequenceNumber != null;
+ int result = extendedSequenceNumber().compareTo(other.getExtendedSequenceNumber());
+ if (result == 0) {
+ return shardIteratorType == AT_SEQUENCE_NUMBER;
}
+ return result < 0;
+ }
- /**
- * Used to advance checkpoint mark to position after given {@link Record}.
- *
- * @param record
- * @return new checkpoint object pointing directly after given {@link Record}
- */
- public ShardCheckpoint moveAfter(KinesisRecord record) {
- return new ShardCheckpoint(
- streamName, shardId,
- AFTER_SEQUENCE_NUMBER,
- record.getSequenceNumber(),
- record.getSubSequenceNumber());
+ private ExtendedSequenceNumber extendedSequenceNumber() {
+ String fullSequenceNumber = sequenceNumber;
+ if (fullSequenceNumber == null) {
+ fullSequenceNumber = shardIteratorType.toString();
}
-
- public String getStreamName() {
- return streamName;
- }
-
- public String getShardId() {
- return shardId;
+ return new ExtendedSequenceNumber(fullSequenceNumber, subSequenceNumber);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Checkpoint %s for stream %s, shard %s: %s", shardIteratorType,
+ streamName, shardId,
+ sequenceNumber);
+ }
+
+ public ShardRecordsIterator getShardRecordsIterator(SimplifiedKinesisClient kinesis)
+ throws TransientKinesisException {
+ return new ShardRecordsIterator(this, kinesis);
+ }
+
+ public String getShardIterator(SimplifiedKinesisClient kinesisClient)
+ throws TransientKinesisException {
+ if (checkpointIsInTheMiddleOfAUserRecord()) {
+ return kinesisClient.getShardIterator(streamName,
+ shardId, AT_SEQUENCE_NUMBER,
+ sequenceNumber, null);
}
+ return kinesisClient.getShardIterator(streamName,
+ shardId, shardIteratorType,
+ sequenceNumber, timestamp);
+ }
+
+ private boolean checkpointIsInTheMiddleOfAUserRecord() {
+ return shardIteratorType == AFTER_SEQUENCE_NUMBER && subSequenceNumber != null;
+ }
+
+ /**
+ * Used to advance checkpoint mark to position after given {@link Record}.
+ *
+ * @param record
+ * @return new checkpoint object pointing directly after given {@link Record}
+ */
+ public ShardCheckpoint moveAfter(KinesisRecord record) {
+ return new ShardCheckpoint(
+ streamName, shardId,
+ AFTER_SEQUENCE_NUMBER,
+ record.getSequenceNumber(),
+ record.getSubSequenceNumber());
+ }
+
+ public String getStreamName() {
+ return streamName;
+ }
+
+ public String getShardId() {
+ return shardId;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/7925a668/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
index 872f604..a69c6c1 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
@@ -21,7 +21,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Queues.newArrayDeque;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+
import java.util.Deque;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,68 +33,68 @@ import org.slf4j.LoggerFactory;
* Then the caller of {@link ShardRecordsIterator#next()} can read from queue one by one.
*/
class ShardRecordsIterator {
- private static final Logger LOG = LoggerFactory.getLogger(ShardRecordsIterator.class);
- private final SimplifiedKinesisClient kinesis;
- private final RecordFilter filter;
- private ShardCheckpoint checkpoint;
- private String shardIterator;
- private Deque<KinesisRecord> data = newArrayDeque();
+ private static final Logger LOG = LoggerFactory.getLogger(ShardRecordsIterator.class);
- public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
- SimplifiedKinesisClient simplifiedKinesisClient) throws
- TransientKinesisException {
- this(initialCheckpoint, simplifiedKinesisClient, new RecordFilter());
- }
+ private final SimplifiedKinesisClient kinesis;
+ private final RecordFilter filter;
+ private ShardCheckpoint checkpoint;
+ private String shardIterator;
+ private Deque<KinesisRecord> data = newArrayDeque();
- public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
- SimplifiedKinesisClient simplifiedKinesisClient,
- RecordFilter filter) throws
- TransientKinesisException {
+ public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
+ SimplifiedKinesisClient simplifiedKinesisClient) throws
+ TransientKinesisException {
+ this(initialCheckpoint, simplifiedKinesisClient, new RecordFilter());
+ }
- this.checkpoint = checkNotNull(initialCheckpoint, "initialCheckpoint");
- this.filter = checkNotNull(filter, "filter");
- this.kinesis = checkNotNull(simplifiedKinesisClient, "simplifiedKinesisClient");
- shardIterator = checkpoint.getShardIterator(kinesis);
- }
+ public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
+ SimplifiedKinesisClient simplifiedKinesisClient,
+ RecordFilter filter) throws
+ TransientKinesisException {
- /**
- * Returns record if there's any present.
- * Returns absent() if there are no new records at this time in the shard.
- */
- public CustomOptional<KinesisRecord> next() throws TransientKinesisException {
- readMoreIfNecessary();
+ this.checkpoint = checkNotNull(initialCheckpoint, "initialCheckpoint");
+ this.filter = checkNotNull(filter, "filter");
+ this.kinesis = checkNotNull(simplifiedKinesisClient, "simplifiedKinesisClient");
+ shardIterator = checkpoint.getShardIterator(kinesis);
+ }
- if (data.isEmpty()) {
- return CustomOptional.absent();
- } else {
- KinesisRecord record = data.removeFirst();
- checkpoint = checkpoint.moveAfter(record);
- return CustomOptional.of(record);
- }
- }
+ /**
+ * Returns record if there's any present.
+ * Returns absent() if there are no new records at this time in the shard.
+ */
+ public CustomOptional<KinesisRecord> next() throws TransientKinesisException {
+ readMoreIfNecessary();
- private void readMoreIfNecessary() throws TransientKinesisException {
- if (data.isEmpty()) {
- GetKinesisRecordsResult response;
- try {
- response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(),
- checkpoint.getShardId());
- } catch (ExpiredIteratorException e) {
- LOG.info("Refreshing expired iterator", e);
- shardIterator = checkpoint.getShardIterator(kinesis);
- response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(),
- checkpoint.getShardId());
- }
- LOG.debug("Fetched {} new records", response.getRecords().size());
- shardIterator = response.getNextShardIterator();
- data.addAll(filter.apply(response.getRecords(), checkpoint));
- }
+ if (data.isEmpty()) {
+ return CustomOptional.absent();
+ } else {
+ KinesisRecord record = data.removeFirst();
+ checkpoint = checkpoint.moveAfter(record);
+ return CustomOptional.of(record);
}
+ }
- public ShardCheckpoint getCheckpoint() {
- return checkpoint;
+ private void readMoreIfNecessary() throws TransientKinesisException {
+ if (data.isEmpty()) {
+ GetKinesisRecordsResult response;
+ try {
+ response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(),
+ checkpoint.getShardId());
+ } catch (ExpiredIteratorException e) {
+ LOG.info("Refreshing expired iterator", e);
+ shardIterator = checkpoint.getShardIterator(kinesis);
+ response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(),
+ checkpoint.getShardId());
+ }
+ LOG.debug("Fetched {} new records", response.getRecords().size());
+ shardIterator = response.getNextShardIterator();
+ data.addAll(filter.apply(response.getRecords(), checkpoint));
}
+ }
+ public ShardCheckpoint getCheckpoint() {
+ return checkpoint;
+ }
}