You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2022/02/07 03:09:13 UTC

[GitHub] [druid] AmatyaAvadhanula opened a new pull request #12235: Optimize kinesis ingestion task assignment after resharding

AmatyaAvadhanula opened a new pull request #12235:
URL: https://github.com/apache/druid/pull/12235


   
   <!-- Thanks for trying to help us make Apache Druid be the best it can be! Please fill out as much of the following information as is possible (where relevant, and remove it when irrelevant) to help make the intention and scope of this PR clear in order to ease review. -->
   
   <!-- Please read the doc for contribution (https://github.com/apache/druid/blob/master/CONTRIBUTING.md) before making this PR. Also, once you open a PR, please _avoid using force pushes and rebasing_ since these make it difficult for reviewers to see what you've changed in response to their reviews. See [the 'If your pull request shows conflicts with master' section](https://github.com/apache/druid/blob/master/CONTRIBUTING.md#if-your-pull-request-shows-conflicts-with-master) for more details. -->
   
   <!-- Replace XXXX with the id of the issue fixed in this PR. Remove this section if there is no corresponding issue. Don't reference the issue in the title of this pull-request. -->
   
   <!-- If you are a committer, follow the PR action item checklist for committers:
   https://github.com/apache/druid/blob/master/dev/committer-instructions.md#pr-and-issue-action-item-checklist-for-committers. -->
   
   ### Description
   
   When a kinesis stream is resharded, the original shards are closed. A large number of intermediate shards may also be created in the process which are eventually closed as well. 
   
   If a shard is closed before any records are put into it, it would be ideal to ignore this shard for ingestion, to increase efficiency.
   
   While we read from kinesis for shards frequently, both open and closed shards are returned and it is expensive to determine if a closed shard was ever written to, since it requires polling each shard for its records.
   
   <!-- Describe the goal of this PR, what problem are you fixing. If there is a corresponding issue (referenced above), it's not necessary to repeat the description here, however, you may choose to keep one summary sentence. -->
   
   The goal of this PR is to efficiently determine which shards are relevant for ingestion and to avoid unnecessary expensive calls.
   
   <!-- Describe your patch: what did you change in code? How did you fix the problem? -->
   
   KinesisRecordSupplier is used to get a list of all shards during Kinesis ingestion. This patch modifies the returned value from the list of all present shards to the list of all shards relevant for ingestion.
   
   Repetitive calls to kinesis for shards' records are avoided by maintaining an in-memory cache .
   
   <!-- If there are several relatively logically separate changes in this PR, create a mini-section for each of them. For example: -->
   
   
   <!--
   In each section, please describe design decisions made, including:
    - Choice of algorithms
    - Behavioral aspects. What configuration values are acceptable? How are corner cases and error conditions handled, such as when there are insufficient resources?
    - Class organization and design (how the logic is split between classes, inheritance, composition, design patterns)
    - Method organization and design (how the logic is split between methods, parameters and return types)
    - Naming (class, method, API, configuration, HTTP endpoint, names of emitted metrics)
   -->
   
   An in memory cache is implemented in KinesisRecordSupplier to maintain closed shard (empty and non-empty, separately) to avoid making redundant expensive calls.
   
   KinesisRecordSupplier#getPartitionIds(stream) may be called for any stream and there would be no good way to clear this data from the cache, which is why the cache is maintained for streams which have at least one shard assigned to a particular instance of this class and may be added / removed when partition re-assignment happens
   
   When a shard is expired, it is removed from the cache in the next call to getPartitionIds
   
   <!-- It's good to describe an alternative design (or mention an alternative name) for every design (or naming) decision point and compare the alternatives with the designs that you've implemented (or the names you've chosen) to highlight the advantages of the chosen designs and names. -->
   
   Alternative design:
   
   Update the metdata with end offsets of closed and empty shards. This may be simpler to implement since it doesn't require a cache but would lead to waste of resources since a task would have to update the metadata
   
   <!-- If there was a discussion of the design of the feature implemented in this PR elsewhere (e. g. a "Proposal" issue, any other issue, or a thread in the development mailing list), link to that discussion from this PR description and explain what have changed in your final design compared to your original proposal or the consensus version in the end of the discussion. If something hasn't changed since the original discussion, you can omit a detailed discussion of those aspects of the design here, perhaps apart from brief mentioning for the sake of readability of this PR description. -->
   
   <!-- Some of the aspects mentioned above may be omitted for simple and small changes. -->
   
   <hr>
   
   ##### Key changed/added classes in this PR
    * `KinesisRecordSupplier`
   
   <hr>
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items from the checklist below are strictly necessary, but it would be very helpful if you at least self-review the PR. -->
   
   This PR has:
   - [x] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [x] added documentation for new or modified features or behaviors.
   - [x] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [x] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a change in pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
AmatyaAvadhanula commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r803339870



##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
##########
@@ -416,6 +424,49 @@ protected boolean supportsPartitionExpiration()
     return true;
   }
 
+  @Override
+  protected boolean shouldSkipIgnorablePartitions()
+  {
+    return spec.getSpec().getTuningConfig().shouldSkipIgnorableShards();
+  }
+
+  /**
+   * Closed and empty shards can be ignored for ingestion,
+   * Use this method if skipIgnorablePartitions is true in the spec
+   *
+   * These partitions can be safely ignored for both ingestion task assignment and autoscaler limits
+   *
+   * @return the set of ignorable shards' ids
+   */
+  @Override
+  protected Set<String> getIgnorablePartitionIds()
+  {
+    updateClosedShardCache();
+    return getEmptyClosedShardIds();
+  }
+
+  private void updateClosedShardCache()
+  {
+    String stream = spec.getSource();
+    Set<Shard> allActiveShards = ((KinesisRecordSupplier) recordSupplier).getShards(stream);
+    Set<String> activeClosedShards = allActiveShards.stream()
+                                                    .filter(shard -> isShardClosed(shard))
+                                                    .map(Shard::getShardId).collect(Collectors.toSet());
+
+    // clear stale shards
+    emptyClosedShardIds.retainAll(activeClosedShards);

Review comment:
       Method has been refactored to make cache usage obvious




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r806501122



##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -750,6 +752,25 @@ public boolean isAnyFetchActive()
                              .anyMatch(fetch -> (fetch != null && !fetch.isDone()));
   }
 
+  /**
+   * Is costly and requires polling the shard to determine if it's empty
+   * @param stream to which shard belongs
+   * @param shardId of the closed shard
+   * @return if the closed shard is empty

Review comment:
       Nit:
   ```suggestion
      * @return true if the closed shard is empty, false otherwise
   ```

##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
##########
@@ -481,4 +535,47 @@ protected boolean supportsPartitionExpiration()
 
     return new KinesisDataSourceMetadata(newSequences);
   }
+
+  /**
+   * A shard is considered closed iff it has an ending sequence number.
+   *
+   * @param shard to be checked
+   * @return if shard is closed
+   */
+  private boolean isShardClosed(Shard shard)
+  {
+    return shard.getSequenceNumberRange().getEndingSequenceNumber() != null;
+  }
+
+  /**
+   * Checking if a shard is empty requires polling for records which is quite expensive
+   * Fortunately, the results can be cached for closed shards as no more records can be written to them
+   * Please use this method only if the info is absent from the cache
+   *
+   * @param stream to which the shard belongs
+   * @param shardId of the shard
+   * @return if the shard is empty
+   */
+  private boolean isClosedShardEmpty(String stream, String shardId)
+  {
+    return ((KinesisRecordSupplier) recordSupplier).isClosedShardEmpty(stream, shardId);

Review comment:
       Nit: Since this is a single statement method and is used only in one place, we could move this to the calling method itself. Might help readability.

##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -667,28 +666,31 @@ public String getEarliestSequenceNumber(StreamPartition<String> partition)
    * This makes the method resilient to LimitExceeded exceptions (compared to 100 shards, 10 TPS of describeStream)
    *
    * @param stream name of stream
-   *
-   * @return Set of Shard ids
+   * @return Immutable set of shards
    */
+  public Set<Shard> getShards(String stream)
+  {
+    ImmutableSet.Builder<Shard> shards = ImmutableSet.builder();
+    ListShardsRequest request = new ListShardsRequest().withStreamName(stream);
+    while (true) {
+      ListShardsResult result = kinesis.listShards(request);
+      shards.addAll(result.getShards());
+      String nextToken = result.getNextToken();
+      if (nextToken == null) {
+        return shards.build();
+      }
+      request = new ListShardsRequest().withNextToken(nextToken);
+    }
+  }
+
   @Override
   public Set<String> getPartitionIds(String stream)
   {
     return wrapExceptions(() -> {
-      final Set<String> retVal = new TreeSet<>();
-      ListShardsRequest request = new ListShardsRequest().withStreamName(stream);
-      while (true) {
-        ListShardsResult result = kinesis.listShards(request);
-        retVal.addAll(result.getShards()
-                            .stream()
-                            .map(Shard::getShardId)
-                            .collect(Collectors.toList())
-        );
-        String nextToken = result.getNextToken();
-        if (nextToken == null) {
-          return retVal;
-        }
-        request = new ListShardsRequest().withNextToken(nextToken);
-      }
+      return ImmutableSet.copyOf(getShards(stream).stream()
+                                                  .map(shard -> shard.getShardId())
+                                                  .collect(Collectors.toList())

Review comment:
       Nit: Can we just collect to an immutable Set instead of collecting to a List first and then converting to a Set?

##########
File path: extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
##########
@@ -1023,4 +1024,61 @@ public void getPartitionTimeLag() throws InterruptedException
     }
     verifyAll();
   }
+
+  @Test
+  public void testIsClosedShardEmpty()
+  {
+    AmazonKinesis mockKinesis = EasyMock.mock(AmazonKinesis.class);
+    KinesisRecordSupplier target = new KinesisRecordSupplier(mockKinesis,
+                                                             recordsPerFetch,
+                                                             0,
+                                                             2,
+                                                             false,
+                                                             100,
+                                                             5000,
+                                                             5000,
+                                                             60000,
+                                                             5,
+                                                             true
+    );
+    Record record = new Record();
+    String shardId;
+
+    // No records and null iterator -> empty
+    shardId = "0";
+    isClosedShardEmptyHelper(mockKinesis, shardId, new ArrayList<>(), null);
+    Assert.assertTrue(target.isClosedShardEmpty(STREAM, shardId));
+
+    // no records and non-null iterator -> non-empty
+    shardId = "1";
+    isClosedShardEmptyHelper(mockKinesis, shardId, new ArrayList<>(), "nextIterator");
+    Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId));
+
+    // non-empty records and null iterator -> non-empty
+    shardId = "2";
+    isClosedShardEmptyHelper(mockKinesis, shardId, Collections.singletonList(record), null);
+    Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId));
+
+    // non-empty records and non-null iterator -> non-empty
+    shardId = "3";

Review comment:
       It doesn't seem like the `shardId` matters in these tests, because we are doing an `EasyMock.reset(kinesis)` anyway. If not required, just use a final `shardId`.
   
   Option 2 (preferred): Use different `shardIds` but do not call reset on the mock.

##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -750,6 +752,25 @@ public boolean isAnyFetchActive()
                              .anyMatch(fetch -> (fetch != null && !fetch.isDone()));
   }
 
