You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2018/12/19 04:32:36 UTC

[incubator-druid] branch master updated: Broker: Await initialization before finishing startup. (#6742)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a09cde  Broker: Await initialization before finishing startup. (#6742)
7a09cde is described below

commit 7a09cde4de1953eee75c5033e863cfde8f94d6c1
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Tue Dec 18 20:32:31 2018 -0800

    Broker: Await initialization before finishing startup. (#6742)
    
    * Broker: Await initialization before finishing startup.
    
    In particular, hold off on announcing the service and starting the
    HTTP server until the server view and SQL metadata cache are finished
    initializing. This closes a window of time where a Broker could return
    partial results shortly after startup.
    
    As part of this, some simplification of server-lifecycle service
    announcements. This helps ensure that the two different kinds of
    announcements we do (legacy and new-style) stay in sync.
    
    * Remove unused imports.
    
    * Fix NPE in ServerRunnable.
---
 docs/content/configuration/index.md                |  2 +
 .../druid/client/BrokerSegmentWatcherConfig.java   |  8 ++
 .../org/apache/druid/client/BrokerServerView.java  | 52 ++++++++++---
 .../server/coordination/broker/DruidBroker.java    | 90 ---------------------
 .../main/java/org/apache/druid/cli/CliBroker.java  | 18 ++---
 .../java/org/apache/druid/cli/CliCoordinator.java  | 13 ++--
 .../java/org/apache/druid/cli/CliHistorical.java   | 18 ++---
 .../org/apache/druid/cli/CliMiddleManager.java     | 15 ++--
 .../java/org/apache/druid/cli/CliOverlord.java     | 21 ++---
 .../main/java/org/apache/druid/cli/CliRouter.java  | 10 +--
 .../java/org/apache/druid/cli/ServerRunnable.java  | 91 +++++++++++++++++++++-
 .../druid/sql/calcite/planner/PlannerConfig.java   | 12 +++
 .../druid/sql/calcite/schema/DruidSchema.java      | 17 ++--
 .../druid/sql/calcite/util/CalciteTests.java       |  2 +-
 14 files changed, 207 insertions(+), 162 deletions(-)

diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md
index 89cce22..92942a0 100644
--- a/docs/content/configuration/index.md
+++ b/docs/content/configuration/index.md
@@ -1258,6 +1258,7 @@ The Druid SQL server is configured through the following properties on the broke
 |`druid.sql.avatica.maxStatementsPerConnection`|Maximum number of simultaneous open statements per Avatica client connection.|1|
 |`druid.sql.avatica.connectionIdleTimeout`|Avatica client connection idle timeout.|PT5M|
 |`druid.sql.http.enable`|Whether to enable JSON over HTTP querying at `/druid/v2/sql/`.|true|
+|`druid.sql.planner.awaitInitializationOnStart`|Boolean|Whether the the Broker will wait for its SQL metadata view to fully initialize before starting up. If set to 'true', the Broker's HTTP server will not start up, and the Broker will not announce itself as available, until the server view is initialized. See also `druid.broker.segment.awaitInitializationOnStart`, a related setting.|true|
 |`druid.sql.planner.maxQueryCount`|Maximum number of queries to issue, including nested queries. Set to 1 to disable sub-queries, or set to 0 for unlimited.|8|
 |`druid.sql.planner.maxSemiJoinRowsInMemory`|Maximum number of rows to keep in memory for executing two-stage semi-join queries like `SELECT * FROM Employee WHERE DeptName IN (SELECT DeptName FROM Dept)`.|100000|
 |`druid.sql.planner.maxTopNLimit`|Maximum threshold for a [TopN query](../querying/topnquery.html). Higher limits will be planned as [GroupBy queries](../querying/groupbyquery.html) instead.|100000|
@@ -1291,6 +1292,7 @@ See [cache configuration](#cache-configuration) for how to configure cache setti
 |`druid.announcer.type`|batch or http|Segment discovery method to use. "http" enables discovering segments using HTTP instead of zookeeper.|batch|
 |`druid.broker.segment.watchedTiers`|List of strings|Broker watches the segment announcements from nodes serving segments to build cache of which node is serving which segments, this configuration allows to only consider segments being served from a whitelist of tiers. By default, Broker would consider all tiers. This can be used to partition your dataSources in specific historical tiers and configure brokers in partitions so that they are only queryable for specific dataSources.|none|
 |`druid.broker.segment.watchedDataSources`|List of strings|Broker watches the segment announcements from nodes serving segments to build cache of which node is serving which segments, this configuration allows to only consider segments being served from a whitelist of dataSources. By default, Broker would consider all datasources. This can be used to configure brokers in partitions so that they are only queryable for specific dataSources.|none|
+|`druid.broker.segment.awaitInitializationOnStart`|Boolean|Whether the the Broker will wait for its view of segments to fully initialize before starting up. If set to 'true', the Broker's HTTP server will not start up, and the Broker will not announce itself as available, until the server view is initialized. See also `druid.sql.planner.awaitInitializationOnStart`, a related setting.|true|
 
 ## Historical
 
diff --git a/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java b/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java
index 9a0fc54..0abb456 100644
--- a/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java
+++ b/server/src/main/java/org/apache/druid/client/BrokerSegmentWatcherConfig.java
@@ -33,6 +33,9 @@ public class BrokerSegmentWatcherConfig
   @JsonProperty
   private Set<String> watchedDataSources = null;
 
+  @JsonProperty
+  private boolean awaitInitializationOnStart = true;
+
   public Set<String> getWatchedTiers()
   {
     return watchedTiers;
@@ -42,4 +45,9 @@ public class BrokerSegmentWatcherConfig
   {
     return watchedDataSources;
   }
+
+  public boolean isAwaitInitializationOnStart()
+  {
+    return awaitInitializationOnStart;
+  }
 }
diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
index 574cdc8..0b28a1b 100644
--- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java
+++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
@@ -27,10 +27,12 @@ import com.google.inject.Inject;
 import org.apache.druid.client.selector.QueryableDruidServer;
 import org.apache.druid.client.selector.ServerSelector;
 import org.apache.druid.client.selector.TierSelectorStrategy;
+import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.guice.annotations.EscalatedClient;
 import org.apache.druid.guice.annotations.Smile;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.http.client.HttpClient;
@@ -49,6 +51,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
@@ -56,6 +59,7 @@ import java.util.stream.Collectors;
 
 /**
  */
+@ManageLifecycle
 public class BrokerServerView implements TimelineServerView
 {
   private static final Logger log = new Logger(BrokerServerView.class);
@@ -74,19 +78,20 @@ public class BrokerServerView implements TimelineServerView
   private final FilteredServerInventoryView baseView;
   private final TierSelectorStrategy tierSelectorStrategy;
   private final ServiceEmitter emitter;
+  private final BrokerSegmentWatcherConfig segmentWatcherConfig;
   private final Predicate<Pair<DruidServerMetadata, DataSegment>> segmentFilter;
 
-  private volatile boolean initialized = false;
+  private final CountDownLatch initialized = new CountDownLatch(1);
 
   @Inject
   public BrokerServerView(
-      QueryToolChestWarehouse warehouse,
-      QueryWatcher queryWatcher,
-      @Smile ObjectMapper smileMapper,
-      @EscalatedClient HttpClient httpClient,
-      FilteredServerInventoryView baseView,
-      TierSelectorStrategy tierSelectorStrategy,
-      ServiceEmitter emitter,
+      final QueryToolChestWarehouse warehouse,
+      final QueryWatcher queryWatcher,
+      final @Smile ObjectMapper smileMapper,
+      final @EscalatedClient HttpClient httpClient,
+      final FilteredServerInventoryView baseView,
+      final TierSelectorStrategy tierSelectorStrategy,
+      final ServiceEmitter emitter,
       final BrokerSegmentWatcherConfig segmentWatcherConfig
   )
   {
@@ -97,6 +102,7 @@ public class BrokerServerView implements TimelineServerView
     this.baseView = baseView;
     this.tierSelectorStrategy = tierSelectorStrategy;
     this.emitter = emitter;
+    this.segmentWatcherConfig = segmentWatcherConfig;
     this.clients = new ConcurrentHashMap<>();
     this.selectors = new HashMap<>();
     this.timelines = new HashMap<>();
@@ -143,7 +149,7 @@ public class BrokerServerView implements TimelineServerView
           @Override
           public CallbackAction segmentViewInitialized()
           {
-            initialized = true;
+            initialized.countDown();
             runTimelineCallbacks(TimelineCallback::timelineInitialized);
             return ServerView.CallbackAction.CONTINUE;
           }
@@ -165,9 +171,25 @@ public class BrokerServerView implements TimelineServerView
     );
   }
 
+  @LifecycleStart
+  public void start() throws InterruptedException
+  {
+    if (segmentWatcherConfig.isAwaitInitializationOnStart()) {
+      final long startMillis = System.currentTimeMillis();
+      log.info("%s waiting for initialization.", getClass().getSimpleName());
+      awaitInitialization();
+      log.info("%s initialized in [%,d] ms.", getClass().getSimpleName(), System.currentTimeMillis() - startMillis);
+    }
+  }
+
   public boolean isInitialized()
   {
-    return initialized;
+    return initialized.getCount() == 0;
+  }
+
+  public void awaitInitialization() throws InterruptedException
+  {
+    initialized.await();
   }
 
   private QueryableDruidServer addServer(DruidServer server)
@@ -183,7 +205,15 @@ public class BrokerServerView implements TimelineServerView
 
   private DirectDruidClient makeDirectClient(DruidServer server)
   {
-    return new DirectDruidClient(warehouse, queryWatcher, smileMapper, httpClient, server.getScheme(), server.getHost(), emitter);
+    return new DirectDruidClient(
+        warehouse,
+        queryWatcher,
+        smileMapper,
+        httpClient,
+        server.getScheme(),
+        server.getHost(),
+        emitter
+    );
   }
 
   private QueryableDruidServer removeServer(DruidServer server)
diff --git a/server/src/main/java/org/apache/druid/server/coordination/broker/DruidBroker.java b/server/src/main/java/org/apache/druid/server/coordination/broker/DruidBroker.java
deleted file mode 100644
index 41e6d9a..0000000
--- a/server/src/main/java/org/apache/druid/server/coordination/broker/DruidBroker.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.server.coordination.broker;
-
-import com.google.common.base.Predicates;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.inject.Inject;
-import org.apache.druid.client.FilteredServerInventoryView;
-import org.apache.druid.client.ServerView;
-import org.apache.druid.curator.discovery.ServiceAnnouncer;
-import org.apache.druid.guice.ManageLifecycle;
-import org.apache.druid.guice.annotations.Self;
-import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
-import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
-import org.apache.druid.server.DruidNode;
-
-@ManageLifecycle
-public class DruidBroker
-{
-  private final DruidNode self;
-  private final ServiceAnnouncer serviceAnnouncer;
-
-  private volatile boolean started = false;
-
-  @Inject
-  public DruidBroker(
-      final FilteredServerInventoryView serverInventoryView,
-      final @Self DruidNode self,
-      final ServiceAnnouncer serviceAnnouncer
-  )
-  {
-    this.self = self;
-    this.serviceAnnouncer = serviceAnnouncer;
-
-    serverInventoryView.registerSegmentCallback(
-        MoreExecutors.sameThreadExecutor(),
-        new ServerView.BaseSegmentCallback()
-        {
-          @Override
-          public ServerView.CallbackAction segmentViewInitialized()
-          {
-            serviceAnnouncer.announce(self);
-            return ServerView.CallbackAction.UNREGISTER;
-          }
-        },
-        // We are not interested in any segment callbacks except view initialization
-        Predicates.alwaysFalse()
-    );
-  }
-
-  @LifecycleStart
-  public void start()
-  {
-    synchronized (self) {
-      if (started) {
-        return;
-      }
-      started = true;
-    }
-  }
-
-  @LifecycleStop
-  public void stop()
-  {
-    synchronized (self) {
-      if (!started) {
-        return;
-      }
-      serviceAnnouncer.unannounce(self);
-      started = false;
-    }
-  }
-}
diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java b/services/src/main/java/org/apache/druid/cli/CliBroker.java
index 009a6fe..982223a 100644
--- a/services/src/main/java/org/apache/druid/cli/CliBroker.java
+++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java
@@ -20,7 +20,6 @@
 package org.apache.druid.cli;
 
 import com.google.common.collect.ImmutableList;
-import com.google.inject.Key;
 import com.google.inject.Module;
 import com.google.inject.name.Names;
 import io.airlift.airline.Command;
@@ -51,7 +50,6 @@ import org.apache.druid.query.lookup.LookupModule;
 import org.apache.druid.server.BrokerQueryResource;
 import org.apache.druid.server.ClientInfoResource;
 import org.apache.druid.server.ClientQuerySegmentWalker;
-import org.apache.druid.server.coordination.broker.DruidBroker;
 import org.apache.druid.server.http.BrokerResource;
 import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
 import org.apache.druid.server.metrics.MetricsModule;
@@ -94,7 +92,7 @@ public class CliBroker extends ServerRunnable
           binder.bindConstant().annotatedWith(PruneLoadSpec.class).to(true);
 
           binder.bind(CachingClusteredClient.class).in(LazySingleton.class);
-          binder.bind(BrokerServerView.class).in(LazySingleton.class);
+          LifecycleModule.register(binder, BrokerServerView.class);
           binder.bind(TimelineServerView.class).to(BrokerServerView.class).in(LazySingleton.class);
 
           JsonConfigProvider.bind(binder, "druid.broker.cache", CacheConfig.class);
@@ -117,7 +115,6 @@ public class CliBroker extends ServerRunnable
           Jerseys.addResource(binder, ClientInfoResource.class);
 
           LifecycleModule.register(binder, BrokerQueryResource.class);
-          LifecycleModule.register(binder, DruidBroker.class);
 
           Jerseys.addResource(binder, HttpServerInventoryViewResource.class);
 
@@ -125,11 +122,14 @@ public class CliBroker extends ServerRunnable
 
           LifecycleModule.register(binder, Server.class);
 
-          binder
-              .bind(DiscoverySideEffectsProvider.Child.class)
-              .toProvider(new DiscoverySideEffectsProvider(NodeType.BROKER, ImmutableList.of(LookupNodeService.class)))
-              .in(LazySingleton.class);
-          LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class));
+
+          bindAnnouncer(
+              binder,
+              DiscoverySideEffectsProvider.builder(NodeType.BROKER)
+                                          .serviceClasses(ImmutableList.of(LookupNodeService.class))
+                                          .useLegacyAnnouncer(true)
+                                          .build()
+          );
         },
         new LookupModule(),
         new SqlModule()
diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
index dd4aa53..7f36e0c 100644
--- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
@@ -21,10 +21,8 @@ package org.apache.druid.cli;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Predicates;
-import com.google.common.collect.ImmutableList;
 import com.google.inject.Binder;
 import com.google.inject.Inject;
-import com.google.inject.Key;
 import com.google.inject.Module;
 import com.google.inject.Provides;
 import com.google.inject.name.Names;
@@ -217,12 +215,11 @@ public class CliCoordinator extends ServerRunnable
                 DruidCoordinatorCleanupPendingSegments.class
             );
 
-            binder
-                .bind(DiscoverySideEffectsProvider.Child.class)
-                .annotatedWith(Coordinator.class)
-                .toProvider(new DiscoverySideEffectsProvider(NodeType.COORDINATOR, ImmutableList.of()))
-                .in(LazySingleton.class);
-            LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class, Coordinator.class));
+            bindAnnouncer(
+                binder,
+                Coordinator.class,
+                DiscoverySideEffectsProvider.builder(NodeType.COORDINATOR).build()
+            );
           }
 
           @Provides
diff --git a/services/src/main/java/org/apache/druid/cli/CliHistorical.java b/services/src/main/java/org/apache/druid/cli/CliHistorical.java
index 7d3c7e8..d8432d3 100644
--- a/services/src/main/java/org/apache/druid/cli/CliHistorical.java
+++ b/services/src/main/java/org/apache/druid/cli/CliHistorical.java
@@ -20,13 +20,11 @@
 package org.apache.druid.cli;
 
 import com.google.common.collect.ImmutableList;
