You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/10/31 23:03:12 UTC
[1/2] beam git commit: [BEAM-2468] Reading Kinesis records in the
background
Repository: beam
Updated Branches:
refs/heads/master c53a121f5 -> 612af0a29
[BEAM-2468] Reading Kinesis records in the background
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ec394465
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ec394465
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ec394465
Branch: refs/heads/master
Commit: ec3944659d16d696bcd73589aa035dbaa9aede2c
Parents: c53a121
Author: Pawel Kaczmarczyk <p....@ocado.com>
Authored: Mon Oct 2 17:24:11 2017 +0200
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Oct 31 16:02:43 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/kinesis/KinesisReader.java | 43 ++---
.../sdk/io/kinesis/KinesisReaderCheckpoint.java | 18 --
.../apache/beam/sdk/io/kinesis/RoundRobin.java | 54 ------
.../beam/sdk/io/kinesis/ShardCheckpoint.java | 8 +-
.../beam/sdk/io/kinesis/ShardReadersPool.java | 162 ++++++++++++++++
.../sdk/io/kinesis/ShardRecordsIterator.java | 90 ++++-----
.../beam/sdk/io/kinesis/KinesisReaderTest.java | 66 +++----
.../beam/sdk/io/kinesis/RoundRobinTest.java | 59 ------
.../sdk/io/kinesis/ShardReadersPoolTest.java | 185 +++++++++++++++++++
.../io/kinesis/ShardRecordsIteratorTest.java | 35 ++--
10 files changed, 454 insertions(+), 266 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ec394465/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
index 8095150..665b897 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
@@ -18,10 +18,8 @@
package org.apache.beam.sdk.io.kinesis;
import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.collect.Lists.newArrayList;
import java.io.IOException;
-import java.util.List;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.io.UnboundedSource;
@@ -64,7 +62,6 @@ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
private final SimplifiedKinesisClient kinesis;
private final KinesisSource source;
private final CheckpointGenerator initialCheckpointGenerator;
- private RoundRobin<ShardRecordsIterator> shardIterators;
private CustomOptional<KinesisRecord> currentRecord = CustomOptional.absent();
private MovingFunction minReadTimestampMsSinceEpoch;
private Instant lastWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
@@ -72,6 +69,7 @@ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
private Instant backlogBytesLastCheckTime = new Instant(0L);
private Duration upToDateThreshold;
private Duration backlogBytesCheckThreshold;
+ private ShardReadersPool shardReadersPool;
KinesisReader(SimplifiedKinesisClient kinesis,
CheckpointGenerator initialCheckpointGenerator,
@@ -107,13 +105,8 @@ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
LOG.info("Starting reader using {}", initialCheckpointGenerator);
try {
- KinesisReaderCheckpoint initialCheckpoint =
- initialCheckpointGenerator.generate(kinesis);
- List<ShardRecordsIterator> iterators = newArrayList();
- for (ShardCheckpoint checkpoint : initialCheckpoint) {
- iterators.add(checkpoint.getShardRecordsIterator(kinesis));
- }
- shardIterators = new RoundRobin<>(iterators);
+ shardReadersPool = createShardReadersPool();
+ shardReadersPool.start();
} catch (TransientKinesisException e) {
throw new IOException(e);
}
@@ -128,21 +121,12 @@ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
*/
@Override
public boolean advance() throws IOException {
- try {
- for (int i = 0; i < shardIterators.size(); ++i) {
- currentRecord = shardIterators.getCurrent().next();
- if (currentRecord.isPresent()) {
- Instant approximateArrivalTimestamp = currentRecord.get()
- .getApproximateArrivalTimestamp();
- minReadTimestampMsSinceEpoch.add(Instant.now().getMillis(),
- approximateArrivalTimestamp.getMillis());
- return true;
- } else {
- shardIterators.moveForward();
- }
- }
- } catch (TransientKinesisException e) {
- LOG.warn("Transient exception occurred", e);
+ currentRecord = shardReadersPool.nextRecord();
+ if (currentRecord.isPresent()) {
+ Instant approximateArrivalTimestamp = currentRecord.get().getApproximateArrivalTimestamp();
+ minReadTimestampMsSinceEpoch.add(Instant.now().getMillis(),
+ approximateArrivalTimestamp.getMillis());
+ return true;
}
return false;
}
@@ -170,13 +154,14 @@ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
@Override
public void close() throws IOException {
+ shardReadersPool.stop();
}
@Override
public Instant getWatermark() {
Instant now = Instant.now();
long readMin = minReadTimestampMsSinceEpoch.get(now.getMillis());
- if (readMin == Long.MAX_VALUE) {
+ if (readMin == Long.MAX_VALUE && shardReadersPool.allShardsUpToDate()) {
lastWatermark = now;
} else if (minReadTimestampMsSinceEpoch.isSignificant()) {
Instant minReadTime = new Instant(readMin);
@@ -189,7 +174,7 @@ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
@Override
public UnboundedSource.CheckpointMark getCheckpointMark() {
- return KinesisReaderCheckpoint.asCurrentStateOf(shardIterators);
+ return shardReadersPool.getCheckpointMark();
}
@Override
@@ -221,4 +206,8 @@ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
watermark, lastBacklogBytes);
return lastBacklogBytes;
}
+
+ ShardReadersPool createShardReadersPool() throws TransientKinesisException {
+ return new ShardReadersPool(kinesis, initialCheckpointGenerator.generate(kinesis));
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ec394465/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
index d995e75..eca8791 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReaderCheckpoint.java
@@ -17,18 +17,15 @@
*/
package org.apache.beam.sdk.io.kinesis;
-import static com.google.common.collect.Iterables.transform;
import static com.google.common.collect.Lists.newArrayList;
import static com.google.common.collect.Lists.partition;
-import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
-import javax.annotation.Nullable;
import org.apache.beam.sdk.io.UnboundedSource;
@@ -47,21 +44,6 @@ class KinesisReaderCheckpoint implements Iterable<ShardCheckpoint>, UnboundedSou
this.shardCheckpoints = ImmutableList.copyOf(shardCheckpoints);
}
- public static KinesisReaderCheckpoint asCurrentStateOf(Iterable<ShardRecordsIterator>
- iterators) {
- return new KinesisReaderCheckpoint(transform(iterators,
- new Function<ShardRecordsIterator, ShardCheckpoint>() {
-
- @Nullable
- @Override
- public ShardCheckpoint apply(@Nullable
- ShardRecordsIterator shardRecordsIterator) {
- assert shardRecordsIterator != null;
- return shardRecordsIterator.getCheckpoint();
- }
- }));
- }
-
/**
* Splits given multi-shard checkpoint into partitions of approximately equal size.
*
http://git-wip-us.apache.org/repos/asf/beam/blob/ec394465/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
deleted file mode 100644
index 806d982..0000000
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/RoundRobin.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.kinesis;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.collect.Queues.newArrayDeque;
-
-import java.util.Deque;
-import java.util.Iterator;
-
-/**
- * Very simple implementation of round robin algorithm.
- */
-class RoundRobin<T> implements Iterable<T> {
-
- private final Deque<T> deque;
-
- public RoundRobin(Iterable<T> collection) {
- this.deque = newArrayDeque(collection);
- checkArgument(!deque.isEmpty(), "Tried to initialize RoundRobin with empty collection");
- }
-
- public T getCurrent() {
- return deque.getFirst();
- }
-
- public void moveForward() {
- deque.addLast(deque.removeFirst());
- }
-
- public int size() {
- return deque.size();
- }
-
- @Override
- public Iterator<T> iterator() {
- return deque.iterator();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ec394465/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
index 95f97b8..94e3b96 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardCheckpoint.java
@@ -85,8 +85,7 @@ class ShardCheckpoint implements Serializable {
}
if (shardIteratorType == AT_TIMESTAMP) {
checkNotNull(timestamp,
- "You must provide timestamp for AT_SEQUENCE_NUMBER"
- + " or AFTER_SEQUENCE_NUMBER");
+ "You must provide timestamp for AT_TIMESTAMP");
} else {
checkArgument(timestamp == null,
"Timestamp must be null for an iterator type other than AT_TIMESTAMP");
@@ -131,11 +130,6 @@ class ShardCheckpoint implements Serializable {
sequenceNumber);
}
- public ShardRecordsIterator getShardRecordsIterator(SimplifiedKinesisClient kinesis)
- throws TransientKinesisException {
- return new ShardRecordsIterator(this, kinesis);
- }
-
public String getShardIterator(SimplifiedKinesisClient kinesisClient)
throws TransientKinesisException {
if (checkpointIsInTheMiddleOfAUserRecord()) {
http://git-wip-us.apache.org/repos/asf/beam/blob/ec394465/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
new file mode 100644
index 0000000..83e3081
--- /dev/null
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.Iterables.transform;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Internal shard iterators pool.
+ * It maintains the thread pool for reading Kinesis shards in separate threads.
+ * Read records are stored in a blocking queue of limited capacity.
+ */
+class ShardReadersPool {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ShardReadersPool.class);
+ private static final int DEFAULT_CAPACITY_PER_SHARD = 10_000;
+ private ExecutorService executorService;
+ private BlockingQueue<KinesisRecord> recordsQueue;
+ private Map<String, ShardRecordsIterator> shardIteratorsMap;
+ private SimplifiedKinesisClient kinesis;
+ private KinesisReaderCheckpoint initialCheckpoint;
+ private final int queueCapacityPerShard;
+ private AtomicBoolean poolOpened = new AtomicBoolean(true);
+
+ ShardReadersPool(SimplifiedKinesisClient kinesis, KinesisReaderCheckpoint initialCheckpoint) {
+ this(kinesis, initialCheckpoint, DEFAULT_CAPACITY_PER_SHARD);
+ }
+
+ ShardReadersPool(SimplifiedKinesisClient kinesis, KinesisReaderCheckpoint initialCheckpoint,
+ int queueCapacityPerShard) {
+ this.kinesis = kinesis;
+ this.initialCheckpoint = initialCheckpoint;
+ this.queueCapacityPerShard = queueCapacityPerShard;
+ }
+
+ void start() throws TransientKinesisException {
+ ImmutableMap.Builder<String, ShardRecordsIterator> shardsMap = ImmutableMap.builder();
+ for (ShardCheckpoint checkpoint : initialCheckpoint) {
+ shardsMap.put(checkpoint.getShardId(), createShardIterator(kinesis, checkpoint));
+ }
+ shardIteratorsMap = shardsMap.build();
+ executorService = Executors.newFixedThreadPool(shardIteratorsMap.size());
+ recordsQueue = new LinkedBlockingQueue<>(queueCapacityPerShard * shardIteratorsMap.size());
+ for (final ShardRecordsIterator shardRecordsIterator : shardIteratorsMap.values()) {
+ executorService.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ readLoop(shardRecordsIterator);
+ }
+ });
+ }
+ }
+
+ private void readLoop(ShardRecordsIterator shardRecordsIterator) {
+ while (poolOpened.get()) {
+ try {
+ List<KinesisRecord> kinesisRecords = shardRecordsIterator.readNextBatch();
+ for (KinesisRecord kinesisRecord : kinesisRecords) {
+ recordsQueue.put(kinesisRecord);
+ }
+ } catch (TransientKinesisException e) {
+ LOG.warn("Transient exception occurred.", e);
+ } catch (InterruptedException e) {
+ LOG.warn("Thread was interrupted, finishing the read loop", e);
+ break;
+ } catch (Throwable e) {
+ LOG.error("Unexpected exception occurred", e);
+ }
+ }
+ LOG.info("Kinesis Shard read loop has finished");
+ }
+
+ CustomOptional<KinesisRecord> nextRecord() {
+ try {
+ KinesisRecord record = recordsQueue.poll(1, TimeUnit.SECONDS);
+ if (record == null) {
+ return CustomOptional.absent();
+ }
+ shardIteratorsMap.get(record.getShardId()).ackRecord(record);
+ return CustomOptional.of(record);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for KinesisRecord from the buffer");
+ return CustomOptional.absent();
+ }
+ }
+
+ void stop() {
+ LOG.info("Closing shard iterators pool");
+ poolOpened.set(false);
+ executorService.shutdownNow();
+ boolean isShutdown = false;
+ int attemptsLeft = 3;
+ while (!isShutdown && attemptsLeft-- > 0) {
+ try {
+ isShutdown = executorService.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while waiting for the executor service to shutdown");
+ throw new RuntimeException(e);
+ }
+ if (!isShutdown && attemptsLeft > 0) {
+ LOG.warn("Executor service is taking long time to shutdown, will retry. {} attempts left",
+ attemptsLeft);
+ }
+ }
+ }
+
+ boolean allShardsUpToDate() {
+ boolean shardsUpToDate = true;
+ for (ShardRecordsIterator shardRecordsIterator : shardIteratorsMap.values()) {
+ shardsUpToDate &= shardRecordsIterator.isUpToDate();
+ }
+ return shardsUpToDate;
+ }
+
+ KinesisReaderCheckpoint getCheckpointMark() {
+ return new KinesisReaderCheckpoint(transform(shardIteratorsMap.values(),
+ new Function<ShardRecordsIterator, ShardCheckpoint>() {
+ @Override
+ public ShardCheckpoint apply(ShardRecordsIterator shardRecordsIterator) {
+ checkArgument(shardRecordsIterator != null, "shardRecordsIterator can not be null");
+ return shardRecordsIterator.getCheckpoint();
+ }
+ }));
+ }
+
+ ShardRecordsIterator createShardIterator(SimplifiedKinesisClient kinesis,
+ ShardCheckpoint checkpoint) throws TransientKinesisException {
+ return new ShardRecordsIterator(checkpoint, kinesis);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ec394465/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
index d4e8038..c1450be 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIterator.java
@@ -18,19 +18,21 @@
package org.apache.beam.sdk.io.kinesis;
import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.collect.Queues.newArrayDeque;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
-import java.util.Deque;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Iterates over records in a single shard.
- * Under the hood records are retrieved from Kinesis in batches and stored in the in-memory queue.
- * Then the caller of {@link ShardRecordsIterator#next()} can read from queue one by one.
+ * Records are retrieved in batches via calls to {@link ShardRecordsIterator#readNextBatch()}.
+ * Client has to confirm processed records by calling
+ * {@link ShardRecordsIterator#ackRecord(KinesisRecord)} method.
*/
class ShardRecordsIterator {
@@ -38,71 +40,59 @@ class ShardRecordsIterator {
private final SimplifiedKinesisClient kinesis;
private final RecordFilter filter;
- private ShardCheckpoint checkpoint;
+ private final String streamName;
+ private final String shardId;
+ private AtomicReference<ShardCheckpoint> checkpoint;
private String shardIterator;
- private Deque<KinesisRecord> data = newArrayDeque();
+ private AtomicLong millisBehindLatest = new AtomicLong(Long.MAX_VALUE);
- public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
- SimplifiedKinesisClient simplifiedKinesisClient) throws
- TransientKinesisException {
+ ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
+ SimplifiedKinesisClient simplifiedKinesisClient) throws TransientKinesisException {
this(initialCheckpoint, simplifiedKinesisClient, new RecordFilter());
}
- public ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
+ ShardRecordsIterator(final ShardCheckpoint initialCheckpoint,
SimplifiedKinesisClient simplifiedKinesisClient,
- RecordFilter filter) throws
- TransientKinesisException {
-
- this.checkpoint = checkNotNull(initialCheckpoint, "initialCheckpoint");
+ RecordFilter filter) throws TransientKinesisException {
+ this.checkpoint = new AtomicReference<>(checkNotNull(initialCheckpoint, "initialCheckpoint"));
this.filter = checkNotNull(filter, "filter");
this.kinesis = checkNotNull(simplifiedKinesisClient, "simplifiedKinesisClient");
- shardIterator = checkpoint.getShardIterator(kinesis);
+ this.streamName = initialCheckpoint.getStreamName();
+ this.shardId = initialCheckpoint.getShardId();
+ this.shardIterator = initialCheckpoint.getShardIterator(kinesis);
}
- /**
- * Returns record if there's any present.
- * Returns absent() if there are no new records at this time in the shard.
- */
- public CustomOptional<KinesisRecord> next() throws TransientKinesisException {
- readMoreIfNecessary();
-
- if (data.isEmpty()) {
- return CustomOptional.absent();
- } else {
- KinesisRecord record = data.removeFirst();
- checkpoint = checkpoint.moveAfter(record);
- return CustomOptional.of(record);
- }
+ List<KinesisRecord> readNextBatch() throws TransientKinesisException {
+ GetKinesisRecordsResult response = fetchRecords();
+ LOG.debug("Fetched {} new records", response.getRecords().size());
+
+ List<KinesisRecord> filteredRecords = filter.apply(response.getRecords(), checkpoint.get());
+ millisBehindLatest.set(response.getMillisBehindLatest());
+ return filteredRecords;
}
- private void readMoreIfNecessary() throws TransientKinesisException {
- if (data.isEmpty()) {
- GetKinesisRecordsResult response = fetchRecords();
- data.addAll(filter.apply(response.getRecords(), checkpoint));
+ private GetKinesisRecordsResult fetchRecords() throws TransientKinesisException {
+ try {
+ GetKinesisRecordsResult response = kinesis.getRecords(shardIterator, streamName, shardId);
+ shardIterator = response.getNextShardIterator();
+ return response;
+ } catch (ExpiredIteratorException e) {
+ LOG.info("Refreshing expired iterator", e);
+ shardIterator = checkpoint.get().getShardIterator(kinesis);
+ return fetchRecords();
}
}
- private GetKinesisRecordsResult fetchRecords() throws TransientKinesisException {
- GetKinesisRecordsResult response = null;
- do {
- try {
- response = kinesis.getRecords(shardIterator, checkpoint.getStreamName(),
- checkpoint.getShardId());
- shardIterator = response.getNextShardIterator();
- } catch (ExpiredIteratorException e) {
- LOG.info("Refreshing expired iterator", e);
- shardIterator = checkpoint.getShardIterator(kinesis);
- }
- } while (response == null || gotEmptyResponseButIsBeforeEndOfTheStream(response));
- return response;
+ ShardCheckpoint getCheckpoint() {
+ return checkpoint.get();
}
- private boolean gotEmptyResponseButIsBeforeEndOfTheStream(GetKinesisRecordsResult response) {
- return response.getRecords().isEmpty() && response.getMillisBehindLatest() > 0;
+ boolean isUpToDate() {
+ return millisBehindLatest.get() == 0L;
}
- public ShardCheckpoint getCheckpoint() {
- return checkpoint;
+ void ackRecord(KinesisRecord record) {
+ checkpoint.set(checkpoint.get().moveAfter(record));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ec394465/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
index 22d8bce..11ae011 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
@@ -21,7 +21,9 @@ import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.IOException;
@@ -50,11 +52,11 @@ public class KinesisReaderTest {
@Mock
private ShardCheckpoint firstCheckpoint, secondCheckpoint;
@Mock
- private ShardRecordsIterator firstIterator, secondIterator;
- @Mock
private KinesisRecord a, b, c, d;
@Mock
private KinesisSource kinesisSource;
+ @Mock
+ private ShardReadersPool shardReadersPool;
private KinesisReader reader;
@@ -63,16 +65,22 @@ public class KinesisReaderTest {
when(generator.generate(kinesis)).thenReturn(new KinesisReaderCheckpoint(
asList(firstCheckpoint, secondCheckpoint)
));
- when(firstCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(firstIterator);
- when(secondCheckpoint.getShardRecordsIterator(kinesis)).thenReturn(secondIterator);
- when(firstIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
- when(secondIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
+ when(shardReadersPool.nextRecord()).thenReturn(CustomOptional.<KinesisRecord>absent());
when(a.getApproximateArrivalTimestamp()).thenReturn(Instant.now());
when(b.getApproximateArrivalTimestamp()).thenReturn(Instant.now());
when(c.getApproximateArrivalTimestamp()).thenReturn(Instant.now());
when(d.getApproximateArrivalTimestamp()).thenReturn(Instant.now());
- reader = new KinesisReader(kinesis, generator, kinesisSource, Duration.ZERO, Duration.ZERO);
+ reader = createReader(Duration.ZERO);
+ }
+
+ private KinesisReader createReader(Duration backlogBytesCheckThreshold)
+ throws TransientKinesisException {
+ KinesisReader kinesisReader = spy(new KinesisReader(kinesis, generator, kinesisSource,
+ Duration.ZERO, backlogBytesCheckThreshold));
+ doReturn(shardReadersPool).when(kinesisReader)
+ .createShardReadersPool();
+ return kinesisReader;
}
@Test
@@ -89,7 +97,7 @@ public class KinesisReaderTest {
@Test
public void startReturnsTrueIfSomeDataAvailable() throws IOException,
TransientKinesisException {
- when(firstIterator.next()).
+ when(shardReadersPool.nextRecord()).
thenReturn(CustomOptional.of(a)).
thenReturn(CustomOptional.<KinesisRecord>absent());
@@ -97,34 +105,22 @@ public class KinesisReaderTest {
}
@Test
- public void advanceReturnsFalseIfThereIsTransientExceptionInKinesis()
- throws IOException, TransientKinesisException {
- reader.start();
-
- when(firstIterator.next()).thenThrow(TransientKinesisException.class);
-
- assertThat(reader.advance()).isFalse();
- }
-
- @Test
public void readsThroughAllDataAvailable() throws IOException, TransientKinesisException {
- when(firstIterator.next()).
+ when(shardReadersPool.nextRecord()).
+ thenReturn(CustomOptional.of(c)).
thenReturn(CustomOptional.<KinesisRecord>absent()).
thenReturn(CustomOptional.of(a)).
thenReturn(CustomOptional.<KinesisRecord>absent()).
- thenReturn(CustomOptional.of(b)).
- thenReturn(CustomOptional.<KinesisRecord>absent());
-
- when(secondIterator.next()).
- thenReturn(CustomOptional.of(c)).
- thenReturn(CustomOptional.<KinesisRecord>absent()).
thenReturn(CustomOptional.of(d)).
+ thenReturn(CustomOptional.of(b)).
thenReturn(CustomOptional.<KinesisRecord>absent());
assertThat(reader.start()).isTrue();
assertThat(reader.getCurrent()).isEqualTo(c);
+ assertThat(reader.advance()).isFalse();
assertThat(reader.advance()).isTrue();
assertThat(reader.getCurrent()).isEqualTo(a);
+ assertThat(reader.advance()).isFalse();
assertThat(reader.advance()).isTrue();
assertThat(reader.getCurrent()).isEqualTo(d);
assertThat(reader.advance()).isTrue();
@@ -138,7 +134,6 @@ public class KinesisReaderTest {
final long timestampMs = 1000L;
prepareRecordsWithArrivalTimestamps(timestampMs, 1, KinesisReader.MIN_WATERMARK_MESSAGES / 2);
- when(secondIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
for (boolean more = reader.start(); more; more = reader.advance()) {
assertThat(reader.getWatermark()).isEqualTo(BoundedWindow.TIMESTAMP_MIN_VALUE);
@@ -151,7 +146,6 @@ public class KinesisReaderTest {
long timestampMs = 1000L;
prepareRecordsWithArrivalTimestamps(timestampMs, 1, KinesisReader.MIN_WATERMARK_MESSAGES);
- when(secondIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
int recordsNeededForWatermarkAdvancing = KinesisReader.MIN_WATERMARK_MESSAGES;
for (boolean more = reader.start(); more; more = reader.advance()) {
@@ -169,7 +163,6 @@ public class KinesisReaderTest {
long timestampMs = 1000L;
prepareRecordsWithArrivalTimestamps(timestampMs, -1, KinesisReader.MIN_WATERMARK_MESSAGES * 2);
- when(secondIterator.next()).thenReturn(CustomOptional.<KinesisRecord>absent());
Instant lastWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
for (boolean more = reader.start(); more; more = reader.advance()) {
@@ -184,14 +177,14 @@ public class KinesisReaderTest {
int count) throws TransientKinesisException {
long timestampMs = initialTimestampMs;
KinesisRecord firstRecord = prepareRecordMockWithArrivalTimestamp(timestampMs);
- OngoingStubbing<CustomOptional<KinesisRecord>> firstIteratorStubbing =
- when(firstIterator.next()).thenReturn(CustomOptional.of(firstRecord));
+ OngoingStubbing<CustomOptional<KinesisRecord>> shardReadersPoolStubbing =
+ when(shardReadersPool.nextRecord()).thenReturn(CustomOptional.of(firstRecord));
for (int i = 0; i < count; i++) {
timestampMs += increment;
KinesisRecord record = prepareRecordMockWithArrivalTimestamp(timestampMs);
- firstIteratorStubbing = firstIteratorStubbing.thenReturn(CustomOptional.of(record));
+ shardReadersPoolStubbing = shardReadersPoolStubbing.thenReturn(CustomOptional.of(record));
}
- firstIteratorStubbing.thenReturn(CustomOptional.<KinesisRecord>absent());
+ shardReadersPoolStubbing.thenReturn(CustomOptional.<KinesisRecord>absent());
}
private KinesisRecord prepareRecordMockWithArrivalTimestamp(long timestampMs) {
@@ -202,7 +195,8 @@ public class KinesisReaderTest {
@Test
public void getTotalBacklogBytesShouldReturnLastSeenValueWhenKinesisExceptionsOccur()
- throws TransientKinesisException {
+ throws TransientKinesisException, IOException {
+ reader.start();
when(kinesisSource.getStreamName()).thenReturn("stream1");
when(kinesis.getBacklogBytes(eq("stream1"), any(Instant.class)))
.thenReturn(10L)
@@ -216,9 +210,9 @@ public class KinesisReaderTest {
@Test
public void getTotalBacklogBytesShouldReturnLastSeenValueWhenCalledFrequently()
- throws TransientKinesisException {
- KinesisReader backlogCachingReader = new KinesisReader(kinesis, generator, kinesisSource,
- Duration.ZERO, Duration.standardSeconds(30));
+ throws TransientKinesisException, IOException {
+ KinesisReader backlogCachingReader = createReader(Duration.standardSeconds(30));
+ backlogCachingReader.start();
when(kinesisSource.getStreamName()).thenReturn("stream1");
when(kinesis.getBacklogBytes(eq("stream1"), any(Instant.class)))
.thenReturn(10L)
http://git-wip-us.apache.org/repos/asf/beam/blob/ec394465/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
deleted file mode 100644
index e4abce4..0000000
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/RoundRobinTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.kinesis;
-
-import static com.google.common.collect.Lists.newArrayList;
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.junit.Test;
-
-/**
- * Tests {@link RoundRobin}.
- */
-public class RoundRobinTest {
-
- @Test(expected = IllegalArgumentException.class)
- public void doesNotAllowCreationWithEmptyCollection() {
- new RoundRobin<>(Collections.emptyList());
- }
-
- @Test
- public void goesThroughElementsInCycle() {
- List<String> input = newArrayList("a", "b", "c");
-
- RoundRobin<String> roundRobin = new RoundRobin<>(newArrayList(input));
-
- input.addAll(input); // duplicate the input
- for (String element : input) {
- assertThat(roundRobin.getCurrent()).isEqualTo(element);
- assertThat(roundRobin.getCurrent()).isEqualTo(element);
- roundRobin.moveForward();
- }
- }
-
- @Test
- public void usualIteratorGoesThroughElementsOnce() {
- List<String> input = newArrayList("a", "b", "c");
-
- RoundRobin<String> roundRobin = new RoundRobin<>(input);
- assertThat(roundRobin).hasSize(3).containsOnly(input.toArray(new String[0]));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/ec394465/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
new file mode 100644
index 0000000..03cc428
--- /dev/null
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kinesis;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.base.Stopwatch;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Tests {@link ShardReadersPool}.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class ShardReadersPoolTest {
+
+ @Mock
+ private ShardRecordsIterator firstIterator, secondIterator;
+ @Mock
+ private ShardCheckpoint firstCheckpoint, secondCheckpoint;
+ @Mock
+ private SimplifiedKinesisClient kinesis;
+ @Mock
+ private KinesisRecord a, b, c, d;
+
+ private ShardReadersPool shardReadersPool;
+
+ @Before
+ public void setUp() throws TransientKinesisException {
+ when(a.getShardId()).thenReturn("shard1");
+ when(b.getShardId()).thenReturn("shard1");
+ when(c.getShardId()).thenReturn("shard2");
+ when(d.getShardId()).thenReturn("shard2");
+ when(firstCheckpoint.getShardId()).thenReturn("shard1");
+ when(secondCheckpoint.getShardId()).thenReturn("shard2");
+ KinesisReaderCheckpoint checkpoint = new KinesisReaderCheckpoint(
+ Arrays.asList(firstCheckpoint, secondCheckpoint));
+ shardReadersPool = Mockito.spy(new ShardReadersPool(kinesis, checkpoint));
+ doReturn(firstIterator).when(shardReadersPool).createShardIterator(kinesis, firstCheckpoint);
+ doReturn(secondIterator).when(shardReadersPool).createShardIterator(kinesis, secondCheckpoint);
+ }
+
+ @Test
+ public void shouldReturnAllRecords() throws TransientKinesisException {
+ when(firstIterator.readNextBatch())
+ .thenReturn(Collections.<KinesisRecord>emptyList())
+ .thenReturn(asList(a, b))
+ .thenReturn(Collections.<KinesisRecord>emptyList());
+ when(secondIterator.readNextBatch())
+ .thenReturn(singletonList(c))
+ .thenReturn(singletonList(d))
+ .thenReturn(Collections.<KinesisRecord>emptyList());
+
+ shardReadersPool.start();
+ List<KinesisRecord> fetchedRecords = new ArrayList<>();
+ while (fetchedRecords.size() < 4) {
+ CustomOptional<KinesisRecord> nextRecord = shardReadersPool.nextRecord();
+ if (nextRecord.isPresent()) {
+ fetchedRecords.add(nextRecord.get());
+ }
+ }
+ assertThat(fetchedRecords).containsExactlyInAnyOrder(a, b, c, d);
+ }
+
+ @Test
+ public void shouldReturnAbsentOptionalWhenNoRecords() throws TransientKinesisException {
+ when(firstIterator.readNextBatch())
+ .thenReturn(Collections.<KinesisRecord>emptyList());
+ when(secondIterator.readNextBatch())
+ .thenReturn(Collections.<KinesisRecord>emptyList());
+
+ shardReadersPool.start();
+ CustomOptional<KinesisRecord> nextRecord = shardReadersPool.nextRecord();
+ assertThat(nextRecord.isPresent()).isFalse();
+ }
+
+ @Test
+ public void shouldCheckpointReadRecords() throws TransientKinesisException {
+ when(firstIterator.readNextBatch())
+ .thenReturn(asList(a, b))
+ .thenReturn(Collections.<KinesisRecord>emptyList());
+ when(secondIterator.readNextBatch())
+ .thenReturn(singletonList(c))
+ .thenReturn(singletonList(d))
+ .thenReturn(Collections.<KinesisRecord>emptyList());
+
+ shardReadersPool.start();
+ int recordsFound = 0;
+ while (recordsFound < 4) {
+ CustomOptional<KinesisRecord> nextRecord = shardReadersPool.nextRecord();
+ if (nextRecord.isPresent()) {
+ recordsFound++;
+ KinesisRecord kinesisRecord = nextRecord.get();
+ if (kinesisRecord.getShardId().equals("shard1")) {
+ verify(firstIterator).ackRecord(kinesisRecord);
+ } else {
+ verify(secondIterator).ackRecord(kinesisRecord);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void shouldInterruptKinesisReadingAndStopShortly() throws TransientKinesisException {
+ when(firstIterator.readNextBatch()).thenAnswer(new Answer<List<KinesisRecord>>() {
+
+ @Override
+ public List<KinesisRecord> answer(InvocationOnMock invocation) throws Throwable {
+ Thread.sleep(TimeUnit.MINUTES.toMillis(1));
+ return Collections.emptyList();
+ }
+ });
+ shardReadersPool.start();
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ shardReadersPool.stop();
+ assertThat(stopwatch.elapsed(TimeUnit.MILLISECONDS)).isLessThan(TimeUnit.SECONDS.toMillis(1));
+ }
+
+ @Test
+ public void shouldInterruptPuttingRecordsToQueueAndStopShortly()
+ throws TransientKinesisException {
+ when(firstIterator.readNextBatch()).thenReturn(asList(a, b, c));
+ KinesisReaderCheckpoint checkpoint = new KinesisReaderCheckpoint(
+ Arrays.asList(firstCheckpoint, secondCheckpoint));
+ ShardReadersPool shardReadersPool = new ShardReadersPool(kinesis, checkpoint, 2);
+ shardReadersPool.start();
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ shardReadersPool.stop();
+ assertThat(stopwatch.elapsed(TimeUnit.MILLISECONDS)).isLessThan(TimeUnit.SECONDS.toMillis(1));
+
+ }
+
+ @Test
+ public void shouldDetectThatNotAllShardsAreUpToDate() throws TransientKinesisException {
+ when(firstIterator.isUpToDate()).thenReturn(true);
+ when(secondIterator.isUpToDate()).thenReturn(false);
+ shardReadersPool.start();
+
+ assertThat(shardReadersPool.allShardsUpToDate()).isFalse();
+ }
+
+ @Test
+ public void shouldDetectThatAllShardsAreUpToDate() throws TransientKinesisException {
+ when(firstIterator.isUpToDate()).thenReturn(true);
+ when(secondIterator.isUpToDate()).thenReturn(true);
+ shardReadersPool.start();
+
+ assertThat(shardReadersPool.allShardsUpToDate()).isTrue();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/ec394465/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
index 4b2190f..a77eafa 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardRecordsIteratorTest.java
@@ -104,27 +104,31 @@ public class ShardRecordsIteratorTest {
}
@Test
- public void returnsAbsentIfNoRecordsPresent() throws IOException, TransientKinesisException {
- assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
- assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
- assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+ public void goesThroughAvailableRecords() throws IOException, TransientKinesisException {
+ when(firstResult.getRecords()).thenReturn(asList(a, b, c));
+ when(secondResult.getRecords()).thenReturn(singletonList(d));
+ when(thirdResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
+
+ assertThat(iterator.getCheckpoint()).isEqualTo(firstCheckpoint);
+ assertThat(iterator.readNextBatch()).isEqualTo(asList(a, b, c));
+ assertThat(iterator.readNextBatch()).isEqualTo(singletonList(d));
+ assertThat(iterator.readNextBatch()).isEqualTo(Collections.emptyList());
+
}
@Test
- public void goesThroughAvailableRecords() throws IOException, TransientKinesisException {
+ public void conformingRecordsMovesCheckpoint() throws IOException, TransientKinesisException {
when(firstResult.getRecords()).thenReturn(asList(a, b, c));
when(secondResult.getRecords()).thenReturn(singletonList(d));
+ when(thirdResult.getRecords()).thenReturn(Collections.<KinesisRecord>emptyList());
- assertThat(iterator.getCheckpoint()).isEqualTo(firstCheckpoint);
- assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
+ iterator.ackRecord(a);
assertThat(iterator.getCheckpoint()).isEqualTo(aCheckpoint);
- assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
+ iterator.ackRecord(b);
assertThat(iterator.getCheckpoint()).isEqualTo(bCheckpoint);
- assertThat(iterator.next()).isEqualTo(CustomOptional.of(c));
+ iterator.ackRecord(c);
assertThat(iterator.getCheckpoint()).isEqualTo(cCheckpoint);
- assertThat(iterator.next()).isEqualTo(CustomOptional.of(d));
- assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
- assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+ iterator.ackRecord(d);
assertThat(iterator.getCheckpoint()).isEqualTo(dCheckpoint);
}
@@ -140,9 +144,10 @@ public class ShardRecordsIteratorTest {
when(kinesisClient.getRecords(SECOND_REFRESHED_ITERATOR, STREAM_NAME, SHARD_ID))
.thenReturn(secondResult);
- assertThat(iterator.next()).isEqualTo(CustomOptional.of(a));
- assertThat(iterator.next()).isEqualTo(CustomOptional.of(b));
- assertThat(iterator.next()).isEqualTo(CustomOptional.absent());
+ assertThat(iterator.readNextBatch()).isEqualTo(singletonList(a));
+ iterator.ackRecord(a);
+ assertThat(iterator.readNextBatch()).isEqualTo(singletonList(b));
+ assertThat(iterator.readNextBatch()).isEqualTo(Collections.emptyList());
}
private static class IdentityAnswer implements Answer<Object> {
[2/2] beam git commit: This closes #3930: [BEAM-2468] Reading Kinesis
records in the background
Posted by jk...@apache.org.
This closes #3930: [BEAM-2468] Reading Kinesis records in the background
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/612af0a2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/612af0a2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/612af0a2
Branch: refs/heads/master
Commit: 612af0a296550e6ae12033aa75ed5ffac7650382
Parents: c53a121 ec39446
Author: Eugene Kirpichov <ki...@google.com>
Authored: Tue Oct 31 16:02:57 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Oct 31 16:02:57 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/kinesis/KinesisReader.java | 43 ++---
.../sdk/io/kinesis/KinesisReaderCheckpoint.java | 18 --
.../apache/beam/sdk/io/kinesis/RoundRobin.java | 54 ------
.../beam/sdk/io/kinesis/ShardCheckpoint.java | 8 +-
.../beam/sdk/io/kinesis/ShardReadersPool.java | 162 ++++++++++++++++
.../sdk/io/kinesis/ShardRecordsIterator.java | 90 ++++-----
.../beam/sdk/io/kinesis/KinesisReaderTest.java | 66 +++----
.../beam/sdk/io/kinesis/RoundRobinTest.java | 59 ------
.../sdk/io/kinesis/ShardReadersPoolTest.java | 185 +++++++++++++++++++
.../io/kinesis/ShardRecordsIteratorTest.java | 35 ++--
10 files changed, 454 insertions(+), 266 deletions(-)
----------------------------------------------------------------------