You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by za...@apache.org on 2022/01/20 18:32:47 UTC
[druid] branch master updated: Close provisioner during HttpRemotetaskRunner LifecycleStop (#12176)
This is an automated email from the ASF dual-hosted git repository.
zachjsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 376d7c0 Close provisioner during HttpRemotetaskRunner LifecycleStop (#12176)
376d7c0 is described below
commit 376d7c069de3b87b21db262cca4b72ddc49e3b66
Author: zachjsh <za...@gmail.com>
AuthorDate: Thu Jan 20 13:32:08 2022 -0500
Close provisioner during HttpRemotetaskRunner LifecycleStop (#12176)
Fixed an issue where the provisionerService which can be used to spawn resources as needed is left running on a non-leader coordinator/overlord, after it is removed from leadership. Provisioning should only be done by the leader. To fix the issue, a call to stop the provisionerService was added to the stop() method of HttpRemoteTaskRunner class. The provisionerService was properly closed on other TaskRunner types.
---
.../overlord/hrtr/HttpRemoteTaskRunner.java | 3 +
.../overlord/hrtr/HttpRemoteTaskRunnerTest.java | 143 +++++++++++++++++++++
2 files changed, 146 insertions(+)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 45af215..6acaa20 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -1342,6 +1342,9 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
log.info("Stopping...");
+ if (provisioningService != null) {
+ provisioningService.close();
+ }
pendingTasksExec.shutdownNow();
workersSyncExec.shutdownNow();
cleanupExec.shutdown();
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
index 5a67611..db52d3a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
@@ -42,6 +42,8 @@ import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy;
+import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService;
+import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.worker.TaskAnnouncement;
@@ -78,6 +80,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import static org.easymock.EasyMock.isA;
+
/**
*
*/
@@ -183,6 +187,145 @@ public class HttpRemoteTaskRunnerTest
}
/*
+ Simulates startup of Overlord. Overlord is then stopped and is expected to close down certain things.
+ */
+ @Test(timeout = 60_000L)
+ public void testFreshStartAndStop()
+ {
+ TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+ ProvisioningStrategy provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class);
+
+ DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+ ProvisioningService provisioningService = EasyMock.createNiceMock(ProvisioningService.class);
+ EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+ .andReturn(druidNodeDiscovery);
+ EasyMock.expect(provisioningStrategy.makeProvisioningService(isA(HttpRemoteTaskRunner.class)))
+ .andReturn(provisioningService);
+ provisioningService.close();
+ EasyMock.expectLastCall();
+ EasyMock.replay(druidNodeDiscoveryProvider, provisioningStrategy, provisioningService);
+
+ HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
+ TestHelper.makeJsonMapper(),
+ new HttpRemoteTaskRunnerConfig()
+ {
+ @Override
+ public int getPendingTasksRunnerNumThreads()
+ {
+ return 3;
+ }
+ },
+ EasyMock.createNiceMock(HttpClient.class),
+ DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
+ provisioningStrategy,
+ druidNodeDiscoveryProvider,
+ EasyMock.createNiceMock(TaskStorage.class),
+ EasyMock.createNiceMock(CuratorFramework.class),
+ new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
+ )
+ {
+ @Override
+ protected WorkerHolder createWorkerHolder(
+ ObjectMapper smileMapper,
+ HttpClient httpClient,
+ HttpRemoteTaskRunnerConfig config,
+ ScheduledExecutorService workersSyncExec,
+ WorkerHolder.Listener listener,
+ Worker worker,
+ List<TaskAnnouncement> knownAnnouncements
+ )
+ {
+ return HttpRemoteTaskRunnerTest.createWorkerHolder(
+ smileMapper,
+ httpClient,
+ config,
+ workersSyncExec,
+ listener,
+ worker,
+ ImmutableList.of(),
+ ImmutableList.of(),
+ ImmutableMap.of(),
+ new AtomicInteger(),
+ ImmutableSet.of()
+ );
+ }
+ };
+
+ taskRunner.start();
+ taskRunner.stop();
+ EasyMock.verify(druidNodeDiscoveryProvider, provisioningStrategy, provisioningService);
+ }
+
+ /*
+ Simulates startup of Overlord with no provisoner. Overlord is then stopped and is expected to close down certain
+ things.
+ */
+ @Test(timeout = 60_000L)
+ public void testFreshStartAndStopNoProvisioner()
+ {
+ TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+ ProvisioningStrategy provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class);
+
+ DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+ EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+ .andReturn(druidNodeDiscovery);
+ EasyMock.expect(provisioningStrategy.makeProvisioningService(isA(HttpRemoteTaskRunner.class)))
+ .andReturn(null);
+ EasyMock.expectLastCall();
+ EasyMock.replay(druidNodeDiscoveryProvider, provisioningStrategy);
+
+ HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
+ TestHelper.makeJsonMapper(),
+ new HttpRemoteTaskRunnerConfig()
+ {
+ @Override
+ public int getPendingTasksRunnerNumThreads()
+ {
+ return 3;
+ }
+ },
+ EasyMock.createNiceMock(HttpClient.class),
+ DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
+ provisioningStrategy,
+ druidNodeDiscoveryProvider,
+ EasyMock.createNiceMock(TaskStorage.class),
+ EasyMock.createNiceMock(CuratorFramework.class),
+ new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
+ )
+ {
+ @Override
+ protected WorkerHolder createWorkerHolder(
+ ObjectMapper smileMapper,
+ HttpClient httpClient,
+ HttpRemoteTaskRunnerConfig config,
+ ScheduledExecutorService workersSyncExec,
+ WorkerHolder.Listener listener,
+ Worker worker,
+ List<TaskAnnouncement> knownAnnouncements
+ )
+ {
+ return HttpRemoteTaskRunnerTest.createWorkerHolder(
+ smileMapper,
+ httpClient,
+ config,
+ workersSyncExec,
+ listener,
+ worker,
+ ImmutableList.of(),
+ ImmutableList.of(),
+ ImmutableMap.of(),
+ new AtomicInteger(),
+ ImmutableSet.of()
+ );
+ }
+ };
+
+ taskRunner.start();
+ taskRunner.stop();
+ EasyMock.verify(druidNodeDiscoveryProvider, provisioningStrategy);
+ }
+
+ /*
Simulates one task not getting acknowledged to be running after assigning it to a worker. But, other tasks are
successfully assigned to other worker and get completed.
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org