You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2019/07/25 17:12:41 UTC

[incubator-druid] branch 0.15.1-incubating updated: fix issue with CuratorLoadQueuePeon shutting down executors it does not own (#8140) (#8151)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch 0.15.1-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.15.1-incubating by this push:
     new 6888cfb  fix issue with CuratorLoadQueuePeon shutting down executors it does not own (#8140) (#8151)
6888cfb is described below

commit 6888cfbfdb0b18123e5850e8f42dbe957323fb32
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Thu Jul 25 10:12:30 2019 -0700

    fix issue with CuratorLoadQueuePeon shutting down executors it does not own (#8140) (#8151)
    
    * fix issue with CuratorLoadQueuePeon shutting down executors it does not own
    
    * use lifecycled executors
    
    * maybe this
---
 .../server/coordinator/CuratorLoadQueuePeon.java   |  2 --
 .../coordinator/CuratorDruidCoordinatorTest.java   | 22 ++++++++++++++++++----
 .../java/org/apache/druid/cli/CliCoordinator.java  | 12 +++++++++---
 3 files changed, 27 insertions(+), 9 deletions(-)

diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
index a4d5d94..6a526e8 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
@@ -334,8 +334,6 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
 
     queuedSize.set(0L);
     failedAssignCount.set(0);
-    processingExecutor.shutdown();
-    callBackExecutor.shutdown();
   }
 
   private void entryRemoved(SegmentHolder segmentHolder, String path)
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
index 9cd0206..1cea98e 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
@@ -73,7 +73,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -117,6 +119,9 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
   private final ObjectMapper jsonMapper;
   private final ZkPathsConfig zkPathsConfig;
 
+  private ScheduledExecutorService peonExec = Execs.scheduledSingleThreaded("Master-PeonExec--%d");
+  private ExecutorService callbackExec = Execs.multiThreaded(4, "LoadQueuePeon-callbackexec--%d");
+
   public CuratorDruidCoordinatorTest()
   {
     jsonMapper = TestHelper.makeJsonMapper();
@@ -186,16 +191,16 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
         curator,
         SOURCE_LOAD_PATH,
         objectMapper,
-        Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_src_scheduled-%d"),
-        Execs.singleThreaded("coordinator_test_load_queue_peon_src-%d"),
+        peonExec,
+        callbackExec,
         druidCoordinatorConfig
     );
     destinationLoadQueuePeon = new CuratorLoadQueuePeon(
         curator,
         DESTINATION_LOAD_PATH,
         objectMapper,
-        Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_dest_scheduled-%d"),
-        Execs.singleThreaded("coordinator_test_load_queue_peon_dest-%d"),
+        peonExec,
+        callbackExec,
         druidCoordinatorConfig
     );
     druidNode = new DruidNode("hey", "what", false, 1234, null, true, false);
@@ -261,6 +266,15 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
   public final TestRule timeout = new DeadlockDetectingTimeout(60, TimeUnit.SECONDS);
 
   @Test
+  public void testStopDoesntKillPoolItDoesntOwn() throws Exception
+  {
+    setupView();
+    sourceLoadQueuePeon.stop();
+    Assert.assertFalse(peonExec.isShutdown());
+    Assert.assertFalse(callbackExec.isShutdown());
+  }
+
+  @Test
   public void testMoveSegment() throws Exception
   {
     segmentViewInitLatch = new CountDownLatch(1);
diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
index 98bb958..834b417 100644
--- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
@@ -47,7 +47,9 @@ import org.apache.druid.guice.annotations.CoordinatorIndexingServiceHelper;
 import org.apache.druid.guice.annotations.EscalatedGlobal;
 import org.apache.druid.guice.http.JettyHttpClientModule;
 import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.concurrent.ExecutorServices;
 import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.lifecycle.Lifecycle;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.metadata.MetadataRuleManager;
@@ -248,7 +250,8 @@ public class CliCoordinator extends ServerRunnable
               ScheduledExecutorFactory factory,
               DruidCoordinatorConfig config,
               @EscalatedGlobal HttpClient httpClient,
-              ZkPathsConfig zkPaths
+              ZkPathsConfig zkPaths,
+              Lifecycle lifecycle
           )
           {
             boolean useHttpLoadQueuePeon = "http".equalsIgnoreCase(config.getLoadQueuePeonType());
@@ -256,9 +259,12 @@ public class CliCoordinator extends ServerRunnable
             if (useHttpLoadQueuePeon) {
               callBackExec = Execs.singleThreaded("LoadQueuePeon-callbackexec--%d");
             } else {
-              callBackExec = Execs.multiThreaded(config.getNumCuratorCallBackThreads(), "LoadQueuePeon"
-                                                                                        + "-callbackexec--%d");
+              callBackExec = Execs.multiThreaded(
+                  config.getNumCuratorCallBackThreads(),
+                  "LoadQueuePeon-callbackexec--%d"
+              );
             }
+            ExecutorServices.manageLifecycle(lifecycle, callBackExec);
             return new LoadQueueTaskMaster(
                 curator,
                 jsonMapper,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org