You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by kf...@apache.org on 2022/09/15 10:01:27 UTC

[druid] branch master updated: Faster fix for dangling tasks upon supervisor termination (#13072)

This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 1311e85f65 Faster fix for dangling tasks upon supervisor termination (#13072)
1311e85f65 is described below

commit 1311e85f6588016176c29c3eb37b09ddb9865f7d
Author: AmatyaAvadhanula <am...@imply.io>
AuthorDate: Thu Sep 15 15:31:14 2022 +0530

    Faster fix for dangling tasks upon supervisor termination (#13072)
    
    This commit fixes issues with delayed supervisor termination during certain transient states.
    Tasks can be created during supervisor termination and left behind since the cleanup may
    not consider these newly added tasks.
    
    #12178 added a lock for the entire process of task creation to prevent such dangling tasks.
    But it also introduced a deadlock scenario as follows:
    - An invocation of `runInternal` is in progress.
    - A `stop` request comes, acquires `stateChangeLock` and submit a `ShutdownNotice`
    - `runInternal` keeps waiting to acquire the `stateChangeLock`
    - `ShutdownNotice` remains stuck in the notice queue because `runInternal` is still running
    - After some timeout, the supervisor goes through a forced termination
    
    Fix:
     * `SeekableStreamSupervisor.runInternal` - do not try to acquire lock if supervisor is already stopping
     * `SupervisorStateManager.maybeSetState` - do not allow transitions from STOPPING state
---
 .../supervisor/SeekableStreamSupervisor.java       | 21 +++++++---
 .../SeekableStreamSupervisorStateManagerTest.java  | 26 ++++++++++++
 .../SeekableStreamSupervisorStateTest.java         | 46 ++++++++++++++++++++++
 .../supervisor/SupervisorStateManager.java         | 12 ++++--
 4 files changed, 97 insertions(+), 8 deletions(-)

diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 225cc09de7..1093584d11 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -1454,6 +1454,12 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
       checkCurrentTaskState();
 
+      // If supervisor is already stopping, don't contend for stateChangeLock since the block can be skipped
+      if (stateManager.getSupervisorState().getBasicState().equals(SupervisorStateManager.BasicState.STOPPING)) {
+        generateAndLogReport();
+        return;
+      }
+
       synchronized (stateChangeLock) {
         // if supervisor is not suspended, ensure required tasks are running
         // if suspended, ensure tasks have been requested to gracefully stop
@@ -1471,11 +1477,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         }
       }
 
-      if (log.isDebugEnabled()) {
-        log.debug(generateReport(true).toString());
-      } else {
-        log.info(generateReport(false).toString());
-      }
+      generateAndLogReport();
     }
     catch (Exception e) {
       stateManager.recordThrowableEvent(e);
@@ -1486,6 +1488,15 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
   }
 
+  private void generateAndLogReport()
+  {
+    if (log.isDebugEnabled()) {
+      log.debug(generateReport(true).toString());
+    } else {
+      log.info(generateReport(false).toString());
+    }
+  }
+
   private void possiblyRegisterListener()
   {
     if (listenerRegistered) {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateManagerTest.java
index f65fccf702..e19dc6354d 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateManagerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateManagerTest.java
@@ -98,6 +98,32 @@ public class SeekableStreamSupervisorStateManagerTest
     Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState().getBasicState());
   }
 
+  @Test
+  public void testStoppingPath()
+  {
+    Assert.assertEquals(BasicState.PENDING, stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.PENDING, stateManager.getSupervisorState().getBasicState());
+
+    stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM);
+    Assert.assertEquals(SeekableStreamState.CONNECTING_TO_STREAM, stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState().getBasicState());
+
+    stateManager.maybeSetState(SeekableStreamState.DISCOVERING_INITIAL_TASKS);
+    Assert.assertEquals(SeekableStreamState.DISCOVERING_INITIAL_TASKS, stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, stateManager.getSupervisorState().getBasicState());
+
+    // Emulates graceful shutdown
+    stateManager.maybeSetState(BasicState.STOPPING);
+
+    stateManager.maybeSetState(SeekableStreamState.CREATING_TASKS);
+    Assert.assertEquals(BasicState.STOPPING, stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.STOPPING, stateManager.getSupervisorState().getBasicState());
+
+    stateManager.markRunFinished();
+    Assert.assertEquals(BasicState.STOPPING, stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.STOPPING, stateManager.getSupervisorState().getBasicState());
+  }
+
   @Test
   public void testStreamFailureLostContact()
   {
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 82f3b58631..5566a5e95d 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -622,6 +622,52 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
     verifyAll();
   }
 
+  @Test
+  public void testStoppingGracefully() throws Exception
+  {
+    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+    EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
+    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+    EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
+
+    taskRunner.unregisterListener("testSupervisorId");
+    indexTaskClient.close();
+    recordSupplier.close();
+
+    replayAll();
+
+    SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
+
+    supervisor.start();
+
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState());
+    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
+    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.runInternal();
+
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
+    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
+    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());
+
+    supervisor.stop(true);
+
+    Assert.assertTrue(supervisor.stateManager.isHealthy());
+    Assert.assertEquals(BasicState.STOPPING, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.STOPPING, supervisor.stateManager.getSupervisorState().getBasicState());
+
+    // Subsequent run after graceful shutdown has begun
+    supervisor.runInternal();
+    Assert.assertEquals(BasicState.STOPPING, supervisor.stateManager.getSupervisorState());
+    Assert.assertEquals(BasicState.STOPPING, supervisor.stateManager.getSupervisorState().getBasicState());
+
+    verifyAll();
+  }
+
 
   @Test
   public void testEmitBothLag() throws Exception
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
index e1b1a5fb23..346fcd20de 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java
@@ -120,10 +120,16 @@ public class SupervisorStateManager
 
   /**
    * Certain states are only valid if the supervisor hasn't had a successful iteration. This method checks if there's
-   * been at least one successful iteration, and if applicable sets supervisor state to an appropriate new state.
+   * been at least one successful iteration, and if applicable, sets supervisor state to an appropriate new state.
+   * A STOPPING supervisor cannot transition to any other state as this state is final.
+   * This method must be thread-safe as multiple threads trying to update may lead to an invalid state.
    */
-  public void maybeSetState(State proposedState)
+  public synchronized void maybeSetState(State proposedState)
   {
+    if (BasicState.STOPPING.equals(this.supervisorState)) {
+      return;
+    }
+
     // if we're over our unhealthiness threshold, set the state to the appropriate unhealthy state
     if (consecutiveFailedRuns >= supervisorStateManagerConfig.getUnhealthinessThreshold()) {
       hasHitUnhealthinessThreshold = true;
@@ -204,7 +210,7 @@ public class SupervisorStateManager
     return new ArrayList<>(recentEventsQueue);
   }
 
-  public State getSupervisorState()
+  public synchronized State getSupervisorState()
   {
     return supervisorState;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org