+  /**
+   * Is costly and requires polling the shard to determine if it's empty

Review comment:
       Nit:
   I don't think we are really polling anything here.
   
   ```suggestion
      * Fetches records from the specified shard to determine if it is empty.
   ```

##########
File path: extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
##########
@@ -339,9 +342,9 @@ public void testNoInitialStateWithAutoScaleOut() throws Exception
     EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
     EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
     EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
-            new KinesisDataSourceMetadata(
-                    null
-            )
+        new KinesisDataSourceMetadata(

Review comment:
       Nit: If you are reformatting this, maybe just put the whole constructor on a single line.

##########
File path: extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
##########
@@ -1294,7 +1297,9 @@ public void testRequeueAdoptedTaskWhenFailed() throws Exception
             .times(1);
 
 
-    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(captured.getValue())).anyTimes();
+    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))

Review comment:
       Nit: Please try to avoid formatting changes unless they are relevant to the core set of changes.

##########
File path: extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
##########
@@ -4885,6 +4907,135 @@ private void testShardMergePhaseThree(List<Task> phaseTwoTasks) throws Exception
     Assert.assertEquals(expectedPartitionOffsets, supervisor.getPartitionOffsets());
   }
 
+  @Test
+  public void testUpdateClosedShardCache()
+  {
+    supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null);
+    supervisor.setupRecordSupplier();
+    supervisor.tryInit();
+    String stream = supervisor.getKinesisSupervisorSpec().getSource();
+    Shard openShard = EasyMock.mock(Shard.class);
+    Shard emptyClosedShard = EasyMock.mock(Shard.class);
+    Shard nonEmptyClosedShard = EasyMock.mock(Shard.class);
+    Set<Shard> activeShards;
+    Set<String> emptyClosedShardIds;
+    Set<String> nonEmptyClosedShardIds;
+
+    // ITERATION 0:
+    // active shards: an open shard, closed-empty shard and closed-nonEmpty shard
+    activeShards = getActiveShards(openShard, true,
+                                   emptyClosedShard, true,
+                                   nonEmptyClosedShard, true);
+
+    EasyMock.reset(supervisorRecordSupplier);

Review comment:
       Rather than resetting the mock objects every time, you could do the following:
   1. have some open shards, closed empty and closed non-empty shards (and maybe open empty too, just to be sure)
   2. setup the record supplier to return all of these when calling `getShards`
   3. setup the record supplier to return the correct values for `isClosedShardEmpty` for each shard
   4. call `supervisor.getIgnorablePartitionIds()` and verify the returned values
   5. in step 4, verify the shard ids on which the mock record supplier calls `isClosedShardEmpty`
   (optional step 5a: move some shards from open to closed, empty to non-empty, add new shards, etc.)
   6. call `supervisor.getIgnorablePartitionIds()` again and verify the returned values
   7. In step 6, verify the shard ids on which the mock record supplier calls `isClosedShardEmpty`
   
   I feel this would be easier to follow and more representative of what would happen in a real scenario. It would also help you avoid resetting the mock objects and avoid exposing the internal caches which are currently only visible for testing.

##########
File path: extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
##########
@@ -1023,4 +1024,61 @@ public void getPartitionTimeLag() throws InterruptedException
     }
     verifyAll();
   }
+
+  @Test
+  public void testIsClosedShardEmpty()
+  {
+    AmazonKinesis mockKinesis = EasyMock.mock(AmazonKinesis.class);
+    KinesisRecordSupplier target = new KinesisRecordSupplier(mockKinesis,
+                                                             recordsPerFetch,
+                                                             0,
+                                                             2,
+                                                             false,
+                                                             100,
+                                                             5000,
+                                                             5000,
+                                                             60000,
+                                                             5,
+                                                             true
+    );
+    Record record = new Record();
+    String shardId;
+
+    // No records and null iterator -> empty
+    shardId = "0";
+    isClosedShardEmptyHelper(mockKinesis, shardId, new ArrayList<>(), null);
+    Assert.assertTrue(target.isClosedShardEmpty(STREAM, shardId));
+
+    // no records and non-null iterator -> non-empty
+    shardId = "1";
+    isClosedShardEmptyHelper(mockKinesis, shardId, new ArrayList<>(), "nextIterator");
+    Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId));
+
+    // non-empty records and null iterator -> non-empty
+    shardId = "2";
+    isClosedShardEmptyHelper(mockKinesis, shardId, Collections.singletonList(record), null);
+    Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId));
+
+    // non-empty records and non-null iterator -> non-empty
+    shardId = "3";
+    isClosedShardEmptyHelper(mockKinesis, shardId, Collections.singletonList(record), "nextIterator");
+    Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId));
+  }
+
+  private void isClosedShardEmptyHelper(AmazonKinesis kinesis, String shardId,

Review comment:
       Rename this to something more descriptive of the actual contents of this method.
   e.g. `setupMockKinesisForShardId` or something. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r801700846



##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -750,6 +752,25 @@ public boolean isAnyFetchActive()
                              .anyMatch(fetch -> (fetch != null && !fetch.isDone()));
   }
 
+  /**
+   * Is costly and requires polling the shard to determine if it's empty
+   * @param stream to which shard belongs
+   * @param shardId of the shard
+   * @return if the shard is empty
+   */
+  public boolean isShardEmpty(String stream, String shardId)