-import com.google.inject.Key;
 import com.google.inject.Module;
 import com.google.inject.name.Names;
 import io.airlift.airline.Command;
 import org.apache.druid.client.cache.CacheConfig;
 import org.apache.druid.client.cache.CacheMonitor;
-import org.apache.druid.discovery.DataNodeService;
 import org.apache.druid.discovery.LookupNodeService;
 import org.apache.druid.discovery.NodeType;
 import org.apache.druid.guice.CacheModule;
@@ -103,16 +101,12 @@ public class CliHistorical extends ServerRunnable
           binder.install(new CacheModule());
           MetricsModule.register(binder, CacheMonitor.class);
 
-          binder
-              .bind(DiscoverySideEffectsProvider.Child.class)
-              .toProvider(
-                  new DiscoverySideEffectsProvider(
-                      NodeType.HISTORICAL,
-                      ImmutableList.of(DataNodeService.class, LookupNodeService.class)
-                  )
-              )
-              .in(LazySingleton.class);
-          LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class));
+          bindAnnouncer(
+              binder,
+              DiscoverySideEffectsProvider.builder(NodeType.HISTORICAL)
+                                          .serviceClasses(ImmutableList.of(LookupNodeService.class))
+                                          .build()
+          );
         },
         new LookupModule()
     );
diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
index e266233..25c4e13 100644
--- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
+++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
@@ -100,7 +100,7 @@ public class CliMiddleManager extends ServerRunnable
             binder.bind(ForkingTaskRunner.class).in(LazySingleton.class);
 
             binder.bind(IndexingServiceClient.class).toProvider(Providers.of(null));
-            binder.bind(new TypeLiteral<IndexTaskClientFactory<ParallelIndexTaskClient>>(){})
+            binder.bind(new TypeLiteral<IndexTaskClientFactory<ParallelIndexTaskClient>>() {})
                   .toProvider(Providers.of(null));
             binder.bind(ChatHandlerProvider.class).toProvider(Providers.of(null));
             PolyBind.createChoice(
@@ -130,13 +130,12 @@ public class CliMiddleManager extends ServerRunnable
 
             LifecycleModule.register(binder, Server.class);
 
-            binder
-                .bind(DiscoverySideEffectsProvider.Child.class)
-                .toProvider(
-                    new DiscoverySideEffectsProvider(NodeType.MIDDLE_MANAGER, ImmutableList.of(WorkerNodeService.class))
-                )
-                .in(LazySingleton.class);
-            LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class));
+            bindAnnouncer(
+                binder,
+                DiscoverySideEffectsProvider.builder(NodeType.MIDDLE_MANAGER)
+                                            .serviceClasses(ImmutableList.of(WorkerNodeService.class))
+                                            .build()
+            );
           }
 
           @Provides
diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
index 5407221..1cd3954 100644
--- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
+++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
@@ -194,7 +194,7 @@ public class CliOverlord extends ServerRunnable
             binder.bind(SupervisorManager.class).in(LazySingleton.class);
 
             binder.bind(IndexingServiceClient.class).to(HttpIndexingServiceClient.class).in(LazySingleton.class);
-            binder.bind(new TypeLiteral<IndexTaskClientFactory<ParallelIndexTaskClient>>(){})
+            binder.bind(new TypeLiteral<IndexTaskClientFactory<ParallelIndexTaskClient>>() {})
                   .toProvider(Providers.of(null));
             binder.bind(ChatHandlerProvider.class).toProvider(Providers.of(null));
 
@@ -237,12 +237,11 @@ public class CliOverlord extends ServerRunnable
               LifecycleModule.register(binder, Server.class);
             }
 
-            binder
-                .bind(DiscoverySideEffectsProvider.Child.class)
-                .annotatedWith(IndexingService.class)
-                .toProvider(new DiscoverySideEffectsProvider(NodeType.OVERLORD, ImmutableList.of()))
-                .in(LazySingleton.class);
-            LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class, IndexingService.class));
+            bindAnnouncer(
+                binder,
+                IndexingService.class,
+                DiscoverySideEffectsProvider.builder(NodeType.OVERLORD).build()
+            );
           }
 
           private void configureTaskStorage(Binder binder)
