You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "zachjsh (via GitHub)" <gi...@apache.org> on 2023/11/10 00:28:59 UTC

[PR] Kinesis adaptive memory management WIP (druid)

zachjsh opened a new pull request, #15360:
URL: https://github.com/apache/druid/pull/15360

   <!-- Thanks for trying to help us make Apache Druid be the best it can be! Please fill out as much of the following information as is possible (where relevant, and remove it when irrelevant) to help make the intention and scope of this PR clear in order to ease review. -->
   
   <!-- Please read the doc for contribution (https://github.com/apache/druid/blob/master/CONTRIBUTING.md) before making this PR. Also, once you open a PR, please _avoid using force pushes and rebasing_ since these make it difficult for reviewers to see what you've changed in response to their reviews. See [the 'If your pull request shows conflicts with master' section](https://github.com/apache/druid/blob/master/CONTRIBUTING.md#if-your-pull-request-shows-conflicts-with-master) for more details. -->
   
   Fixes #XXXX.
   
   <!-- Replace XXXX with the id of the issue fixed in this PR. Remove this section if there is no corresponding issue. Don't reference the issue in the title of this pull-request. -->
   
   <!-- If you are a committer, follow the PR action item checklist for committers:
   https://github.com/apache/druid/blob/master/dev/committer-instructions.md#pr-and-issue-action-item-checklist-for-committers. -->
   
   ### Description
   
   <!-- Describe the goal of this PR, what problem are you fixing. If there is a corresponding issue (referenced above), it's not necessary to repeat the description here, however, you may choose to keep one summary sentence. -->
   
   <!-- Describe your patch: what did you change in code? How did you fix the problem? -->
   
   <!-- If there are several relatively logically separate changes in this PR, create a mini-section for each of them. For example: -->
   
   #### Fixed the bug ...
   #### Renamed the class ...
   #### Added a forbidden-apis entry ...
   
   <!--
   In each section, please describe design decisions made, including:
    - Choice of algorithms
    - Behavioral aspects. What configuration values are acceptable? How are corner cases and error conditions handled, such as when there are insufficient resources?
    - Class organization and design (how the logic is split between classes, inheritance, composition, design patterns)
    - Method organization and design (how the logic is split between methods, parameters and return types)
    - Naming (class, method, API, configuration, HTTP endpoint, names of emitted metrics)
   -->
   
   
   <!-- It's good to describe an alternative design (or mention an alternative name) for every design (or naming) decision point and compare the alternatives with the designs that you've implemented (or the names you've chosen) to highlight the advantages of the chosen designs and names. -->
   
   <!-- If there was a discussion of the design of the feature implemented in this PR elsewhere (e. g. a "Proposal" issue, any other issue, or a thread in the development mailing list), link to that discussion from this PR description and explain what have changed in your final design compared to your original proposal or the consensus version in the end of the discussion. If something hasn't changed since the original discussion, you can omit a detailed discussion of those aspects of the design here, perhaps apart from brief mentioning for the sake of readability of this PR description. -->
   
   <!-- Some of the aspects mentioned above may be omitted for simple and small changes. -->
   
   #### Release note
   <!-- Give your best effort to summarize your changes in a couple of sentences aimed toward Druid users. 
   
   If your change doesn't have end user impact, you can skip this section.
   
   For tips about how to write a good release note, see [Release notes](https://github.com/apache/druid/blob/master/CONTRIBUTING.md#release-notes).
   
   -->
   
   
   <hr>
   
   ##### Key changed/added classes in this PR
    * `MyFoo`
    * `OurBar`
    * `TheirBaz`
   
   <hr>
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items from the checklist below are strictly necessary, but it would be very helpful if you at least self-review the PR. -->
   
   This PR has:
   
   - [ ] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] a release note entry in the PR description.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Kinesis adaptive memory management (druid)

Posted by "zachjsh (via GitHub)" <gi...@apache.org>.
zachjsh commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1401103893


##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -294,22 +295,18 @@ private Runnable fetchRecords()
 
             // If the buffer was full and we weren't able to add the message, grab a new stream iterator starting
             // from this message and back off for a bit to let the buffer drain before retrying.
-            if (!records.offer(currRecord, recordBufferOfferTimeout, TimeUnit.MILLISECONDS)) {
+            recordBufferOfferWaitMillis = recordBufferOfferTimeout;
+            while (!records.offer(
+                new MemoryBoundLinkedBlockingQueue.ObjectContainer<>(currRecord, recordSize),
+                recordBufferOfferWaitMillis,
+                TimeUnit.MILLISECONDS
+            )) {
               log.warn(
                   "Kinesis records are being processed slower than they are fetched. "
                   + "OrderedPartitionableRecord buffer full, storing iterator and retrying in [%,dms].",
                   recordBufferFullWait
               );
-
-              shardIterator = kinesis.getShardIterator(
-                  currRecord.getStream(),
-                  currRecord.getPartitionId(),
-                  ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
-                  currRecord.getSequenceNumber()
-              ).getShardIterator();
-
-              scheduleBackgroundFetch(recordBufferFullWait);
-              return;
+              recordBufferOfferWaitMillis = recordBufferFullWait;

Review Comment:
   Previouslt when the record buffer is full here, the fetchRecords logic threw away the rest of the GetRecords result after recordBufferOfferTimeout and starts a new shard iterator. This seemed excessively churny. Instead we wait an unbounded amount of time for queue to stop being full. If the queue remains full, we’ll end up right back waiting for it after the restarted fetch.



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -294,22 +295,18 @@ private Runnable fetchRecords()
 
             // If the buffer was full and we weren't able to add the message, grab a new stream iterator starting
             // from this message and back off for a bit to let the buffer drain before retrying.
-            if (!records.offer(currRecord, recordBufferOfferTimeout, TimeUnit.MILLISECONDS)) {
+            recordBufferOfferWaitMillis = recordBufferOfferTimeout;
+            while (!records.offer(
+                new MemoryBoundLinkedBlockingQueue.ObjectContainer<>(currRecord, recordSize),
+                recordBufferOfferWaitMillis,
+                TimeUnit.MILLISECONDS
+            )) {
               log.warn(
                   "Kinesis records are being processed slower than they are fetched. "
                   + "OrderedPartitionableRecord buffer full, storing iterator and retrying in [%,dms].",
                   recordBufferFullWait
               );
-
-              shardIterator = kinesis.getShardIterator(
-                  currRecord.getStream(),
-                  currRecord.getPartitionId(),
-                  ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
-                  currRecord.getSequenceNumber()
-              ).getShardIterator();
-
-              scheduleBackgroundFetch(recordBufferFullWait);
-              return;
+              recordBufferOfferWaitMillis = recordBufferFullWait;

Review Comment:
   Previouslt when the record buffer is full here, the fetchRecords logic threw away the rest of the GetRecords result after recordBufferOfferTimeout and starts a new shard iterator. This seemed excessively churny. Instead we wait an unbounded amount of time for queue to stop being full. If the queue remains full, we’ll end up right back waiting for it after the restarted fetch.



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -294,22 +295,18 @@ private Runnable fetchRecords()
 
             // If the buffer was full and we weren't able to add the message, grab a new stream iterator starting
             // from this message and back off for a bit to let the buffer drain before retrying.
-            if (!records.offer(currRecord, recordBufferOfferTimeout, TimeUnit.MILLISECONDS)) {
+            recordBufferOfferWaitMillis = recordBufferOfferTimeout;
+            while (!records.offer(
+                new MemoryBoundLinkedBlockingQueue.ObjectContainer<>(currRecord, recordSize),
+                recordBufferOfferWaitMillis,
+                TimeUnit.MILLISECONDS
+            )) {
               log.warn(
                   "Kinesis records are being processed slower than they are fetched. "
                   + "OrderedPartitionableRecord buffer full, storing iterator and retrying in [%,dms].",
                   recordBufferFullWait
               );
-
-              shardIterator = kinesis.getShardIterator(
-                  currRecord.getStream(),
-                  currRecord.getPartitionId(),
-                  ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
-                  currRecord.getSequenceNumber()
-              ).getShardIterator();
-
-              scheduleBackgroundFetch(recordBufferFullWait);
-              return;
+              recordBufferOfferWaitMillis = recordBufferFullWait;

Review Comment:
   Previously when the record buffer is full here, the fetchRecords logic threw away the rest of the GetRecords result after recordBufferOfferTimeout and starts a new shard iterator. This seemed excessively churny. Instead we wait an unbounded amount of time for queue to stop being full. If the queue remains full, we’ll end up right back waiting for it after the restarted fetch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Kinesis adaptive memory management (druid)

Posted by "zachjsh (via GitHub)" <gi...@apache.org>.
zachjsh commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1409815785


##########
docs/development/extensions-core/kinesis-ingestion.md:
##########
@@ -656,25 +655,22 @@ For more detail, see [Segment size optimization](../../operations/segment-optimi
 
 Kinesis indexing tasks fetch records using `fetchThreads` threads.
 If `fetchThreads` is higher than the number of Kinesis shards, the excess threads are unused.
-Each fetch thread fetches up to `recordsPerFetch` records at once from a Kinesis shard, with a delay between fetches
+Each fetch thread fetches up to 10 MB of records at once from a Kinesis shard, with a delay between fetches
 of `fetchDelayMillis`.
-The records fetched by each thread are pushed into a shared queue of size `recordBufferSize`.
+The records fetched by each thread are pushed into a shared queue of size `recordBufferSizeBytes`.
 The main runner thread for each task polls up to `maxRecordsPerPoll` records from the queue at once.
 
-When using Kinesis Producer Library's aggregation feature, that is when [`deaggregate`](#deaggregation) is set,
-each of these parameters refers to aggregated records rather than individual records.
-
 The default values for these parameters are:
 
 - `fetchThreads`: Twice the number of processors available to the task. The number of processors available to the task
 is the total number of processors on the server, divided by `druid.worker.capacity` (the number of task slots on that
-particular server).
+particular server). This value is further limited so that the total data record data fetched at a given time does not
+exceed 5% of the max heap configured, assuming that each thread fetches 10 MB of records at once. If the value specified
+for this configuration is higher than this limit, no failure occurs, but a warning is logged, and the value is
+implicitly lowered to the max allowed by this constraint.
 - `fetchDelayMillis`: 0 (no delay between fetches).
-- `recordsPerFetch`: 100 MB or an estimated 5% of available heap, whichever is smaller, divided by `fetchThreads`.
-For estimation purposes, Druid uses a figure of 10 KB for regular records and 1 MB for [aggregated records](#deaggregation).
-- `recordBufferSize`: 100 MB or an estimated 10% of available heap, whichever is smaller.
-For estimation purposes, Druid uses a figure of 10 KB for regular records and 1 MB for [aggregated records](#deaggregation).
-- `maxRecordsPerPoll`: 100 for regular records, 1 for [aggregated records](#deaggregation).
+- `recordBufferSizeBytes`: 100 MB or an estimated 10% of available heap, whichever is smaller.
+- `maxRecordsPerPoll`: 1.

Review Comment:
   updated



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -437,42 +429,35 @@ public KinesisRecordSupplier(
   {
     Preconditions.checkNotNull(amazonKinesis);
     this.kinesis = amazonKinesis;
-    this.recordsPerFetch = recordsPerFetch;
     this.fetchDelayMillis = fetchDelayMillis;
-    this.deaggregate = deaggregate;
     this.recordBufferOfferTimeout = recordBufferOfferTimeout;
     this.recordBufferFullWait = recordBufferFullWait;
     this.maxRecordsPerPoll = maxRecordsPerPoll;
     this.fetchThreads = fetchThreads;
-    this.recordBufferSize = recordBufferSize;
+    this.recordBufferSizeBytes = recordBufferSizeBytes;
     this.useEarliestSequenceNumber = useEarliestSequenceNumber;
     this.useListShards = useListShards;
     this.backgroundFetchEnabled = fetchThreads > 0;
 
     // the deaggregate function is implemented by the amazon-kinesis-client, whose license is not compatible with Apache.
     // The work around here is to use reflection to find the deaggregate function in the classpath. See details on the
     // docs page for more information on how to use deaggregation
-    if (deaggregate) {
-      try {
-        Class<?> kclUserRecordclass = Class.forName("com.amazonaws.services.kinesis.clientlibrary.types.UserRecord");
-        MethodHandles.Lookup lookup = MethodHandles.publicLookup();
+    try {
+      Class<?> kclUserRecordclass = Class.forName("com.amazonaws.services.kinesis.clientlibrary.types.UserRecord");

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Kinesis adaptive memory management (druid)

Posted by "zachjsh (via GitHub)" <gi...@apache.org>.
zachjsh commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1407313784


##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -437,42 +429,35 @@ public KinesisRecordSupplier(
   {
     Preconditions.checkNotNull(amazonKinesis);
     this.kinesis = amazonKinesis;
-    this.recordsPerFetch = recordsPerFetch;
     this.fetchDelayMillis = fetchDelayMillis;
-    this.deaggregate = deaggregate;
     this.recordBufferOfferTimeout = recordBufferOfferTimeout;
     this.recordBufferFullWait = recordBufferFullWait;
     this.maxRecordsPerPoll = maxRecordsPerPoll;
     this.fetchThreads = fetchThreads;
-    this.recordBufferSize = recordBufferSize;
+    this.recordBufferSizeBytes = recordBufferSizeBytes;
     this.useEarliestSequenceNumber = useEarliestSequenceNumber;
     this.useListShards = useListShards;
     this.backgroundFetchEnabled = fetchThreads > 0;
 
     // the deaggregate function is implemented by the amazon-kinesis-client, whose license is not compatible with Apache.
     // The work around here is to use reflection to find the deaggregate function in the classpath. See details on the
     // docs page for more information on how to use deaggregation
-    if (deaggregate) {
-      try {
-        Class<?> kclUserRecordclass = Class.forName("com.amazonaws.services.kinesis.clientlibrary.types.UserRecord");
-        MethodHandles.Lookup lookup = MethodHandles.publicLookup();
+    try {
+      Class<?> kclUserRecordclass = Class.forName("com.amazonaws.services.kinesis.clientlibrary.types.UserRecord");

Review Comment:
   Removed the licensing comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Kinesis adaptive memory management (druid)

Posted by "zachjsh (via GitHub)" <gi...@apache.org>.
zachjsh merged PR #15360:
URL: https://github.com/apache/druid/pull/15360


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Kinesis adaptive memory management (druid)

Posted by "zachjsh (via GitHub)" <gi...@apache.org>.
zachjsh commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1406908345


##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java:
##########
@@ -81,7 +80,7 @@ public KinesisIndexTaskTuningConfig(
       Long handoffConditionTimeout,
       Boolean resetOffsetAutomatically,
       Boolean skipSequenceNumberAvailabilityCheck,
-      Integer recordBufferSize,
+      @Nullable Integer recordBufferSizeBytes,

Review Comment:
   added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Kinesis adaptive memory management (druid)

Posted by "gianm (via GitHub)" <gi...@apache.org>.
gianm commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1409823324


##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java:
##########
@@ -78,6 +82,14 @@ public KinesisIndexTask(
     );
     this.useListShards = useListShards;
     this.awsCredentialsConfig = awsCredentialsConfig;
+    if (tuningConfig.getRecordBufferSizeConfigured() != null) {

Review Comment:
   Please move these two checks to `run` rather than the constructor, because we don't need to log this stuff every time a task object is constructed. (That happens at various points on the Overlord due to various API calls and internal machinations, and will create a log of log spam.)



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java:
##########
@@ -71,12 +69,10 @@ public KinesisSupervisorIOConfig(
       @JsonProperty("lateMessageRejectionPeriod") Period lateMessageRejectionPeriod,
       @JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod,
       @JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime,
-      @JsonProperty("recordsPerFetch") Integer recordsPerFetch,
       @JsonProperty("fetchDelayMillis") Integer fetchDelayMillis,
       @JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
       @JsonProperty("awsExternalId") String awsExternalId,
-      @Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig autoScalerConfig,
-      @JsonProperty("deaggregate") boolean deaggregate

Review Comment:
   The `recordsPerFetch` and `deaggregate` properties should stay here for better compatibility during rolling updates and rollbacks. (We don't want to lose track of them prior to a potential rollback.)
   
   So let's instead mark them deprecated, but keep them.



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java:
##########
@@ -179,15 +186,36 @@ AWSCredentialsConfig getAwsCredentialsConfig()
   }
 
   @VisibleForTesting
-  static int computeFetchThreads(final RuntimeInfo runtimeInfo, final Integer configuredFetchThreads)
+  static int computeFetchThreads(
+      final RuntimeInfo runtimeInfo,
+      final Integer configuredFetchThreads
+  )
   {
-    final int fetchThreads;
+    int fetchThreads;
     if (configuredFetchThreads != null) {
       fetchThreads = configuredFetchThreads;
     } else {
       fetchThreads = runtimeInfo.getAvailableProcessors() * 2;
     }
 
+    // Each fetchThread can return upto 10MB at a time
+    // (https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html), cap fetchThreads so that
+    // we don't exceed more than the least of 100MB or 5% of heap at a time. Don't fail if fetchThreads specified
+    // is greater than this as to not cause failure for older configurations, but log warning in this case, and lower
+    // fetchThreads implicitly.
+    final long memoryToUse = Math.min(
+        KinesisIndexTaskIOConfig.MAX_RECORD_FETCH_MEMORY,
+        (long) (runtimeInfo.getMaxHeapSizeBytes() * KinesisIndexTaskIOConfig.RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION)
+    );
+    int maxFetchThreads = Math.max(
+        1,
+        (int) (memoryToUse / GET_RECORDS_MAX_BYTES_PER_CALL)
+    );
+    if (fetchThreads > maxFetchThreads) {
+      log.warn("fetchThreads [%d] being lowered to [%d]", fetchThreads, maxFetchThreads);

Review Comment:
   This warning should only get logged if `configuredFetchThreads != null`. There's no reason to log it if `runtimeInfo.getAvailableProcessors() * 2` is lower than `maxFetchThreads`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Kinesis adaptive memory management (druid)

Posted by "zachjsh (via GitHub)" <gi...@apache.org>.
zachjsh commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1412410412


##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java:
##########
@@ -179,15 +186,36 @@ AWSCredentialsConfig getAwsCredentialsConfig()
   }
 
   @VisibleForTesting
-  static int computeFetchThreads(final RuntimeInfo runtimeInfo, final Integer configuredFetchThreads)
+  static int computeFetchThreads(
+      final RuntimeInfo runtimeInfo,
+      final Integer configuredFetchThreads
+  )
   {
-    final int fetchThreads;
+    int fetchThreads;
     if (configuredFetchThreads != null) {
       fetchThreads = configuredFetchThreads;
     } else {
       fetchThreads = runtimeInfo.getAvailableProcessors() * 2;
     }
 
+    // Each fetchThread can return upto 10MB at a time
+    // (https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html), cap fetchThreads so that
+    // we don't exceed more than the least of 100MB or 5% of heap at a time. Don't fail if fetchThreads specified
+    // is greater than this as to not cause failure for older configurations, but log warning in this case, and lower
+    // fetchThreads implicitly.
+    final long memoryToUse = Math.min(
+        KinesisIndexTaskIOConfig.MAX_RECORD_FETCH_MEMORY,
+        (long) (runtimeInfo.getMaxHeapSizeBytes() * KinesisIndexTaskIOConfig.RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION)
+    );
+    int maxFetchThreads = Math.max(
+        1,
+        (int) (memoryToUse / GET_RECORDS_MAX_BYTES_PER_CALL)
+    );
+    if (fetchThreads > maxFetchThreads) {
+      log.warn("fetchThreads [%d] being lowered to [%d]", fetchThreads, maxFetchThreads);

Review Comment:
   Good catch, updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Kinesis adaptive memory management (druid)

Posted by "zachjsh (via GitHub)" <gi...@apache.org>.
zachjsh commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1401104992


##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -1059,11 +1045,22 @@ private void filterBufferAndResetBackgroundFetch(Set<StreamPartition<String>> pa
     }
 
     // filter records in buffer and only retain ones whose partition was not seeked
-    BlockingQueue<OrderedPartitionableRecord<String, String, ByteEntity>> newQ = new LinkedBlockingQueue<>(recordBufferSize);
+    MemoryBoundLinkedBlockingQueue<OrderedPartitionableRecord<String, String, ByteEntity>> newQ =
+        new MemoryBoundLinkedBlockingQueue<>(recordBufferSizeBytes);
 
     records.stream()
-           .filter(x -> !partitions.contains(x.getStreamPartition()))
-           .forEachOrdered(newQ::offer);
+        .filter(x -> !partitions.contains(x.getData().getStreamPartition()))
+        .forEachOrdered(x -> {
+          if (!newQ.offer(x)) {

Review Comment:
   it is a new failure more. I believe if the data was not added here, it could have resulted in data loss. Any other suggestion here? I was a little concerned about this too, but I think potential data loss is worse.



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java:
##########
@@ -81,7 +80,7 @@ public KinesisIndexTaskTuningConfig(
       Long handoffConditionTimeout,
       Boolean resetOffsetAutomatically,
       Boolean skipSequenceNumberAvailabilityCheck,
-      Integer recordBufferSize,
+      @Nullable Integer recordBufferSizeBytes,

Review Comment:
   good thought, will add.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Kinesis adaptive memory management (druid)

Posted by "zachjsh (via GitHub)" <gi...@apache.org>.
zachjsh commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1401106010


##########
docs/development/extensions-core/kinesis-ingestion.md:
##########
@@ -656,25 +655,22 @@ For more detail, see [Segment size optimization](../../operations/segment-optimi
 
 Kinesis indexing tasks fetch records using `fetchThreads` threads.
 If `fetchThreads` is higher than the number of Kinesis shards, the excess threads are unused.
-Each fetch thread fetches up to `recordsPerFetch` records at once from a Kinesis shard, with a delay between fetches
+Each fetch thread fetches up to 10 MB of records at once from a Kinesis shard, with a delay between fetches
 of `fetchDelayMillis`.
-The records fetched by each thread are pushed into a shared queue of size `recordBufferSize`.
+The records fetched by each thread are pushed into a shared queue of size `recordBufferSizeBytes`.
 The main runner thread for each task polls up to `maxRecordsPerPoll` records from the queue at once.
 
-When using Kinesis Producer Library's aggregation feature, that is when [`deaggregate`](#deaggregation) is set,
-each of these parameters refers to aggregated records rather than individual records.
-
 The default values for these parameters are:
 
 - `fetchThreads`: Twice the number of processors available to the task. The number of processors available to the task
 is the total number of processors on the server, divided by `druid.worker.capacity` (the number of task slots on that
-particular server).
+particular server). This value is further limited so that the total data record data fetched at a given time does not
+exceed 5% of the max heap configured, assuming that each thread fetches 10 MB of records at once. If the value specified
+for this configuration is higher than this limit, no failure occurs, but a warning is logged, and the value is
+implicitly lowered to the max allowed by this constraint.
 - `fetchDelayMillis`: 0 (no delay between fetches).
-- `recordsPerFetch`: 100 MB or an estimated 5% of available heap, whichever is smaller, divided by `fetchThreads`.
-For estimation purposes, Druid uses a figure of 10 KB for regular records and 1 MB for [aggregated records](#deaggregation).
-- `recordBufferSize`: 100 MB or an estimated 10% of available heap, whichever is smaller.
-For estimation purposes, Druid uses a figure of 10 KB for regular records and 1 MB for [aggregated records](#deaggregation).
-- `maxRecordsPerPoll`: 100 for regular records, 1 for [aggregated records](#deaggregation).
+- `recordBufferSizeBytes`: 100 MB or an estimated 10% of available heap, whichever is smaller.
+- `maxRecordsPerPoll`: 1.

Review Comment:
   I wondered the same actually. tbh, im not sure. I think validation for this requires extensive performance testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Kinesis adaptive memory management (druid)

Posted by "jon-wei (via GitHub)" <gi...@apache.org>.
jon-wei commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1411330251


##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -635,23 +620,19 @@ public List<OrderedPartitionableRecord<String, String, ByteEntity>> poll(long ti
     start();
 
     try {
-      int expectedSize = Math.min(Math.max(records.size(), 1), maxRecordsPerPoll);
-
-      List<OrderedPartitionableRecord<String, String, ByteEntity>> polledRecords = new ArrayList<>(expectedSize);
+      List<MemoryBoundLinkedBlockingQueue.ObjectContainer<OrderedPartitionableRecord<String, String, ByteEntity>>> polledRecords = new ArrayList<>();
 
-      Queues.drain(
-          records,
+      records.drain(
           polledRecords,
-          expectedSize,
+          maxBytesPerPoll,

Review Comment:
   What happens if a single record is larger than maxBytePerPoll? Would this get stuck and make no progress? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Kinesis adaptive memory management (druid)

Posted by "gianm (via GitHub)" <gi...@apache.org>.
gianm commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1408807627


##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -437,42 +429,35 @@ public KinesisRecordSupplier(
   {
     Preconditions.checkNotNull(amazonKinesis);
     this.kinesis = amazonKinesis;
-    this.recordsPerFetch = recordsPerFetch;
     this.fetchDelayMillis = fetchDelayMillis;
-    this.deaggregate = deaggregate;
     this.recordBufferOfferTimeout = recordBufferOfferTimeout;
     this.recordBufferFullWait = recordBufferFullWait;
     this.maxRecordsPerPoll = maxRecordsPerPoll;
     this.fetchThreads = fetchThreads;
-    this.recordBufferSize = recordBufferSize;
+    this.recordBufferSizeBytes = recordBufferSizeBytes;
     this.useEarliestSequenceNumber = useEarliestSequenceNumber;
     this.useListShards = useListShards;
     this.backgroundFetchEnabled = fetchThreads > 0;
 
     // the deaggregate function is implemented by the amazon-kinesis-client, whose license is not compatible with Apache.
     // The work around here is to use reflection to find the deaggregate function in the classpath. See details on the
     // docs page for more information on how to use deaggregation
-    if (deaggregate) {
-      try {
-        Class<?> kclUserRecordclass = Class.forName("com.amazonaws.services.kinesis.clientlibrary.types.UserRecord");
-        MethodHandles.Lookup lookup = MethodHandles.publicLookup();
+    try {
+      Class<?> kclUserRecordclass = Class.forName("com.amazonaws.services.kinesis.clientlibrary.types.UserRecord");

Review Comment:
   Oh, it even looks like since #12370, `amazon-kinesis-client` with an Apache license is a regular dependency. So this reflective stuff is no longer needed. Please either rewrite it to use regular Java calls, or if you don't rewrite it, include a comment describing the situation. Something like:
   
   > The deaggregate function is implemented by the amazon-kinesis-client, whose license was formerly not compatible with Apache. The code here avoids the license issue by using reflection, but is no longer necessary since amazon-kinesis-client is now Apache-licensed and is now a dependency of Druid. This code could safely be modified to use regular calls rather than reflection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Kinesis adaptive memory management (druid)

Posted by "gianm (via GitHub)" <gi...@apache.org>.
gianm commented on PR #15360:
URL: https://github.com/apache/druid/pull/15360#issuecomment-1831299419

   Tagged "release notes" since various memory-related configs are changed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Kinesis adaptive memory management (druid)

Posted by "zachjsh (via GitHub)" <gi...@apache.org>.
zachjsh commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1406907369


##########
docs/development/extensions-core/kinesis-ingestion.md:
##########
@@ -656,25 +655,22 @@ For more detail, see [Segment size optimization](../../operations/segment-optimi
 
 Kinesis indexing tasks fetch records using `fetchThreads` threads.
 If `fetchThreads` is higher than the number of Kinesis shards, the excess threads are unused.
-Each fetch thread fetches up to `recordsPerFetch` records at once from a Kinesis shard, with a delay between fetches
+Each fetch thread fetches up to 10 MB of records at once from a Kinesis shard, with a delay between fetches
 of `fetchDelayMillis`.
-The records fetched by each thread are pushed into a shared queue of size `recordBufferSize`.
+The records fetched by each thread are pushed into a shared queue of size `recordBufferSizeBytes`.
 The main runner thread for each task polls up to `maxRecordsPerPoll` records from the queue at once.
 
-When using Kinesis Producer Library's aggregation feature, that is when [`deaggregate`](#deaggregation) is set,
-each of these parameters refers to aggregated records rather than individual records.
-
 The default values for these parameters are:
 
 - `fetchThreads`: Twice the number of processors available to the task. The number of processors available to the task
 is the total number of processors on the server, divided by `druid.worker.capacity` (the number of task slots on that
-particular server).
+particular server). This value is further limited so that the total data record data fetched at a given time does not
+exceed 5% of the max heap configured, assuming that each thread fetches 10 MB of records at once. If the value specified
+for this configuration is higher than this limit, no failure occurs, but a warning is logged, and the value is
+implicitly lowered to the max allowed by this constraint.
 - `fetchDelayMillis`: 0 (no delay between fetches).
-- `recordsPerFetch`: 100 MB or an estimated 5% of available heap, whichever is smaller, divided by `fetchThreads`.
-For estimation purposes, Druid uses a figure of 10 KB for regular records and 1 MB for [aggregated records](#deaggregation).
-- `recordBufferSize`: 100 MB or an estimated 10% of available heap, whichever is smaller.
-For estimation purposes, Druid uses a figure of 10 KB for regular records and 1 MB for [aggregated records](#deaggregation).
-- `maxRecordsPerPoll`: 100 for regular records, 1 for [aggregated records](#deaggregation).
+- `recordBufferSizeBytes`: 100 MB or an estimated 10% of available heap, whichever is smaller.
+- `maxRecordsPerPoll`: 1.

Review Comment:
   Changed it so that it polls for at most 1_000_000 bytes, and at least one record which is what we were targeting for before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Kinesis adaptive memory management (druid)

Posted by "github-advanced-security[bot] (via GitHub)" <gi...@apache.org>.
github-advanced-security[bot] commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1406932606


##########
processing/src/main/java/org/apache/druid/java/util/common/MemoryBoundLinkedBlockingQueue.java:
##########
@@ -61,12 +92,56 @@
     return ret;
   }
 
+  public Stream<ObjectContainer<T>> stream()
+  {
+    return queue.stream();
+  }
+
+  public int drain(Collection<? super ObjectContainer<T>> buffer, int bytesToDrain, long timeout, TimeUnit unit)
+      throws InterruptedException
+  {
+    Preconditions.checkNotNull(buffer);
+    long deadline = System.nanoTime() + unit.toNanos(timeout);
+    int added = 0;
+    int bytesAdded = 0;
+    while (bytesAdded < bytesToDrain) {
+      ObjectContainer<T> e = queue.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
+      if (e == null) {
+        break;
+      }
+      currentMemory.addAndGet(-e.getSize());
+      buffer.add(e);
+      ++added;
+      bytesAdded += e.getSize();

Review Comment:
   ## Implicit narrowing conversion in compound assignment
   
   Implicit cast of source type long to narrower destination type int.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/6012)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Kinesis adaptive memory management (druid)

Posted by "zachjsh (via GitHub)" <gi...@apache.org>.
zachjsh commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1406907369


##########
docs/development/extensions-core/kinesis-ingestion.md:
##########
@@ -656,25 +655,22 @@ For more detail, see [Segment size optimization](../../operations/segment-optimi
 
 Kinesis indexing tasks fetch records using `fetchThreads` threads.
 If `fetchThreads` is higher than the number of Kinesis shards, the excess threads are unused.
-Each fetch thread fetches up to `recordsPerFetch` records at once from a Kinesis shard, with a delay between fetches
+Each fetch thread fetches up to 10 MB of records at once from a Kinesis shard, with a delay between fetches
 of `fetchDelayMillis`.
-The records fetched by each thread are pushed into a shared queue of size `recordBufferSize`.
+The records fetched by each thread are pushed into a shared queue of size `recordBufferSizeBytes`.
 The main runner thread for each task polls up to `maxRecordsPerPoll` records from the queue at once.
 
-When using Kinesis Producer Library's aggregation feature, that is when [`deaggregate`](#deaggregation) is set,
-each of these parameters refers to aggregated records rather than individual records.
-
 The default values for these parameters are:
 
 - `fetchThreads`: Twice the number of processors available to the task. The number of processors available to the task
 is the total number of processors on the server, divided by `druid.worker.capacity` (the number of task slots on that
-particular server).
+particular server). This value is further limited so that the total data record data fetched at a given time does not
+exceed 5% of the max heap configured, assuming that each thread fetches 10 MB of records at once. If the value specified
+for this configuration is higher than this limit, no failure occurs, but a warning is logged, and the value is
+implicitly lowered to the max allowed by this constraint.
 - `fetchDelayMillis`: 0 (no delay between fetches).
-- `recordsPerFetch`: 100 MB or an estimated 5% of available heap, whichever is smaller, divided by `fetchThreads`.
-For estimation purposes, Druid uses a figure of 10 KB for regular records and 1 MB for [aggregated records](#deaggregation).
-- `recordBufferSize`: 100 MB or an estimated 10% of available heap, whichever is smaller.
-For estimation purposes, Druid uses a figure of 10 KB for regular records and 1 MB for [aggregated records](#deaggregation).
-- `maxRecordsPerPoll`: 100 for regular records, 1 for [aggregated records](#deaggregation).
+- `recordBufferSizeBytes`: 100 MB or an estimated 10% of available heap, whichever is smaller.
+- `maxRecordsPerPoll`: 1.

Review Comment:
   Changed it so that it polls for at least one record and at most 1_000_000 bytes if more than 1 record, which is what we were targeting for before.



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java:
##########
@@ -179,15 +174,36 @@ AWSCredentialsConfig getAwsCredentialsConfig()
   }
 
   @VisibleForTesting
-  static int computeFetchThreads(final RuntimeInfo runtimeInfo, final Integer configuredFetchThreads)
+  static int computeFetchThreads(
+      final RuntimeInfo runtimeInfo,
+      final Integer configuredFetchThreads
+  )
   {
-    final int fetchThreads;
+    int fetchThreads;
     if (configuredFetchThreads != null) {
       fetchThreads = configuredFetchThreads;
     } else {
       fetchThreads = runtimeInfo.getAvailableProcessors() * 2;
     }
 
+    // Each fetchThread can return upto 10MB at a time
+    // (https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html), cap fetchThreads so that
+    // we don't exceed more than the least of 100MB or 5% of heap at a time. Don't fail if fetchThreads specified
+    // is greater than this as to not cause failure for older configurations, but log warning in this case, and lower
+    // fetchThreads implicitly.
+    final long memoryToUse = Math.min(
+        KinesisIndexTaskIOConfig.MAX_RECORD_FETCH_MEMORY,
+        (long) (runtimeInfo.getMaxHeapSizeBytes() * KinesisIndexTaskIOConfig.RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION)
+    );
+    int maxFetchThreads = Math.max(
+        1,
+        (int) (memoryToUse / 10_000_000L)

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Kinesis adaptive memory management (druid)

Posted by "zachjsh (via GitHub)" <gi...@apache.org>.
zachjsh commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1412410605


##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java:
##########
@@ -71,12 +69,10 @@ public KinesisSupervisorIOConfig(
       @JsonProperty("lateMessageRejectionPeriod") Period lateMessageRejectionPeriod,
       @JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod,
       @JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime,
-      @JsonProperty("recordsPerFetch") Integer recordsPerFetch,
       @JsonProperty("fetchDelayMillis") Integer fetchDelayMillis,
       @JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
       @JsonProperty("awsExternalId") String awsExternalId,
-      @Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig autoScalerConfig,
-      @JsonProperty("deaggregate") boolean deaggregate

Review Comment:
   added back and marked as deprecated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Kinesis adaptive memory management (druid)

Posted by "zachjsh (via GitHub)" <gi...@apache.org>.
zachjsh commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1412410162


##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java:
##########
@@ -78,6 +82,14 @@ public KinesisIndexTask(
     );
     this.useListShards = useListShards;
     this.awsCredentialsConfig = awsCredentialsConfig;
+    if (tuningConfig.getRecordBufferSizeConfigured() != null) {

Review Comment:
   Good catch moved.



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java:
##########
@@ -78,6 +82,14 @@ public KinesisIndexTask(
     );
     this.useListShards = useListShards;
     this.awsCredentialsConfig = awsCredentialsConfig;
+    if (tuningConfig.getRecordBufferSizeConfigured() != null) {

Review Comment:
   Good catch. Moved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Kinesis adaptive memory management (druid)

Posted by "jon-wei (via GitHub)" <gi...@apache.org>.
jon-wei commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1401061045


##########
docs/development/extensions-core/kinesis-ingestion.md:
##########
@@ -656,25 +655,22 @@ For more detail, see [Segment size optimization](../../operations/segment-optimi
 
 Kinesis indexing tasks fetch records using `fetchThreads` threads.
 If `fetchThreads` is higher than the number of Kinesis shards, the excess threads are unused.
-Each fetch thread fetches up to `recordsPerFetch` records at once from a Kinesis shard, with a delay between fetches
+Each fetch thread fetches up to 10 MB of records at once from a Kinesis shard, with a delay between fetches
 of `fetchDelayMillis`.
-The records fetched by each thread are pushed into a shared queue of size `recordBufferSize`.
+The records fetched by each thread are pushed into a shared queue of size `recordBufferSizeBytes`.
 The main runner thread for each task polls up to `maxRecordsPerPoll` records from the queue at once.
 
-When using Kinesis Producer Library's aggregation feature, that is when [`deaggregate`](#deaggregation) is set,
-each of these parameters refers to aggregated records rather than individual records.
-
 The default values for these parameters are:
 
 - `fetchThreads`: Twice the number of processors available to the task. The number of processors available to the task
 is the total number of processors on the server, divided by `druid.worker.capacity` (the number of task slots on that
-particular server).
+particular server). This value is further limited so that the total data record data fetched at a given time does not
+exceed 5% of the max heap configured, assuming that each thread fetches 10 MB of records at once. If the value specified
+for this configuration is higher than this limit, no failure occurs, but a warning is logged, and the value is
+implicitly lowered to the max allowed by this constraint.
 - `fetchDelayMillis`: 0 (no delay between fetches).
-- `recordsPerFetch`: 100 MB or an estimated 5% of available heap, whichever is smaller, divided by `fetchThreads`.
-For estimation purposes, Druid uses a figure of 10 KB for regular records and 1 MB for [aggregated records](#deaggregation).
-- `recordBufferSize`: 100 MB or an estimated 10% of available heap, whichever is smaller.
-For estimation purposes, Druid uses a figure of 10 KB for regular records and 1 MB for [aggregated records](#deaggregation).
-- `maxRecordsPerPoll`: 100 for regular records, 1 for [aggregated records](#deaggregation).
+- `recordBufferSizeBytes`: 100 MB or an estimated 10% of available heap, whichever is smaller.
+- `maxRecordsPerPoll`: 1.

Review Comment:
   Should this be higher? I wonder if this is too low in the case of non-aggregated records



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Kinesis adaptive memory management (druid)

Posted by "gianm (via GitHub)" <gi...@apache.org>.
gianm commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1408828126


##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -637,21 +622,22 @@ public List<OrderedPartitionableRecord<String, String, ByteEntity>> poll(long ti
     try {
       int expectedSize = Math.min(Math.max(records.size(), 1), maxRecordsPerPoll);
 
-      List<OrderedPartitionableRecord<String, String, ByteEntity>> polledRecords = new ArrayList<>(expectedSize);
+      List<MemoryBoundLinkedBlockingQueue.ObjectContainer<OrderedPartitionableRecord<String, String, ByteEntity>>> polledRecords = new ArrayList<>(expectedSize);
 
-      Queues.drain(
-          records,
+      records.drain(
           polledRecords,
-          expectedSize,
+          MAX_BYTES_PER_POLL,

Review Comment:
   It looks like `maxRecordsPerPoll` isn't doing anything anymore. Is that right? If so let's get rid of it.



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -1059,11 +1045,23 @@ private void filterBufferAndResetBackgroundFetch(Set<StreamPartition<String>> pa
     }
 
     // filter records in buffer and only retain ones whose partition was not seeked
-    BlockingQueue<OrderedPartitionableRecord<String, String, ByteEntity>> newQ = new LinkedBlockingQueue<>(recordBufferSize);
+    MemoryBoundLinkedBlockingQueue<OrderedPartitionableRecord<String, String, ByteEntity>> newQ =
+        new MemoryBoundLinkedBlockingQueue<>(recordBufferSizeBytes);
 
     records.stream()
-           .filter(x -> !partitions.contains(x.getStreamPartition()))
-           .forEachOrdered(newQ::offer);
+        .filter(x -> !partitions.contains(x.getData().getStreamPartition()))
+        .forEachOrdered(x -> {
+          if (!newQ.offer(x)) {
+            // this should never really happen in practice but adding check here for safety.

Review Comment:
   Checks that should never happen, but are for safety, should be `DruidException.defensive`



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -294,22 +296,18 @@ private Runnable fetchRecords()
 
             // If the buffer was full and we weren't able to add the message, grab a new stream iterator starting
             // from this message and back off for a bit to let the buffer drain before retrying.
-            if (!records.offer(currRecord, recordBufferOfferTimeout, TimeUnit.MILLISECONDS)) {

Review Comment:
   The comment above is no longer accurate -- we aren't grabbing new stream iterators anymore when the buffer is full.



##########
docs/development/extensions-core/kinesis-ingestion.md:
##########
@@ -656,25 +655,22 @@ For more detail, see [Segment size optimization](../../operations/segment-optimi
 
 Kinesis indexing tasks fetch records using `fetchThreads` threads.
 If `fetchThreads` is higher than the number of Kinesis shards, the excess threads are unused.
-Each fetch thread fetches up to `recordsPerFetch` records at once from a Kinesis shard, with a delay between fetches
+Each fetch thread fetches up to 10 MB of records at once from a Kinesis shard, with a delay between fetches
 of `fetchDelayMillis`.
-The records fetched by each thread are pushed into a shared queue of size `recordBufferSize`.
+The records fetched by each thread are pushed into a shared queue of size `recordBufferSizeBytes`.
 The main runner thread for each task polls up to `maxRecordsPerPoll` records from the queue at once.
 
-When using Kinesis Producer Library's aggregation feature, that is when [`deaggregate`](#deaggregation) is set,
-each of these parameters refers to aggregated records rather than individual records.
-
 The default values for these parameters are:
 
 - `fetchThreads`: Twice the number of processors available to the task. The number of processors available to the task
 is the total number of processors on the server, divided by `druid.worker.capacity` (the number of task slots on that
-particular server).
+particular server). This value is further limited so that the total data record data fetched at a given time does not
+exceed 5% of the max heap configured, assuming that each thread fetches 10 MB of records at once. If the value specified
+for this configuration is higher than this limit, no failure occurs, but a warning is logged, and the value is
+implicitly lowered to the max allowed by this constraint.
 - `fetchDelayMillis`: 0 (no delay between fetches).
-- `recordsPerFetch`: 100 MB or an estimated 5% of available heap, whichever is smaller, divided by `fetchThreads`.
-For estimation purposes, Druid uses a figure of 10 KB for regular records and 1 MB for [aggregated records](#deaggregation).
-- `recordBufferSize`: 100 MB or an estimated 10% of available heap, whichever is smaller.
-For estimation purposes, Druid uses a figure of 10 KB for regular records and 1 MB for [aggregated records](#deaggregation).
-- `maxRecordsPerPoll`: 100 for regular records, 1 for [aggregated records](#deaggregation).
+- `recordBufferSizeBytes`: 100 MB or an estimated 10% of available heap, whichever is smaller.
+- `maxRecordsPerPoll`: 1.

Review Comment:
   So does that mean we should update the `maxRecordsPerPoll: 1` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Kinesis adaptive memory management (druid)

Posted by "zachjsh (via GitHub)" <gi...@apache.org>.
zachjsh commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1409816358


##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -637,21 +622,22 @@ public List<OrderedPartitionableRecord<String, String, ByteEntity>> poll(long ti
     try {
       int expectedSize = Math.min(Math.max(records.size(), 1), maxRecordsPerPoll);
 
-      List<OrderedPartitionableRecord<String, String, ByteEntity>> polledRecords = new ArrayList<>(expectedSize);
+      List<MemoryBoundLinkedBlockingQueue.ObjectContainer<OrderedPartitionableRecord<String, String, ByteEntity>>> polledRecords = new ArrayList<>(expectedSize);
 
-      Queues.drain(
-          records,
+      records.drain(
           polledRecords,
-          expectedSize,
+          MAX_BYTES_PER_POLL,

Review Comment:
   removed, and added `maxBytesPerPoll` which is being used instead now.



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -1059,11 +1045,23 @@ private void filterBufferAndResetBackgroundFetch(Set<StreamPartition<String>> pa
     }
 
     // filter records in buffer and only retain ones whose partition was not seeked
-    BlockingQueue<OrderedPartitionableRecord<String, String, ByteEntity>> newQ = new LinkedBlockingQueue<>(recordBufferSize);
+    MemoryBoundLinkedBlockingQueue<OrderedPartitionableRecord<String, String, ByteEntity>> newQ =
+        new MemoryBoundLinkedBlockingQueue<>(recordBufferSizeBytes);
 
     records.stream()
-           .filter(x -> !partitions.contains(x.getStreamPartition()))
-           .forEachOrdered(newQ::offer);
+        .filter(x -> !partitions.contains(x.getData().getStreamPartition()))
+        .forEachOrdered(x -> {
+          if (!newQ.offer(x)) {
+            // this should never really happen in practice but adding check here for safety.

Review Comment:
   thanks! updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Kinesis adaptive memory management (druid)

Posted by "zachjsh (via GitHub)" <gi...@apache.org>.
zachjsh commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1409816580


##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -294,22 +296,18 @@ private Runnable fetchRecords()
 
             // If the buffer was full and we weren't able to add the message, grab a new stream iterator starting
             // from this message and back off for a bit to let the buffer drain before retrying.
-            if (!records.offer(currRecord, recordBufferOfferTimeout, TimeUnit.MILLISECONDS)) {

Review Comment:
   removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Kinesis adaptive memory management (druid)

Posted by "jon-wei (via GitHub)" <gi...@apache.org>.
jon-wei commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1401071581


##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java:
##########
@@ -81,7 +80,7 @@ public KinesisIndexTaskTuningConfig(
       Long handoffConditionTimeout,
       Boolean resetOffsetAutomatically,
       Boolean skipSequenceNumberAvailabilityCheck,
-      Integer recordBufferSize,
+      @Nullable Integer recordBufferSizeBytes,

Review Comment:
   Do you think it'd make sense to log a warning if the eliminated property is provided?



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java:
##########
@@ -179,15 +174,36 @@ AWSCredentialsConfig getAwsCredentialsConfig()
   }
 
   @VisibleForTesting
-  static int computeFetchThreads(final RuntimeInfo runtimeInfo, final Integer configuredFetchThreads)
+  static int computeFetchThreads(
+      final RuntimeInfo runtimeInfo,
+      final Integer configuredFetchThreads
+  )
   {
-    final int fetchThreads;
+    int fetchThreads;
     if (configuredFetchThreads != null) {
       fetchThreads = configuredFetchThreads;
     } else {
       fetchThreads = runtimeInfo.getAvailableProcessors() * 2;
     }
 
+    // Each fetchThread can return upto 10MB at a time
+    // (https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html), cap fetchThreads so that
+    // we don't exceed more than the least of 100MB or 5% of heap at a time. Don't fail if fetchThreads specified
+    // is greater than this as to not cause failure for older configurations, but log warning in this case, and lower
+    // fetchThreads implicitly.
+    final long memoryToUse = Math.min(
+        KinesisIndexTaskIOConfig.MAX_RECORD_FETCH_MEMORY,
+        (long) (runtimeInfo.getMaxHeapSizeBytes() * KinesisIndexTaskIOConfig.RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION)
+    );
+    int maxFetchThreads = Math.max(
+        1,
+        (int) (memoryToUse / 10_000_000L)

Review Comment:
   nit: maybe use a constant for the 10MB limit with a comment that explains the limit comes from the Kinesis library



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -437,42 +429,35 @@ public KinesisRecordSupplier(
   {
     Preconditions.checkNotNull(amazonKinesis);
     this.kinesis = amazonKinesis;
-    this.recordsPerFetch = recordsPerFetch;
     this.fetchDelayMillis = fetchDelayMillis;
-    this.deaggregate = deaggregate;
     this.recordBufferOfferTimeout = recordBufferOfferTimeout;
     this.recordBufferFullWait = recordBufferFullWait;
     this.maxRecordsPerPoll = maxRecordsPerPoll;
     this.fetchThreads = fetchThreads;
-    this.recordBufferSize = recordBufferSize;
+    this.recordBufferSizeBytes = recordBufferSizeBytes;
     this.useEarliestSequenceNumber = useEarliestSequenceNumber;
     this.useListShards = useListShards;
     this.backgroundFetchEnabled = fetchThreads > 0;
 
     // the deaggregate function is implemented by the amazon-kinesis-client, whose license is not compatible with Apache.
     // The work around here is to use reflection to find the deaggregate function in the classpath. See details on the
     // docs page for more information on how to use deaggregation
-    if (deaggregate) {
-      try {
-        Class<?> kclUserRecordclass = Class.forName("com.amazonaws.services.kinesis.clientlibrary.types.UserRecord");
-        MethodHandles.Lookup lookup = MethodHandles.publicLookup();
+    try {
+      Class<?> kclUserRecordclass = Class.forName("com.amazonaws.services.kinesis.clientlibrary.types.UserRecord");

Review Comment:
   Are the points about the licensing above still correct? Looks like amazon-kinesis-client is Apache licensed now: https://github.com/awslabs/amazon-kinesis-client/blob/master/LICENSE.txt



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -1059,11 +1045,22 @@ private void filterBufferAndResetBackgroundFetch(Set<StreamPartition<String>> pa
     }
 
     // filter records in buffer and only retain ones whose partition was not seeked
-    BlockingQueue<OrderedPartitionableRecord<String, String, ByteEntity>> newQ = new LinkedBlockingQueue<>(recordBufferSize);
+    MemoryBoundLinkedBlockingQueue<OrderedPartitionableRecord<String, String, ByteEntity>> newQ =
+        new MemoryBoundLinkedBlockingQueue<>(recordBufferSizeBytes);
 
     records.stream()
-           .filter(x -> !partitions.contains(x.getStreamPartition()))
-           .forEachOrdered(newQ::offer);
+        .filter(x -> !partitions.contains(x.getData().getStreamPartition()))
+        .forEachOrdered(x -> {
+          if (!newQ.offer(x)) {

Review Comment:
   Is this a new failure mode? What would've happened in the old code if the queue size was exceeded?



##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -294,22 +295,18 @@ private Runnable fetchRecords()
 
             // If the buffer was full and we weren't able to add the message, grab a new stream iterator starting
             // from this message and back off for a bit to let the buffer drain before retrying.
-            if (!records.offer(currRecord, recordBufferOfferTimeout, TimeUnit.MILLISECONDS)) {
+            recordBufferOfferWaitMillis = recordBufferOfferTimeout;
+            while (!records.offer(
+                new MemoryBoundLinkedBlockingQueue.ObjectContainer<>(currRecord, recordSize),
+                recordBufferOfferWaitMillis,
+                TimeUnit.MILLISECONDS
+            )) {
               log.warn(
                   "Kinesis records are being processed slower than they are fetched. "
                   + "OrderedPartitionableRecord buffer full, storing iterator and retrying in [%,dms].",
                   recordBufferFullWait
               );
-
-              shardIterator = kinesis.getShardIterator(
-                  currRecord.getStream(),
-                  currRecord.getPartitionId(),
-                  ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
-                  currRecord.getSequenceNumber()
-              ).getShardIterator();
-
-              scheduleBackgroundFetch(recordBufferFullWait);
-              return;
+              recordBufferOfferWaitMillis = recordBufferFullWait;

Review Comment:
   How come the shardIterator doesn't need to be reset here as before?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Kinesis adaptive memory management (druid)

Posted by "zachjsh (via GitHub)" <gi...@apache.org>.
zachjsh commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1406907369


##########
docs/development/extensions-core/kinesis-ingestion.md:
##########
@@ -656,25 +655,22 @@ For more detail, see [Segment size optimization](../../operations/segment-optimi
 
 Kinesis indexing tasks fetch records using `fetchThreads` threads.
 If `fetchThreads` is higher than the number of Kinesis shards, the excess threads are unused.
-Each fetch thread fetches up to `recordsPerFetch` records at once from a Kinesis shard, with a delay between fetches
+Each fetch thread fetches up to 10 MB of records at once from a Kinesis shard, with a delay between fetches
 of `fetchDelayMillis`.
-The records fetched by each thread are pushed into a shared queue of size `recordBufferSize`.
+The records fetched by each thread are pushed into a shared queue of size `recordBufferSizeBytes`.
 The main runner thread for each task polls up to `maxRecordsPerPoll` records from the queue at once.
 
-When using Kinesis Producer Library's aggregation feature, that is when [`deaggregate`](#deaggregation) is set,
-each of these parameters refers to aggregated records rather than individual records.
-
 The default values for these parameters are:
 
 - `fetchThreads`: Twice the number of processors available to the task. The number of processors available to the task
 is the total number of processors on the server, divided by `druid.worker.capacity` (the number of task slots on that
-particular server).
+particular server). This value is further limited so that the total data record data fetched at a given time does not
+exceed 5% of the max heap configured, assuming that each thread fetches 10 MB of records at once. If the value specified
+for this configuration is higher than this limit, no failure occurs, but a warning is logged, and the value is
+implicitly lowered to the max allowed by this constraint.
 - `fetchDelayMillis`: 0 (no delay between fetches).
-- `recordsPerFetch`: 100 MB or an estimated 5% of available heap, whichever is smaller, divided by `fetchThreads`.
-For estimation purposes, Druid uses a figure of 10 KB for regular records and 1 MB for [aggregated records](#deaggregation).
-- `recordBufferSize`: 100 MB or an estimated 10% of available heap, whichever is smaller.
-For estimation purposes, Druid uses a figure of 10 KB for regular records and 1 MB for [aggregated records](#deaggregation).
-- `maxRecordsPerPoll`: 100 for regular records, 1 for [aggregated records](#deaggregation).
+- `recordBufferSizeBytes`: 100 MB or an estimated 10% of available heap, whichever is smaller.
+- `maxRecordsPerPoll`: 1.

Review Comment:
   Now I am polling for at most 1_000_000 bytes, which is what we were targeting for before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Kinesis adaptive memory management (druid)

Posted by "zachjsh (via GitHub)" <gi...@apache.org>.
zachjsh commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1406908526


##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -1059,11 +1045,22 @@ private void filterBufferAndResetBackgroundFetch(Set<StreamPartition<String>> pa
     }
 
     // filter records in buffer and only retain ones whose partition was not seeked
-    BlockingQueue<OrderedPartitionableRecord<String, String, ByteEntity>> newQ = new LinkedBlockingQueue<>(recordBufferSize);
+    MemoryBoundLinkedBlockingQueue<OrderedPartitionableRecord<String, String, ByteEntity>> newQ =
+        new MemoryBoundLinkedBlockingQueue<>(recordBufferSizeBytes);
 
     records.stream()
-           .filter(x -> !partitions.contains(x.getStreamPartition()))
-           .forEachOrdered(newQ::offer);
+        .filter(x -> !partitions.contains(x.getData().getStreamPartition()))
+        .forEachOrdered(x -> {
+          if (!newQ.offer(x)) {

Review Comment:
   added comment saying that this shouldnt really happen, but is added for safety.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Kinesis adaptive memory management (druid)

Posted by "zachjsh (via GitHub)" <gi...@apache.org>.
zachjsh commented on code in PR #15360:
URL: https://github.com/apache/druid/pull/15360#discussion_r1412411405


##########
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java:
##########
@@ -635,23 +620,19 @@ public List<OrderedPartitionableRecord<String, String, ByteEntity>> poll(long ti
     start();
 
     try {
-      int expectedSize = Math.min(Math.max(records.size(), 1), maxRecordsPerPoll);
-
-      List<OrderedPartitionableRecord<String, String, ByteEntity>> polledRecords = new ArrayList<>(expectedSize);
+      List<MemoryBoundLinkedBlockingQueue.ObjectContainer<OrderedPartitionableRecord<String, String, ByteEntity>>> polledRecords = new ArrayList<>();
 
-      Queues.drain(
-          records,
+      records.drain(
           polledRecords,
-          expectedSize,
+          maxBytesPerPoll,

Review Comment:
   good question, it always drains at least one record, clarified that in the docs. I added a test for this, see org.apache.druid.java.util.common.MemoryBoundLinkedBlockingQueueTest#test_drain_queueWithFirstItemSizeGreaterThanLimit_succeeds



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org