Review comment:
       To be clear, this method will not return true for all the empty shards. it will return true for all the empty shards that are also closed. is that correct? 

##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
##########
@@ -416,6 +424,49 @@ protected boolean supportsPartitionExpiration()
     return true;
   }
 
+  @Override
+  protected boolean shouldSkipIgnorablePartitions()
+  {
+    return spec.getSpec().getTuningConfig().shouldSkipIgnorableShards();
+  }
+
+  /**
+   * Closed and empty shards can be ignored for ingestion,
+   * Use this method if skipIgnorablePartitions is true in the spec
+   *
+   * These partitions can be safely ignored for both ingesetion task assignment and autoscaler limits
+   *
+   * @return the set of ignorable shards' ids
+   */
+  @Override
+  protected Set<String> getIgnorablePartitionIds()
+  {
+    updateClosedShardCache();
+    return ImmutableSet.copyOf(emptyClosedShardIds);
+  }
+
+  private void updateClosedShardCache()
+  {
+    String stream = spec.getSource();
+    Set<Shard> allActiveShards = ((KinesisRecordSupplier) recordSupplier).getShards(stream);
+    Set<String> activeClosedShards = allActiveShards.stream()
+                                                    .filter(shard -> isShardOpen(shard))

Review comment:
       ```suggestion
                                                       .filter(shard -> !isShardOpen(shard))
   ```
   is that how it should be? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] zachjsh commented on a change in pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
zachjsh commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r808573807



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -2291,9 +2291,30 @@ protected boolean supportsPartitionExpiration()
     return false;
   }
 
+  protected boolean shouldSkipIgnorablePartitions()

Review comment:
       Should we not do something similar for Kafka? Why is this not an issue with Kafka?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r808647249



##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
##########
@@ -88,6 +91,11 @@
   private final AWSCredentialsConfig awsCredentialsConfig;
   private volatile Map<String, Long> currentPartitionTimeLag;
 
+  // Maintain sets of currently closed shards to find "bad" (closed and empty) shards
+  // Poll closed shards once and store the result to avoid redundant costly calls to kinesis
+  private final Set<String> emptyClosedShardIds = new TreeSet<>();

