You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2019/07/24 17:59:55 UTC
[incubator-druid] branch master updated: fix issue with
CuratorLoadQueuePeon shutting down executors it does not own (#8140)
This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 0695e48 fix issue with CuratorLoadQueuePeon shutting down executors it does not own (#8140)
0695e48 is described below
commit 0695e487e7cd93ba63d2c2355db47bd5ee3111b8
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Wed Jul 24 10:59:43 2019 -0700
fix issue with CuratorLoadQueuePeon shutting down executors it does not own (#8140)
* fix issue with CuratorLoadQueuePeon shutting down executors it does not own
* use lifecycled executors
* maybe this
---
.../server/coordinator/CuratorLoadQueuePeon.java | 2 --
.../coordinator/CuratorDruidCoordinatorTest.java | 22 ++++++++++++++++++----
.../java/org/apache/druid/cli/CliCoordinator.java | 12 +++++++++---
3 files changed, 27 insertions(+), 9 deletions(-)
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
index 10113fd..5fd9ac5 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
@@ -335,8 +335,6 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
queuedSize.set(0L);
failedAssignCount.set(0);
- processingExecutor.shutdown();
- callBackExecutor.shutdown();
}
private void entryRemoved(SegmentHolder segmentHolder, String path)
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
index eb3b25b..7e9659e 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
@@ -73,7 +73,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -119,6 +121,9 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
private final ObjectMapper jsonMapper;
private final ZkPathsConfig zkPathsConfig;
+ private ScheduledExecutorService peonExec = Execs.scheduledSingleThreaded("Master-PeonExec--%d");
+ private ExecutorService callbackExec = Execs.multiThreaded(4, "LoadQueuePeon-callbackexec--%d");
+
public CuratorDruidCoordinatorTest()
{
jsonMapper = TestHelper.makeJsonMapper();
@@ -187,16 +192,16 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
curator,
SOURCE_LOAD_PATH,
objectMapper,
- Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_src_scheduled-%d"),
- Execs.singleThreaded("coordinator_test_load_queue_peon_src-%d"),
+ peonExec,
+ callbackExec,
druidCoordinatorConfig
);
destinationLoadQueuePeon = new CuratorLoadQueuePeon(
curator,
DESTINATION_LOAD_PATH,
objectMapper,
- Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_dest_scheduled-%d"),
- Execs.singleThreaded("coordinator_test_load_queue_peon_dest-%d"),
+ peonExec,
+ callbackExec,
druidCoordinatorConfig
);
druidNode = new DruidNode("hey", "what", false, 1234, null, true, false);
@@ -262,6 +267,15 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
public final TestRule timeout = new DeadlockDetectingTimeout(60, TimeUnit.SECONDS);
@Test
+ public void testStopDoesntKillPoolItDoesntOwn() throws Exception
+ {
+ setupView();
+ sourceLoadQueuePeon.stop();
+ Assert.assertFalse(peonExec.isShutdown());
+ Assert.assertFalse(callbackExec.isShutdown());
+ }
+
+ @Test
public void testMoveSegment() throws Exception
{
segmentViewInitLatch = new CountDownLatch(1);
diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
index ecafcbf..118b911 100644
--- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
@@ -47,7 +47,9 @@ import org.apache.druid.guice.annotations.CoordinatorIndexingServiceHelper;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.http.JettyHttpClientModule;
import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.concurrent.ExecutorServices;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.metadata.MetadataRuleManager;
@@ -248,7 +250,8 @@ public class CliCoordinator extends ServerRunnable
ScheduledExecutorFactory factory,
DruidCoordinatorConfig config,
@EscalatedGlobal HttpClient httpClient,
- ZkPathsConfig zkPaths
+ ZkPathsConfig zkPaths,
+ Lifecycle lifecycle
)
{
boolean useHttpLoadQueuePeon = "http".equalsIgnoreCase(config.getLoadQueuePeonType());
@@ -256,9 +259,12 @@ public class CliCoordinator extends ServerRunnable
if (useHttpLoadQueuePeon) {
callBackExec = Execs.singleThreaded("LoadQueuePeon-callbackexec--%d");
} else {
- callBackExec = Execs.multiThreaded(config.getNumCuratorCallBackThreads(), "LoadQueuePeon"
- + "-callbackexec--%d");
+ callBackExec = Execs.multiThreaded(
+ config.getNumCuratorCallBackThreads(),
+ "LoadQueuePeon-callbackexec--%d"
+ );
}
+ ExecutorServices.manageLifecycle(lifecycle, callBackExec);
return new LoadQueueTaskMaster(
curator,
jsonMapper,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org