You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2022/10/10 13:42:36 UTC

[GitHub] [druid] kfaraz commented on a diff in pull request #13144: Adds Idle feature to `SeekableStreamSupervisor` for inactive stream

kfaraz commented on code in PR #13144:
URL: https://github.com/apache/druid/pull/13144#discussion_r991179466


##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java:
##########
@@ -154,7 +155,7 @@ public synchronized void maybeSetState(State proposedState)
       return;
     }
 
-    // if we're trying to switch to a healthy steady state (i.e. RUNNING or SUSPENDED) but haven't had a successful run
+    // if we're trying to switch to a healthy steady state (i.e. RUNNING or SUSPENDED) or IDLE state but haven't had a successful run

Review Comment:
   I don't think this change is applicable since IDLE doesn't count as a "healthy steady state". IIUC, the `healthySteadyState` can only be RUNNING or SUSPENDED, which is determined while creating this `SupervisorStateManager` instance.



##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerConfig.java:
##########
@@ -46,6 +46,10 @@
   @JsonProperty
   private int maxStoredExceptionEvents = Math.max(unhealthinessThreshold, healthinessThreshold);
 
+  //

Review Comment:
   Nit: missing comment?



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -23,6 +23,7 @@
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.curator.shaded.com.google.common.collect.ImmutableMap;

Review Comment:
   Adding a dependency just for `ImmutableMap` seems overkill. You can use `Collections.emptyMap()` instead.



##########
.travis.yml:
##########
@@ -506,13 +506,13 @@ jobs:
       stage: Tests - phase 2
       jdk: openjdk8
       services: *integration_test_services
-      env: TESTNG_GROUPS='-Dgroups=kafka-index' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
+      env: TESTNG_GROUPS='-Dgroups=kafka-index' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager' OVERRIDE_CONFIG_PATH='./environment-configs/test-groups/enable-idle-behaviour'

Review Comment:
   Nit: It is preferable to have the name of the override config the same as the test group (`kafka-index` in this case). Otherwise, we would end up having separate config files for every feature.
   
   You need not create another one for `kafka-transactional-index` though and can reuse the same one.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2502,11 +2511,21 @@ private boolean updatePartitionDataFromStream()
     return true;
   }
 
+  protected Map<PartitionIdType, SequenceOffsetType> getLatestSequences()

Review Comment:
   Please add a javadoc to this method.
   Should this be renamed to `getLatestSequencesFromStream()`?



##########
integration-tests/src/test/resources/stream/data/supervisor_with_idle_behaviour_enabled_spec_template.json:
##########
@@ -0,0 +1,60 @@
+{

Review Comment:
   This spec seems pretty generic. Do we really need to add a new one for the new test or can we reuse any existing one?



##########
extensions-core/kafka-indexing-service/pom.xml:
##########
@@ -134,6 +134,11 @@
       <artifactId>validation-api</artifactId>
       <scope>provided</scope>
     </dependency>
+    <dependency>

Review Comment:
   It doesn't seem like this dependency is needed.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2502,11 +2511,21 @@ private boolean updatePartitionDataFromStream()
     return true;
   }
 
+  protected Map<PartitionIdType, SequenceOffsetType> getLatestSequences()
+  {
+    return new HashMap<>();
+  }
+
+  private boolean isIdle()

Review Comment:
   This method should be exposed by the `stateManager` itself, since the state is tracked there anyway and we refer to it for other purposes such as checking `isSteadyState()` or `isHealthy()`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -2502,11 +2511,21 @@ private boolean updatePartitionDataFromStream()
     return true;
   }
 