Review comment:
       Synchronizing the whole method `updateClosedShardCache` would actually be preferable because the state returned by two subsequent calls to `recordSupplier.getShards()` can be different.
   So this call should happen inside the synchronized block, as should the calls to `recordSupplier.isClosedShardEmpty()`.
   
   I hope this doesn't cause bottlenecks though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r808620269



##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
##########
@@ -88,6 +91,11 @@
   private final AWSCredentialsConfig awsCredentialsConfig;
   private volatile Map<String, Long> currentPartitionTimeLag;
 
+  // Maintain sets of currently closed shards to find "bad" (closed and empty) shards
+  // Poll closed shards once and store the result to avoid redundant costly calls to kinesis
+  private final Set<String> emptyClosedShardIds = new TreeSet<>();

Review comment:
       Thanks for pointing this out, @zachjsh.
   This code would be executed by the `SeekableStreamSupervisor` while executing a `RunNotice` (scheduled when status of a task changes) as well as a `DynamicAllocationTasksNotice` (scheduled for auto-scaling). There is a possibility of contention between these two executions.
   
   We can make the part where the caches are updated `synchronized`.
   Just changing these two caches to a `Concurrent` version might not be enough as a whole new list of active shards is fetched in `updateClosedShardCache()` and the caches must be updated with this new state before any other action is performed.
   
   cc: @AmatyaAvadhanula 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r807596846



##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -667,28 +666,31 @@ public String getEarliestSequenceNumber(StreamPartition<String> partition)
    * This makes the method resilient to LimitExceeded exceptions (compared to 100 shards, 10 TPS of describeStream)
    *
    * @param stream name of stream
-   *
-   * @return Set of Shard ids
+   * @return Immutable set of shards
    */
+  public Set<Shard> getShards(String stream)
+  {
+    ImmutableSet.Builder<Shard> shards = ImmutableSet.builder();
+    ListShardsRequest request = new ListShardsRequest().withStreamName(stream);
+    while (true) {
+      ListShardsResult result = kinesis.listShards(request);
+      shards.addAll(result.getShards());
+      String nextToken = result.getNextToken();
+      if (nextToken == null) {
+        return shards.build();
+      }
+      request = new ListShardsRequest().withNextToken(nextToken);
+    }
+  }
+
   @Override
   public Set<String> getPartitionIds(String stream)
   {
     return wrapExceptions(() -> {
-      final Set<String> retVal = new TreeSet<>();
-      ListShardsRequest request = new ListShardsRequest().withStreamName(stream);
-      while (true) {
-        ListShardsResult result = kinesis.listShards(request);
-        retVal.addAll(result.getShards()
-                            .stream()
-                            .map(Shard::getShardId)
-                            .collect(Collectors.toList())
-        );
-        String nextToken = result.getNextToken();
-        if (nextToken == null) {
-          return retVal;
-        }
-        request = new ListShardsRequest().withNextToken(nextToken);
-      }
+      return ImmutableSet.copyOf(getShards(stream).stream()
+                                                  .map(shard -> shard.getShardId())
+                                                  .collect(Collectors.toList())

Review comment:
       Also, does it really need to be wrapped inside ImmutableSet again? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz merged pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
kfaraz merged pull request #12235:
URL: https://github.com/apache/druid/pull/12235


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a change in pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
AmatyaAvadhanula commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r807783920



##########
File path: extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
##########
@@ -4885,6 +4907,135 @@ private void testShardMergePhaseThree(List<Task> phaseTwoTasks) throws Exception
     Assert.assertEquals(expectedPartitionOffsets, supervisor.getPartitionOffsets());
   }
 
+  @Test
+  public void testUpdateClosedShardCache()
+  {
+    supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null);
+    supervisor.setupRecordSupplier();
+    supervisor.tryInit();
+    String stream = supervisor.getKinesisSupervisorSpec().getSource();
+    Shard openShard = EasyMock.mock(Shard.class);
+    Shard emptyClosedShard = EasyMock.mock(Shard.class);
+    Shard nonEmptyClosedShard = EasyMock.mock(Shard.class);
+    Set<Shard> activeShards;
+    Set<String> emptyClosedShardIds;
+    Set<String> nonEmptyClosedShardIds;
+
+    // ITERATION 0:
+    // active shards: an open shard, closed-empty shard and closed-nonEmpty shard
+    activeShards = getActiveShards(openShard, true,
+                                   emptyClosedShard, true,
+                                   nonEmptyClosedShard, true);
+
+    EasyMock.reset(supervisorRecordSupplier);

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r808620959



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -2291,9 +2291,30 @@ protected boolean supportsPartitionExpiration()
     return false;
   }
 
+  protected boolean shouldSkipIgnorablePartitions()

Review comment:
       We haven't encountered something similar in Kafka yet. But the API has been put in place so that if `KafkaSupervisor` needs to do something similar, it can override the methods `shouldSkipIgnorablePartitions()` and `getIgnorablePartitionIds()`.
   
   For Kinesis, the ignorable partitions translate to empty and closed shards, which is a concept specific to Kinesis.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a change in pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
AmatyaAvadhanula commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r808759383



##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
##########
@@ -88,6 +91,11 @@
   private final AWSCredentialsConfig awsCredentialsConfig;
   private volatile Map<String, Long> currentPartitionTimeLag;
 
+  // Maintain sets of currently closed shards to find "bad" (closed and empty) shards
+  // Poll closed shards once and store the result to avoid redundant costly calls to kinesis
+  private final Set<String> emptyClosedShardIds = new TreeSet<>();

Review comment:
       The whole method has been synchronized. Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a change in pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
AmatyaAvadhanula commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r808759794



##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
##########
@@ -88,6 +91,11 @@
   private final AWSCredentialsConfig awsCredentialsConfig;
   private volatile Map<String, Long> currentPartitionTimeLag;
 
+  // Maintain sets of currently closed shards to find "bad" (closed and empty) shards

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r806501122



##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -750,6 +752,25 @@ public boolean isAnyFetchActive()
                              .anyMatch(fetch -> (fetch != null && !fetch.isDone()));
   }
 
+  /**
+   * Is costly and requires polling the shard to determine if it's empty
+   * @param stream to which shard belongs
+   * @param shardId of the closed shard
+   * @return if the closed shard is empty

Review comment:
       Nit:
   ```suggestion
      * @return true if the closed shard is empty, false otherwise
   ```

##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
##########
@@ -481,4 +535,47 @@ protected boolean supportsPartitionExpiration()
 
     return new KinesisDataSourceMetadata(newSequences);
   }
+
+  /**
+   * A shard is considered closed iff it has an ending sequence number.
+   *
+   * @param shard to be checked
+   * @return if shard is closed
+   */
+  private boolean isShardClosed(Shard shard)
+  {
+    return shard.getSequenceNumberRange().getEndingSequenceNumber() != null;
+  }
+
+  /**
+   * Checking if a shard is empty requires polling for records which is quite expensive
+   * Fortunately, the results can be cached for closed shards as no more records can be written to them
+   * Please use this method only if the info is absent from the cache
+   *
+   * @param stream to which the shard belongs
+   * @param shardId of the shard
+   * @return if the shard is empty
+   */
+  private boolean isClosedShardEmpty(String stream, String shardId)
+  {
+    return ((KinesisRecordSupplier) recordSupplier).isClosedShardEmpty(stream, shardId);

Review comment:
       Nit: Since this is a single statement method and is used only in one place, we could move this to the calling method itself. Might help readability.

##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -667,28 +666,31 @@ public String getEarliestSequenceNumber(StreamPartition<String> partition)
    * This makes the method resilient to LimitExceeded exceptions (compared to 100 shards, 10 TPS of describeStream)
    *
    * @param stream name of stream
-   *
-   * @return Set of Shard ids
+   * @return Immutable set of shards
    */
+  public Set<Shard> getShards(String stream)
+  {
+    ImmutableSet.Builder<Shard> shards = ImmutableSet.builder();
+    ListShardsRequest request = new ListShardsRequest().withStreamName(stream);
+    while (true) {
+      ListShardsResult result = kinesis.listShards(request);
+      shards.addAll(result.getShards());
+      String nextToken = result.getNextToken();
+      if (nextToken == null) {
+        return shards.build();
+      }
+      request = new ListShardsRequest().withNextToken(nextToken);
+    }
+  }
+
   @Override
   public Set<String> getPartitionIds(String stream)
   {
     return wrapExceptions(() -> {
-      final Set<String> retVal = new TreeSet<>();
-      ListShardsRequest request = new ListShardsRequest().withStreamName(stream);
-      while (true) {
-        ListShardsResult result = kinesis.listShards(request);
-        retVal.addAll(result.getShards()
-                            .stream()
-                            .map(Shard::getShardId)
-                            .collect(Collectors.toList())
-        );
-        String nextToken = result.getNextToken();
-        if (nextToken == null) {
-          return retVal;
-        }
-        request = new ListShardsRequest().withNextToken(nextToken);
-      }
+      return ImmutableSet.copyOf(getShards(stream).stream()
+                                                  .map(shard -> shard.getShardId())
+                                                  .collect(Collectors.toList())

Review comment:
       Nit: Can we just collect to an immutable Set instead of collecting to a List first and then converting to a Set?

##########
File path: extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
##########
@@ -1023,4 +1024,61 @@ public void getPartitionTimeLag() throws InterruptedException
     }
     verifyAll();
   }
+
+  @Test
+  public void testIsClosedShardEmpty()
+  {
+    AmazonKinesis mockKinesis = EasyMock.mock(AmazonKinesis.class);
+    KinesisRecordSupplier target = new KinesisRecordSupplier(mockKinesis,
+                                                             recordsPerFetch,
+                                                             0,
+                                                             2,
+                                                             false,
+                                                             100,
+                                                             5000,
+                                                             5000,
+                                                             60000,
+                                                             5,
+                                                             true
+    );
+    Record record = new Record();
+    String shardId;
+
+    // No records and null iterator -> empty
+    shardId = "0";
+    isClosedShardEmptyHelper(mockKinesis, shardId, new ArrayList<>(), null);
+    Assert.assertTrue(target.isClosedShardEmpty(STREAM, shardId));
+
+    // no records and non-null iterator -> non-empty
+    shardId = "1";
+    isClosedShardEmptyHelper(mockKinesis, shardId, new ArrayList<>(), "nextIterator");
+    Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId));
+
+    // non-empty records and null iterator -> non-empty
+    shardId = "2";
+    isClosedShardEmptyHelper(mockKinesis, shardId, Collections.singletonList(record), null);
+    Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId));
+
+    // non-empty records and non-null iterator -> non-empty
+    shardId = "3";

Review comment:
       It doesn't seem like the `shardId` matters in these tests, because we are doing an `EasyMock.reset(kinesis)` anyway. If not required, just use a final `shardId`.
   
   Option 2 (preferred): Use different `shardIds` but do not call reset on the mock.

##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -750,6 +752,25 @@ public boolean isAnyFetchActive()
                              .anyMatch(fetch -> (fetch != null && !fetch.isDone()));
   }
 