@@ -284,10 +283,14 @@ public class CliOverlord extends ServerRunnable
             biddy.addBinding("local").to(ForkingTaskRunnerFactory.class);
             binder.bind(ForkingTaskRunnerFactory.class).in(LazySingleton.class);
 
-            biddy.addBinding(RemoteTaskRunnerFactory.TYPE_NAME).to(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
+            biddy.addBinding(RemoteTaskRunnerFactory.TYPE_NAME)
+                 .to(RemoteTaskRunnerFactory.class)
+                 .in(LazySingleton.class);
             binder.bind(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
 
-            biddy.addBinding(HttpRemoteTaskRunnerFactory.TYPE_NAME).to(HttpRemoteTaskRunnerFactory.class).in(LazySingleton.class);
+            biddy.addBinding(HttpRemoteTaskRunnerFactory.TYPE_NAME)
+                 .to(HttpRemoteTaskRunnerFactory.class)
+                 .in(LazySingleton.class);
             binder.bind(HttpRemoteTaskRunnerFactory.class).in(LazySingleton.class);
 
             JacksonConfigProvider.bind(binder, WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class, null);
diff --git a/services/src/main/java/org/apache/druid/cli/CliRouter.java b/services/src/main/java/org/apache/druid/cli/CliRouter.java
index 3f80faf..64b5887 100644
--- a/services/src/main/java/org/apache/druid/cli/CliRouter.java
+++ b/services/src/main/java/org/apache/druid/cli/CliRouter.java
@@ -21,7 +21,6 @@ package org.apache.druid.cli;
 
 import com.google.common.collect.ImmutableList;
 import com.google.inject.Binder;
-import com.google.inject.Key;
 import com.google.inject.Module;
 import com.google.inject.TypeLiteral;
 import com.google.inject.name.Names;
@@ -114,11 +113,10 @@ public class CliRouter extends ServerRunnable
             LifecycleModule.register(binder, Server.class);
             DiscoveryModule.register(binder, Self.class);
 
-            binder
-                .bind(DiscoverySideEffectsProvider.Child.class)
-                .toProvider(new DiscoverySideEffectsProvider(NodeType.ROUTER, ImmutableList.of()))
-                .in(LazySingleton.class);
-            LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class));
+            bindAnnouncer(
+                binder,
+                DiscoverySideEffectsProvider.builder(NodeType.ROUTER).build()
+            );
           }
         },
         new LookupModule()
diff --git a/services/src/main/java/org/apache/druid/cli/ServerRunnable.java b/services/src/main/java/org/apache/druid/cli/ServerRunnable.java
index 7f60f1f..77d409c 100644
--- a/services/src/main/java/org/apache/druid/cli/ServerRunnable.java
+++ b/services/src/main/java/org/apache/druid/cli/ServerRunnable.java
@@ -20,19 +20,26 @@
 package org.apache.druid.cli;
 
 import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.inject.Binder;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
+import com.google.inject.Key;
 import com.google.inject.Provider;
+import org.apache.druid.curator.discovery.ServiceAnnouncer;
 import org.apache.druid.discovery.DiscoveryDruidNode;
 import org.apache.druid.discovery.DruidNodeAnnouncer;
 import org.apache.druid.discovery.DruidService;
 import org.apache.druid.discovery.NodeType;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.LifecycleModule;
 import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.java.util.common.lifecycle.Lifecycle;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.server.DruidNode;
 
