You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2016/08/30 15:54:03 UTC
flink git commit: [FLINK-4514][kinesis-connector] Handle unexpected
ExpiredIteratorExceptions
Repository: flink
Updated Branches:
refs/heads/master 78d9ae9ba -> b7d83899a
[FLINK-4514][kinesis-connector] Handle unexpected ExpiredIteratorExceptions
This closes #2432
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7d83899
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7d83899
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7d83899
Branch: refs/heads/master
Commit: b7d83899abfe8175b1fc9e526b6afb2ca7a056ed
Parents: 78d9ae9
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Aug 29 17:30:39 2016 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Aug 30 23:50:40 2016 +0800
----------------------------------------------------------------------
.../kinesis/config/ConsumerConfigConstants.java | 7 ++
.../kinesis/internals/ShardConsumer.java | 74 +++++++++++++++-----
.../kinesis/util/KinesisConfigUtil.java | 30 +++++---
.../kinesis/internals/ShardConsumerTest.java | 40 +++++++++++
.../testutils/FakeKinesisBehavioursFactory.java | 66 +++++++++++++++--
5 files changed, 187 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b7d83899/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
index 28ff3e4..76c20ed 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kinesis.config;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
/**
@@ -128,4 +129,10 @@ public class ConsumerConfigConstants extends AWSConfigConstants {
public static final long DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS = 10000L;
+ /**
+ * To avoid shard iterator expires in {@link ShardConsumer}s, the value for the configured
+ * getRecords interval can not exceed 5 minutes, which is the expire time for retrieved iterators.
+ */
+ public static final long MAX_SHARD_GETRECORDS_INTERVAL_MILLIS = 300000L;
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b7d83899/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 6e24e65..612a4a7 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kinesis.internals;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
@@ -29,6 +30,8 @@ import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.math.BigInteger;
@@ -44,6 +47,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class ShardConsumer<T> implements Runnable {
+ private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class);
+
private final KinesisDeserializationSchema<T> deserializer;
private final KinesisProxyInterface kinesis;
@@ -133,7 +138,7 @@ public class ShardConsumer<T> implements Runnable {
kinesis.getShardIterator(subscribedShard, ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
// get only the last aggregated record
- GetRecordsResult getRecordsResult = kinesis.getRecords(itrForLastAggregatedRecord, 1);
+ GetRecordsResult getRecordsResult = getRecords(itrForLastAggregatedRecord, 1);
List<UserRecord> fetchedRecords = deaggregateRecords(
getRecordsResult.getRecords(),
@@ -168,7 +173,7 @@ public class ShardConsumer<T> implements Runnable {
Thread.sleep(fetchIntervalMillis);
}
- GetRecordsResult getRecordsResult = kinesis.getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
+ GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
// each of the Kinesis records may be aggregated, so we must deaggregate them before proceeding
List<UserRecord> fetchedRecords = deaggregateRecords(
@@ -199,11 +204,15 @@ public class ShardConsumer<T> implements Runnable {
}
/**
- * Deserializes a record for collection, and accordingly updates the shard state in the fetcher.
+ * Deserializes a record for collection, and accordingly updates the shard state in the fetcher. The last
+ * successfully collected sequence number in this shard consumer is also updated so that
+ * {@link ShardConsumer#getRecords(String, int)} may be able to use the correct sequence number to refresh shard
+ * iterators if necessary.
+ *
* Note that the server-side Kinesis timestamp is attached to the record when collected. When the
* user programs uses {@link TimeCharacteristic#EventTime}, this timestamp will be used by default.
*
- * @param record
+ * @param record record to deserialize and collect
* @throws IOException
*/
private void deserializeRecordForCollectionAndUpdateState(UserRecord record)
@@ -223,19 +232,52 @@ public class ShardConsumer<T> implements Runnable {
subscribedShard.getStreamName(),
subscribedShard.getShard().getShardId());
- if (record.isAggregated()) {
- fetcherRef.emitRecordAndUpdateState(
- value,
- approxArrivalTimestamp,
- subscribedShardStateIndex,
- new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber()));
- } else {
- fetcherRef.emitRecordAndUpdateState(
- value,
- approxArrivalTimestamp,
- subscribedShardStateIndex,
- new SequenceNumber(record.getSequenceNumber()));
+ SequenceNumber collectedSequenceNumber = (record.isAggregated())
+ ? new SequenceNumber(record.getSequenceNumber(), record.getSubSequenceNumber())
+ : new SequenceNumber(record.getSequenceNumber());
+
+ fetcherRef.emitRecordAndUpdateState(
+ value,
+ approxArrivalTimestamp,
+ subscribedShardStateIndex,
+ collectedSequenceNumber);
+
+ lastSequenceNum = collectedSequenceNumber;
+ }
+
+ /**
+ * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while also handling unexpected
+ * AWS {@link ExpiredIteratorException}s to assure that we get results and don't just fail on
+ * such occasions. The returned shard iterator within the successful {@link GetRecordsResult} should
+ * be used for the next call to this method.
+ *
+ * Note: it is important that this method is not called again before all the records from the last result have been
+ * fully collected with {@link ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, otherwise
+ * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in the middle of an aggregated record, leading to
+ * incorrect shard iteration if the iterator had to be refreshed.
+ *
+ * @param shardItr shard iterator to use
+ * @param maxNumberOfRecords the maximum number of records to fetch for this getRecords attempt
+ * @return get records result
+ * @throws InterruptedException
+ */
+ private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException {
+ GetRecordsResult getRecordsResult = null;
+ while (getRecordsResult == null) {
+ try {
+ getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords);
+ } catch (ExpiredIteratorException eiEx) {
+ LOG.warn("Encountered an unexpected expired iterator {} for shard {};" +
+ " refreshing the iterator ...", shardItr, subscribedShard);
+ shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber());
+
+ // sleep for the fetch interval before the next getRecords attempt with the refreshed iterator
+ if (fetchIntervalMillis != 0) {
+ Thread.sleep(fetchIntervalMillis);
+ }
+ }
}
+ return getRecordsResult;
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/flink/blob/b7d83899/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index d9d553b..9aa14ad 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.connectors.kinesis.config.ProducerConfigConsta
import java.util.Properties;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -65,13 +66,13 @@ public class KinesisConfigUtil {
"Invalid value given for maximum retry attempts for getRecords shard operation. Must be a valid non-negative integer value.");
validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE,
- "Invalid value given for get records operation base backoff milliseconds. Must be a valid non-negative long value");
+ "Invalid value given for get records operation base backoff milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX,
- "Invalid value given for get records operation max backoff milliseconds. Must be a valid non-negative long value");
+ "Invalid value given for get records operation max backoff milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
- "Invalid value given for get records operation backoff exponential constant. Must be a valid non-negative double value");
+ "Invalid value given for get records operation backoff exponential constant. Must be a valid non-negative double value.");
validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
"Invalid value given for getRecords sleep interval in milliseconds. Must be a valid non-negative long value.");
@@ -80,25 +81,34 @@ public class KinesisConfigUtil {
"Invalid value given for maximum retry attempts for getShardIterator shard operation. Must be a valid non-negative integer value.");
validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_BASE,
- "Invalid value given for get shard iterator operation base backoff milliseconds. Must be a valid non-negative long value");
+ "Invalid value given for get shard iterator operation base backoff milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_MAX,
- "Invalid value given for get shard iterator operation max backoff milliseconds. Must be a valid non-negative long value");
+ "Invalid value given for get shard iterator operation max backoff milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
- "Invalid value given for get shard iterator operation backoff exponential constant. Must be a valid non-negative double value");
+ "Invalid value given for get shard iterator operation backoff exponential constant. Must be a valid non-negative double value.");
validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
- "Invalid value given for shard discovery sleep interval in milliseconds. Must be a valid non-negative long value");
+ "Invalid value given for shard discovery sleep interval in milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE,
- "Invalid value given for describe stream operation base backoff milliseconds. Must be a valid non-negative long value");
+ "Invalid value given for describe stream operation base backoff milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX,
- "Invalid value given for describe stream operation max backoff milliseconds. Must be a valid non-negative long value");
+ "Invalid value given for describe stream operation max backoff milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
- "Invalid value given for describe stream operation backoff exponential constant. Must be a valid non-negative double value");
+ "Invalid value given for describe stream operation backoff exponential constant. Must be a valid non-negative double value.");
+
+ if (config.containsKey(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS)) {
+ checkArgument(
+ Long.parseLong(config.getProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS))
+ < ConsumerConfigConstants.MAX_SHARD_GETRECORDS_INTERVAL_MILLIS,
+ "Invalid value given for getRecords sleep interval in milliseconds. Must be lower than " +
+ ConsumerConfigConstants.MAX_SHARD_GETRECORDS_INTERVAL_MILLIS + " milliseconds."
+ );
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/b7d83899/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
index 5b3e1a5..96764a4 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
@@ -79,4 +79,44 @@ public class ShardConsumerTest {
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()));
}
+ @Test
+ public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithUnexpectedExpiredIterator() {
+ KinesisStreamShard fakeToBeConsumedShard = new KinesisStreamShard(
+ "fakeStream",
+ new Shard()
+ .withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))
+ .withHashKeyRange(
+ new HashKeyRange()
+ .withStartingHashKey("0")
+ .withEndingHashKey(new BigInteger(StringUtils.repeat("FF", 16), 16).toString())));
+
+ LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
+ subscribedShardsStateUnderTest.add(
+ new KinesisStreamShardState(fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
+
+ TestableKinesisDataFetcher fetcher =
+ new TestableKinesisDataFetcher(
+ Collections.singletonList("fakeStream"),
+ new Properties(),
+ 10,
+ 2,
+ new AtomicReference<Throwable>(),
+ subscribedShardsStateUnderTest,
+ KinesisDataFetcher.createInitialSubscribedStreamsToLastDiscoveredShardsState(Collections.singletonList("fakeStream")),
+ Mockito.mock(KinesisProxyInterface.class));
+
+ new ShardConsumer<>(
+ fetcher,
+ 0,
+ subscribedShardsStateUnderTest.get(0).getKinesisStreamShard(),
+ subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
+ // Get a total of 1000 records with 9 getRecords() calls,
+ // and the 7th getRecords() call will encounter an unexpected expired shard iterator
+ FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(1000, 9, 7)).run();
+
+ assertTrue(fetcher.getNumOfElementsCollected() == 1000);
+ assertTrue(subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum().equals(
+ SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b7d83899/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index fc98fca..65e6d4e 100644
--- a/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ b/flink-streaming-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.connectors.kinesis.testutils;
+import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
@@ -33,13 +34,15 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
/**
* Factory for different kinds of fake Kinesis behaviours using the {@link KinesisProxyInterface} interface.
*/
public class FakeKinesisBehavioursFactory {
// ------------------------------------------------------------------------
- // Behaviours related to shard listing and resharding, used in ShardDiscovererTest
+ // Behaviours related to shard listing and resharding, used in KinesisDataFetcherTest
// ------------------------------------------------------------------------
public static KinesisProxyInterface noShardsFoundForRequestedStreamsBehaviour() {
@@ -75,14 +78,69 @@ public class FakeKinesisBehavioursFactory {
public static KinesisProxyInterface totalNumOfRecordsAfterNumOfGetRecordsCalls(final int numOfRecords, final int numOfGetRecordsCalls) {
return new SingleShardEmittingFixNumOfRecordsKinesis(numOfRecords, numOfGetRecordsCalls);
}
+
+ public static KinesisProxyInterface totalNumOfRecordsAfterNumOfGetRecordsCallsWithUnexpectedExpiredIterator(
+ final int numOfRecords, final int numOfGetRecordsCall, final int orderOfCallToExpire) {
+ return new SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis(
+ numOfRecords, numOfGetRecordsCall, orderOfCallToExpire);
+ }
+
+ public static class SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis extends SingleShardEmittingFixNumOfRecordsKinesis {
+
+ private boolean expiredOnceAlready = false;
+ private boolean expiredIteratorRefreshed = false;
+ private final int orderOfCallToExpire;
+
+ public SingleShardEmittingFixNumOfRecordsWithExpiredIteratorKinesis(final int numOfRecords,
+ final int numOfGetRecordsCalls,
+ final int orderOfCallToExpire) {
+ super(numOfRecords, numOfGetRecordsCalls);
+ checkArgument(orderOfCallToExpire <= numOfGetRecordsCalls,
+ "can not test unexpected expired iterator if orderOfCallToExpire is larger than numOfGetRecordsCalls");
+ this.orderOfCallToExpire = orderOfCallToExpire;
+ }
+
+ @Override
+ public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) {
+ if ((Integer.valueOf(shardIterator) == orderOfCallToExpire-1) && !expiredOnceAlready) {
+ // we fake only once the expired iterator exception at the specified get records attempt order
+ expiredOnceAlready = true;
+ throw new ExpiredIteratorException("Artificial expired shard iterator");
+ } else if (expiredOnceAlready && !expiredIteratorRefreshed) {
+ // if we've thrown the expired iterator exception already, but the iterator was not refreshed,
+ // throw a hard exception to the test that is testing this Kinesis behaviour
+ throw new RuntimeException("expired shard iterator was not refreshed on the next getRecords() call");
+ } else {
+ // assuming that the maxRecordsToGet is always large enough
+ return new GetRecordsResult()
+ .withRecords(shardItrToRecordBatch.get(shardIterator))
+ .withNextShardIterator(
+ (Integer.valueOf(shardIterator) == totalNumOfGetRecordsCalls - 1)
+ ? null : String.valueOf(Integer.valueOf(shardIterator) + 1)); // last next shard iterator is null
+ }
+ }
+
+ @Override
+ public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, String startingSeqNum) {
+ if (!expiredOnceAlready) {
+ // for the first call, just return the iterator of the first batch of records
+ return "0";
+ } else {
+ // fake the iterator refresh when this is called again after getRecords throws expired iterator
+ // exception on the orderOfCallToExpire attempt
+ expiredIteratorRefreshed = true;
+ return String.valueOf(orderOfCallToExpire-1);
+ }
+ }
+ }
private static class SingleShardEmittingFixNumOfRecordsKinesis implements KinesisProxyInterface {
- private final int totalNumOfGetRecordsCalls;
+ protected final int totalNumOfGetRecordsCalls;
- private final int totalNumOfRecords;
+ protected final int totalNumOfRecords;
- private final Map<String,List<Record>> shardItrToRecordBatch;
+ protected final Map<String,List<Record>> shardItrToRecordBatch;
public SingleShardEmittingFixNumOfRecordsKinesis(final int numOfRecords, final int numOfGetRecordsCalls) {
this.totalNumOfRecords = numOfRecords;