+  /**
+   * Is costly and requires polling the shard to determine if it's empty

Review comment:
       Nit:
   I don't think we are really polling anything here.
   
   ```suggestion
      * Fetches records from the specified shard to determine if it is empty.
   ```

##########
File path: extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
##########
@@ -339,9 +342,9 @@ public void testNoInitialStateWithAutoScaleOut() throws Exception
     EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
     EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
     EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
-            new KinesisDataSourceMetadata(
-                    null
-            )
+        new KinesisDataSourceMetadata(

Review comment:
       Nit: If you are reformatting this, maybe just put the whole constructor on a single line.

##########
File path: extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
##########
@@ -1294,7 +1297,9 @@ public void testRequeueAdoptedTaskWhenFailed() throws Exception
             .times(1);
 
 
-    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(captured.getValue())).anyTimes();
+    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))

Review comment:
       Nit: Please try to avoid formatting changes unless they are relevant to the core set of changes.

##########
File path: extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
##########
@@ -4885,6 +4907,135 @@ private void testShardMergePhaseThree(List<Task> phaseTwoTasks) throws Exception
     Assert.assertEquals(expectedPartitionOffsets, supervisor.getPartitionOffsets());
   }
 
+  @Test
+  public void testUpdateClosedShardCache()
+  {
+    supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null);
+    supervisor.setupRecordSupplier();
+    supervisor.tryInit();
+    String stream = supervisor.getKinesisSupervisorSpec().getSource();
+    Shard openShard = EasyMock.mock(Shard.class);
+    Shard emptyClosedShard = EasyMock.mock(Shard.class);
+    Shard nonEmptyClosedShard = EasyMock.mock(Shard.class);
+    Set<Shard> activeShards;
+    Set<String> emptyClosedShardIds;
+    Set<String> nonEmptyClosedShardIds;
+
+    // ITERATION 0:
+    // active shards: an open shard, closed-empty shard and closed-nonEmpty shard
+    activeShards = getActiveShards(openShard, true,
+                                   emptyClosedShard, true,
+                                   nonEmptyClosedShard, true);
+
+    EasyMock.reset(supervisorRecordSupplier);

Review comment:
       Rather than resetting the mock objects every time, you could do the following:
   1. have some open shards, closed empty and closed non-empty shards (and maybe open empty too, just to be sure)
   2. setup the record supplier to return all of these when calling `getShards`
   3. setup the record supplier to return the correct values for `isClosedShardEmpty` for each shard
   4. call `supervisor.getIgnorablePartitionIds()` and verify the returned values
   5. in step 4, verify the shard ids on which the mock record supplier calls `isClosedShardEmpty`
   (optional step 5a: move some shards from open to closed, empty to non-empty, add new shards, etc.)
   6. call `supervisor.getIgnorablePartitionIds()` again and verify the returned values
   7. In step 6, verify the shard ids on which the mock record supplier calls `isClosedShardEmpty`
   
   I feel this would be easier to follow and more representative of what would happen in a real scenario. It would also help you avoid resetting the mock objects and avoid exposing the internal caches which are currently only visible for testing.

##########
File path: extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
##########
@@ -1023,4 +1024,61 @@ public void getPartitionTimeLag() throws InterruptedException
     }
     verifyAll();
   }
+
+  @Test
+  public void testIsClosedShardEmpty()
+  {
+    AmazonKinesis mockKinesis = EasyMock.mock(AmazonKinesis.class);
+    KinesisRecordSupplier target = new KinesisRecordSupplier(mockKinesis,
+                                                             recordsPerFetch,
+                                                             0,
+                                                             2,
+                                                             false,
+                                                             100,
+                                                             5000,
+                                                             5000,
+                                                             60000,
+                                                             5,
+                                                             true
+    );
+    Record record = new Record();
+    String shardId;
+
+    // No records and null iterator -> empty
+    shardId = "0";
+    isClosedShardEmptyHelper(mockKinesis, shardId, new ArrayList<>(), null);
+    Assert.assertTrue(target.isClosedShardEmpty(STREAM, shardId));
+
+    // no records and non-null iterator -> non-empty
+    shardId = "1";
+    isClosedShardEmptyHelper(mockKinesis, shardId, new ArrayList<>(), "nextIterator");
+    Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId));
+
+    // non-empty records and null iterator -> non-empty
+    shardId = "2";
+    isClosedShardEmptyHelper(mockKinesis, shardId, Collections.singletonList(record), null);
+    Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId));
+
+    // non-empty records and non-null iterator -> non-empty
+    shardId = "3";
+    isClosedShardEmptyHelper(mockKinesis, shardId, Collections.singletonList(record), "nextIterator");
+    Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId));
+  }
+
+  private void isClosedShardEmptyHelper(AmazonKinesis kinesis, String shardId,

Review comment:
       Rename this to something more descriptive of the actual contents of this method.
   e.g. `setupMockKinesisForShardId` or something. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a change in pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
AmatyaAvadhanula commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r807613198



##########
File path: extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
##########
@@ -1023,4 +1024,61 @@ public void getPartitionTimeLag() throws InterruptedException
     }
     verifyAll();
   }
+
+  @Test
+  public void testIsClosedShardEmpty()
+  {
+    AmazonKinesis mockKinesis = EasyMock.mock(AmazonKinesis.class);
+    KinesisRecordSupplier target = new KinesisRecordSupplier(mockKinesis,
+                                                             recordsPerFetch,
+                                                             0,
+                                                             2,
+                                                             false,
+                                                             100,
+                                                             5000,
+                                                             5000,
+                                                             60000,
+                                                             5,
+                                                             true
+    );
+    Record record = new Record();
+    String shardId;
+
+    // No records and null iterator -> empty
+    shardId = "0";
+    isClosedShardEmptyHelper(mockKinesis, shardId, new ArrayList<>(), null);
+    Assert.assertTrue(target.isClosedShardEmpty(STREAM, shardId));
+
+    // no records and non-null iterator -> non-empty
+    shardId = "1";
+    isClosedShardEmptyHelper(mockKinesis, shardId, new ArrayList<>(), "nextIterator");
+    Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId));
+
+    // non-empty records and null iterator -> non-empty
+    shardId = "2";
+    isClosedShardEmptyHelper(mockKinesis, shardId, Collections.singletonList(record), null);
+    Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId));
+
+    // non-empty records and non-null iterator -> non-empty
+    shardId = "3";

Review comment:
       Option 2 has been used




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] zachjsh commented on a change in pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
zachjsh commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r808571450



##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
##########
@@ -88,6 +91,11 @@
   private final AWSCredentialsConfig awsCredentialsConfig;
   private volatile Map<String, Long> currentPartitionTimeLag;
 
+  // Maintain sets of currently closed shards to find "bad" (closed and empty) shards
+  // Poll closed shards once and store the result to avoid redundant costly calls to kinesis
+  private final Set<String> emptyClosedShardIds = new TreeSet<>();

Review comment:
       Should these be in thread-safe container, or have access be protected with lock?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a change in pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
AmatyaAvadhanula commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r801729689



##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -750,6 +752,25 @@ public boolean isAnyFetchActive()
                              .anyMatch(fetch -> (fetch != null && !fetch.isDone()));
   }
 
+  /**
+   * Is costly and requires polling the shard to determine if it's empty
+   * @param stream to which shard belongs
+   * @param shardId of the shard
+   * @return if the shard is empty
+   */
+  public boolean isShardEmpty(String stream, String shardId)

