You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2022/09/26 10:26:53 UTC

[GitHub] [druid] tejaswini-imply opened a new pull request, #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

tejaswini-imply opened a new pull request, #13144:
URL: https://github.com/apache/druid/pull/13144

   ### Description
   When a stream becomes inactive i.e. no new data arrives and all existing data is caught up, there is no need for Supervisor to create new indexing tasks. This feature uses `LagBasedAutoScaler` to turn Supervisor idle when stream is inactive for configured amount of time (See conf section). 
   
   - LagBasedAutoScaler submits either `IdleNotice` or `DynamicAllocationTasksNotice` based on activity in the stream every `LagBasedAutoScalerConfig.scaleActionPeriodMillis`.
   - When Supervisor is in idle state it just doesn't create any new indexing tasks. 
   - This temporary IDLE state is cleared when supervisor is restarted e.g. updated spec. 
   - While the supervisor is in the temporary idle state, the lag-based autoscaler will continue to run, checking for any changes in the latest offsets (or if lag is > 0). If activity in the stream is detected again, then the autoscaler will submit another notice to move the supervisor out of its idle state and resume creating indexing tasks.
   
   Please note the initial delay before new data is ingested from when new data enters into the stream is frequency of idle/dynamic task allocation notice submission (default - 1 min) + initial warmup time to create new indexing tasks.
   
   #### How current offsets fetching work:
   - Supervisor is not idle: current offsets are fetched from running tasks.
   - Supervisor is idle: current offsets are fetched from metadata storage since no tasks are likely running.
   
   ### Configuration changes
   - `druid.supervisor.enableIdleBehaviour` overlord property enables this feature. Default value is false.
   - `LagBasedAutoScalerConfig.minPauseSupervisorIfStreamIdleMillis` - Minimum time interval to wait until stream is considered inactive/idle. Default value is 60,000 millis.
   - Adds new State - `IDLE` - to Basic states of Supervisor.
   
   <hr>
   
   ##### Key changed/added classes in this PR
    * `SeekableStreamSupervisor`
    * `KafkaSupervisor`
    * `KinesisSupervisor`
    * `SupervisorStateManager`
    * `LagBasedAutoScaler`
   
   <hr>
   
   
   This PR has:
   - [ ] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r989747816


##########
docs/development/extensions-core/kafka-supervisor-reference.md:
##########
@@ -51,6 +51,8 @@ This topic contains configuration reference information for the Apache Kafka sup
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, Druid drops messages with timestamps earlier than *2016-01-01T11:00Z*. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
+|`enableIdleBehaviour`|Boolean|To turn on/off idle behavior per Supervisor.|no (default == false)|
+|`idleSupervisorForStreamIdleMillis`|Long|Minimum time interval to wait until stream is considered idle. (i.e. all existing data is caught up and no new data arrives).| no (default == 60000) |

Review Comment:
   `idleSupervisorForIdleStreamMillis` - we can use a better name here. 



##########
docs/development/extensions-core/kafka-supervisor-reference.md:
##########
@@ -51,6 +51,8 @@ This topic contains configuration reference information for the Apache Kafka sup
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, Druid drops messages with timestamps earlier than *2016-01-01T11:00Z*. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
+|`enableIdleBehaviour`|Boolean|To turn on/off idle behavior per Supervisor.|no (default == false)|
+|`idleSupervisorForStreamIdleMillis`|Long|Minimum time interval to wait until stream is considered idle. (i.e. all existing data is caught up and no new data arrives).| no (default == 60000) |

Review Comment:
   ```suggestion
   |`idleSupervisorForStreamIdleMillis`|Long|Minimum time interval to wait before a topic is considered idle. (i.e. all existing data has been read from the stream and the topic is not getting new data).| no (default == 60000) |
   ```



##########
docs/configuration/index.md:
##########
@@ -1152,6 +1152,7 @@ There are additional configs for autoscaling (if it is enabled):
 |`druid.supervisor.taskUnhealthinessThreshold`|The number of consecutive task failures before the supervisor is considered unhealthy.|3|
 |`druid.supervisor.storeStackTrace`|Whether full stack traces of supervisor exceptions should be stored and returned by the supervisor `/status` endpoint.|false|
 |`druid.supervisor.maxStoredExceptionEvents`|The maximum number of exception events that can be returned through the supervisor `/status` endpoint.|`max(healthinessThreshold, unhealthinessThreshold)`|
+|`druid.supervisor.enableIdleBehaviour`|Whether supervisor should be turned idle if stream is idle for configured time.|false|

Review Comment:
   ```suggestion
   |`druid.supervisor.enableIdleBehaviour`|If enabled, Kafka supervisor will become idle if there is no data on input stream/topic for some time. That time can be configured via `<insert-the-setting or reference>` .|false|
   ```



##########
docs/development/extensions-core/kafka-supervisor-reference.md:
##########
@@ -51,6 +51,8 @@ This topic contains configuration reference information for the Apache Kafka sup
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, Druid drops messages with timestamps earlier than *2016-01-01T11:00Z*. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
+|`enableIdleBehaviour`|Boolean|To turn on/off idle behavior per Supervisor.|no (default == false)|

Review Comment:
   what does `idle behavior` mean? We should instead say that "If turned off, the supervisor can never get into an idle state. there is also no documentation about how this flag and the cluster-level flag interacts. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