+  protected Map<PartitionIdType, SequenceOffsetType> getLatestSequences()
+  {
+    return new HashMap<>();
+  }
+
+  private boolean isIdle()
+  {
+    return SupervisorStateManager.BasicState.IDLE.equals(getState());
+  }
+
   private void assignRecordSupplierToPartitionIds()
   {
     recordSupplierLock.lock();
     try {
-      final Set partitions = partitionIds.stream()
+      final Set<StreamPartition<PartitionIdType>> partitions = partitionIds.stream()

Review Comment:
   Thanks! :) 



##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java:
##########
@@ -199,8 +200,9 @@ public void markRunFinished()
     // Try to set the state to RUNNING or SUSPENDED. This will be rejected if we haven't had atLeastOneSuccessfulRun
     // (in favor of the more specific states for the initial run) and will instead trigger setting the state to an
     // unhealthy one if we are now over the error thresholds.
-    maybeSetState(healthySteadyState);
-
+    if (!BasicState.IDLE.equals(supervisorState)) {

Review Comment:
   Nit: Once `isIdle()` is moved to this class, use that here instead.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -743,6 +743,11 @@ public String getType()
   private volatile boolean lifecycleStarted = false;
   private final ServiceEmitter emitter;
 
+  // snapshots latest sequences from stream to be verified in next run cycle of inactive stream check
+  protected final ConcurrentHashMap<PartitionIdType, SequenceOffsetType> previousPartitionOffsetsSnapshot = new ConcurrentHashMap<>();

Review Comment:
   This should be private if it doesn't need to be accessed elsewhere.
   It can be an ordinary map as it will only ever be accessed from `runInternal`, which is invoked on a single thread.
   
   You could also consider renaming it to something like `previousSequencesFromStream` to correspond with `latestSequencesFromStream`.



##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java:
##########
@@ -240,6 +242,11 @@ protected boolean isStoreStackTrace()
     return supervisorStateManagerConfig.isStoreStackTrace();
   }
 
+  public boolean isEnableIdleBehavior()

Review Comment:
   Is this used anywhere? `SeekableStreamSupervisor` seems to be directly the checking the `SupervisorStateManagerConfig` itself for this field.



##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java:
##########
@@ -199,8 +200,9 @@ public void markRunFinished()
     // Try to set the state to RUNNING or SUSPENDED. This will be rejected if we haven't had atLeastOneSuccessfulRun

Review Comment:
   ```suggestion
       // If the supervisor is not IDLE, try to set the state to RUNNING or SUSPENDED.
       // This will be rejected if we haven't had atLeastOneSuccessfulRun
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3977,7 +4041,7 @@ protected void emitNoticesQueueSize()
 
   protected void emitLag()
   {
-    if (spec.isSuspended() || !stateManager.isSteadyState()) {
+    if (spec.isSuspended() || !stateManager.isSteadyState() && !isIdle()) {

Review Comment:
   For readability:
   ```suggestion
       if (spec.isSuspended() || (!stateManager.isSteadyState() && !isIdle())) {
   ```



##########
docs/configuration/index.md:
##########
@@ -1152,6 +1152,7 @@ There are additional configs for autoscaling (if it is enabled):
 |`druid.supervisor.taskUnhealthinessThreshold`|The number of consecutive task failures before the supervisor is considered unhealthy.|3|
 |`druid.supervisor.storeStackTrace`|Whether full stack traces of supervisor exceptions should be stored and returned by the supervisor `/status` endpoint.|false|
 |`druid.supervisor.maxStoredExceptionEvents`|The maximum number of exception events that can be returned through the supervisor `/status` endpoint.|`max(healthinessThreshold, unhealthinessThreshold)`|
+|`druid.supervisor.enableIdleBehaviour`|If enabled, Supervisor can become idle if there is no data on input stream/topic for some time. This time can be configured via `awaitStreamInactiveMillis`. It can also be turned off per Supervisor.|false|

Review Comment:
   ```suggestion
   |`druid.supervisor.enableIdleBehaviour`|Whether a supervisor should become idle and stop creating new tasks if there has been no new data on the input stream for a specified period of time.|false|
   ```
   
   Also add a separate entry for `awaitStreamInactiveMillis`.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -274,17 +269,19 @@ protected Map<Integer, Long> getPartitionTimeLag()
   @SuppressWarnings("SSBasedInspection")
   protected Map<Integer, Long> getRecordLagPerPartition(Map<Integer, Long> currentOffsets)
   {
-    return currentOffsets
+    if (latestSequenceFromStream == null) {
+      return ImmutableMap.of();
+    }
+
+    return latestSequenceFromStream
         .entrySet()
         .stream()
         .collect(
             Collectors.toMap(
                 Entry::getKey,
-                e -> latestSequenceFromStream != null
-                     && latestSequenceFromStream.get(e.getKey()) != null
-                     && e.getValue() != null
-                     ? latestSequenceFromStream.get(e.getKey()) - e.getValue()
-                     : Integer.MIN_VALUE
+                e -> e.getValue() != null

Review Comment:
   Looks much cleaner now.
   
   I hope this doesn't break any assumption where we expect `Integer.MIN_VALUE` for partitions that are missing in the `latestSequenceFromStream`, if any.



##########
extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java:
##########
@@ -249,15 +252,7 @@ protected Map<Integer, Long> getPartitionRecordLag()
     if (latestSequenceFromStream == null) {
       return null;
     }
-
-    if (!latestSequenceFromStream.keySet().equals(highestCurrentOffsets.keySet())) {

Review Comment:
   Super nit: Any particular reason to move this? The method seems small enough to have this here itself.



##########
docs/development/extensions-core/kafka-supervisor-reference.md:
##########
@@ -51,6 +51,8 @@ This topic contains configuration reference information for the Apache Kafka sup
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, Druid drops messages with timestamps earlier than *2016-01-01T11:00Z*. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
+|`enableIdleBehaviour`|If enabled, Kafka supervisor will become idle if there is no data on input stream/topic for some time. This can only be enabled if Overlord config `druid.supervisor.enableIdleBehaviour` is enabled.|no (default == false)|
+|`awaitStreamInactiveMillis`|Long|Minimum time interval to wait before a topic is considered inactive. (i.e. all existing data has been read from the stream and the topic is not getting new data).| no (default == 60000) |

Review Comment:
   ```suggestion
   |`awaitStreamInactiveMillis`|Long|Minimum time interval to wait after the topic has become inactive before marking the supervisor as idle. A topic is considered to be inactive if all existing data has been read from it and no new data is being published to it.| no (default == 60000) |
   ```



##########
docs/development/extensions-core/kafka-supervisor-reference.md:
##########
@@ -51,6 +51,8 @@ This topic contains configuration reference information for the Apache Kafka sup
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, Druid drops messages with timestamps earlier than *2016-01-01T11:00Z*. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
+|`enableIdleBehaviour`|If enabled, Kafka supervisor will become idle if there is no data on input stream/topic for some time. This can only be enabled if Overlord config `druid.supervisor.enableIdleBehaviour` is enabled.|no (default == false)|

Review Comment:
   ```suggestion
   |`enableIdleBehaviour`|Boolean|Configure Kafka supervisor to become idle and not create any more tasks if there has been no data on the input topic for some time. This takes effect only if overlord config `druid.supervisor.enableIdleBehaviour` is enabled.|no (default == false)|
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -743,6 +743,11 @@ public String getType()
   private volatile boolean lifecycleStarted = false;
   private final ServiceEmitter emitter;
 
+  // snapshots latest sequences from stream to be verified in next run cycle of inactive stream check
+  protected final ConcurrentHashMap<PartitionIdType, SequenceOffsetType> previousPartitionOffsetsSnapshot = new ConcurrentHashMap<>();
+  private long activeLastTime;

Review Comment:
   ```suggestion
     private long lastActiveTimeMillis;
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -743,6 +743,11 @@ public String getType()
   private volatile boolean lifecycleStarted = false;
   private final ServiceEmitter emitter;
 
+  // snapshots latest sequences from stream to be verified in next run cycle of inactive stream check
+  protected final ConcurrentHashMap<PartitionIdType, SequenceOffsetType> previousPartitionOffsetsSnapshot = new ConcurrentHashMap<>();
+  private long activeLastTime;
+  private long idleTime;

Review Comment:
   This should be a local variable and not a member field.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3648,25 +3704,33 @@ private void updateCurrentOffsets() throws InterruptedException, ExecutionExcept
 
   protected Map<PartitionIdType, SequenceOffsetType> getHighestCurrentOffsets()

Review Comment:
   Please add a javadoc to this method and maybe explain how this change affects the idle behaviour.



##########
docs/development/extensions-core/kafka-supervisor-operations.md:
##########
@@ -56,6 +56,7 @@ The list of `detailedState` values and their corresponding `state` mapping is as
 |DISCOVERING_INITIAL_TASKS (first iteration only)|RUNNING|The supervisor is discovering already-running tasks|
 |CREATING_TASKS (first iteration only)|RUNNING|The supervisor is creating tasks and discovering state|
 |RUNNING|RUNNING|The supervisor has started tasks and is waiting for taskDuration to elapse|
+|IDLE|IDLE|The supervisor is not creating tasks any longer since stream is idle|

Review Comment:
   ```suggestion
   |IDLE|IDLE|The supervisor is not creating tasks since the input stream has not received any new data|
   ```



##########
docs/development/extensions-core/kafka-supervisor-reference.md:
##########
@@ -51,6 +51,8 @@ This topic contains configuration reference information for the Apache Kafka sup
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, Druid drops messages with timestamps earlier than *2016-01-01T11:00Z*. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
+|`enableIdleBehaviour`|If enabled, Kafka supervisor will become idle if there is no data on input stream/topic for some time. This can only be enabled if Overlord config `druid.supervisor.enableIdleBehaviour` is enabled.|no (default == false)|

Review Comment:
   Shouldn't this default be `true`?
   If I have enabled the idle behaviour at the overlord, shouldn't all supervisors be allowed to go into idle state by default?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org