Review comment:
       Yes, will rename it to isClosedShardEmpty, like it is in KinesisSupervisor




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r803306040



##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
##########
@@ -416,6 +424,49 @@ protected boolean supportsPartitionExpiration()
     return true;
   }
 
+  @Override
+  protected boolean shouldSkipIgnorablePartitions()
+  {
+    return spec.getSpec().getTuningConfig().shouldSkipIgnorableShards();
+  }
+
+  /**
+   * Closed and empty shards can be ignored for ingestion,
+   * Use this method if skipIgnorablePartitions is true in the spec
+   *
+   * These partitions can be safely ignored for both ingestion task assignment and autoscaler limits
+   *
+   * @return the set of ignorable shards' ids
+   */
+  @Override
+  protected Set<String> getIgnorablePartitionIds()
+  {
+    updateClosedShardCache();
+    return getEmptyClosedShardIds();
+  }
+
+  private void updateClosedShardCache()
+  {
+    String stream = spec.getSource();
+    Set<Shard> allActiveShards = ((KinesisRecordSupplier) recordSupplier).getShards(stream);
+    Set<String> activeClosedShards = allActiveShards.stream()
+                                                    .filter(shard -> isShardClosed(shard))
+                                                    .map(Shard::getShardId).collect(Collectors.toSet());
+
+    // clear stale shards
+    emptyClosedShardIds.retainAll(activeClosedShards);

Review comment:
       Shouldn't we just clear these two sets here?
   They are being re-populated in the for loop later anyway.

##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
##########
@@ -481,4 +532,53 @@ protected boolean supportsPartitionExpiration()
 
     return new KinesisDataSourceMetadata(newSequences);
   }