+import java.lang.annotation.Annotation;
 import java.util.List;
 
 /**
@@ -58,6 +65,32 @@ public abstract class ServerRunnable extends GuiceRunnable
     }
   }
 
+  public static void bindAnnouncer(
+      final Binder binder,
+      final DiscoverySideEffectsProvider provider
+  )
+  {
+    binder.bind(DiscoverySideEffectsProvider.Child.class)
+          .toProvider(provider)
+          .in(LazySingleton.class);
+
+    LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class));
+  }
+
+  public static void bindAnnouncer(
+      final Binder binder,
+      final Class<? extends Annotation> annotation,
+      final DiscoverySideEffectsProvider provider
+  )
+  {
+    binder.bind(DiscoverySideEffectsProvider.Child.class)
+          .annotatedWith(annotation)
+          .toProvider(provider)
+          .in(LazySingleton.class);
+
+    LifecycleModule.registerKey(binder, Key.get(DiscoverySideEffectsProvider.Child.class, annotation));
+  }
+
   /**
    * This is a helper class used by CliXXX classes to announce {@link DiscoveryDruidNode}
    * as part of {@link Lifecycle.Stage#LAST}.
@@ -66,13 +99,51 @@ public abstract class ServerRunnable extends GuiceRunnable
   {
     public static class Child {}
 
-    @Inject @Self
+    public static class Builder
+    {
+      private NodeType nodeType;
+      private List<Class<? extends DruidService>> serviceClasses = ImmutableList.of();
+      private boolean useLegacyAnnouncer;
+
+      public Builder(final NodeType nodeType)
+      {
+        this.nodeType = nodeType;
+      }
+
+      public Builder serviceClasses(final List<Class<? extends DruidService>> serviceClasses)
+      {
+        this.serviceClasses = serviceClasses;
+        return this;
+      }
+
+      public Builder useLegacyAnnouncer(final boolean useLegacyAnnouncer)
+      {
+        this.useLegacyAnnouncer = useLegacyAnnouncer;
+        return this;
+      }
+
+      public DiscoverySideEffectsProvider build()
+      {
+        return new DiscoverySideEffectsProvider(nodeType, serviceClasses, useLegacyAnnouncer);
+      }
+    }
+
+    public static Builder builder(final NodeType nodeType)
+    {
+      return new Builder(nodeType);
+    }
+
+    @Inject
+    @Self
     private DruidNode druidNode;
 
     @Inject
     private DruidNodeAnnouncer announcer;
 
     @Inject
+    private ServiceAnnouncer legacyAnnouncer;
+
+    @Inject
     private Lifecycle lifecycle;
 
     @Inject
@@ -80,11 +151,17 @@ public abstract class ServerRunnable extends GuiceRunnable
 
     private final NodeType nodeType;
     private final List<Class<? extends DruidService>> serviceClasses;
+    private final boolean useLegacyAnnouncer;
 
-    public DiscoverySideEffectsProvider(NodeType nodeType, List<Class<? extends DruidService>> serviceClasses)
+    private DiscoverySideEffectsProvider(
+        final NodeType nodeType,
+        final List<Class<? extends DruidService>> serviceClasses,
+        final boolean useLegacyAnnouncer
+    )
     {
       this.nodeType = nodeType;
       this.serviceClasses = serviceClasses;
+      this.useLegacyAnnouncer = useLegacyAnnouncer;
     }
 
     @Override
@@ -105,11 +182,21 @@ public abstract class ServerRunnable extends GuiceRunnable
             public void start()
             {
               announcer.announce(discoveryDruidNode);
+
+              if (useLegacyAnnouncer) {
+                legacyAnnouncer.announce(discoveryDruidNode.getDruidNode());
+              }
             }
 
             @Override
             public void stop()
             {
+              // Reverse order vs. start().
+
+              if (useLegacyAnnouncer) {
+                legacyAnnouncer.unannounce(discoveryDruidNode.getDruidNode());
+              }
+
               announcer.unannounce(discoveryDruidNode);
             }
           },
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java
index e46ff8b..fe9e72f 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java
@@ -61,6 +61,9 @@ public class PlannerConfig
   private boolean requireTimeCondition = false;
 
   @JsonProperty
+  private boolean awaitInitializationOnStart = true;
+
+  @JsonProperty
   private DateTimeZone sqlTimeZone = DateTimeZone.UTC;
 
   public Period getMetadataRefreshPeriod()
@@ -113,6 +116,11 @@ public class PlannerConfig
     return sqlTimeZone;
   }
 
+  public boolean isAwaitInitializationOnStart()
+  {
+    return awaitInitializationOnStart;
+  }
+
   public PlannerConfig withOverrides(final Map<String, Object> context)
   {
     if (context == null) {
@@ -142,6 +150,7 @@ public class PlannerConfig
     );
     newConfig.requireTimeCondition = isRequireTimeCondition();
     newConfig.sqlTimeZone = getSqlTimeZone();
+    newConfig.awaitInitializationOnStart = isAwaitInitializationOnStart();
     return newConfig;
   }
 
@@ -181,6 +190,7 @@ public class PlannerConfig
            useApproximateTopN == that.useApproximateTopN &&
            useFallback == that.useFallback &&
            requireTimeCondition == that.requireTimeCondition &&
+           awaitInitializationOnStart == that.awaitInitializationOnStart &&
            Objects.equals(metadataRefreshPeriod, that.metadataRefreshPeriod) &&
            Objects.equals(sqlTimeZone, that.sqlTimeZone);
   }
@@ -199,6 +209,7 @@ public class PlannerConfig
         useApproximateTopN,
         useFallback,
         requireTimeCondition,
+        awaitInitializationOnStart,
         sqlTimeZone
     );
   }
@@ -216,6 +227,7 @@ public class PlannerConfig
            ", useApproximateTopN=" + useApproximateTopN +
            ", useFallback=" + useFallback +
            ", requireTimeCondition=" + requireTimeCondition +
+           ", awaitInitializationOnStart=" + awaitInitializationOnStart +
            ", sqlTimeZone=" + sqlTimeZone +
            '}';
   }
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
index ab43307..40dcb56 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.sql.calcite.schema;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMultimap;
@@ -98,7 +97,7 @@ public class DruidSchema extends AbstractSchema
   private final ConcurrentMap<String, DruidTable> tables;
 
   // For awaitInitialization.
-  private final CountDownLatch initializationLatch = new CountDownLatch(1);
+  private final CountDownLatch initialized = new CountDownLatch(1);
 
   // Protects access to segmentSignatures, mutableSegments, segmentsNeedingRefresh, lastRefresh, isServerViewInitialized
   private final Object lock = new Object();
@@ -175,7 +174,7 @@ public class DruidSchema extends AbstractSchema
   }
 
   @LifecycleStart
-  public void start()
+  public void start() throws InterruptedException
   {
     cacheExec.submit(
         new Runnable()
@@ -254,7 +253,7 @@ public class DruidSchema extends AbstractSchema
                     }
                   }
 
-                  initializationLatch.countDown();
+                  initialized.countDown();
                 }
                 catch (InterruptedException e) {
                   // Fall through.
@@ -288,6 +287,13 @@ public class DruidSchema extends AbstractSchema
           }
         }
     );
+
+    if (config.isAwaitInitializationOnStart()) {
+      final long startMillis = System.currentTimeMillis();
+      log.info("%s waiting for initialization.", getClass().getSimpleName());
+      awaitInitialization();
+      log.info("%s initialized in [%,d] ms.", getClass().getSimpleName(), System.currentTimeMillis() - startMillis);
+    }
   }
 
   @LifecycleStop
@@ -296,10 +302,9 @@ public class DruidSchema extends AbstractSchema
     cacheExec.shutdownNow();
   }
 
-  @VisibleForTesting
   public void awaitInitialization() throws InterruptedException
   {
-    initializationLatch.await();
+    initialized.await();
   }
 
   @Override
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index 56c570f..488a355 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -566,8 +566,8 @@ public class CalciteTests
         TEST_AUTHENTICATOR_ESCALATOR
     );
 
-    schema.start();
     try {
+      schema.start();
       schema.awaitInitialization();
     }
     catch (InterruptedException e) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org