You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2019/06/04 23:26:31 UTC

[incubator-druid] branch master updated: Kinesis: Fix getPartitionIds, should be checking isHasMoreShards. (#7830)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 1de1a02  Kinesis: Fix getPartitionIds, should be checking isHasMoreShards. (#7830)
1de1a02 is described below

commit 1de1a02e49f59506b52bfa404a56ad6af3e5efc6
Author: Gian Merlino <gi...@imply.io>
AuthorDate: Tue Jun 4 16:26:22 2019 -0700

    Kinesis: Fix getPartitionIds, should be checking isHasMoreShards. (#7830)
---
 .../indexing/kinesis/KinesisRecordSupplier.java    | 35 ++++++++++++++++++----
 .../kinesis/KinesisRecordSupplierTest.java         | 35 +++++++++++++++-------
 2 files changed, 55 insertions(+), 15 deletions(-)

diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
index a55a572..303d490 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
@@ -26,6 +26,8 @@ import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
 import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
 import com.amazonaws.services.kinesis.model.GetRecordsRequest;
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
@@ -35,11 +37,13 @@ import com.amazonaws.services.kinesis.model.Record;
 import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
 import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import com.amazonaws.services.kinesis.model.StreamDescription;
 import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
 import com.amazonaws.util.AwsHostNameUtils;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Queues;
 import org.apache.druid.common.aws.AWSCredentialsConfig;
 import org.apache.druid.common.aws.AWSCredentialsUtils;
@@ -62,6 +66,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -581,11 +586,31 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String>
   @Override
   public Set<String> getPartitionIds(String stream)
   {
-    return wrapExceptions(() -> kinesis.describeStream(stream)
-                                       .getStreamDescription()
-                                       .getShards()
-                                       .stream()
-                                       .map(Shard::getShardId).collect(Collectors.toSet()));
+    return wrapExceptions(
+        () -> {
+          final Set<String> retVal = new HashSet<>();
+          DescribeStreamRequest request = new DescribeStreamRequest();
+          request.setStreamName(stream);
+
+          while (request != null) {
+            final DescribeStreamResult result = kinesis.describeStream(request);
+            final StreamDescription streamDescription = result.getStreamDescription();
+            final List<Shard> shards = streamDescription.getShards();
+
+            for (Shard shard : shards) {
+              retVal.add(shard.getShardId());
+            }
+
+            if (streamDescription.isHasMoreShards()) {
+              request.setExclusiveStartShardId(Iterables.getLast(shards).getShardId());
+            } else {
+              request = null;
+            }
+          }
+
+          return retVal;
+        }
+    );
   }
 
   @Override
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
index bd1fbe8..6dd7de9 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.kinesis;
 
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
 import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import com.amazonaws.services.kinesis.model.GetRecordsRequest;
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
@@ -65,12 +66,14 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
   private static String shard1Iterator = "1";
   private static String shard0Iterator = "0";
   private static AmazonKinesis kinesis;
-  private static DescribeStreamResult describeStreamResult;
+  private static DescribeStreamResult describeStreamResult0;
+  private static DescribeStreamResult describeStreamResult1;
   private static GetShardIteratorResult getShardIteratorResult0;
   private static GetShardIteratorResult getShardIteratorResult1;
   private static GetRecordsResult getRecordsResult0;
   private static GetRecordsResult getRecordsResult1;
-  private static StreamDescription streamDescription;
+  private static StreamDescription streamDescription0;
+  private static StreamDescription streamDescription1;
   private static Shard shard0;
   private static Shard shard1;
   private static KinesisRecordSupplier recordSupplier;
@@ -142,12 +145,14 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
   public void setupTest()
   {
     kinesis = createMock(AmazonKinesisClient.class);
-    describeStreamResult = createMock(DescribeStreamResult.class);
+    describeStreamResult0 = createMock(DescribeStreamResult.class);
+    describeStreamResult1 = createMock(DescribeStreamResult.class);
     getShardIteratorResult0 = createMock(GetShardIteratorResult.class);
     getShardIteratorResult1 = createMock(GetShardIteratorResult.class);
     getRecordsResult0 = createMock(GetRecordsResult.class);
     getRecordsResult1 = createMock(GetRecordsResult.class);
-    streamDescription = createMock(StreamDescription.class);
+    streamDescription0 = createMock(StreamDescription.class);
+    streamDescription1 = createMock(StreamDescription.class);
     shard0 = createMock(Shard.class);
     shard1 = createMock(Shard.class);
     recordsPerFetch = 1;
@@ -163,11 +168,17 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
   @Test
   public void testSupplierSetup()
   {
-    Capture<String> captured = Capture.newInstance();
-    expect(kinesis.describeStream(capture(captured))).andReturn(describeStreamResult).once();
-    expect(describeStreamResult.getStreamDescription()).andReturn(streamDescription).once();
-    expect(streamDescription.getShards()).andReturn(ImmutableList.of(shard0, shard1)).once();
-    expect(shard0.getShardId()).andReturn(shardId0).once();
+    final Capture<DescribeStreamRequest> capturedRequest = Capture.newInstance();
+
+    expect(kinesis.describeStream(capture(capturedRequest))).andReturn(describeStreamResult0).once();
+    expect(describeStreamResult0.getStreamDescription()).andReturn(streamDescription0).once();
+    expect(streamDescription0.getShards()).andReturn(ImmutableList.of(shard0)).once();
+    expect(streamDescription0.isHasMoreShards()).andReturn(true).once();
+    expect(shard0.getShardId()).andReturn(shardId0).times(2);
+    expect(kinesis.describeStream(anyObject(DescribeStreamRequest.class))).andReturn(describeStreamResult1).once();
+    expect(describeStreamResult1.getStreamDescription()).andReturn(streamDescription1).once();
+    expect(streamDescription1.getShards()).andReturn(ImmutableList.of(shard1)).once();
+    expect(streamDescription1.isHasMoreShards()).andReturn(false).once();
     expect(shard1.getShardId()).andReturn(shardId1).once();
 
     replayAll();
@@ -199,7 +210,11 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
     Assert.assertEquals(Collections.emptyList(), recordSupplier.poll(100));
 
     verifyAll();
-    Assert.assertEquals(stream, captured.getValue());
+
+    final DescribeStreamRequest expectedRequest = new DescribeStreamRequest();
+    expectedRequest.setStreamName(stream);
+    expectedRequest.setExclusiveStartShardId("0");
+    Assert.assertEquals(expectedRequest, capturedRequest.getValue());
   }
 
   private static GetRecordsRequest generateGetRecordsReq(String shardIterator, int limit)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org