+
+  /**
+   * Closed shards iff they have an ending sequence number

Review comment:
       ```suggestion
      * A shard is considered closed iff it has an ending sequence number.
   ```

##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
##########
@@ -212,6 +216,12 @@ public Duration getOffsetFetchPeriod()
     return offsetFetchPeriod;
   }
 
+  @JsonProperty
+  public boolean shouldSkipIgnorableShards()

Review comment:
       ```suggestion
     public boolean isSkipIgnorableShards()
   ```
   
   Since this is a serializable POJO, best to adhere to method naming conventions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a change in pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
AmatyaAvadhanula commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r808758862



##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
##########
@@ -481,4 +535,47 @@ protected boolean supportsPartitionExpiration()
 
     return new KinesisDataSourceMetadata(newSequences);
   }
+
+  /**
+   * A shard is considered closed iff it has an ending sequence number.
+   *
+   * @param shard to be checked
+   * @return if shard is closed
+   */
+  private boolean isShardClosed(Shard shard)
+  {
+    return shard.getSequenceNumberRange().getEndingSequenceNumber() != null;
+  }
+
+  /**
+   * Checking if a shard is empty requires polling for records which is quite expensive
+   * Fortunately, the results can be cached for closed shards as no more records can be written to them
+   * Please use this method only if the info is absent from the cache
+   *
+   * @param stream to which the shard belongs
+   * @param shardId of the shard
+   * @return if the shard is empty
+   */
+  private boolean isClosedShardEmpty(String stream, String shardId)
+  {
+    return ((KinesisRecordSupplier) recordSupplier).isClosedShardEmpty(stream, shardId);

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a change in pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
AmatyaAvadhanula commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r808892664



##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
##########
@@ -416,6 +424,52 @@ protected boolean supportsPartitionExpiration()
     return true;
   }
 
+  @Override
+  protected boolean shouldSkipIgnorablePartitions()
+  {
+    return spec.getSpec().getTuningConfig().isSkipIgnorableShards();
+  }
+
+  /**
+   * A kinesis shard is considered to be an ignorable partition if it is both closed and empty
+   * @return set of shards ignorable by kinesis ingestion
+   */
+  @Override
+  protected Set<String> getIgnorablePartitionIds()

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a change in pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
AmatyaAvadhanula commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r807612925



##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -750,6 +752,25 @@ public boolean isAnyFetchActive()
                              .anyMatch(fetch -> (fetch != null && !fetch.isDone()));
   }
 
+  /**
+   * Is costly and requires polling the shard to determine if it's empty
+   * @param stream to which shard belongs
+   * @param shardId of the closed shard
+   * @return if the closed shard is empty

Review comment:
       done

##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -750,6 +752,25 @@ public boolean isAnyFetchActive()
                              .anyMatch(fetch -> (fetch != null && !fetch.isDone()));
   }
 
+  /**
+   * Is costly and requires polling the shard to determine if it's empty

Review comment:
       fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r808620269



##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
##########
@@ -88,6 +91,11 @@
   private final AWSCredentialsConfig awsCredentialsConfig;
   private volatile Map<String, Long> currentPartitionTimeLag;
 
+  // Maintain sets of currently closed shards to find "bad" (closed and empty) shards
+  // Poll closed shards once and store the result to avoid redundant costly calls to kinesis
+  private final Set<String> emptyClosedShardIds = new TreeSet<>();

Review comment:
       Thanks for pointing this out, @zachjsh.
   I think a ConcurrentHashSet would suffice here.
   
   cc: @AmatyaAvadhanula 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a change in pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
AmatyaAvadhanula commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r807613102



##########
File path: extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
##########
@@ -1023,4 +1024,61 @@ public void getPartitionTimeLag() throws InterruptedException
     }
     verifyAll();
   }
+
+  @Test
+  public void testIsClosedShardEmpty()
+  {
+    AmazonKinesis mockKinesis = EasyMock.mock(AmazonKinesis.class);
+    KinesisRecordSupplier target = new KinesisRecordSupplier(mockKinesis,
+                                                             recordsPerFetch,
+                                                             0,
+                                                             2,
+                                                             false,
+                                                             100,
+                                                             5000,
+                                                             5000,
+                                                             60000,
+                                                             5,
+                                                             true
+    );
+    Record record = new Record();
+    String shardId;
+
+    // No records and null iterator -> empty
+    shardId = "0";
+    isClosedShardEmptyHelper(mockKinesis, shardId, new ArrayList<>(), null);
+    Assert.assertTrue(target.isClosedShardEmpty(STREAM, shardId));
+
+    // no records and non-null iterator -> non-empty
+    shardId = "1";
+    isClosedShardEmptyHelper(mockKinesis, shardId, new ArrayList<>(), "nextIterator");
+    Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId));
+
+    // non-empty records and null iterator -> non-empty
+    shardId = "2";
+    isClosedShardEmptyHelper(mockKinesis, shardId, Collections.singletonList(record), null);
+    Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId));
+
+    // non-empty records and non-null iterator -> non-empty
+    shardId = "3";
+    isClosedShardEmptyHelper(mockKinesis, shardId, Collections.singletonList(record), "nextIterator");
+    Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardId));
+  }
+
+  private void isClosedShardEmptyHelper(AmazonKinesis kinesis, String shardId,

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r808819130



##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
##########
@@ -416,6 +424,52 @@ protected boolean supportsPartitionExpiration()
     return true;
   }
 
+  @Override
+  protected boolean shouldSkipIgnorablePartitions()
+  {
+    return spec.getSpec().getTuningConfig().isSkipIgnorableShards();
+  }
+
+  /**
+   * A kinesis shard is considered to be an ignorable partition if it is both closed and empty
+   * @return set of shards ignorable by kinesis ingestion
+   */
+  @Override
+  protected Set<String> getIgnorablePartitionIds()