AmatyaAvadhanula commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r989741127


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3977,14 +4048,25 @@ protected void emitNoticesQueueSize()
 
   protected void emitLag()
   {
-    if (spec.isSuspended() || !stateManager.isSteadyState()) {
+    log.info(stateManager.getSupervisorState().getBasicState().toString());
+    log.info("!isIdle: %s, spec.isSuspended: %s, !statemanager.isSteadyState %s", !isIdle(), spec.isSuspended(), !stateManager.isSteadyState());
+    if (spec.isSuspended() || !stateManager.isSteadyState() && !isIdle()) {
       // don't emit metrics if supervisor is suspended or not in a healthy running state
       // (lag should still available in status report)
       return;
     }
     try {
-      Map<PartitionIdType, Long> partitionRecordLags = getPartitionRecordLag();
-      Map<PartitionIdType, Long> partitionTimeLags = getPartitionTimeLag();
+      Map<PartitionIdType, SequenceOffsetType> offsetsFromMetadatStorage = getOffsetsFromMetadataStorage();

Review Comment:
   These changes can be reverted if the metadata storage offsets are returned instead of any empty map when there are no active tasks (in getHighestCurrentOffsets)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] tejaswini-imply commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
tejaswini-imply commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r991351677


##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java:
##########
@@ -240,6 +242,11 @@ protected boolean isStoreStackTrace()
     return supervisorStateManagerConfig.isStoreStackTrace();
   }
 
+  public boolean isEnableIdleBehavior()

Review Comment:
   It's being used in SupervisorStateManagerTest. Added solely to increase the test coverage in the server-module, doesn't serve logically.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r989742780


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3977,14 +4048,25 @@ protected void emitNoticesQueueSize()
 
   protected void emitLag()
   {
-    if (spec.isSuspended() || !stateManager.isSteadyState()) {
+    log.info(stateManager.getSupervisorState().getBasicState().toString());

Review Comment:
   @tejaswini-imply - these log lines have no context. They are unlikely to be useful. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
kfaraz commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r991245678


##########
docs/configuration/index.md:
##########
@@ -1152,6 +1152,7 @@ There are additional configs for autoscaling (if it is enabled):
 |`druid.supervisor.taskUnhealthinessThreshold`|The number of consecutive task failures before the supervisor is considered unhealthy.|3|
 |`druid.supervisor.storeStackTrace`|Whether full stack traces of supervisor exceptions should be stored and returned by the supervisor `/status` endpoint.|false|
 |`druid.supervisor.maxStoredExceptionEvents`|The maximum number of exception events that can be returned through the supervisor `/status` endpoint.|`max(healthinessThreshold, unhealthinessThreshold)`|
+|`druid.supervisor.enableIdleBehaviour`|If enabled, Supervisor can become idle if there is no data on input stream/topic for some time. This time can be configured via `awaitStreamInactiveMillis`. It can also be turned off per Supervisor.|false|

Review Comment:
   ```suggestion
   |`druid.supervisor.enableIdleBehaviour`|If enabled, a supervisor can become idle and stop creating new tasks if there has been no new data on the input stream for a specified period of time.|false|
   ```
   
   Also add a separate entry for `awaitStreamInactiveMillis`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
kfaraz commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r991262941


##########
docs/development/extensions-core/kafka-supervisor-reference.md:
##########
@@ -51,6 +51,8 @@ This topic contains configuration reference information for the Apache Kafka sup
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, Druid drops messages with timestamps earlier than *2016-01-01T11:00Z*. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
+|`enableIdleBehaviour`|If enabled, Kafka supervisor will become idle if there is no data on input stream/topic for some time. This can only be enabled if Overlord config `druid.supervisor.enableIdleBehaviour` is enabled.|no (default == false)|

Review Comment:
   ```suggestion
   |`enableIdleBehaviour`|Boolean|If enabled, Kafka supervisor becomes idle and does not create any more tasks if there has been no data on the input topic for some time. This takes effect only if overlord config `druid.supervisor.enableIdleBehaviour` is true.|no (default == false)|
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] tejaswini-imply commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
tejaswini-imply commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r988553931


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3244,6 +3275,40 @@ private void checkCurrentTaskState() throws ExecutionException, InterruptedExcep
     Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
   }
 
+  private void checkIfStreamInactiveAndTurnSupervisorIdle()
+  {
+    if (!spec.getSupervisorStateManagerConfig().isEnableIdleBehaviour()
+        || !spec.getIoConfig().isEnableIdleBehaviour()) {
+      return;
+    }
+
+    Map<PartitionIdType, SequenceOffsetType> latestSequencesFromStream = getLatestSequences();
+    long nowTime = Instant.now().toEpochMilli();
+    boolean idle;
+    if (lagsVerifiedlastTime > 0
+        && previousPartitionOffsetsSnapshot.equals(latestSequencesFromStream)
+        && computeTotalLag() == 0) {
+      idleTime += nowTime - lagsVerifiedlastTime;

Review Comment:
   It is used to compare with `ioConfig.idleSupervisorForStreamIdleMillis`, only if greater to turn Supervisor idle.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
AmatyaAvadhanula commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r989739573


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3660,6 +3728,9 @@ protected Map<PartitionIdType, SequenceOffsetType> getHighestCurrentOffsets()
                 Entry::getValue,
                 (v1, v2) -> makeSequenceNumber(v1).compareTo(makeSequenceNumber(v2)) > 0 ? v1 : v2
             ));
+
+        partitionIds.forEach(partitionId -> currentOffsets.putIfAbsent(partitionId, offsetsFromMetadataStorage.get(partitionId)));
+        return currentOffsets;
       }
       // nothing is running but we are not suspended, so lets just hang out in case we get called while things start up
       return ImmutableMap.of();

Review Comment:
   We need to return offsetsFromMetadataStorage here as well since the lag is computed differently now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] tejaswini-imply commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
tejaswini-imply commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r989651900


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3244,6 +3275,40 @@ private void checkCurrentTaskState() throws ExecutionException, InterruptedExcep
     Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
   }
 
+  private void checkIfStreamInactiveAndTurnSupervisorIdle()
+  {
+    if (!spec.getSupervisorStateManagerConfig().isEnableIdleBehaviour()
+        || !spec.getIoConfig().isEnableIdleBehaviour()) {
+      return;

Review Comment:
   Addressed it in the latest commit, Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on PR #13144:
URL: https://github.com/apache/druid/pull/13144#issuecomment-1257912201

   This pull request **introduces 1 alert** when merging d16f8d12b2af32403b198899112732f356892300 into 0bfa81b7df4b0ae76ac45497b007b6857acb419f - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-0b0fb560ba0530f97425c602d16d53d34fc3d093)
   
   **new alerts:**
   
   * 1 for Boxed variable is never null


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] tejaswini-imply commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
tejaswini-imply commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r984225784


##########
docs/configuration/index.md:
##########
@@ -1152,6 +1152,7 @@ There are additional configs for autoscaling (if it is enabled):
 |`druid.supervisor.taskUnhealthinessThreshold`|The number of consecutive task failures before the supervisor is considered unhealthy.|3|
 |`druid.supervisor.storeStackTrace`|Whether full stack traces of supervisor exceptions should be stored and returned by the supervisor `/status` endpoint.|false|
 |`druid.supervisor.maxStoredExceptionEvents`|The maximum number of exception events that can be returned through the supervisor `/status` endpoint.|`max(healthinessThreshold, unhealthinessThreshold)`|
+|`druid.supervisor.enableIdleBehaviour`|Whether supervisor should be turned idle if stream is idle for configured time.|false|

Review Comment:
   This is per cluster, It can't be individually enabled or disabled per supervisor after the design shift, I think we should another config in supervisor spec for the same. Thanks for pointing it out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
AmatyaAvadhanula commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r982280416


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2502,11 +2538,48 @@ private boolean updatePartitionDataFromStream()
     return true;
   }
 
+  public long computeTotalLag()
+  {
+    if (isIdle()) {
+      Map<PartitionIdType, SequenceOffsetType> oldOffsets = getOffsetsFromMetadataStorage();
+      return computeLagStatsForOffsets(oldOffsets).getTotalLag();

Review Comment:
   > Supervisor is idle: current offsets are fetched from metadata storage since no tasks are likely running.
   
   Couldn't this lead to a situation where all actively running tasks could return 0 lag as they have caught up, but one or more of them may not have checkpointed to the metadata store?
   This will lead to a situation where an IDLE supervisor returns to RUNNING due to the discrepancy when computing lag using only the metadata store. 
   I think that for each partition in the topic, the current offset must be retrieved from the active task it has been assigned to, with a fallback to the metadata store. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] tejaswini-imply commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
tejaswini-imply commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r991334161


##########
docs/development/extensions-core/kafka-supervisor-reference.md:
##########
@@ -51,6 +51,8 @@ This topic contains configuration reference information for the Apache Kafka sup
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, Druid drops messages with timestamps earlier than *2016-01-01T11:00Z*. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
+|`enableIdleBehaviour`|If enabled, Kafka supervisor will become idle if there is no data on input stream/topic for some time. This can only be enabled if Overlord config `druid.supervisor.enableIdleBehaviour` is enabled.|no (default == false)|

Review Comment:
   This is made false by default since it's assumed that after overlord restart, the user might desire existing supervisors to not behave differently unless explicitly specified so.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] tejaswini-imply commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
tejaswini-imply commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r983960094


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2502,11 +2538,48 @@ private boolean updatePartitionDataFromStream()
     return true;
   }
 
+  public long computeTotalLag()
+  {
+    if (isIdle()) {
+      Map<PartitionIdType, SequenceOffsetType> oldOffsets = getOffsetsFromMetadataStorage();
+      return computeLagStatsForOffsets(oldOffsets).getTotalLag();

Review Comment:
   @AmatyaAvadhanula Thanks for the review, I have addressed your comments in the latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] tejaswini-imply commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
tejaswini-imply commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r989929349


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3660,6 +3728,9 @@ protected Map<PartitionIdType, SequenceOffsetType> getHighestCurrentOffsets()
                 Entry::getValue,
                 (v1, v2) -> makeSequenceNumber(v1).compareTo(makeSequenceNumber(v2)) > 0 ? v1 : v2
             ));
+
+        partitionIds.forEach(partitionId -> currentOffsets.putIfAbsent(partitionId, offsetsFromMetadataStorage.get(partitionId)));
+        return currentOffsets;
       }
       // nothing is running but we are not suspended, so lets just hang out in case we get called while things start up
       return ImmutableMap.of();

Review Comment:
   Thanks Amatya, Addressed your comments in the latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] tejaswini-imply commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
tejaswini-imply commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r991339917


##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -249,15 +252,7 @@ protected Map<Integer, Long> getPartitionRecordLag()
     if (latestSequenceFromStream == null) {
       return null;
     }
-
-    if (!latestSequenceFromStream.keySet().equals(highestCurrentOffsets.keySet())) {

Review Comment:
   Thanks for pointing it out, @kfaraz. in the earlier design of LagBasedAutoScaler, this piece of code was reused. Now that it's redesigned, there is no need for it, will revert the changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
AmatyaAvadhanula commented on PR #13144:
URL: https://github.com/apache/druid/pull/13144#issuecomment-1272256888

   @tejaswini-imply thank you for the changes and tests.
   However, `ITKafkaIndexingServiceNonTransactionalParallelizedTest.testIndexDataWithIdleBehaviourEnabled` seems to be failing.
   Could you please set the environment to enable cluster-wide idle behaviour?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] tejaswini-imply commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
tejaswini-imply commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r988553320


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3244,6 +3275,40 @@ private void checkCurrentTaskState() throws ExecutionException, InterruptedExcep
     Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
   }
 
+  private void checkIfStreamInactiveAndTurnSupervisorIdle()
+  {
+    if (!spec.getSupervisorStateManagerConfig().isEnableIdleBehaviour()
+        || !spec.getIoConfig().isEnableIdleBehaviour()) {
+      return;

Review Comment:
   Since if this condition turns out to be the case, It's unlikely to change while Supervisor is running hence unlikely to be moved to Idle state. In case of spec update or cluster restart, it would restart the Supervisor as well then in such a scenario Idle state would be erased as well if I'm not missing something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
AmatyaAvadhanula commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r990613026


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java:
##########
@@ -88,6 +88,9 @@ public SeekableStreamSupervisorSpec(
       @JacksonInject SupervisorStateManagerConfig supervisorStateManagerConfig
   )
   {
+    Preconditions.checkArgument(supervisorStateManagerConfig.isEnableIdleBehaviour()
+                                || !ingestionSchema.getIOConfig().isEnableIdleBehaviour(),
+                                "Idle behaviour is disabled on Overlord. It cannot be enabled per Supervisor.");

Review Comment:
   This may break an existing supervisor's ingestion when the cluster-wide setting is turned off



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] tejaswini-imply commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
tejaswini-imply commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r990973087


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java:
##########
@@ -88,6 +88,9 @@ public SeekableStreamSupervisorSpec(
       @JacksonInject SupervisorStateManagerConfig supervisorStateManagerConfig
   )
   {
+    Preconditions.checkArgument(supervisorStateManagerConfig.isEnableIdleBehaviour()
+                                || !ingestionSchema.getIOConfig().isEnableIdleBehaviour(),
+                                "Idle behaviour is disabled on Overlord. It cannot be enabled per Supervisor.");

Review Comment:
   Reverted it. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] tejaswini-imply commented on pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
tejaswini-imply commented on PR #13144:
URL: https://github.com/apache/druid/pull/13144#issuecomment-1276244242

   Thanks @AmatyaAvadhanula, @kfaraz and @abhishekagarwal87 for thorough review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] tejaswini-imply commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
tejaswini-imply commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r988555577


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3244,6 +3275,40 @@ private void checkCurrentTaskState() throws ExecutionException, InterruptedExcep
     Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
   }
 
+  private void checkIfStreamInactiveAndTurnSupervisorIdle()
+  {
+    if (!spec.getSupervisorStateManagerConfig().isEnableIdleBehaviour()
+        || !spec.getIoConfig().isEnableIdleBehaviour()) {
+      return;
+    }
+
+    Map<PartitionIdType, SequenceOffsetType> latestSequencesFromStream = getLatestSequences();
+    long nowTime = Instant.now().toEpochMilli();
+    boolean idle;
+    if (lagsVerifiedlastTime > 0
+        && previousPartitionOffsetsSnapshot.equals(latestSequencesFromStream)
+        && computeTotalLag() == 0) {
+      idleTime += nowTime - lagsVerifiedlastTime;

Review Comment:
   Oh I'm sorry assignment for `lagsVerifiedlastTime` was done inside by mistake.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
AmatyaAvadhanula commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r984219067


##########
docs/configuration/index.md:
##########
@@ -1152,6 +1152,7 @@ There are additional configs for autoscaling (if it is enabled):
 |`druid.supervisor.taskUnhealthinessThreshold`|The number of consecutive task failures before the supervisor is considered unhealthy.|3|
 |`druid.supervisor.storeStackTrace`|Whether full stack traces of supervisor exceptions should be stored and returned by the supervisor `/status` endpoint.|false|
 |`druid.supervisor.maxStoredExceptionEvents`|The maximum number of exception events that can be returned through the supervisor `/status` endpoint.|`max(healthinessThreshold, unhealthinessThreshold)`|
+|`druid.supervisor.enableIdleBehaviour`|Whether supervisor should be turned idle if stream is idle for configured time.|false|

Review Comment:
   Can this property be set independently per supervisor?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] lgtm-com[bot] commented on pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on PR #13144:
URL: https://github.com/apache/druid/pull/13144#issuecomment-1258150627

   This pull request **introduces 1 alert** when merging 440544e32a14a55df70a9deb0ea93b32aa5ebb64 into 0bfa81b7df4b0ae76ac45497b007b6857acb419f - [view on LGTM.com](https://lgtm.com/projects/g/apache/druid/rev/pr-ea6a9b93d659bf45d16f1ef394709cccde5368ef)
   
   **new alerts:**
   
   * 1 for Boxed variable is never null


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
kfaraz commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r991271528


##########
docs/development/extensions-core/kafka-supervisor-reference.md:
##########
@@ -51,6 +51,8 @@ This topic contains configuration reference information for the Apache Kafka sup
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, Druid drops messages with timestamps earlier than *2016-01-01T11:00Z*. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
+|`enableIdleBehaviour`|If enabled, Kafka supervisor will become idle if there is no data on input stream/topic for some time. This can only be enabled if Overlord config `druid.supervisor.enableIdleBehaviour` is enabled.|no (default == false)|
+|`awaitStreamInactiveMillis`|Long|Minimum time interval to wait before a topic is considered inactive. (i.e. all existing data has been read from the stream and the topic is not getting new data).| no (default == 60000) |

Review Comment:
   ```suggestion
   |`awaitStreamInactiveMillis`|Long|Minimum time interval to wait after the topic has become inactive before marking the supervisor as idle. A topic is considered to be inactive if all existing data has been read from it and no new data has been published to it.| no (default == 60000) |
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
kfaraz commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r993048404


##########
docs/development/extensions-core/kafka-supervisor-reference.md:
##########
@@ -117,8 +125,10 @@ The following example demonstrates supervisor spec with `lagBased` autoScaler en
          "taskCount":1,
          "replicas":1,
          "taskDuration":"PT1H",
-         "enableIdleBehaviour": true,
-         "awaitStreamInactiveMillis": 60000
+         "idleConfig": {
+           "enabled": true,
+           "inactiveAfterMillis": 600000 
+         }

Review Comment:
   nit: missing comma



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/IdleConfig.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+import javax.annotation.Nullable;
+
+public class IdleConfig

Review Comment:
   Please add a javadoc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
AmatyaAvadhanula commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r989143654


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3244,6 +3275,40 @@ private void checkCurrentTaskState() throws ExecutionException, InterruptedExcep
     Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
   }
 
+  private void checkIfStreamInactiveAndTurnSupervisorIdle()
+  {
+    if (!spec.getSupervisorStateManagerConfig().isEnableIdleBehaviour()
+        || !spec.getIoConfig().isEnableIdleBehaviour()) {
+      return;

Review Comment:
   Could you please add a check to see if it is suspended?
   If not, a SUSPENDED supervisor could be shown as IDLE.
   While this doesn't affect ingestion, it could be misleading to the user



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] tejaswini-imply commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
tejaswini-imply commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r991374520


##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java:
##########
@@ -154,7 +155,7 @@ public synchronized void maybeSetState(State proposedState)
       return;
     }
 
-    // if we're trying to switch to a healthy steady state (i.e. RUNNING or SUSPENDED) but haven't had a successful run
+    // if we're trying to switch to a healthy steady state (i.e. RUNNING or SUSPENDED) or IDLE state but haven't had a successful run

Review Comment:
   IDLE isn't a healthy steady state. Hence it's mentioned healthy steady state or IDLE state. Please let me know if the way it's mentioned is confusing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] tejaswini-imply commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
tejaswini-imply commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r991390234


##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -274,17 +269,19 @@ protected Map<Integer, Long> getPartitionTimeLag()
   @SuppressWarnings("SSBasedInspection")
   protected Map<Integer, Long> getRecordLagPerPartition(Map<Integer, Long> currentOffsets)
   {
-    return currentOffsets
+    if (latestSequenceFromStream == null) {
+      return ImmutableMap.of();
+    }
+
+    return latestSequenceFromStream
         .entrySet()
         .stream()
         .collect(
             Collectors.toMap(
                 Entry::getKey,
-                e -> latestSequenceFromStream != null
-                     && latestSequenceFromStream.get(e.getKey()) != null
-                     && e.getValue() != null
-                     ? latestSequenceFromStream.get(e.getKey()) - e.getValue()
-                     : Integer.MIN_VALUE
+                e -> e.getValue() != null

Review Comment:
   If any partitions are missing from the `latestSequenceFromStream`, Those partitions won't be considered during lag reporting. Now that idle behavior is decided based on these lags it's better to compare as per the latest stream stats.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 commented on pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 commented on PR #13144:
URL: https://github.com/apache/druid/pull/13144#issuecomment-1276139837

   Thank you @tejaswini-imply. This is going to be very useful feature. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] abhishekagarwal87 merged pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
abhishekagarwal87 merged PR #13144:
URL: https://github.com/apache/druid/pull/13144


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
kfaraz commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r991179466


##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java:
##########
@@ -154,7 +155,7 @@ public synchronized void maybeSetState(State proposedState)
       return;
     }
 
-    // if we're trying to switch to a healthy steady state (i.e. RUNNING or SUSPENDED) but haven't had a successful run
+    // if we're trying to switch to a healthy steady state (i.e. RUNNING or SUSPENDED) or IDLE state but haven't had a successful run

Review Comment:
   I don't think this change is applicable since IDLE doesn't count as a "healthy steady state". IIUC, the `healthySteadyState` can only be RUNNING or SUSPENDED, which is determined while creating this `SupervisorStateManager` instance.



##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerConfig.java:
##########
@@ -46,6 +46,10 @@
   @JsonProperty
   private int maxStoredExceptionEvents = Math.max(unhealthinessThreshold, healthinessThreshold);
 
+  //

Review Comment:
   Nit: missing comment?



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -23,6 +23,7 @@
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.curator.shaded.com.google.common.collect.ImmutableMap;

Review Comment:
   Adding a dependency just for `ImmutableMap` seems overkill. You can use `Collections.emptyMap()` instead.



##########
.travis.yml:
##########
@@ -506,13 +506,13 @@ jobs:
       stage: Tests - phase 2
       jdk: openjdk8
       services: *integration_test_services
-      env: TESTNG_GROUPS='-Dgroups=kafka-index' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
+      env: TESTNG_GROUPS='-Dgroups=kafka-index' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager' OVERRIDE_CONFIG_PATH='./environment-configs/test-groups/enable-idle-behaviour'

Review Comment:
   Nit: It is preferable to have the name of the override config the same as the test group (`kafka-index` in this case). Otherwise, we would end up having separate config files for every feature.
   
   You need not create another one for `kafka-transactional-index` though and can reuse the same one.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2502,11 +2511,21 @@ private boolean updatePartitionDataFromStream()
     return true;
   }
 
+  protected Map<PartitionIdType, SequenceOffsetType> getLatestSequences()

Review Comment:
   Please add a javadoc to this method.
   Should this be renamed to `getLatestSequencesFromStream()`?



##########
integration-tests/src/test/resources/stream/data/supervisor_with_idle_behaviour_enabled_spec_template.json:
##########
@@ -0,0 +1,60 @@
+{

Review Comment:
   This spec seems pretty generic. Do we really need to add a new one for the new test or can we reuse any existing one?



##########
extensions-core/kafka-indexing-service/pom.xml:
##########
@@ -134,6 +134,11 @@
       <artifactId>validation-api</artifactId>
       <scope>provided</scope>
     </dependency>
+    <dependency>

Review Comment:
   It doesn't seem like this dependency is needed.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2502,11 +2511,21 @@ private boolean updatePartitionDataFromStream()
     return true;
   }
 
+  protected Map<PartitionIdType, SequenceOffsetType> getLatestSequences()
+  {
+    return new HashMap<>();
+  }
+
+  private boolean isIdle()

Review Comment:
   This method should be exposed by the `stateManager` itself, since the state is tracked there anyway and we refer to it for other purposes such as checking `isSteadyState()` or `isHealthy()`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2502,11 +2511,21 @@ private boolean updatePartitionDataFromStream()
     return true;
   }
 
+  protected Map<PartitionIdType, SequenceOffsetType> getLatestSequences()
+  {
+    return new HashMap<>();
+  }
+
+  private boolean isIdle()
+  {
+    return SupervisorStateManager.BasicState.IDLE.equals(getState());
+  }
+
   private void assignRecordSupplierToPartitionIds()
   {
     recordSupplierLock.lock();
     try {
-      final Set partitions = partitionIds.stream()
+      final Set<StreamPartition<PartitionIdType>> partitions = partitionIds.stream()

Review Comment:
   Thanks! :) 



##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java:
##########
@@ -199,8 +200,9 @@ public void markRunFinished()
     // Try to set the state to RUNNING or SUSPENDED. This will be rejected if we haven't had atLeastOneSuccessfulRun
     // (in favor of the more specific states for the initial run) and will instead trigger setting the state to an
     // unhealthy one if we are now over the error thresholds.
-    maybeSetState(healthySteadyState);
-
+    if (!BasicState.IDLE.equals(supervisorState)) {

Review Comment:
   Nit: Once `isIdle()` is moved to this class, use that here instead.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -743,6 +743,11 @@ public String getType()
   private volatile boolean lifecycleStarted = false;
   private final ServiceEmitter emitter;
 
+  // snapshots latest sequences from stream to be verified in next run cycle of inactive stream check
+  protected final ConcurrentHashMap<PartitionIdType, SequenceOffsetType> previousPartitionOffsetsSnapshot = new ConcurrentHashMap<>();

Review Comment:
   This should be private if it doesn't need to be accessed elsewhere.
   It can be an ordinary map as it will only ever be accessed from `runInternal`, which is invoked on a single thread.
   
   You could also consider renaming it to something like `previousSequencesFromStream` to correspond with `latestSequencesFromStream`.



##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java:
##########
@@ -240,6 +242,11 @@ protected boolean isStoreStackTrace()
     return supervisorStateManagerConfig.isStoreStackTrace();
   }
 
+  public boolean isEnableIdleBehavior()

Review Comment:
   Is this used anywhere? `SeekableStreamSupervisor` seems to be directly the checking the `SupervisorStateManagerConfig` itself for this field.



##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java:
##########
@@ -199,8 +200,9 @@ public void markRunFinished()
     // Try to set the state to RUNNING or SUSPENDED. This will be rejected if we haven't had atLeastOneSuccessfulRun

Review Comment:
   ```suggestion
       // If the supervisor is not IDLE, try to set the state to RUNNING or SUSPENDED.
       // This will be rejected if we haven't had atLeastOneSuccessfulRun
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3977,7 +4041,7 @@ protected void emitNoticesQueueSize()
 
   protected void emitLag()
   {
-    if (spec.isSuspended() || !stateManager.isSteadyState()) {
+    if (spec.isSuspended() || !stateManager.isSteadyState() && !isIdle()) {

Review Comment:
   For readability:
   ```suggestion
       if (spec.isSuspended() || (!stateManager.isSteadyState() && !isIdle())) {
   ```



##########
docs/configuration/index.md:
##########
@@ -1152,6 +1152,7 @@ There are additional configs for autoscaling (if it is enabled):
 |`druid.supervisor.taskUnhealthinessThreshold`|The number of consecutive task failures before the supervisor is considered unhealthy.|3|
 |`druid.supervisor.storeStackTrace`|Whether full stack traces of supervisor exceptions should be stored and returned by the supervisor `/status` endpoint.|false|
 |`druid.supervisor.maxStoredExceptionEvents`|The maximum number of exception events that can be returned through the supervisor `/status` endpoint.|`max(healthinessThreshold, unhealthinessThreshold)`|
+|`druid.supervisor.enableIdleBehaviour`|If enabled, Supervisor can become idle if there is no data on input stream/topic for some time. This time can be configured via `awaitStreamInactiveMillis`. It can also be turned off per Supervisor.|false|

Review Comment:
   ```suggestion
   |`druid.supervisor.enableIdleBehaviour`|Whether a supervisor should become idle and stop creating new tasks if there has been no new data on the input stream for a specified period of time.|false|
   ```
   
   Also add a separate entry for `awaitStreamInactiveMillis`.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -274,17 +269,19 @@ protected Map<Integer, Long> getPartitionTimeLag()
   @SuppressWarnings("SSBasedInspection")
   protected Map<Integer, Long> getRecordLagPerPartition(Map<Integer, Long> currentOffsets)
   {
-    return currentOffsets
+    if (latestSequenceFromStream == null) {
+      return ImmutableMap.of();
+    }
+
+    return latestSequenceFromStream
         .entrySet()
         .stream()
         .collect(
             Collectors.toMap(
                 Entry::getKey,
-                e -> latestSequenceFromStream != null
-                     && latestSequenceFromStream.get(e.getKey()) != null
-                     && e.getValue() != null
-                     ? latestSequenceFromStream.get(e.getKey()) - e.getValue()
-                     : Integer.MIN_VALUE
+                e -> e.getValue() != null

Review Comment:
   Looks much cleaner now.
   
   I hope this doesn't break any assumption where we expect `Integer.MIN_VALUE` for partitions that are missing in the `latestSequenceFromStream`, if any.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -249,15 +252,7 @@ protected Map<Integer, Long> getPartitionRecordLag()
     if (latestSequenceFromStream == null) {
       return null;
     }
-
-    if (!latestSequenceFromStream.keySet().equals(highestCurrentOffsets.keySet())) {

Review Comment:
   Super nit: Any particular reason to move this? The method seems small enough to have this here itself.



##########
docs/development/extensions-core/kafka-supervisor-reference.md:
##########
@@ -51,6 +51,8 @@ This topic contains configuration reference information for the Apache Kafka sup
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, Druid drops messages with timestamps earlier than *2016-01-01T11:00Z*. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
+|`enableIdleBehaviour`|If enabled, Kafka supervisor will become idle if there is no data on input stream/topic for some time. This can only be enabled if Overlord config `druid.supervisor.enableIdleBehaviour` is enabled.|no (default == false)|
+|`awaitStreamInactiveMillis`|Long|Minimum time interval to wait before a topic is considered inactive. (i.e. all existing data has been read from the stream and the topic is not getting new data).| no (default == 60000) |

Review Comment:
   ```suggestion
   |`awaitStreamInactiveMillis`|Long|Minimum time interval to wait after the topic has become inactive before marking the supervisor as idle. A topic is considered to be inactive if all existing data has been read from it and no new data is being published to it.| no (default == 60000) |
   ```



##########
docs/development/extensions-core/kafka-supervisor-reference.md:
##########
@@ -51,6 +51,8 @@ This topic contains configuration reference information for the Apache Kafka sup
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, Druid drops messages with timestamps earlier than *2016-01-01T11:00Z*. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
+|`enableIdleBehaviour`|If enabled, Kafka supervisor will become idle if there is no data on input stream/topic for some time. This can only be enabled if Overlord config `druid.supervisor.enableIdleBehaviour` is enabled.|no (default == false)|

Review Comment:
   ```suggestion
   |`enableIdleBehaviour`|Boolean|Configure Kafka supervisor to become idle and not create any more tasks if there has been no data on the input topic for some time. This takes effect only if overlord config `druid.supervisor.enableIdleBehaviour` is enabled.|no (default == false)|
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -743,6 +743,11 @@ public String getType()
   private volatile boolean lifecycleStarted = false;
   private final ServiceEmitter emitter;
 
+  // snapshots latest sequences from stream to be verified in next run cycle of inactive stream check
+  protected final ConcurrentHashMap<PartitionIdType, SequenceOffsetType> previousPartitionOffsetsSnapshot = new ConcurrentHashMap<>();
+  private long activeLastTime;

Review Comment:
   ```suggestion
     private long lastActiveTimeMillis;
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -743,6 +743,11 @@ public String getType()
   private volatile boolean lifecycleStarted = false;
   private final ServiceEmitter emitter;
 
+  // snapshots latest sequences from stream to be verified in next run cycle of inactive stream check
+  protected final ConcurrentHashMap<PartitionIdType, SequenceOffsetType> previousPartitionOffsetsSnapshot = new ConcurrentHashMap<>();
+  private long activeLastTime;
+  private long idleTime;

Review Comment:
   This should be a local variable and not a member field.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3648,25 +3704,33 @@ private void updateCurrentOffsets() throws InterruptedException, ExecutionExcept
 
   protected Map<PartitionIdType, SequenceOffsetType> getHighestCurrentOffsets()

Review Comment:
   Please add a javadoc to this method and maybe explain how this change affects the idle behaviour.



##########
docs/development/extensions-core/kafka-supervisor-operations.md:
##########
@@ -56,6 +56,7 @@ The list of `detailedState` values and their corresponding `state` mapping is as
 |DISCOVERING_INITIAL_TASKS (first iteration only)|RUNNING|The supervisor is discovering already-running tasks|
 |CREATING_TASKS (first iteration only)|RUNNING|The supervisor is creating tasks and discovering state|
 |RUNNING|RUNNING|The supervisor has started tasks and is waiting for taskDuration to elapse|
+|IDLE|IDLE|The supervisor is not creating tasks any longer since stream is idle|

Review Comment:
   ```suggestion
   |IDLE|IDLE|The supervisor is not creating tasks since the input stream has not received any new data|
   ```



##########
docs/development/extensions-core/kafka-supervisor-reference.md:
##########
@@ -51,6 +51,8 @@ This topic contains configuration reference information for the Apache Kafka sup
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, Druid drops messages with timestamps earlier than *2016-01-01T11:00Z*. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
+|`enableIdleBehaviour`|If enabled, Kafka supervisor will become idle if there is no data on input stream/topic for some time. This can only be enabled if Overlord config `druid.supervisor.enableIdleBehaviour` is enabled.|no (default == false)|

Review Comment:
   Shouldn't this default be `true`?
   If I have enabled the idle behaviour at the overlord, shouldn't all supervisors be allowed to go into idle state by default?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] tejaswini-imply commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
tejaswini-imply commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r991382877


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2502,11 +2511,21 @@ private boolean updatePartitionDataFromStream()
     return true;
   }
 
+  protected Map<PartitionIdType, SequenceOffsetType> getLatestSequences()
+  {
+    return new HashMap<>();
+  }
+
+  private boolean isIdle()

Review Comment:
   Thanks for the suggestion Kashif, Moving it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] kfaraz commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
kfaraz commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r991291096


##########
docs/development/extensions-core/kafka-supervisor-reference.md:
##########
@@ -51,6 +51,8 @@ This topic contains configuration reference information for the Apache Kafka sup
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, Druid drops messages with timestamps earlier than *2016-01-01T11:00Z*. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
+|`enableIdleBehaviour`|If enabled, Kafka supervisor will become idle if there is no data on input stream/topic for some time. This can only be enabled if Overlord config `druid.supervisor.enableIdleBehaviour` is enabled.|no (default == false)|

Review Comment:
   Shouldn't this be `true` by default?
   If I have enabled the idle behaviour at the overlord, shouldn't all supervisors be allowed to go into idle state by default?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
AmatyaAvadhanula commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r984219067


##########
docs/configuration/index.md:
##########
@@ -1152,6 +1152,7 @@ There are additional configs for autoscaling (if it is enabled):
 |`druid.supervisor.taskUnhealthinessThreshold`|The number of consecutive task failures before the supervisor is considered unhealthy.|3|
 |`druid.supervisor.storeStackTrace`|Whether full stack traces of supervisor exceptions should be stored and returned by the supervisor `/status` endpoint.|false|
 |`druid.supervisor.maxStoredExceptionEvents`|The maximum number of exception events that can be returned through the supervisor `/status` endpoint.|`max(healthinessThreshold, unhealthinessThreshold)`|
+|`druid.supervisor.enableIdleBehaviour`|Whether supervisor should be turned idle if stream is idle for configured time.|false|

Review Comment:
   Can this property be set independently per supervisor? (As part of the supervisor spec)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
AmatyaAvadhanula commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r989698197


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3977,14 +4048,25 @@ protected void emitNoticesQueueSize()
 
   protected void emitLag()
   {
-    if (spec.isSuspended() || !stateManager.isSteadyState()) {
+    log.info(stateManager.getSupervisorState().getBasicState().toString());

Review Comment:
   Could the format of these logs be improved? Also, should they be info logs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
AmatyaAvadhanula commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r989702382


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3648,9 +3715,10 @@ private void updateCurrentOffsets() throws InterruptedException, ExecutionExcept
 
   protected Map<PartitionIdType, SequenceOffsetType> getHighestCurrentOffsets()
   {
+    Map<PartitionIdType, SequenceOffsetType> offsetsFromMetadataStorage = getOffsetsFromMetadataStorage();

Review Comment:
   Please utilize this map instead of calling `getOffsetsFromMetadataStorage` subsequently in the code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] tejaswini-imply commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
tejaswini-imply commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r989782174


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3977,14 +4048,25 @@ protected void emitNoticesQueueSize()
 
   protected void emitLag()
   {
-    if (spec.isSuspended() || !stateManager.isSteadyState()) {
+    log.info(stateManager.getSupervisorState().getBasicState().toString());

Review Comment:
   I have used them for debugging, pushed them by mistake. I'll revert these.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] tejaswini-imply commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

Posted by GitBox <gi...@apache.org>.
tejaswini-imply commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r990974462


##########
docs/development/extensions-core/kafka-supervisor-reference.md:
##########
@@ -51,6 +51,8 @@ This topic contains configuration reference information for the Apache Kafka sup
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, Druid drops messages with timestamps earlier than *2016-01-01T11:00Z*. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
+|`enableIdleBehaviour`|Boolean|To turn on/off idle behavior per Supervisor.|no (default == false)|
+|`idleSupervisorForStreamIdleMillis`|Long|Minimum time interval to wait until stream is considered idle. (i.e. all existing data is caught up and no new data arrives).| no (default == 60000) |

Review Comment:
   Thanks, Abhishek. Updated the docs. This variable is now named `awaitStreamInactiveMillis`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org