You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by za...@apache.org on 2022/01/20 18:32:47 UTC

[druid] branch master updated: Close provisioner during HttpRemotetaskRunner LifecycleStop (#12176)

This is an automated email from the ASF dual-hosted git repository.

zachjsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 376d7c0  Close provisioner during HttpRemotetaskRunner LifecycleStop (#12176)
376d7c0 is described below

commit 376d7c069de3b87b21db262cca4b72ddc49e3b66
Author: zachjsh <za...@gmail.com>
AuthorDate: Thu Jan 20 13:32:08 2022 -0500

    Close provisioner during HttpRemotetaskRunner LifecycleStop (#12176)
    
    Fixed an issue where the provisionerService which can be used to spawn resources as needed is left running on a non-leader coordinator/overlord, after it is removed from leadership. Provisioning should only be done by the leader. To fix the issue, a call to stop the provisionerService was added to the stop() method of HttpRemoteTaskRunner class. The provisionerService was properly closed on other TaskRunner types.
---
 .../overlord/hrtr/HttpRemoteTaskRunner.java        |   3 +
 .../overlord/hrtr/HttpRemoteTaskRunnerTest.java    | 143 +++++++++++++++++++++
 2 files changed, 146 insertions(+)

diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 45af215..6acaa20 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -1342,6 +1342,9 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
 
     log.info("Stopping...");
 
+    if (provisioningService != null) {
+      provisioningService.close();
+    }
     pendingTasksExec.shutdownNow();
     workersSyncExec.shutdownNow();
     cleanupExec.shutdown();
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
index 5a67611..db52d3a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
@@ -42,6 +42,8 @@ import org.apache.druid.indexing.overlord.TaskRunnerListener;
 import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
 import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy;
+import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService;
+import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
 import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
 import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
 import org.apache.druid.indexing.worker.TaskAnnouncement;
@@ -78,6 +80,8 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.easymock.EasyMock.isA;
+
 /**
  *
  */
@@ -183,6 +187,145 @@ public class HttpRemoteTaskRunnerTest
   }
 
   /*
+  Simulates startup of Overlord. Overlord is then stopped and is expected to close down certain things.
+   */
+  @Test(timeout = 60_000L)
+  public void testFreshStartAndStop()
+  {
+    TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+    ProvisioningStrategy provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class);
+
+    DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+    ProvisioningService provisioningService = EasyMock.createNiceMock(ProvisioningService.class);
+    EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+            .andReturn(druidNodeDiscovery);
+    EasyMock.expect(provisioningStrategy.makeProvisioningService(isA(HttpRemoteTaskRunner.class)))
+            .andReturn(provisioningService);
+    provisioningService.close();
+    EasyMock.expectLastCall();
+    EasyMock.replay(druidNodeDiscoveryProvider, provisioningStrategy, provisioningService);
+
+    HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
+        TestHelper.makeJsonMapper(),
+        new HttpRemoteTaskRunnerConfig()
+        {
+          @Override
+          public int getPendingTasksRunnerNumThreads()
+          {
+            return 3;
+          }
+        },
+        EasyMock.createNiceMock(HttpClient.class),
+        DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
+        provisioningStrategy,
+        druidNodeDiscoveryProvider,
+        EasyMock.createNiceMock(TaskStorage.class),
+        EasyMock.createNiceMock(CuratorFramework.class),
+        new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
+    )
+    {
+      @Override
+      protected WorkerHolder createWorkerHolder(
+          ObjectMapper smileMapper,
+          HttpClient httpClient,
+          HttpRemoteTaskRunnerConfig config,
+          ScheduledExecutorService workersSyncExec,
+          WorkerHolder.Listener listener,
+          Worker worker,
+          List<TaskAnnouncement> knownAnnouncements
+      )
+      {
+        return HttpRemoteTaskRunnerTest.createWorkerHolder(
+            smileMapper,
+            httpClient,
+            config,
+            workersSyncExec,
+            listener,
+            worker,
+            ImmutableList.of(),
+            ImmutableList.of(),
+            ImmutableMap.of(),
+            new AtomicInteger(),
+            ImmutableSet.of()
+        );
+      }
+    };
+
+    taskRunner.start();
+    taskRunner.stop();
+    EasyMock.verify(druidNodeDiscoveryProvider, provisioningStrategy, provisioningService);
+  }
+
+  /*
+  Simulates startup of Overlord with no provisoner. Overlord is then stopped and is expected to close down certain
+  things.
+   */
+  @Test(timeout = 60_000L)
+  public void testFreshStartAndStopNoProvisioner()
+  {
+    TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+    ProvisioningStrategy provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class);
+
+    DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+    EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+            .andReturn(druidNodeDiscovery);
+    EasyMock.expect(provisioningStrategy.makeProvisioningService(isA(HttpRemoteTaskRunner.class)))
+            .andReturn(null);
+    EasyMock.expectLastCall();
+    EasyMock.replay(druidNodeDiscoveryProvider, provisioningStrategy);
+
+    HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
+        TestHelper.makeJsonMapper(),
+        new HttpRemoteTaskRunnerConfig()
+        {
+          @Override
+          public int getPendingTasksRunnerNumThreads()
+          {
+            return 3;
+          }
+        },
+        EasyMock.createNiceMock(HttpClient.class),
+        DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
+        provisioningStrategy,
+        druidNodeDiscoveryProvider,
+        EasyMock.createNiceMock(TaskStorage.class),
+        EasyMock.createNiceMock(CuratorFramework.class),
+        new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null)
+    )
+    {
+      @Override
+      protected WorkerHolder createWorkerHolder(
+          ObjectMapper smileMapper,
+          HttpClient httpClient,
+          HttpRemoteTaskRunnerConfig config,
+          ScheduledExecutorService workersSyncExec,
+          WorkerHolder.Listener listener,
+          Worker worker,
+          List<TaskAnnouncement> knownAnnouncements
+      )
+      {
+        return HttpRemoteTaskRunnerTest.createWorkerHolder(
+            smileMapper,
+            httpClient,
+            config,
+            workersSyncExec,
+            listener,
+            worker,
+            ImmutableList.of(),
+            ImmutableList.of(),
+            ImmutableMap.of(),
+            new AtomicInteger(),
+            ImmutableSet.of()
+        );
+      }
+    };
+
+    taskRunner.start();
+    taskRunner.stop();
+    EasyMock.verify(druidNodeDiscoveryProvider, provisioningStrategy);
+  }
+
+  /*
   Simulates one task not getting acknowledged to be running after assigning it to a worker. But, other tasks are
   successfully assigned to other worker and get completed.
    */

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org