Review comment:
       nit - this should be called computeIgnorablePartitionIds() or loadIgnorablePartitionIds()




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r808820705



##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
##########
@@ -416,6 +424,52 @@ protected boolean supportsPartitionExpiration()
     return true;
   }
 
+  @Override
+  protected boolean shouldSkipIgnorablePartitions()
+  {
+    return spec.getSpec().getTuningConfig().isSkipIgnorableShards();
+  }
+
+  /**
+   * A kinesis shard is considered to be an ignorable partition if it is both closed and empty
+   * @return set of shards ignorable by kinesis ingestion
+   */
+  @Override
+  protected Set<String> getIgnorablePartitionIds()

Review comment:
       the current verb indicates that it is just a getter but behind the scenes it can do network calls etc to fetch the partition ids. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a change in pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
AmatyaAvadhanula commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r801727591



##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
##########
@@ -416,6 +424,49 @@ protected boolean supportsPartitionExpiration()
     return true;
   }
 
+  @Override
+  protected boolean shouldSkipIgnorablePartitions()
+  {
+    return spec.getSpec().getTuningConfig().shouldSkipIgnorableShards();
+  }
+
+  /**
+   * Closed and empty shards can be ignored for ingestion,
+   * Use this method if skipIgnorablePartitions is true in the spec
+   *
+   * These partitions can be safely ignored for both ingesetion task assignment and autoscaler limits
+   *
+   * @return the set of ignorable shards' ids
+   */
+  @Override
+  protected Set<String> getIgnorablePartitionIds()
+  {
+    updateClosedShardCache();
+    return ImmutableSet.copyOf(emptyClosedShardIds);
+  }
+
+  private void updateClosedShardCache()
+  {
+    String stream = spec.getSource();
+    Set<Shard> allActiveShards = ((KinesisRecordSupplier) recordSupplier).getShards(stream);
+    Set<String> activeClosedShards = allActiveShards.stream()
+                                                    .filter(shard -> isShardOpen(shard))

Review comment:
       Yes, my bad. Have made the change but haven't pushed it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r808624569



##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
##########
@@ -88,6 +91,11 @@
   private final AWSCredentialsConfig awsCredentialsConfig;
   private volatile Map<String, Long> currentPartitionTimeLag;
 
+  // Maintain sets of currently closed shards to find "bad" (closed and empty) shards

Review comment:
       Nit:
   ```suggestion
     // Maintain sets of currently closed shards to find ignorable (closed and empty) shards
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a change in pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
kfaraz commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r808620269



##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
##########
@@ -88,6 +91,11 @@
   private final AWSCredentialsConfig awsCredentialsConfig;
   private volatile Map<String, Long> currentPartitionTimeLag;
 
+  // Maintain sets of currently closed shards to find "bad" (closed and empty) shards
+  // Poll closed shards once and store the result to avoid redundant costly calls to kinesis
+  private final Set<String> emptyClosedShardIds = new TreeSet<>();

Review comment:
       Thanks for pointing this out, @zachjsh.
   This code would be executed by the `SeekableStreamSupervisor` while executing a `RunNotice` (scheduled when status of a task changes) as well as a `DynamicAllocationTasksNotice` (scheduled for auto-scaling). There is a possibility of contention between these two executions.
   
   I think a ConcurrentHashSet would suffice here.
   
   cc: @AmatyaAvadhanula 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a change in pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
AmatyaAvadhanula commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r807876218



##########
File path: extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
##########
@@ -1294,7 +1297,9 @@ public void testRequeueAdoptedTaskWhenFailed() throws Exception
             .times(1);
 
 
-    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(captured.getValue())).anyTimes();
+    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a change in pull request #12235: Optimize kinesis ingestion task assignment after resharding

Posted by GitBox <gi...@apache.org>.
AmatyaAvadhanula commented on a change in pull request #12235:
URL: https://github.com/apache/druid/pull/12235#discussion_r807612810



##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -667,28 +666,31 @@ public String getEarliestSequenceNumber(StreamPartition<String> partition)
    * This makes the method resilient to LimitExceeded exceptions (compared to 100 shards, 10 TPS of describeStream)
    *
    * @param stream name of stream
-   *
-   * @return Set of Shard ids
+   * @return Immutable set of shards
    */
+  public Set<Shard> getShards(String stream)
+  {
+    ImmutableSet.Builder<Shard> shards = ImmutableSet.builder();
+    ListShardsRequest request = new ListShardsRequest().withStreamName(stream);
+    while (true) {
+      ListShardsResult result = kinesis.listShards(request);
+      shards.addAll(result.getShards());
+      String nextToken = result.getNextToken();
+      if (nextToken == null) {
+        return shards.build();
+      }
+      request = new ListShardsRequest().withNextToken(nextToken);
+    }
+  }
+
   @Override
   public Set<String> getPartitionIds(String stream)
   {
     return wrapExceptions(() -> {
-      final Set<String> retVal = new TreeSet<>();
-      ListShardsRequest request = new ListShardsRequest().withStreamName(stream);
-      while (true) {
-        ListShardsResult result = kinesis.listShards(request);
-        retVal.addAll(result.getShards()
-                            .stream()
-                            .map(Shard::getShardId)
-                            .collect(Collectors.toList())
-        );
-        String nextToken = result.getNextToken();
-        if (nextToken == null) {
-          return retVal;
-        }
-        request = new ListShardsRequest().withNextToken(nextToken);
-      }
+      return ImmutableSet.copyOf(getShards(stream).stream()
+                                                  .map(shard -> shard.getShardId())
+                                                  .collect(Collectors.toList())

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org