You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/05/26 08:42:05 UTC
[6/8] flink git commit: [FLINK-6653] Avoid directly serializing AWS's
Shard class in Kinesis consumer's checkpoints
http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
index 800fde5..7c36945 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
@@ -17,14 +17,17 @@
package org.apache.flink.streaming.connectors.kinesis.internals;
+import com.amazonaws.services.kinesis.model.HashKeyRange;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
import com.amazonaws.services.kinesis.model.Shard;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardV2;
import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema;
import org.apache.flink.streaming.connectors.kinesis.testutils.FakeKinesisBehavioursFactory;
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
@@ -46,6 +49,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -149,33 +153,33 @@ public class KinesisDataFetcherTest {
fakeStreams.add("fakeStream1");
fakeStreams.add("fakeStream2");
- Map<KinesisStreamShard, String> restoredStateUnderTest = new HashMap<>();
+ Map<StreamShardHandle, String> restoredStateUnderTest = new HashMap<>();
// fakeStream1 has 3 shards before restore
restoredStateUnderTest.put(
- new KinesisStreamShard(
+ new StreamShardHandle(
"fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
UUID.randomUUID().toString());
restoredStateUnderTest.put(
- new KinesisStreamShard(
+ new StreamShardHandle(
"fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
UUID.randomUUID().toString());
restoredStateUnderTest.put(
- new KinesisStreamShard(
+ new StreamShardHandle(
"fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
UUID.randomUUID().toString());
// fakeStream2 has 2 shards before restore
restoredStateUnderTest.put(
- new KinesisStreamShard(
+ new StreamShardHandle(
"fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
UUID.randomUUID().toString());
restoredStateUnderTest.put(
- new KinesisStreamShard(
+ new StreamShardHandle(
"fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
UUID.randomUUID().toString());
@@ -198,10 +202,11 @@ public class KinesisDataFetcherTest {
subscribedStreamsToLastSeenShardIdsUnderTest,
FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
- for (Map.Entry<KinesisStreamShard, String> restoredState : restoredStateUnderTest.entrySet()) {
+ for (Map.Entry<StreamShardHandle, String> restoredState : restoredStateUnderTest.entrySet()) {
fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
fetcher.registerNewSubscribedShardState(
- new KinesisStreamShardState(restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
+ new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredState.getKey()),
+ restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
}
PowerMockito.whenNew(ShardConsumer.class).withAnyArguments().thenReturn(Mockito.mock(ShardConsumer.class));
@@ -238,33 +243,33 @@ public class KinesisDataFetcherTest {
fakeStreams.add("fakeStream1");
fakeStreams.add("fakeStream2");
- Map<KinesisStreamShard, String> restoredStateUnderTest = new HashMap<>();
+ Map<StreamShardHandle, String> restoredStateUnderTest = new HashMap<>();
// fakeStream1 has 3 shards before restore
restoredStateUnderTest.put(
- new KinesisStreamShard(
+ new StreamShardHandle(
"fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
UUID.randomUUID().toString());
restoredStateUnderTest.put(
- new KinesisStreamShard(
+ new StreamShardHandle(
"fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
UUID.randomUUID().toString());
restoredStateUnderTest.put(
- new KinesisStreamShard(
+ new StreamShardHandle(
"fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
UUID.randomUUID().toString());
// fakeStream2 has 2 shards before restore
restoredStateUnderTest.put(
- new KinesisStreamShard(
+ new StreamShardHandle(
"fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
UUID.randomUUID().toString());
restoredStateUnderTest.put(
- new KinesisStreamShard(
+ new StreamShardHandle(
"fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
UUID.randomUUID().toString());
@@ -288,10 +293,11 @@ public class KinesisDataFetcherTest {
subscribedStreamsToLastSeenShardIdsUnderTest,
FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
- for (Map.Entry<KinesisStreamShard, String> restoredState : restoredStateUnderTest.entrySet()) {
+ for (Map.Entry<StreamShardHandle, String> restoredState : restoredStateUnderTest.entrySet()) {
fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
fetcher.registerNewSubscribedShardState(
- new KinesisStreamShardState(restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
+ new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredState.getKey()),
+ restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
}
PowerMockito.whenNew(ShardConsumer.class).withAnyArguments().thenReturn(Mockito.mock(ShardConsumer.class));
@@ -330,33 +336,33 @@ public class KinesisDataFetcherTest {
fakeStreams.add("fakeStream3"); // fakeStream3 will not have any shards
fakeStreams.add("fakeStream4"); // fakeStream4 will not have any shards
- Map<KinesisStreamShard, String> restoredStateUnderTest = new HashMap<>();
+ Map<StreamShardHandle, String> restoredStateUnderTest = new HashMap<>();
// fakeStream1 has 3 shards before restore
restoredStateUnderTest.put(
- new KinesisStreamShard(
+ new StreamShardHandle(
"fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
UUID.randomUUID().toString());
restoredStateUnderTest.put(
- new KinesisStreamShard(
+ new StreamShardHandle(
"fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
UUID.randomUUID().toString());
restoredStateUnderTest.put(
- new KinesisStreamShard(
+ new StreamShardHandle(
"fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
UUID.randomUUID().toString());
// fakeStream2 has 2 shards before restore
restoredStateUnderTest.put(
- new KinesisStreamShard(
+ new StreamShardHandle(
"fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
UUID.randomUUID().toString());
restoredStateUnderTest.put(
- new KinesisStreamShard(
+ new StreamShardHandle(
"fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
UUID.randomUUID().toString());
@@ -382,10 +388,11 @@ public class KinesisDataFetcherTest {
subscribedStreamsToLastSeenShardIdsUnderTest,
FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
- for (Map.Entry<KinesisStreamShard, String> restoredState : restoredStateUnderTest.entrySet()) {
+ for (Map.Entry<StreamShardHandle, String> restoredState : restoredStateUnderTest.entrySet()) {
fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
fetcher.registerNewSubscribedShardState(
- new KinesisStreamShardState(restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
+ new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredState.getKey()),
+ restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
}
PowerMockito.whenNew(ShardConsumer.class).withAnyArguments().thenReturn(Mockito.mock(ShardConsumer.class));
@@ -425,33 +432,33 @@ public class KinesisDataFetcherTest {
fakeStreams.add("fakeStream3"); // fakeStream3 will not have any shards
fakeStreams.add("fakeStream4"); // fakeStream4 will not have any shards
- Map<KinesisStreamShard, String> restoredStateUnderTest = new HashMap<>();
+ Map<StreamShardHandle, String> restoredStateUnderTest = new HashMap<>();
// fakeStream1 has 3 shards before restore
restoredStateUnderTest.put(
- new KinesisStreamShard(
+ new StreamShardHandle(
"fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
UUID.randomUUID().toString());
restoredStateUnderTest.put(
- new KinesisStreamShard(
+ new StreamShardHandle(
"fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
UUID.randomUUID().toString());
restoredStateUnderTest.put(
- new KinesisStreamShard(
+ new StreamShardHandle(
"fakeStream1",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
UUID.randomUUID().toString());
// fakeStream2 has 2 shards before restore
restoredStateUnderTest.put(
- new KinesisStreamShard(
+ new StreamShardHandle(
"fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
UUID.randomUUID().toString());
restoredStateUnderTest.put(
- new KinesisStreamShard(
+ new StreamShardHandle(
"fakeStream2",
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
UUID.randomUUID().toString());
@@ -477,10 +484,11 @@ public class KinesisDataFetcherTest {
subscribedStreamsToLastSeenShardIdsUnderTest,
FakeKinesisBehavioursFactory.nonReshardedStreamsBehaviour(streamToShardCount));
- for (Map.Entry<KinesisStreamShard, String> restoredState : restoredStateUnderTest.entrySet()) {
+ for (Map.Entry<StreamShardHandle, String> restoredState : restoredStateUnderTest.entrySet()) {
fetcher.advanceLastDiscoveredShardOfStream(restoredState.getKey().getStreamName(), restoredState.getKey().getShard().getShardId());
fetcher.registerNewSubscribedShardState(
- new KinesisStreamShardState(restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
+ new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(restoredState.getKey()),
+ restoredState.getKey(), new SequenceNumber(restoredState.getValue())));
}
PowerMockito.whenNew(ShardConsumer.class).withAnyArguments().thenReturn(Mockito.mock(ShardConsumer.class));
@@ -512,6 +520,43 @@ public class KinesisDataFetcherTest {
assertTrue(subscribedStreamsToLastSeenShardIdsUnderTest.get("fakeStream4") == null);
}
+ @Test
+ public void testCreateFunctionToConvertBetweenKinesisStreamShardV2AndStreamShardHandle() {
+ String streamName = "fakeStream1";
+ String shardId = "shard-000001";
+ String parentShardId = "shard-000002";
+ String adjacentParentShardId = "shard-000003";
+ String startingHashKey = "key-000001";
+ String endingHashKey = "key-000010";
+ String startingSequenceNumber = "seq-0000021";
+ String endingSequenceNumber = "seq-00000031";
+
+ KinesisStreamShardV2 kinesisStreamShard = new KinesisStreamShardV2();
+ kinesisStreamShard.setStreamName(streamName);
+ kinesisStreamShard.setShardId(shardId);
+ kinesisStreamShard.setParentShardId(parentShardId);
+ kinesisStreamShard.setAdjacentParentShardId(adjacentParentShardId);
+ kinesisStreamShard.setStartingHashKey(startingHashKey);
+ kinesisStreamShard.setEndingHashKey(endingHashKey);
+ kinesisStreamShard.setStartingSequenceNumber(startingSequenceNumber);
+ kinesisStreamShard.setEndingSequenceNumber(endingSequenceNumber);
+
+ Shard shard = new Shard()
+ .withShardId(shardId)
+ .withParentShardId(parentShardId)
+ .withAdjacentParentShardId(adjacentParentShardId)
+ .withHashKeyRange(new HashKeyRange()
+ .withStartingHashKey(startingHashKey)
+ .withEndingHashKey(endingHashKey))
+ .withSequenceNumberRange(new SequenceNumberRange()
+ .withStartingSequenceNumber(startingSequenceNumber)
+ .withEndingSequenceNumber(endingSequenceNumber));
+ StreamShardHandle streamShardHandle = new StreamShardHandle(streamName, shard);
+
+ assertEquals(kinesisStreamShard, KinesisDataFetcher.createKinesisStreamShardV2(streamShardHandle));
+ assertEquals(streamShardHandle, KinesisDataFetcher.createStreamShardHandle(kinesisStreamShard));
+ }
+
private static class DummyFlinkKafkaConsumer<T> extends FlinkKinesisConsumer<T> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
index 96764a4..4e06329 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumerTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.internals;
import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.Shard;
import org.apache.commons.lang.StringUtils;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber;
@@ -43,7 +43,7 @@ public class ShardConsumerTest {
@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedState() {
- KinesisStreamShard fakeToBeConsumedShard = new KinesisStreamShard(
+ StreamShardHandle fakeToBeConsumedShard = new StreamShardHandle(
"fakeStream",
new Shard()
.withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))
@@ -54,7 +54,8 @@ public class ShardConsumerTest {
LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
subscribedShardsStateUnderTest.add(
- new KinesisStreamShardState(fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
+ new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(fakeToBeConsumedShard),
+ fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
TestableKinesisDataFetcher fetcher =
new TestableKinesisDataFetcher(
@@ -70,7 +71,7 @@ public class ShardConsumerTest {
new ShardConsumer<>(
fetcher,
0,
- subscribedShardsStateUnderTest.get(0).getKinesisStreamShard(),
+ subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
FakeKinesisBehavioursFactory.totalNumOfRecordsAfterNumOfGetRecordsCalls(1000, 9)).run();
@@ -81,7 +82,7 @@ public class ShardConsumerTest {
@Test
public void testCorrectNumOfCollectedRecordsAndUpdatedStateWithUnexpectedExpiredIterator() {
- KinesisStreamShard fakeToBeConsumedShard = new KinesisStreamShard(
+ StreamShardHandle fakeToBeConsumedShard = new StreamShardHandle(
"fakeStream",
new Shard()
.withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))
@@ -92,7 +93,8 @@ public class ShardConsumerTest {
LinkedList<KinesisStreamShardState> subscribedShardsStateUnderTest = new LinkedList<>();
subscribedShardsStateUnderTest.add(
- new KinesisStreamShardState(fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
+ new KinesisStreamShardState(KinesisDataFetcher.createKinesisStreamShardV2(fakeToBeConsumedShard),
+ fakeToBeConsumedShard, new SequenceNumber("fakeStartingState")));
TestableKinesisDataFetcher fetcher =
new TestableKinesisDataFetcher(
@@ -108,7 +110,7 @@ public class ShardConsumerTest {
new ShardConsumer<>(
fetcher,
0,
- subscribedShardsStateUnderTest.get(0).getKinesisStreamShard(),
+ subscribedShardsStateUnderTest.get(0).getStreamShardHandle(),
subscribedShardsStateUnderTest.get(0).getLastProcessedSequenceNum(),
// Get a total of 1000 records with 9 getRecords() calls,
// and the 7th getRecords() call will encounter an unexpected expired shard iterator
http://git-wip-us.apache.org/repos/asf/flink/blob/64ca1aa5/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
index b62e7de..ce5a0de 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisBehavioursFactory.java
@@ -22,7 +22,7 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kinesis.model.Shard;
import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
@@ -55,7 +55,7 @@ public class FakeKinesisBehavioursFactory {
}
@Override
- public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object startingMarker) {
+ public String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) {
return null;
}
@@ -122,7 +122,7 @@ public class FakeKinesisBehavioursFactory {
}
@Override
- public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object startingMarker) {
+ public String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) {
if (!expiredOnceAlready) {
// for the first call, just return the iterator of the first batch of records
return "0";
@@ -181,7 +181,7 @@ public class FakeKinesisBehavioursFactory {
}
@Override
- public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object startingMarker) {
+ public String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) {
// this will be called only one time per ShardConsumer;
// so, simply return the iterator of the first batch of records
return "0";
@@ -209,7 +209,7 @@ public class FakeKinesisBehavioursFactory {
private static class NonReshardedStreamsKinesis implements KinesisProxyInterface {
- private Map<String, List<KinesisStreamShard>> streamsWithListOfShards = new HashMap<>();
+ private Map<String, List<StreamShardHandle>> streamsWithListOfShards = new HashMap<>();
public NonReshardedStreamsKinesis(Map<String,Integer> streamsToShardCount) {
for (Map.Entry<String,Integer> streamToShardCount : streamsToShardCount.entrySet()) {
@@ -219,10 +219,10 @@ public class FakeKinesisBehavioursFactory {
if (shardCount == 0) {
// don't do anything
} else {
- List<KinesisStreamShard> shardsOfStream = new ArrayList<>(shardCount);
+ List<StreamShardHandle> shardsOfStream = new ArrayList<>(shardCount);
for (int i=0; i < shardCount; i++) {
shardsOfStream.add(
- new KinesisStreamShard(
+ new StreamShardHandle(
streamName,
new Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(i))));
}
@@ -234,13 +234,13 @@ public class FakeKinesisBehavioursFactory {
@Override
public GetShardListResult getShardList(Map<String, String> streamNamesWithLastSeenShardIds) {
GetShardListResult result = new GetShardListResult();
- for (Map.Entry<String, List<KinesisStreamShard>> streamsWithShards : streamsWithListOfShards.entrySet()) {
+ for (Map.Entry<String, List<StreamShardHandle>> streamsWithShards : streamsWithListOfShards.entrySet()) {
String streamName = streamsWithShards.getKey();
- for (KinesisStreamShard shard : streamsWithShards.getValue()) {
+ for (StreamShardHandle shard : streamsWithShards.getValue()) {
if (streamNamesWithLastSeenShardIds.get(streamName) == null) {
result.addRetrievedShardToStream(streamName, shard);
} else {
- if (KinesisStreamShard.compareShardIds(
+ if (StreamShardHandle.compareShardIds(
shard.getShard().getShardId(), streamNamesWithLastSeenShardIds.get(streamName)) > 0) {
result.addRetrievedShardToStream(streamName, shard);
}
@@ -251,7 +251,7 @@ public class FakeKinesisBehavioursFactory {
}
@Override
- public String getShardIterator(KinesisStreamShard shard, String shardIteratorType, Object startingMarker) {
+ public String getShardIterator(StreamShardHandle shard, String shardIteratorType, Object startingMarker) {
return null;
}