You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by le...@apache.org on 2019/12/08 15:48:13 UTC
[incubator-druid] branch master updated: Add SelfDiscoveryResource; rename org.apache.druid.discovery.No… (#6702)
This is an automated email from the ASF dual-hosted git repository.
leventov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 1c62987 Add SelfDiscoveryResource; rename org.apache.druid.discovery.No… (#6702)
1c62987 is described below
commit 1c62987783e85867856f567b04aad807a26bb2e3
Author: Roman Leventov <le...@gmail.com>
AuthorDate: Sun Dec 8 18:47:58 2019 +0300
Add SelfDiscoveryResource; rename org.apache.druid.discovery.No… (#6702)
* Add SelfDiscoveryResource
* Rename org.apache.druid.discovery.NodeType to NodeRole. Refactor CuratorDruidNodeDiscoveryProvider. Make SelfDiscoveryResource to listen to updates only about a single node (itself).
* Extended docs
* Fix brace
* Remove redundant throws in Lifecycle.Handler.stop()
* Import order
* Remove unresolvable link
* Address comments
* tmp
* tmp
* Rollback docker changes
* Remove extra .sh files
* Move filter
* Fix SecurityResourceFilterTest
---
.../org/apache/druid/guice/LifecycleModule.java | 13 +-
.../org/apache/druid/guice/ManageLifecycle.java | 6 +-
.../druid/guice/ManageLifecycleAnnouncements.java | 6 +-
.../apache/druid/guice/ManageLifecycleInit.java | 6 +-
.../apache/druid/guice/ManageLifecycleServer.java | 6 +-
.../apache/druid/java/util/common/io/Closer.java | 8 +
.../java/util/common/jackson/JacksonUtils.java | 18 +-
.../java/util/common/lifecycle/Lifecycle.java | 44 +--
.../druid/java/util/common/logger/Logger.java | 6 +-
.../client/response/StatusResponseHandler.java | 2 -
docs/operations/api-reference.md | 22 +-
.../druid/security/basic/CommonCacheNotifier.java | 24 +-
.../task/AppenderatorDriverRealtimeIndexTask.java | 4 +-
.../indexing/common/task/RealtimeIndexTask.java | 4 +-
.../overlord/hrtr/HttpRemoteTaskRunner.java | 6 +-
.../SeekableStreamIndexTaskRunner.java | 4 +-
.../overlord/hrtr/HttpRemoteTaskRunnerTest.java | 37 +-
integration-tests/pom.xml | 4 +
.../clients/AbstractQueryResourceTestClient.java | 8 +-
.../org/apache/druid/testing/utils/HttpUtil.java | 103 ++++++
.../security/ITBasicAuthConfigurationTest.java | 137 ++-----
.../discovery/CuratorDruidNodeAnnouncer.java | 21 +-
.../CuratorDruidNodeDiscoveryProvider.java | 395 +++++++++++++--------
.../druid/curator/discovery/DiscoveryModule.java | 8 +-
.../curator/inventory/CuratorInventoryManager.java | 36 +-
.../apache/druid/discovery/DiscoveryDruidNode.java | 28 +-
.../apache/druid/discovery/DruidLeaderClient.java | 8 +-
.../apache/druid/discovery/DruidNodeDiscovery.java | 8 +-
.../discovery/DruidNodeDiscoveryProvider.java | 47 +--
.../discovery/{NodeType.java => NodeRole.java} | 15 +-
.../druid/guice/CoordinatorDiscoveryModule.java | 4 +-
.../guice/IndexingServiceDiscoveryModule.java | 4 +-
.../apache/druid/guice/RouterProcessingModule.java | 5 +-
.../{NodeTypeConfig.java => ServerTypeConfig.java} | 14 +-
.../org/apache/druid/guice/StorageNodeModule.java | 25 +-
.../druid/server/coordination/ServerType.java | 6 +-
.../apache/druid/server/http/ClusterResource.java | 38 +-
.../druid/server/http/SelfDiscoveryResource.java | 101 ++++++
.../server/http/security/StateResourceFilter.java | 4 +-
.../server/router/TieredBrokerHostSelector.java | 4 +-
.../druid/client/HttpServerInventoryViewTest.java | 4 +-
.../CuratorDruidNodeAnnouncerAndDiscoveryTest.java | 123 ++++---
.../druid/discovery/DruidLeaderClientTest.java | 24 +-
.../discovery/DruidNodeDiscoveryProviderTest.java | 27 +-
.../http/security/ResourceFilterTestHelper.java | 47 +--
.../http/security/SecurityResourceFilterTest.java | 6 +-
.../lookup/cache/LookupNodeDiscoveryTest.java | 8 +-
.../router/TieredBrokerHostSelectorTest.java | 10 +-
.../main/java/org/apache/druid/cli/CliBroker.java | 18 +-
.../java/org/apache/druid/cli/CliCoordinator.java | 16 +-
.../java/org/apache/druid/cli/CliHistorical.java | 21 +-
.../main/java/org/apache/druid/cli/CliIndexer.java | 21 +-
.../org/apache/druid/cli/CliMiddleManager.java | 15 +-
.../java/org/apache/druid/cli/CliOverlord.java | 11 +-
.../main/java/org/apache/druid/cli/CliPeon.java | 39 +-
.../main/java/org/apache/druid/cli/CliRouter.java | 14 +-
.../java/org/apache/druid/cli/ServerRunnable.java | 22 +-
.../druid/sql/calcite/schema/SystemSchema.java | 12 +-
.../druid/sql/calcite/schema/SystemSchemaTest.java | 40 +--
.../druid/sql/calcite/util/CalciteTests.java | 4 +-
60 files changed, 1055 insertions(+), 666 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/guice/LifecycleModule.java b/core/src/main/java/org/apache/druid/guice/LifecycleModule.java
index 2de02d4..a793d4a 100644
--- a/core/src/main/java/org/apache/druid/guice/LifecycleModule.java
+++ b/core/src/main/java/org/apache/druid/guice/LifecycleModule.java
@@ -37,10 +37,13 @@ import java.util.Set;
*/
public class LifecycleModule implements Module
{
- // this scope includes final logging shutdown, so all other handlers in this lifecycle scope should avoid logging in
- // the 'stop' method, either failing silently or failing violently and throwing an exception causing an ungraceful exit
+ /**
+ * This scope includes final logging shutdown, so all other handlers in this lifecycle scope should avoid logging in
+ * their stop() method, either failing silently or failing violently and throwing an exception causing an ungraceful
+ * exit.
+ */
private final LifecycleScope initScope = new LifecycleScope(Lifecycle.Stage.INIT);
- private final LifecycleScope scope = new LifecycleScope(Lifecycle.Stage.NORMAL);
+ private final LifecycleScope normalScope = new LifecycleScope(Lifecycle.Stage.NORMAL);
private final LifecycleScope serverScope = new LifecycleScope(Lifecycle.Stage.SERVER);
private final LifecycleScope annoucementsScope = new LifecycleScope(Lifecycle.Stage.ANNOUNCEMENTS);
@@ -118,7 +121,7 @@ public class LifecycleModule implements Module
getEagerBinder(binder); // Load up the eager binder so that it will inject the empty set at a minimum.
binder.bindScope(ManageLifecycleInit.class, initScope);
- binder.bindScope(ManageLifecycle.class, scope);
+ binder.bindScope(ManageLifecycle.class, normalScope);
binder.bindScope(ManageLifecycleServer.class, serverScope);
binder.bindScope(ManageLifecycleAnnouncements.class, annoucementsScope);
}
@@ -141,7 +144,7 @@ public class LifecycleModule implements Module
}
};
initScope.setLifecycle(lifecycle);
- scope.setLifecycle(lifecycle);
+ normalScope.setLifecycle(lifecycle);
serverScope.setLifecycle(lifecycle);
annoucementsScope.setLifecycle(lifecycle);
diff --git a/core/src/main/java/org/apache/druid/guice/ManageLifecycle.java b/core/src/main/java/org/apache/druid/guice/ManageLifecycle.java
index 0e6790d..2e7fd7e 100644
--- a/core/src/main/java/org/apache/druid/guice/ManageLifecycle.java
+++ b/core/src/main/java/org/apache/druid/guice/ManageLifecycle.java
@@ -28,9 +28,9 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
- * Marks the object to be managed by {@link org.apache.druid.java.util.common.lifecycle.Lifecycle}
- *
- * This Scope gets defined by {@link LifecycleModule}
+ * Marks the object to be managed by {@link org.apache.druid.java.util.common.lifecycle.Lifecycle} and set to be on
+ * {@link org.apache.druid.java.util.common.lifecycle.Lifecycle.Stage#NORMAL} stage. This stage gets defined by {@link
+ * LifecycleModule}.
*/
@Target({ ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
diff --git a/core/src/main/java/org/apache/druid/guice/ManageLifecycleAnnouncements.java b/core/src/main/java/org/apache/druid/guice/ManageLifecycleAnnouncements.java
index f9537bc..0f193d1 100644
--- a/core/src/main/java/org/apache/druid/guice/ManageLifecycleAnnouncements.java
+++ b/core/src/main/java/org/apache/druid/guice/ManageLifecycleAnnouncements.java
@@ -28,9 +28,9 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
- * Marks the object to be managed by {@link org.apache.druid.java.util.common.lifecycle.Lifecycle} and set to be on Stage.ANNOUNCEMENTS
- *
- * This Scope gets defined by {@link LifecycleModule}
+ * Marks the object to be managed by {@link org.apache.druid.java.util.common.lifecycle.Lifecycle} and set to be on
+ * {@link org.apache.druid.java.util.common.lifecycle.Lifecycle.Stage#ANNOUNCEMENTS} stage. This stage gets defined by
+ * {@link LifecycleModule}.
*/
@Target({ ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
diff --git a/core/src/main/java/org/apache/druid/guice/ManageLifecycleInit.java b/core/src/main/java/org/apache/druid/guice/ManageLifecycleInit.java
index f8a3750..c3c098a 100644
--- a/core/src/main/java/org/apache/druid/guice/ManageLifecycleInit.java
+++ b/core/src/main/java/org/apache/druid/guice/ManageLifecycleInit.java
@@ -28,9 +28,9 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
- * Marks the object to be managed by {@link org.apache.druid.java.util.common.lifecycle.Lifecycle} and set to be on Stage.INIT
- *
- * This Scope gets defined by {@link LifecycleModule}
+ * Marks the object to be managed by {@link org.apache.druid.java.util.common.lifecycle.Lifecycle} and set to be on
+ * {@link org.apache.druid.java.util.common.lifecycle.Lifecycle.Stage#INIT} stage. This stage gets defined by {@link
+ * LifecycleModule}.
*/
@Target({ ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
diff --git a/core/src/main/java/org/apache/druid/guice/ManageLifecycleServer.java b/core/src/main/java/org/apache/druid/guice/ManageLifecycleServer.java
index f17b49a..7f9c93f 100644
--- a/core/src/main/java/org/apache/druid/guice/ManageLifecycleServer.java
+++ b/core/src/main/java/org/apache/druid/guice/ManageLifecycleServer.java
@@ -28,9 +28,9 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
- * Marks the object to be managed by {@link org.apache.druid.java.util.common.lifecycle.Lifecycle} and set to be on Stage.SERVER
- *
- * This Scope gets defined by {@link LifecycleModule}
+ * Marks the object to be managed by {@link org.apache.druid.java.util.common.lifecycle.Lifecycle} and set to be on
+ * {@link org.apache.druid.java.util.common.lifecycle.Lifecycle.Stage#SERVER} stage. This stage gets defined by {@link
+ * LifecycleModule}.
*/
@Target({ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
diff --git a/core/src/main/java/org/apache/druid/java/util/common/io/Closer.java b/core/src/main/java/org/apache/druid/java/util/common/io/Closer.java
index b17e52c..c843f29 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/io/Closer.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/io/Closer.java
@@ -26,6 +26,7 @@ import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayDeque;
+import java.util.Collection;
import java.util.Deque;
/**
@@ -107,6 +108,13 @@ public final class Closer implements Closeable
{
}
+ public <C extends Closeable> void registerAll(Collection<C> closeables)
+ {
+ for (C closeable : closeables) {
+ register(closeable);
+ }
+ }
+
/**
* Registers the given {@code Closeable} to be closed when this {@code Closer} is
* {@linkplain #close closed}.
diff --git a/core/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java b/core/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java
index eb60d29..6dbdc62 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/jackson/JacksonUtils.java
@@ -27,12 +27,18 @@ import java.util.Map;
public final class JacksonUtils
{
- public static final TypeReference<Map<String, Object>> TYPE_REFERENCE_MAP_STRING_OBJECT = new TypeReference<Map<String, Object>>()
- {
- };
- public static final TypeReference<Map<String, String>> TYPE_REFERENCE_MAP_STRING_STRING = new TypeReference<Map<String, String>>()
- {
- };
+ public static final TypeReference<Map<String, Object>> TYPE_REFERENCE_MAP_STRING_OBJECT =
+ new TypeReference<Map<String, Object>>()
+ {
+ };
+ public static final TypeReference<Map<String, String>> TYPE_REFERENCE_MAP_STRING_STRING =
+ new TypeReference<Map<String, String>>()
+ {
+ };
+ public static final TypeReference<Map<String, Boolean>> TYPE_REFERENCE_MAP_STRING_BOOLEAN =
+ new TypeReference<Map<String, Boolean>>()
+ {
+ };
/** Silences Jackson's {@link IOException}. */
public static <T> T readValue(ObjectMapper mapper, byte[] bytes, Class<T> valueClass)
diff --git a/core/src/main/java/org/apache/druid/java/util/common/lifecycle/Lifecycle.java b/core/src/main/java/org/apache/druid/java/util/common/lifecycle/Lifecycle.java
index 8082ebb..b5a3d89 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/lifecycle/Lifecycle.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/lifecycle/Lifecycle.java
@@ -42,27 +42,29 @@ import java.util.concurrent.locks.ReentrantLock;
* A manager of object Lifecycles.
*
* This object has methods for registering objects that should be started and stopped. The Lifecycle allows for
- * four stages: Stage.INIT, Stage.NORMAL, Stage.SERVER, and Stage.ANNOUNCEMENTS.
+ * four stages: {@link Stage#INIT}, {@link Stage#NORMAL}, {@link Stage#SERVER}, and {@link Stage#ANNOUNCEMENTS}.
*
- * Things added at Stage.INIT will be started first (in the order that they are added to the Lifecycle instance) and
- * then things added at Stage.NORMAL, then Stage.SERVER, and finally, Stage.ANNOUNCEMENTS will be started.
+ * Things added at {@link Stage#INIT} will be started first (in the order that they are added to the Lifecycle instance)
+ * and then things added at {@link Stage#NORMAL}, then {@link Stage#SERVER}, and finally, {@link Stage#ANNOUNCEMENTS}
+ * will be started.
*
- * The close operation goes in reverse order, starting with the last thing added at Stage.ANNOUNCEMENTS and working
- * backwards.
+ * The close operation goes in reverse order, starting with the last thing added at {@link Stage#ANNOUNCEMENTS} and
+ * working backwards.
*
* Conceptually, the stages have the following purposes:
- * - Stage.INIT: Currently, this stage is used exclusively for log4j initialization, since almost everything needs
- * logging and it should be the last thing to shutdown. Any sort of bootstrapping object that provides something that
- * should be initialized before nearly all other Lifecycle objects could also belong here (if it doesn't need
- * logging during start or stop).
- * - Stage.NORMAL: This is the default stage. Most objects will probably make the most sense to be registered at
- * this level, with the exception of any form of server or service announcements
- * - Stage.SERVER: This lifecycle stage is intended for all 'server' objects, and currently only contains the Jetty
- * module, but any sort of 'server' that expects most Lifecycle objects to be initialized by the time it starts, and
- * still available at the time it stops can logically live in this stage.
- * - Stage.ANNOUNCEMENTS: Any object which announces to a cluster this servers location belongs in this stage. By being
- * last, we can be sure that all servers are initialized before we advertise the endpoint locations, and also can be
- * sure that we un-announce these advertisements prior to the Stage.SERVER objects stop.
+ * - {@link Stage#INIT}: Currently, this stage is used exclusively for log4j initialization, since almost everything
+ * needs logging and it should be the last thing to shutdown. Any sort of bootstrapping object that provides
+ * something that should be initialized before nearly all other Lifecycle objects could also belong here (if it
+ * doesn't need logging during start or stop).
+ * - {@link Stage#NORMAL}: This is the default stage. Most objects will probably make the most sense to be registered
+ * at this level, with the exception of any form of server or service announcements
+ * - {@link Stage#SERVER}: This lifecycle stage is intended for all 'server' objects, for example,
+ * org.apache.druid.server.initialization.jetty.JettyServerModule, but any sort of 'server' that expects most (or
+ * some specific) Lifecycle objects to be initialized by the time it starts, and still available at the time it stops
+ * can logically live in this stage.
+ * - {@link Stage#ANNOUNCEMENTS}: Any object which announces to a cluster this servers location belongs in this stage.
+ * By being last, we can be sure that all servers are initialized before we advertise the endpoint locations, and
+ * also can be sure that we un-announce these advertisements prior to the Stage.SERVER objects stop.
*
* There are two sets of methods to add things to the Lifecycle. One set that will just add instances and enforce that
* start() has not been called yet. The other set will add instances and, if the lifecycle is already started, start
@@ -357,7 +359,7 @@ public class Lifecycle
}
startStopLock.lock();
try {
- RuntimeException thrown = null;
+ Exception thrown = null;
for (Stage s : handlers.navigableKeySet().descendingSet()) {
log.info("Stopping lifecycle [%s] stage [%s]", name, s.name());
@@ -365,17 +367,19 @@ public class Lifecycle
try {
handler.stop();
}
- catch (RuntimeException e) {
+ catch (Exception e) {
log.warn(e, "Lifecycle [%s] encountered exception while stopping %s", name, handler);
if (thrown == null) {
thrown = e;
+ } else {
+ thrown.addSuppressed(e);
}
}
}
}
if (thrown != null) {
- throw thrown;
+ throw new RuntimeException(thrown);
}
}
finally {
diff --git a/core/src/main/java/org/apache/druid/java/util/common/logger/Logger.java b/core/src/main/java/org/apache/druid/java/util/common/logger/Logger.java
index 8aa0e1c..15e16d0 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/logger/Logger.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/logger/Logger.java
@@ -28,6 +28,7 @@ public class Logger
{
private final org.slf4j.Logger log;
private final boolean stackTraces;
+ private final Logger noStackTraceLogger;
public Logger(String name)
{
@@ -43,6 +44,7 @@ public class Logger
{
this.log = log;
this.stackTraces = stackTraces;
+ noStackTraceLogger = stackTraces ? new Logger(log, false) : this;
}
protected org.slf4j.Logger getSlf4jLogger()
@@ -57,12 +59,12 @@ public class Logger
}
/**
- * Create a copy of this Logger that does not log exception stack traces, unless the log level is DEBUG or lower.
+ * Returns a copy of this Logger that does not log exception stack traces, unless the log level is DEBUG or lower.
* Useful for writing code like: {@code log.noStackTrace().warn(e, "Something happened.");}
*/
public Logger noStackTrace()
{
- return new Logger(log, false);
+ return noStackTraceLogger;
}
public void trace(String message, Object... formatArgs)
diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/StatusResponseHandler.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/StatusResponseHandler.java
index 03eb3cd..0774352 100644
--- a/core/src/main/java/org/apache/druid/java/util/http/client/response/StatusResponseHandler.java
+++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/StatusResponseHandler.java
@@ -24,8 +24,6 @@ import org.jboss.netty.handler.codec.http.HttpResponse;
import java.nio.charset.StandardCharsets;
-/**
- */
public class StatusResponseHandler implements HttpResponseHandler<StatusResponseHolder, StatusResponseHolder>
{
diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md
index fe6813d..c184e8d 100644
--- a/docs/operations/api-reference.md
+++ b/docs/operations/api-reference.md
@@ -45,9 +45,29 @@ An endpoint that always returns a boolean "true" value with a 200 OK response, u
Returns the current configuration properties of the process.
+* `/status/selfDiscoveredStatus`
+
+Returns a JSON map of the form `{"selfDiscovered": true/false}`, indicating whether the node has received a confirmation
+from the central node discovery mechanism (currently ZooKeeper) of the Druid cluster that the node has been added to the
+cluster. It is recommended to not consider a Druid node "healthy" or "ready" in automated deployment/container
+management systems until it returns `{"selfDiscovered": true}` from this endpoint. This is because a node may be
+isolated from the rest of the cluster due to network issues and it doesn't make sense to consider nodes "healthy" in
+this case. Also, when nodes such as Brokers use ZooKeeper segment discovery for building their view of the Druid cluster
+(as opposed to HTTP segment discovery), they may be unusable until the ZooKeeper client is fully initialized and starts
+to receive data from the ZooKeeper cluster. `{"selfDiscovered": true}` is a proxy event indicating that the ZooKeeper
+client on the node has started to receive data from the ZooKeeper cluster and it's expected that all segments and other
+nodes will be discovered by this node timely from this point.
+
+* `/status/selfDiscovered`
+
+Similar to `/status/selfDiscoveredStatus`, but returns 200 OK response with empty body if the node has discovered itself
+and 503 SERVICE UNAVAILABLE if the node hasn't discovered itself yet. This endpoint might be useful because some
+monitoring checks such as AWS load balancer health checks are not able to look at the response body.
+
## Master Server
-This section documents the API endpoints for the processes that reside on Master servers (Coordinators and Overlords) in the suggested [three-server configuration](../design/processes.html#server-types).
+This section documents the API endpoints for the processes that reside on Master servers (Coordinators and Overlords)
+in the suggested [three-server configuration](../design/processes.html#server-types).
### Coordinator
diff --git a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java
index 4ae5ae8..ad0364d 100644
--- a/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java
+++ b/extensions-core/druid-basic-security/src/main/java/org/apache/druid/security/basic/CommonCacheNotifier.java
@@ -24,7 +24,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
-import org.apache.druid.discovery.NodeType;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
@@ -59,17 +59,17 @@ public class CommonCacheNotifier
private static final EmittingLogger LOG = new EmittingLogger(CommonCacheNotifier.class);
/**
- * {@link NodeType#COORDINATOR} is intentionally omitted because it gets its information about the auth state directly
+ * {@link NodeRole#COORDINATOR} is intentionally omitted because it gets its information about the auth state directly
* from metadata storage.
*/
- private static final List<NodeType> NODE_TYPES = Arrays.asList(
- NodeType.BROKER,
- NodeType.OVERLORD,
- NodeType.HISTORICAL,
- NodeType.PEON,
- NodeType.ROUTER,
- NodeType.MIDDLE_MANAGER,
- NodeType.INDEXER
+ private static final List<NodeRole> NODE_TYPES = Arrays.asList(
+ NodeRole.BROKER,
+ NodeRole.OVERLORD,
+ NodeRole.HISTORICAL,
+ NodeRole.PEON,
+ NodeRole.ROUTER,
+ NodeRole.MIDDLE_MANAGER,
+ NodeRole.INDEXER
);
private final DruidNodeDiscoveryProvider discoveryProvider;
@@ -161,8 +161,8 @@ public class CommonCacheNotifier
private List<ListenableFuture<StatusResponseHolder>> sendUpdate(String updatedAuthenticatorPrefix, byte[] serializedEntity)
{
List<ListenableFuture<StatusResponseHolder>> futures = new ArrayList<>();
- for (NodeType nodeType : NODE_TYPES) {
- DruidNodeDiscovery nodeDiscovery = discoveryProvider.getForNodeType(nodeType);
+ for (NodeRole nodeRole : NODE_TYPES) {
+ DruidNodeDiscovery nodeDiscovery = discoveryProvider.getForNodeRole(nodeRole);
Collection<DiscoveryDruidNode> nodes = nodeDiscovery.getAllNodes();
for (DiscoveryDruidNode node : nodes) {
URL listenerURL = getListenerURL(
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index cb9a9df..b7db37a 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -38,7 +38,7 @@ import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.LookupNodeService;
-import org.apache.druid.discovery.NodeType;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
@@ -744,7 +744,7 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
new LookupNodeService(getContextValue(CTX_KEY_LOOKUP_TIER));
return new DiscoveryDruidNode(
toolbox.getDruidNode(),
- NodeType.PEON,
+ NodeRole.PEON,
ImmutableMap.of(
toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(),
lookupNodeService.getName(), lookupNodeService
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
index c85d468..31c5799 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/RealtimeIndexTask.java
@@ -32,7 +32,7 @@ import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.LookupNodeService;
-import org.apache.druid.discovery.NodeType;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
@@ -359,7 +359,7 @@ public class RealtimeIndexTask extends AbstractTask
new LookupNodeService((String) getContextValue(CTX_KEY_LOOKUP_TIER));
DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(
toolbox.getDruidNode(),
- NodeType.PEON,
+ NodeRole.PEON,
ImmutableMap.of(
toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(),
lookupNodeService.getName(), lookupNodeService
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index afbdbd7..3020d3c 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -431,10 +431,8 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
private void startWorkersHandling() throws InterruptedException
{
final CountDownLatch workerViewInitialized = new CountDownLatch(1);
-
- DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForService(
- WorkerNodeService.DISCOVERY_SERVICE_KEY
- );
+ DruidNodeDiscovery druidNodeDiscovery =
+ druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY);
druidNodeDiscovery.registerListener(
new DruidNodeDiscovery.Listener()
{
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index e1f3861..3fbbcb8 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -46,7 +46,7 @@ import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.LookupNodeService;
-import org.apache.druid.discovery.NodeType;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
@@ -434,7 +434,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
final DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(
toolbox.getDruidNode(),
- NodeType.PEON,
+ NodeRole.PEON,
ImmutableMap.of(
toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(),
lookupNodeService.getName(), lookupNodeService
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
index dc87de6..745f3e5 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
@@ -31,7 +31,7 @@ import org.apache.druid.common.guava.DSuppliers;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
-import org.apache.druid.discovery.NodeType;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.discovery.WorkerNodeService;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
@@ -142,7 +142,7 @@ public class HttpRemoteTaskRunnerTest
DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode(
new DruidNode("service", "host1", false, 8080, null, true, false),
- NodeType.MIDDLE_MANAGER,
+ NodeRole.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
)
@@ -150,7 +150,7 @@ public class HttpRemoteTaskRunnerTest
DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode(
new DruidNode("service", "host2", false, 8080, null, true, false),
- NodeType.MIDDLE_MANAGER,
+ NodeRole.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
)
@@ -239,7 +239,7 @@ public class HttpRemoteTaskRunnerTest
DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode(
new DruidNode("service", "host1", false, 8080, null, true, false),
- NodeType.MIDDLE_MANAGER,
+ NodeRole.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
)
@@ -247,7 +247,7 @@ public class HttpRemoteTaskRunnerTest
DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode(
new DruidNode("service", "host2", false, 8080, null, true, false),
- NodeType.MIDDLE_MANAGER,
+ NodeRole.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
)
@@ -343,7 +343,7 @@ public class HttpRemoteTaskRunnerTest
DiscoveryDruidNode druidNode = new DiscoveryDruidNode(
new DruidNode("service", "host", false, 1234, null, true, false),
- NodeType.MIDDLE_MANAGER,
+ NodeRole.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
)
@@ -488,7 +488,7 @@ public class HttpRemoteTaskRunnerTest
DiscoveryDruidNode druidNode = new DiscoveryDruidNode(
new DruidNode("service", "host", false, 1234, null, true, false),
- NodeType.MIDDLE_MANAGER,
+ NodeRole.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
)
@@ -663,8 +663,11 @@ public class HttpRemoteTaskRunnerTest
DiscoveryDruidNode druidNode = new DiscoveryDruidNode(
new DruidNode("service", "host", false, 1234, null, true, false),
- NodeType.MIDDLE_MANAGER,
- ImmutableMap.of(WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY))
+ NodeRole.MIDDLE_MANAGER,
+ ImmutableMap.of(
+ WorkerNodeService.DISCOVERY_SERVICE_KEY,
+ new WorkerNodeService("ip1", 2, "0", WorkerConfig.DEFAULT_CATEGORY)
+ )
);
workerHolders.put(
@@ -843,7 +846,7 @@ public class HttpRemoteTaskRunnerTest
DiscoveryDruidNode druidNode1 = new DiscoveryDruidNode(
new DruidNode("service", "host1", false, 8080, null, true, false),
- NodeType.MIDDLE_MANAGER,
+ NodeRole.MIDDLE_MANAGER,
ImmutableMap.of(
WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip1", 1, "0", WorkerConfig.DEFAULT_CATEGORY)
)
@@ -889,8 +892,11 @@ public class HttpRemoteTaskRunnerTest
DiscoveryDruidNode druidNode2 = new DiscoveryDruidNode(
new DruidNode("service", "host2", false, 8080, null, true, false),
- NodeType.MIDDLE_MANAGER,
- ImmutableMap.of(WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 1, "0", WorkerConfig.DEFAULT_CATEGORY))
+ NodeRole.MIDDLE_MANAGER,
+ ImmutableMap.of(
+ WorkerNodeService.DISCOVERY_SERVICE_KEY,
+ new WorkerNodeService("ip2", 1, "0", WorkerConfig.DEFAULT_CATEGORY)
+ )
);
workerHolders.put(
@@ -920,8 +926,11 @@ public class HttpRemoteTaskRunnerTest
DiscoveryDruidNode druidNode3 = new DiscoveryDruidNode(
new DruidNode("service", "host3", false, 8080, null, true, false),
- NodeType.MIDDLE_MANAGER,
- ImmutableMap.of(WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 1, "0", WorkerConfig.DEFAULT_CATEGORY))
+ NodeRole.MIDDLE_MANAGER,
+ ImmutableMap.of(
+ WorkerNodeService.DISCOVERY_SERVICE_KEY,
+ new WorkerNodeService("ip2", 1, "0", WorkerConfig.DEFAULT_CATEGORY)
+ )
);
workerHolders.put(
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 7822af3..66dea19 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -179,6 +179,10 @@
<artifactId>log4j-slf4j-impl</artifactId>
<scope>runtime</scope>
</dependency>
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </dependency>
<!-- Tests -->
<dependency>
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/AbstractQueryResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/AbstractQueryResourceTestClient.java
index 3babf3e..1cb9dbf 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/clients/AbstractQueryResourceTestClient.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/AbstractQueryResourceTestClient.java
@@ -40,8 +40,7 @@ public abstract class AbstractQueryResourceTestClient<QueryType>
{
private final ObjectMapper jsonMapper;
private final HttpClient httpClient;
- protected final String routerUrl;
- private final StatusResponseHandler responseHandler;
+ final String routerUrl;
@Inject
AbstractQueryResourceTestClient(
@@ -53,7 +52,6 @@ public abstract class AbstractQueryResourceTestClient<QueryType>
this.jsonMapper = jsonMapper;
this.httpClient = httpClient;
this.routerUrl = config.getRouterUrl();
- this.responseHandler = StatusResponseHandler.getInstance();
}
public abstract String getBrokerURL();
@@ -65,8 +63,8 @@ public abstract class AbstractQueryResourceTestClient<QueryType>
new Request(HttpMethod.POST, new URL(url)).setContent(
"application/json",
jsonMapper.writeValueAsBytes(query)
- ), responseHandler
-
+ ),
+ StatusResponseHandler.getInstance()
).get();
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/HttpUtil.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/HttpUtil.java
new file mode 100644
index 0000000..5e01126
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/HttpUtil.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.testing.clients.AbstractQueryResourceTestClient;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nullable;
+import javax.ws.rs.core.MediaType;
+import java.net.URL;
+
+public class HttpUtil
+{
+ private static final Logger LOG = new Logger(AbstractQueryResourceTestClient.class);
+ private static final StatusResponseHandler RESPONSE_HANDLER = StatusResponseHandler.getInstance();
+
+ public static StatusResponseHolder makeRequest(HttpClient httpClient, HttpMethod method, String url, byte[] content)
+ {
+ return makeRequestWithExpectedStatus(
+ httpClient,
+ method,
+ url,
+ content,
+ HttpResponseStatus.OK
+ );
+ }
+
+ public static StatusResponseHolder makeRequestWithExpectedStatus(
+ HttpClient httpClient,
+ HttpMethod method,
+ String url,
+ @Nullable byte[] content,
+ HttpResponseStatus expectedStatus
+ )
+ {
+ try {
+ Request request = new Request(method, new URL(url));
+ if (content != null) {
+ request.setContent(MediaType.APPLICATION_JSON, content);
+ }
+ int retryCount = 0;
+
+ StatusResponseHolder response;
+
+ while (true) {
+ response = httpClient.go(request, RESPONSE_HANDLER).get();
+
+ if (!response.getStatus().equals(expectedStatus)) {
+ String errMsg = StringUtils.format(
+ "Error while making request to url[%s] status[%s] content[%s]",
+ url,
+ response.getStatus(),
+ response.getContent()
+ );
+ // it can take time for the auth config to propagate, so we retry
+ if (retryCount > 10) {
+ throw new ISE(errMsg);
+ } else {
+ LOG.error(errMsg);
+ LOG.error("retrying in 3000ms, retryCount: " + retryCount);
+ retryCount++;
+ Thread.sleep(3000);
+ }
+ } else {
+ break;
+ }
+ }
+ return response;
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private HttpUtil()
+ {
+ }
+}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java
index cb675bf..d276acb 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/security/ITBasicAuthConfigurationTest.java
@@ -27,14 +27,12 @@ import com.google.common.collect.Lists;
import com.google.inject.Inject;
import org.apache.calcite.avatica.AvaticaSqlException;
import org.apache.druid.guice.annotations.Client;
-import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.CredentialedHttpClient;
import org.apache.druid.java.util.http.client.HttpClient;
-import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.auth.BasicCredentials;
-import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.security.basic.authentication.entity.BasicAuthenticatorCredentialUpdate;
import org.apache.druid.server.security.Action;
@@ -45,6 +43,7 @@ import org.apache.druid.sql.avatica.DruidAvaticaHandler;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.testing.utils.HttpUtil;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.TestQueryHelper;
import org.apache.druid.tests.TestNGGroup;
@@ -55,8 +54,6 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
-import javax.ws.rs.core.MediaType;
-import java.net.URL;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
@@ -74,11 +71,6 @@ public class ITBasicAuthConfigurationTest
{
private static final Logger LOG = new Logger(ITBasicAuthConfigurationTest.class);
- private static final TypeReference<Map<String, Boolean>> LOAD_STATUS_TYPE_REFERENCE =
- new TypeReference<Map<String, Boolean>>()
- {
- };
-
private static final TypeReference<List<Map<String, Object>>> SYS_SCHEMA_RESULTS_TYPE_REFERENCE =
new TypeReference<List<Map<String, Object>>>()
{
@@ -115,7 +107,6 @@ public class ITBasicAuthConfigurationTest
@Client
HttpClient httpClient;
-
@Inject
private CoordinatorResourceTestClient coordinatorClient;
@@ -201,7 +192,7 @@ public class ITBasicAuthConfigurationTest
);
// check that we can access a datasource-permission restricted resource on the broker
- makeRequest(
+ HttpUtil.makeRequest(
datasourceOnlyUserClient,
HttpMethod.GET,
config.getBrokerUrl() + "/druid/v2/datasources/auth_test",
@@ -209,8 +200,13 @@ public class ITBasicAuthConfigurationTest
);
// check that we can access a state-permission restricted resource on the broker
- makeRequest(datasourceWithStateUserClient, HttpMethod.GET, config.getBrokerUrl() + "/status", null);
- makeRequest(stateOnlyUserClient, HttpMethod.GET, config.getBrokerUrl() + "/status", null);
+ HttpUtil.makeRequest(
+ datasourceWithStateUserClient,
+ HttpMethod.GET,
+ config.getBrokerUrl() + "/status",
+ null
+ );
+ HttpUtil.makeRequest(stateOnlyUserClient, HttpMethod.GET, config.getBrokerUrl() + "/status", null);
// initial setup is done now, run the system schema response content tests
final List<Map<String, Object>> adminSegments = jsonMapper.readValue(
@@ -416,14 +412,14 @@ public class ITBasicAuthConfigurationTest
// create 100 users
for (int i = 0; i < 100; i++) {
- makeRequest(
+ HttpUtil.makeRequest(
adminClient,
HttpMethod.POST,
config.getCoordinatorUrl() + "/druid-ext/basic-security/authentication/db/basic/users/druid" + i,
null
);
- makeRequest(
+ HttpUtil.makeRequest(
adminClient,
HttpMethod.POST,
config.getCoordinatorUrl() + "/druid-ext/basic-security/authorization/db/basic/users/druid" + i,
@@ -434,14 +430,14 @@ public class ITBasicAuthConfigurationTest
}
// setup the last of 100 users and check that it works
- makeRequest(
+ HttpUtil.makeRequest(
adminClient,
HttpMethod.POST,
config.getCoordinatorUrl() + "/druid-ext/basic-security/authentication/db/basic/users/druid99/credentials",
jsonMapper.writeValueAsBytes(new BasicAuthenticatorCredentialUpdate("helloworld", 5000))
);
- makeRequest(
+ HttpUtil.makeRequest(
adminClient,
HttpMethod.POST,
config.getCoordinatorUrl() + "/druid-ext/basic-security/authorization/db/basic/users/druid99/roles/druidrole",
@@ -477,16 +473,16 @@ public class ITBasicAuthConfigurationTest
private void testOptionsRequests(HttpClient httpClient)
{
- makeRequest(httpClient, HttpMethod.OPTIONS, config.getCoordinatorUrl() + "/status", null);
- makeRequest(httpClient, HttpMethod.OPTIONS, config.getIndexerUrl() + "/status", null);
- makeRequest(httpClient, HttpMethod.OPTIONS, config.getBrokerUrl() + "/status", null);
- makeRequest(httpClient, HttpMethod.OPTIONS, config.getHistoricalUrl() + "/status", null);
- makeRequest(httpClient, HttpMethod.OPTIONS, config.getRouterUrl() + "/status", null);
+ HttpUtil.makeRequest(httpClient, HttpMethod.OPTIONS, config.getCoordinatorUrl() + "/status", null);
+ HttpUtil.makeRequest(httpClient, HttpMethod.OPTIONS, config.getIndexerUrl() + "/status", null);
+ HttpUtil.makeRequest(httpClient, HttpMethod.OPTIONS, config.getBrokerUrl() + "/status", null);
+ HttpUtil.makeRequest(httpClient, HttpMethod.OPTIONS, config.getHistoricalUrl() + "/status", null);
+ HttpUtil.makeRequest(httpClient, HttpMethod.OPTIONS, config.getRouterUrl() + "/status", null);
}
private void checkUnsecuredCoordinatorLoadQueuePath(HttpClient client)
{
- makeRequest(client, HttpMethod.GET, config.getCoordinatorUrl() + "/druid/coordinator/v1/loadqueue", null);
+ HttpUtil.makeRequest(client, HttpMethod.GET, config.getCoordinatorUrl() + "/druid/coordinator/v1/loadqueue", null);
}
private void testAvaticaQuery(String url)
@@ -536,11 +532,11 @@ public class ITBasicAuthConfigurationTest
private void checkNodeAccess(HttpClient httpClient)
{
- makeRequest(httpClient, HttpMethod.GET, config.getCoordinatorUrl() + "/status", null);
- makeRequest(httpClient, HttpMethod.GET, config.getIndexerUrl() + "/status", null);
- makeRequest(httpClient, HttpMethod.GET, config.getBrokerUrl() + "/status", null);
- makeRequest(httpClient, HttpMethod.GET, config.getHistoricalUrl() + "/status", null);
- makeRequest(httpClient, HttpMethod.GET, config.getRouterUrl() + "/status", null);
+ HttpUtil.makeRequest(httpClient, HttpMethod.GET, config.getCoordinatorUrl() + "/status", null);
+ HttpUtil.makeRequest(httpClient, HttpMethod.GET, config.getIndexerUrl() + "/status", null);
+ HttpUtil.makeRequest(httpClient, HttpMethod.GET, config.getBrokerUrl() + "/status", null);
+ HttpUtil.makeRequest(httpClient, HttpMethod.GET, config.getHistoricalUrl() + "/status", null);
+ HttpUtil.makeRequest(httpClient, HttpMethod.GET, config.getRouterUrl() + "/status", null);
}
private void checkLoadStatus(HttpClient httpClient) throws Exception
@@ -554,92 +550,31 @@ public class ITBasicAuthConfigurationTest
private void checkLoadStatusSingle(HttpClient httpClient, String baseUrl) throws Exception
{
- StatusResponseHolder holder = makeRequest(
+ StatusResponseHolder holder = HttpUtil.makeRequest(
httpClient,
HttpMethod.GET,
baseUrl + "/druid-ext/basic-security/authentication/loadStatus",
null
);
String content = holder.getContent();
- Map<String, Boolean> loadStatus = jsonMapper.readValue(content, LOAD_STATUS_TYPE_REFERENCE);
+ Map<String, Boolean> loadStatus = jsonMapper.readValue(content, JacksonUtils.TYPE_REFERENCE_MAP_STRING_BOOLEAN);
Assert.assertNotNull(loadStatus.get("basic"));
Assert.assertTrue(loadStatus.get("basic"));
- holder = makeRequest(
+ holder = HttpUtil.makeRequest(
httpClient,
HttpMethod.GET,
baseUrl + "/druid-ext/basic-security/authorization/loadStatus",
null
);
content = holder.getContent();
- loadStatus = jsonMapper.readValue(content, LOAD_STATUS_TYPE_REFERENCE);
+ loadStatus = jsonMapper.readValue(content, JacksonUtils.TYPE_REFERENCE_MAP_STRING_BOOLEAN);
Assert.assertNotNull(loadStatus.get("basic"));
Assert.assertTrue(loadStatus.get("basic"));
}
- private StatusResponseHolder makeRequest(HttpClient httpClient, HttpMethod method, String url, byte[] content)
- {
- return makeRequestWithExpectedStatus(
- httpClient,
- method,
- url,
- content,
- HttpResponseStatus.OK
- );
- }
-
- private StatusResponseHolder makeRequestWithExpectedStatus(
- HttpClient httpClient,
- HttpMethod method,
- String url,
- byte[] content,
- HttpResponseStatus expectedStatus
- )
- {
- try {
- Request request = new Request(method, new URL(url));
- if (content != null) {
- request.setContent(MediaType.APPLICATION_JSON, content);
- }
- int retryCount = 0;
-
- StatusResponseHolder response;
-
- while (true) {
- response = httpClient.go(
- request,
- StatusResponseHandler.getInstance()
- ).get();
-
- if (!response.getStatus().equals(expectedStatus)) {
- String errMsg = StringUtils.format(
- "Error while making request to url[%s] status[%s] content[%s]",
- url,
- response.getStatus(),
- response.getContent()
- );
- // it can take time for the auth config to propagate, so we retry
- if (retryCount > 10) {
- throw new ISE(errMsg);
- } else {
- LOG.error(errMsg);
- LOG.error("retrying in 3000ms, retryCount: " + retryCount);
- retryCount++;
- Thread.sleep(3000);
- }
- } else {
- break;
- }
- }
- return response;
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
private void createUserAndRoleWithPermissions(
HttpClient adminClient,
String user,
@@ -648,7 +583,7 @@ public class ITBasicAuthConfigurationTest
List<ResourceAction> permissions
) throws Exception
{
- makeRequest(
+ HttpUtil.makeRequest(
adminClient,
HttpMethod.POST,
StringUtils.format(
@@ -658,7 +593,7 @@ public class ITBasicAuthConfigurationTest
),
null
);
- makeRequest(
+ HttpUtil.makeRequest(
adminClient,
HttpMethod.POST,
StringUtils.format(
@@ -668,7 +603,7 @@ public class ITBasicAuthConfigurationTest
),
jsonMapper.writeValueAsBytes(new BasicAuthenticatorCredentialUpdate(password, 5000))
);
- makeRequest(
+ HttpUtil.makeRequest(
adminClient,
HttpMethod.POST,
StringUtils.format(
@@ -678,7 +613,7 @@ public class ITBasicAuthConfigurationTest
),
null
);
- makeRequest(
+ HttpUtil.makeRequest(
adminClient,
HttpMethod.POST,
StringUtils.format(
@@ -688,7 +623,7 @@ public class ITBasicAuthConfigurationTest
),
null
);
- makeRequest(
+ HttpUtil.makeRequest(
adminClient,
HttpMethod.POST,
StringUtils.format(
@@ -700,7 +635,7 @@ public class ITBasicAuthConfigurationTest
null
);
byte[] permissionsBytes = jsonMapper.writeValueAsBytes(permissions);
- makeRequest(
+ HttpUtil.makeRequest(
adminClient,
HttpMethod.POST,
StringUtils.format(
@@ -721,7 +656,7 @@ public class ITBasicAuthConfigurationTest
Map<String, Object> queryMap = ImmutableMap.of(
"query", query
);
- return makeRequestWithExpectedStatus(
+ return HttpUtil.makeRequestWithExpectedStatus(
httpClient,
HttpMethod.POST,
config.getBrokerUrl() + "/druid/v2/sql",
diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java
index 5529d90..5b536c6 100644
--- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java
+++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java
@@ -26,13 +26,20 @@ import org.apache.curator.utils.ZKPaths;
import org.apache.druid.curator.announcement.Announcer;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeAnnouncer;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.DruidNode;
import org.apache.druid.server.initialization.ZkPathsConfig;
public class CuratorDruidNodeAnnouncer implements DruidNodeAnnouncer
{
+ static String makeNodeAnnouncementPath(ZkPathsConfig config, NodeRole nodeRole, DruidNode node)
+ {
+ return ZKPaths.makePath(config.getInternalDiscoveryPath(), nodeRole.toString(), node.getHostAndPortToUse());
+ }
+
private static final Logger log = new Logger(CuratorDruidNodeAnnouncer.class);
private final Announcer announcer;
@@ -55,11 +62,8 @@ public class CuratorDruidNodeAnnouncer implements DruidNodeAnnouncer
log.debug("Announcing self [%s].", asString);
- String path = ZKPaths.makePath(
- config.getInternalDiscoveryPath(),
- discoveryDruidNode.getNodeType().toString(),
- discoveryDruidNode.getDruidNode().getHostAndPortToUse()
- );
+ String path =
+ makeNodeAnnouncementPath(config, discoveryDruidNode.getNodeRole(), discoveryDruidNode.getDruidNode());
announcer.announce(path, StringUtils.toUtf8(asString));
log.info("Announced self [%s].", asString);
@@ -77,11 +81,8 @@ public class CuratorDruidNodeAnnouncer implements DruidNodeAnnouncer
log.debug("Unannouncing self [%s].", asString);
- String path = ZKPaths.makePath(
- config.getInternalDiscoveryPath(),
- discoveryDruidNode.getNodeType().toString(),
- discoveryDruidNode.getDruidNode().getHostAndPortToUse()
- );
+ String path =
+ makeNodeAnnouncementPath(config, discoveryDruidNode.getNodeRole(), discoveryDruidNode.getDruidNode());
announcer.unannounce(path);
log.info("Unannounced self [%s].", asString);
diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
index f4235a2..383ec20 100644
--- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
+++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeDiscoveryProvider.java
@@ -25,33 +25,44 @@ import com.google.common.collect.ImmutableList;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.concurrent.LifecycleLock;
+import org.apache.druid.curator.cache.PathChildrenCacheFactory;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
-import org.apache.druid.discovery.NodeType;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.DruidNode;
import org.apache.druid.server.initialization.ZkPathsConfig;
+import org.apache.druid.utils.CloseableUtils;
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
/**
*
@@ -67,7 +78,8 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
private ExecutorService listenerExecutor;
- private final ConcurrentHashMap<NodeType, NodeTypeWatcher> nodeTypeWatchers = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<NodeRole, NodeRoleWatcher> nodeRoleWatchers = new ConcurrentHashMap<>();
+ private final ConcurrentLinkedQueue<NodeDiscoverer> nodeDiscoverers = new ConcurrentLinkedQueue<>();
private final LifecycleLock lifecycleLock = new LifecycleLock();
@@ -84,24 +96,33 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
}
@Override
- public DruidNodeDiscovery getForNodeType(NodeType nodeType)
+ public BooleanSupplier getForNode(DruidNode node, NodeRole nodeRole)
{
- Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
+ Preconditions.checkState(lifecycleLock.isStarted());
+ log.debug("Creating a NodeDiscoverer for node [%s] and role [%s]", node, nodeRole);
+ NodeDiscoverer nodeDiscoverer = new NodeDiscoverer(config, jsonMapper, curatorFramework, node, nodeRole);
+ nodeDiscoverers.add(nodeDiscoverer);
+ return nodeDiscoverer::nodeDiscovered;
+ }
+
+ @Override
+ public DruidNodeDiscovery getForNodeRole(NodeRole nodeRole)
+ {
+ Preconditions.checkState(lifecycleLock.isStarted());
- return nodeTypeWatchers.computeIfAbsent(
- nodeType,
- nType -> {
- log.debug("Creating NodeTypeWatcher for nodeType [%s].", nType);
- NodeTypeWatcher nodeTypeWatcher = new NodeTypeWatcher(
+ return nodeRoleWatchers.computeIfAbsent(
+ nodeRole,
+ role -> {
+ log.debug("Creating NodeRoleWatcher for nodeRole [%s].", role);
+ NodeRoleWatcher nodeRoleWatcher = new NodeRoleWatcher(
listenerExecutor,
curatorFramework,
config.getInternalDiscoveryPath(),
jsonMapper,
- nType
+ role
);
- nodeTypeWatcher.start();
- log.debug("Created NodeTypeWatcher for nodeType [%s].", nType);
- return nodeTypeWatcher;
+ log.debug("Created NodeRoleWatcher for nodeRole [%s].", role);
+ return nodeRoleWatcher;
}
);
}
@@ -128,7 +149,7 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
}
@LifecycleStop
- public void stop()
+ public void stop() throws IOException
{
if (!lifecycleLock.canStop()) {
throw new ISE("can't stop.");
@@ -136,19 +157,20 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
log.debug("Stopping.");
- for (NodeTypeWatcher watcher : nodeTypeWatchers.values()) {
- watcher.stop();
- }
- listenerExecutor.shutdownNow();
+ Closer closer = Closer.create();
+ closer.registerAll(nodeRoleWatchers.values());
+ closer.registerAll(nodeDiscoverers);
+
+ CloseableUtils.closeBoth(closer, listenerExecutor::shutdownNow);
}
- private static class NodeTypeWatcher implements DruidNodeDiscovery
+ private static class NodeRoleWatcher implements DruidNodeDiscovery, Closeable
{
- private static final Logger log = new Logger(NodeTypeWatcher.class);
+ private static final Logger log = new Logger(NodeRoleWatcher.class);
private final CuratorFramework curatorFramework;
- private final NodeType nodeType;
+ private final NodeRole nodeRole;
private final ObjectMapper jsonMapper;
/**
@@ -168,28 +190,45 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
private final CountDownLatch cacheInitialized = new CountDownLatch(1);
- NodeTypeWatcher(
+ NodeRoleWatcher(
ExecutorService listenerExecutor,
CuratorFramework curatorFramework,
String basePath,
ObjectMapper jsonMapper,
- NodeType nodeType
+ NodeRole nodeRole
)
{
this.listenerExecutor = listenerExecutor;
this.curatorFramework = curatorFramework;
- this.nodeType = nodeType;
+ this.nodeRole = nodeRole;
this.jsonMapper = jsonMapper;
- // This is required to be single threaded from Docs in PathChildrenCache;
- this.cacheExecutor = Execs.singleThreaded(StringUtils.format("NodeTypeWatcher[%s]", nodeType));
- this.cache = new PathChildrenCache(
- curatorFramework,
- ZKPaths.makePath(basePath, nodeType.toString()),
- true,
- true,
- cacheExecutor
- );
+ // This is required to be single threaded from docs in PathChildrenCache.
+ this.cacheExecutor = Execs.singleThreaded(StringUtils.format("NodeRoleWatcher[%s]", nodeRole));
+ cache = new PathChildrenCacheFactory.Builder()
+ //NOTE: cacheData is temporarily set to false and we get data directly from ZK on each event.
+ //this is a workaround to solve curator's out-of-order events problem
+ //https://issues.apache.org/jira/browse/CURATOR-191
+ // This is also done in CuratorInventoryManager.
+ .withCacheData(true)
+ .withCompressed(true)
+ .withExecutorService(cacheExecutor)
+ .build()
+ .make(curatorFramework, ZKPaths.makePath(basePath, nodeRole.toString()));
+
+ try {
+ cache.getListenable().addListener((client, event) -> handleChildEvent(event));
+ cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
+ }
+ catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ CloseableUtils.closeBoth(cache, cacheExecutor::shutdownNow);
}
@Override
@@ -205,8 +244,8 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
}
if (!nodeViewInitialized) {
log.info(
- "Cache for process type [%s] not initialized yet; getAllNodes() might not return full information.",
- nodeType.getJsonName()
+ "Cache for node role [%s] not initialized yet; getAllNodes() might not return full information.",
+ nodeRole.getJsonName()
);
}
return unmodifiableNodes;
@@ -236,111 +275,55 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
try {
switch (event.getType()) {
case CHILD_ADDED: {
- final byte[] data;
- try {
- data = curatorFramework.getData().decompressed().forPath(event.getData().getPath());
- }
- catch (Exception ex) {
- log.noStackTrace().error(
- ex,
- "Failed to get data for path [%s]. Ignoring event [%s].",
- event.getData().getPath(),
- event.getType()
- );
- return;
- }
-
- DiscoveryDruidNode druidNode = jsonMapper.readValue(data, DiscoveryDruidNode.class);
-
- if (!nodeType.equals(druidNode.getNodeType())) {
- log.warn(
- "Node[%s] of type[%s] addition ignored due to mismatched type (expected type[%s]).",
- druidNode.getDruidNode().getUriToUse(),
- druidNode.getNodeType().getJsonName(),
- nodeType.getJsonName()
- );
- return;
- }
-
- log.info(
- "Node[%s] of type[%s] detected.",
- druidNode.getDruidNode().getUriToUse(),
- nodeType.getJsonName()
- );
-
- addNode(druidNode);
-
+ childAdded(event);
break;
}
case CHILD_REMOVED: {
- DiscoveryDruidNode druidNode = jsonMapper.readValue(event.getData().getData(), DiscoveryDruidNode.class);
-
- if (!nodeType.equals(druidNode.getNodeType())) {
- log.warn(
- "Node[%s] of type[%s] removal ignored due to mismatched type (expected type[%s]).",
- druidNode.getDruidNode().getUriToUse(),
- druidNode.getNodeType().getJsonName(),
- nodeType.getJsonName()
- );
- return;
- }
-
- log.info(
- "Node[%s] of type[%s] went offline.",
- druidNode.getDruidNode().getUriToUse(),
- nodeType.getJsonName()
- );
-
- removeNode(druidNode);
-
+ childRemoved(event);
break;
}
case INITIALIZED: {
- // No need to wait on CountDownLatch, because we are holding the lock under which it could only be
- // counted down.
- if (cacheInitialized.getCount() == 0) {
- log.warn("cache is already initialized. ignoring [%s] event.", event.getType());
- return;
- }
-
- log.info("Node watcher of type[%s] is now initialized.", nodeType.getJsonName());
-
- for (Listener listener : nodeListeners) {
- safeSchedule(
- () -> {
- listener.nodesAdded(unmodifiableNodes);
- listener.nodeViewInitialized();
- },
- "Exception occured in nodesAdded([%s]) in listener [%s].",
- unmodifiableNodes,
- listener
- );
- }
-
- cacheInitialized.countDown();
+ cacheInitialized();
break;
}
default: {
- log.warn("Ignored event type[%s] for node watcher of type[%s].", event.getType(), nodeType.getJsonName());
+ log.warn("Ignored event type[%s] for node watcher of role[%s].", event.getType(), nodeRole.getJsonName());
}
}
}
catch (Exception ex) {
- log.error(ex, "Unknown error in node watcher of type[%s].", nodeType.getJsonName());
+ log.error(ex, "Unknown error in node watcher of role[%s].", nodeRole.getJsonName());
}
}
}
- private void safeSchedule(Runnable runnable, String errMsgFormat, Object... args)
+ @GuardedBy("lock")
+ void childAdded(PathChildrenCacheEvent event) throws IOException
{
- listenerExecutor.submit(() -> {
- try {
- runnable.run();
- }
- catch (Exception ex) {
- log.error(ex, errMsgFormat, args);
- }
- });
+ final byte[] data = getZkDataForNode(event.getData());
+ if (data == null) {
+ log.error(
+ "Failed to get data for path [%s]. Ignoring a child addition event.",
+ event.getData().getPath()
+ );
+ return;
+ }
+
+ DiscoveryDruidNode druidNode = jsonMapper.readValue(data, DiscoveryDruidNode.class);
+
+ if (!nodeRole.equals(druidNode.getNodeRole())) {
+ log.error(
+ "Node[%s] of role[%s] addition ignored due to mismatched role (expected role[%s]).",
+ druidNode.getDruidNode().getUriToUse(),
+ druidNode.getNodeRole().getJsonName(),
+ nodeRole.getJsonName()
+ );
+ return;
+ }
+
+ log.info("Node[%s] of role[%s] detected.", druidNode.getDruidNode().getUriToUse(), nodeRole.getJsonName());
+
+ addNode(druidNode);
}
@GuardedBy("lock")
@@ -361,25 +344,51 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
}
}
} else {
- log.warn(
- "Node[%s:%s] discovered but existed already [%s].",
- druidNode.getDruidNode().getHostAndPortToUse(),
- druidNode,
+ log.error(
+ "Node[%s] of role[%s] discovered but existed already [%s].",
+ druidNode.getDruidNode().getUriToUse(),
+ nodeRole.getJsonName(),
prev
);
}
}
@GuardedBy("lock")
+ private void childRemoved(PathChildrenCacheEvent event) throws IOException
+ {
+ final byte[] data = event.getData().getData();
+ if (data == null) {
+ log.error("Failed to get data for path [%s]. Ignoring a child removal event.", event.getData().getPath());
+ return;
+ }
+
+ DiscoveryDruidNode druidNode = jsonMapper.readValue(data, DiscoveryDruidNode.class);
+
+ if (!nodeRole.equals(druidNode.getNodeRole())) {
+ log.error(
+ "Node[%s] of role[%s] removal ignored due to mismatched role (expected role[%s]).",
+ druidNode.getDruidNode().getUriToUse(),
+ druidNode.getNodeRole().getJsonName(),
+ nodeRole.getJsonName()
+ );
+ return;
+ }
+
+ log.info("Node[%s] of role[%s] went offline.", druidNode.getDruidNode().getUriToUse(), nodeRole.getJsonName());
+
+ removeNode(druidNode);
+ }
+
+ @GuardedBy("lock")
private void removeNode(DiscoveryDruidNode druidNode)
{
DiscoveryDruidNode prev = nodes.remove(druidNode.getDruidNode().getHostAndPortToUse());
if (prev == null) {
- log.warn(
- "Noticed disappearance of unknown druid node [%s:%s].",
- druidNode.getDruidNode().getHostAndPortToUse(),
- druidNode
+ log.error(
+ "Noticed disappearance of unknown druid node [%s] of role[%s].",
+ druidNode.getDruidNode().getUriToUse(),
+ druidNode.getNodeRole().getJsonName()
);
return;
}
@@ -390,34 +399,138 @@ public class CuratorDruidNodeDiscoveryProvider extends DruidNodeDiscoveryProvide
for (Listener listener : nodeListeners) {
safeSchedule(
() -> listener.nodesRemoved(nodeRemoved),
- "Exception occured in nodeRemoved(node=[%s]) in listener [%s].",
- druidNode.getDruidNode().getHostAndPortToUse(),
+ "Exception occured in nodeRemoved(node[%s] of role[%s]) in listener [%s].",
+ druidNode.getDruidNode().getUriToUse(),
+ druidNode.getNodeRole().getJsonName(),
listener
);
}
}
}
- public void start()
+ /**
+ * Doing this instead of a simple call to {@link ChildData#getData()} because data cache is turned off, see a
+ * comment in {@link #NodeRoleWatcher}.
+ */
+ @Nullable
+ private byte[] getZkDataForNode(ChildData child)
{
try {
- cache.getListenable().addListener((client, event) -> handleChildEvent(event));
- cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
+ return curatorFramework.getData().decompressed().forPath(child.getPath());
}
catch (Exception ex) {
- throw new RuntimeException(ex);
+ log.error(ex, "Exception while getting data for node %s", child.getPath());
+ return null;
+ }
+ }
+
+ @GuardedBy("lock")
+ private void cacheInitialized()
+ {
+ // No need to wait on CountDownLatch, because we are holding the lock under which it could only be
+ // counted down.
+ if (cacheInitialized.getCount() == 0) {
+ log.error("cache is already initialized. ignoring cache initialization event.");
+ return;
+ }
+
+ log.info("Node watcher of role[%s] is now initialized.", nodeRole.getJsonName());
+
+ for (Listener listener : nodeListeners) {
+ safeSchedule(
+ () -> {
+ listener.nodesAdded(unmodifiableNodes);
+ listener.nodeViewInitialized();
+ },
+ "Exception occured in nodesAdded([%s]) in listener [%s].",
+ unmodifiableNodes,
+ listener
+ );
+ }
+
+ cacheInitialized.countDown();
+ }
+
+ private void safeSchedule(Runnable runnable, String errMsgFormat, Object... args)
+ {
+ listenerExecutor.submit(() -> {
+ try {
+ runnable.run();
+ }
+ catch (Exception ex) {
+ log.error(errMsgFormat, args);
+ }
+ });
+ }
+ }
+
+ private static class NodeDiscoverer implements Closeable
+ {
+ private final ObjectMapper jsonMapper;
+ private final NodeCache nodeCache;
+ private final NodeRole nodeRole;
+
+ private NodeDiscoverer(
+ ZkPathsConfig config,
+ ObjectMapper jsonMapper,
+ CuratorFramework curatorFramework,
+ DruidNode node,
+ NodeRole nodeRole
+ )
+ {
+ this.jsonMapper = jsonMapper;
+ String path = CuratorDruidNodeAnnouncer.makeNodeAnnouncementPath(config, nodeRole, node);
+ nodeCache = new NodeCache(curatorFramework, path, true);
+ this.nodeRole = nodeRole;
+
+ try {
+ nodeCache.start(true /* buildInitial */);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
}
}
- public void stop()
+ private boolean nodeDiscovered()
{
+ @Nullable ChildData currentChild = nodeCache.getCurrentData();
+ if (currentChild == null) {
+ // Not discovered yet.
+ return false;
+ }
+
+ final byte[] data = currentChild.getData();
+
+ DiscoveryDruidNode druidNode;
try {
- cache.close();
- cacheExecutor.shutdownNow();
+ druidNode = jsonMapper.readValue(data, DiscoveryDruidNode.class);
}
- catch (Exception ex) {
- log.error(ex, "Failed to stop node watcher for type [%s].", nodeType);
+ catch (IOException e) {
+ log.error(e, "Exception occurred when reading node's value");
+ return false;
+ }
+
+ if (!nodeRole.equals(druidNode.getNodeRole())) {
+ log.error(
+ "Node[%s] of role[%s] add is discovered by node watcher of different node role. Ignored.",
+ druidNode.getDruidNode().getUriToUse(),
+ druidNode.getNodeRole().getJsonName()
+ );
+ return false;
}
+
+ log.info(
+ "Node[%s] of role[%s] appeared.",
+ druidNode.getDruidNode().getUriToUse(),
+ druidNode.getNodeRole().getJsonName()
+ );
+ return true;
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ nodeCache.close();
}
}
}
diff --git a/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java b/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java
index 5dd53da..fd89964 100644
--- a/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java
+++ b/server/src/main/java/org/apache/druid/curator/discovery/DiscoveryModule.java
@@ -195,8 +195,10 @@ public class DiscoveryModule implements Module
PolyBind.optionBinder(binder, Key.get(DruidLeaderSelector.class, Coordinator.class))
.addBinding(CURATOR_KEY)
- .toProvider(new DruidLeaderSelectorProvider(
- (zkPathsConfig) -> ZKPaths.makePath(zkPathsConfig.getCoordinatorPath(), "_COORDINATOR"))
+ .toProvider(
+ new DruidLeaderSelectorProvider(
+ zkPathsConfig -> ZKPaths.makePath(zkPathsConfig.getCoordinatorPath(), "_COORDINATOR")
+ )
)
.in(LazySingleton.class);
@@ -204,7 +206,7 @@ public class DiscoveryModule implements Module
.addBinding(CURATOR_KEY)
.toProvider(
new DruidLeaderSelectorProvider(
- (zkPathsConfig) -> ZKPaths.makePath(zkPathsConfig.getOverlordPath(), "_OVERLORD")
+ zkPathsConfig -> ZKPaths.makePath(zkPathsConfig.getOverlordPath(), "_OVERLORD")
)
)
.in(LazySingleton.class);
diff --git a/server/src/main/java/org/apache/druid/curator/inventory/CuratorInventoryManager.java b/server/src/main/java/org/apache/druid/curator/inventory/CuratorInventoryManager.java
index 9032bca..88c9d8f 100644
--- a/server/src/main/java/org/apache/druid/curator/inventory/CuratorInventoryManager.java
+++ b/server/src/main/java/org/apache/druid/curator/inventory/CuratorInventoryManager.java
@@ -33,6 +33,7 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collection;
import java.util.Set;
@@ -70,7 +71,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
private final PathChildrenCacheFactory cacheFactory;
private final ExecutorService pathChildrenCacheExecutor;
- private volatile PathChildrenCache childrenCache;
+ private volatile @Nullable PathChildrenCache childrenCache;
public CuratorInventoryManager(
CuratorFramework curatorFramework,
@@ -91,6 +92,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
//NOTE: cacheData is temporarily set to false and we get data directly from ZK on each event.
//this is a workaround to solve curator's out-of-order events problem
//https://issues.apache.org/jira/browse/CURATOR-191
+ // This is also done in CuratorDruidNodeDiscoveryProvider.
.withCacheData(false)
.withCompressed(true)
.withExecutorService(pathChildrenCacheExecutor)
@@ -101,12 +103,14 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
@LifecycleStart
public void start() throws Exception
{
+ PathChildrenCache childrenCache;
synchronized (lock) {
+ childrenCache = this.childrenCache;
if (childrenCache != null) {
return;
}
- childrenCache = cacheFactory.make(curatorFramework, config.getContainerPath());
+ this.childrenCache = childrenCache = cacheFactory.make(curatorFramework, config.getContainerPath());
}
childrenCache.getListenable().addListener(new ContainerCacheListener());
@@ -131,13 +135,14 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
public void stop() throws IOException
{
synchronized (lock) {
+ PathChildrenCache childrenCache = this.childrenCache;
if (childrenCache == null) {
return;
}
// This close() call actually calls shutdownNow() on the executor registered with the Cache object...
childrenCache.close();
- childrenCache = null;
+ this.childrenCache = null;
}
Closer closer = Closer.create();
@@ -157,6 +162,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
return config;
}
+ @Nullable
public ContainerClass getInventoryValue(String containerKey)
{
final ContainerHolder containerHolder = containers.get(containerKey);
@@ -171,13 +177,18 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
.collect(Collectors.toList());
}
- private byte[] getZkDataForNode(String path)
+ /**
+ * Doing this instead of a simple call to {@link ChildData#getData()} because data cache is turned off, see a comment
+ * in {@link #CuratorInventoryManager}.
+ */
+ @Nullable
+ private byte[] getZkDataForNode(ChildData child)
{
try {
- return curatorFramework.getData().decompressed().forPath(path);
+ return curatorFramework.getData().decompressed().forPath(child.getPath());
}
catch (Exception ex) {
- log.warn(ex, "Exception while getting data for node %s", path);
+ log.warn(ex, "Exception while getting data for node %s", child.getPath());
return null;
}
}
@@ -193,7 +204,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
PathChildrenCache cache
)
{
- this.container = new AtomicReference<ContainerClass>(container);
+ this.container = new AtomicReference<>(container);
this.cache = cache;
}
@@ -226,7 +237,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
synchronized (lock) {
final ChildData child = event.getData();
- byte[] data = getZkDataForNode(child.getPath());
+ byte[] data = getZkDataForNode(child);
if (data == null) {
log.warn("Ignoring event: Type - %s , Path - %s , Version - %s",
event.getType(),
@@ -285,7 +296,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
synchronized (lock) {
final ChildData child = event.getData();
- byte[] data = getZkDataForNode(child.getPath());
+ byte[] data = getZkDataForNode(child);
if (data == null) {
log.warn(
"Ignoring event: Type - %s , Path - %s , Version - %s",
@@ -356,13 +367,10 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
private class InventoryCacheListener implements PathChildrenCacheListener
{
private final String containerKey;
- private final String inventoryPath;
public InventoryCacheListener(String containerKey, String inventoryPath)
{
this.containerKey = containerKey;
- this.inventoryPath = inventoryPath;
-
log.debug("Created new InventoryCacheListener for %s", inventoryPath);
}
@@ -378,7 +386,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
case CHILD_ADDED: {
final ChildData child = event.getData();
- byte[] data = getZkDataForNode(child.getPath());
+ byte[] data = getZkDataForNode(child);
if (data == null) {
log.warn("Ignoring event: Type - %s , Path - %s , Version - %s",
event.getType(),
@@ -401,7 +409,7 @@ public class CuratorInventoryManager<ContainerClass, InventoryClass>
case CHILD_UPDATED: {
final ChildData child = event.getData();
- byte[] data = getZkDataForNode(child.getPath());
+ byte[] data = getZkDataForNode(child);
if (data == null) {
log.warn("Ignoring event: Type - %s , Path - %s , Version - %s",
event.getType(),
diff --git a/server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java b/server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java
index a8d2d14..eaef716 100644
--- a/server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java
+++ b/server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java
@@ -30,14 +30,14 @@ import java.util.Objects;
/**
* Representation of all information related to discovery of a node and all the other metadata associated with
- * the node per nodeType such as broker, historical etc.
- * Note that one Druid process might announce multiple DiscoveryDruidNode if it acts as multiple nodeTypes e.g.
- * coordinator would announce DiscoveryDruidNode for overlord nodeType as well when acting as overlord.
+ * the node per nodeRole such as broker, historical etc.
+ * Note that one Druid process might announce multiple DiscoveryDruidNode if it acts in multiple {@link NodeRole}s e. g.
+ * Coordinator would announce DiscoveryDruidNode for {@link NodeRole#OVERLORD} as well when acting as Overlord.
*/
public class DiscoveryDruidNode
{
private final DruidNode druidNode;
- private final NodeType nodeType;
+ private final NodeRole nodeRole;
// Other metadata associated with the node e.g.
// if its a historical node then lookup information, segment loading capacity etc.
@@ -46,12 +46,12 @@ public class DiscoveryDruidNode
@JsonCreator
public DiscoveryDruidNode(
@JsonProperty("druidNode") DruidNode druidNode,
- @JsonProperty("nodeType") NodeType nodeType,
+ @JsonProperty("nodeType") NodeRole nodeRole,
@JsonProperty("services") Map<String, DruidService> services
)
{
this.druidNode = druidNode;
- this.nodeType = nodeType;
+ this.nodeRole = nodeRole;
if (services != null && !services.isEmpty()) {
this.services.putAll(services);
@@ -64,10 +64,14 @@ public class DiscoveryDruidNode
return services;
}
- @JsonProperty
- public NodeType getNodeType()
+ /**
+ * Keeping the legacy name 'nodeType' property name for backward compatibility. When the project is updated to
+ * Jackson 2.9 it could be changed, see https://github.com/apache/incubator-druid/issues/7152.
+ */
+ @JsonProperty("nodeType")
+ public NodeRole getNodeRole()
{
- return nodeType;
+ return nodeRole;
}
@JsonProperty
@@ -100,14 +104,14 @@ public class DiscoveryDruidNode
}
DiscoveryDruidNode that = (DiscoveryDruidNode) o;
return Objects.equals(druidNode, that.druidNode) &&
- Objects.equals(nodeType, that.nodeType) &&
+ Objects.equals(nodeRole, that.nodeRole) &&
Objects.equals(services, that.services);
}
@Override
public int hashCode()
{
- return Objects.hash(druidNode, nodeType, services);
+ return Objects.hash(druidNode, nodeRole, services);
}
@Override
@@ -115,7 +119,7 @@ public class DiscoveryDruidNode
{
return "DiscoveryDruidNode{" +
"druidNode=" + druidNode +
- ", nodeType='" + nodeType + '\'' +
+ ", nodeRole='" + nodeRole + '\'' +
", services=" + services +
'}';
}
diff --git a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
index 7fb42f5..e756103 100644
--- a/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
+++ b/server/src/main/java/org/apache/druid/discovery/DruidLeaderClient.java
@@ -68,7 +68,7 @@ public class DruidLeaderClient
private final HttpClient httpClient;
private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
- private final NodeType nodeTypeToWatch;
+ private final NodeRole nodeRoleToWatch;
private final String leaderRequestPath;
@@ -82,14 +82,14 @@ public class DruidLeaderClient
public DruidLeaderClient(
HttpClient httpClient,
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
- NodeType nodeTypeToWatch,
+ NodeRole nodeRoleToWatch,
String leaderRequestPath,
ServerDiscoverySelector serverDiscoverySelector
)
{
this.httpClient = httpClient;
this.druidNodeDiscoveryProvider = druidNodeDiscoveryProvider;
- this.nodeTypeToWatch = nodeTypeToWatch;
+ this.nodeRoleToWatch = nodeRoleToWatch;
this.leaderRequestPath = leaderRequestPath;
this.serverDiscoverySelector = serverDiscoverySelector;
}
@@ -102,7 +102,7 @@ public class DruidLeaderClient
}
try {
- druidNodeDiscovery = druidNodeDiscoveryProvider.getForNodeType(nodeTypeToWatch);
+ druidNodeDiscovery = druidNodeDiscoveryProvider.getForNodeRole(nodeRoleToWatch);
lifecycleLock.started();
log.debug("Started.");
}
diff --git a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscovery.java b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscovery.java
index d9148c3..a1e65af 100644
--- a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscovery.java
+++ b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscovery.java
@@ -22,7 +22,7 @@ package org.apache.druid.discovery;
import java.util.Collection;
/**
- * Interface for discovering Druid Nodes announced by DruidNodeAnnouncer.
+ * Interface for discovering Druid nodes announced by {@link DruidNodeAnnouncer}.
*/
public interface DruidNodeDiscovery
{
@@ -30,9 +30,9 @@ public interface DruidNodeDiscovery
void registerListener(Listener listener);
/**
- * Listener for watching nodes in a DruidNodeDiscovery instance obtained via DruidNodeDiscoveryProvider.getXXX().
- * DruidNodeDiscovery implementation should assume that Listener is not threadsafe and never call methods in
- * Listener concurrently.
+ * Listener for watching nodes in a DruidNodeDiscovery instance obtained via {@link
+ * DruidNodeDiscoveryProvider}.getXXX(). DruidNodeDiscovery implementation should assume that Listener is not
+ * thread-safe and never call methods in Listener concurrently.
*
* Implementation of Listener must ensure to not do any time consuming work or block in any of the methods.
*/
diff --git a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
index ee9a2e8..7332029 100644
--- a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
+++ b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.DruidNode;
import java.util.ArrayList;
import java.util.Collection;
@@ -33,25 +34,27 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.function.BooleanSupplier;
/**
* Provider of {@link DruidNodeDiscovery} instances.
*/
public abstract class DruidNodeDiscoveryProvider
{
- private static final Map<String, Set<NodeType>> SERVICE_TO_NODE_TYPES = ImmutableMap.of(
- LookupNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeType.BROKER, NodeType.HISTORICAL, NodeType.PEON, NodeType.INDEXER),
- DataNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeType.HISTORICAL, NodeType.PEON, NodeType.INDEXER),
- WorkerNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeType.MIDDLE_MANAGER, NodeType.INDEXER)
+ private static final Map<String, Set<NodeRole>> SERVICE_TO_NODE_TYPES = ImmutableMap.of(
+ LookupNodeService.DISCOVERY_SERVICE_KEY,
+ ImmutableSet.of(NodeRole.BROKER, NodeRole.HISTORICAL, NodeRole.PEON, NodeRole.INDEXER),
+ DataNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeRole.HISTORICAL, NodeRole.PEON, NodeRole.INDEXER),
+ WorkerNodeService.DISCOVERY_SERVICE_KEY, ImmutableSet.of(NodeRole.MIDDLE_MANAGER, NodeRole.INDEXER)
);
private final ConcurrentHashMap<String, ServiceDruidNodeDiscovery> serviceDiscoveryMap =
new ConcurrentHashMap<>(SERVICE_TO_NODE_TYPES.size());
- /**
- * Get DruidNodeDiscovery instance to discover nodes of given nodeType.
- */
- public abstract DruidNodeDiscovery getForNodeType(NodeType nodeType);
+ public abstract BooleanSupplier getForNode(DruidNode node, NodeRole nodeRole);
+
+ /** Get a {@link DruidNodeDiscovery} instance to discover nodes of the given node role. */
+ public abstract DruidNodeDiscovery getForNodeRole(NodeRole nodeRole);
/**
* Get DruidNodeDiscovery instance to discover nodes that announce given service in its metadata.
@@ -62,15 +65,15 @@ public abstract class DruidNodeDiscoveryProvider
serviceName,
service -> {
- Set<NodeType> nodeTypesToWatch = DruidNodeDiscoveryProvider.SERVICE_TO_NODE_TYPES.get(service);
- if (nodeTypesToWatch == null) {
+ Set<NodeRole> nodeRolesToWatch = DruidNodeDiscoveryProvider.SERVICE_TO_NODE_TYPES.get(service);
+ if (nodeRolesToWatch == null) {
throw new IAE("Unknown service [%s].", service);
}
- ServiceDruidNodeDiscovery serviceDiscovery = new ServiceDruidNodeDiscovery(service, nodeTypesToWatch.size());
+ ServiceDruidNodeDiscovery serviceDiscovery = new ServiceDruidNodeDiscovery(service, nodeRolesToWatch.size());
DruidNodeDiscovery.Listener filteringGatheringUpstreamListener =
serviceDiscovery.filteringUpstreamListener();
- for (NodeType nodeType : nodeTypesToWatch) {
- getForNodeType(nodeType).registerListener(filteringGatheringUpstreamListener);
+ for (NodeRole nodeRole : nodeRolesToWatch) {
+ getForNodeRole(nodeRole).registerListener(filteringGatheringUpstreamListener);
}
return serviceDiscovery;
}
@@ -89,13 +92,13 @@ public abstract class DruidNodeDiscoveryProvider
private final Object lock = new Object();
- private int uninitializedNodeTypes;
+ private int uninitializedNodeRoles;
- ServiceDruidNodeDiscovery(String service, int watchedNodeTypes)
+ ServiceDruidNodeDiscovery(String service, int watchedNodeRoles)
{
- Preconditions.checkArgument(watchedNodeTypes > 0);
+ Preconditions.checkArgument(watchedNodeRoles > 0);
this.service = service;
- this.uninitializedNodeTypes = watchedNodeTypes;
+ this.uninitializedNodeRoles = watchedNodeRoles;
}
@Override
@@ -114,7 +117,7 @@ public abstract class DruidNodeDiscoveryProvider
if (!unmodifiableNodes.isEmpty()) {
listener.nodesAdded(unmodifiableNodes);
}
- if (uninitializedNodeTypes == 0) {
+ if (uninitializedNodeRoles == 0) {
listener.nodeViewInitialized();
}
listeners.add(listener);
@@ -128,7 +131,7 @@ public abstract class DruidNodeDiscoveryProvider
/**
* Listens for all node updates and filters them based on {@link #service}. Note: this listener is registered with
- * the objects returned from {@link #getForNodeType(NodeType)}, NOT with {@link ServiceDruidNodeDiscovery} itself.
+ * the objects returned from {@link #getForNodeRole(NodeRole)}, NOT with {@link ServiceDruidNodeDiscovery} itself.
*/
class FilteringUpstreamListener implements DruidNodeDiscovery.Listener
{
@@ -203,12 +206,12 @@ public abstract class DruidNodeDiscoveryProvider
public void nodeViewInitialized()
{
synchronized (lock) {
- if (uninitializedNodeTypes == 0) {
+ if (uninitializedNodeRoles == 0) {
log.error("Unexpected call of nodeViewInitialized()");
return;
}
- uninitializedNodeTypes--;
- if (uninitializedNodeTypes == 0) {
+ uninitializedNodeRoles--;
+ if (uninitializedNodeRoles == 0) {
for (Listener listener : listeners) {
try {
listener.nodeViewInitialized();
diff --git a/server/src/main/java/org/apache/druid/discovery/NodeType.java b/server/src/main/java/org/apache/druid/discovery/NodeRole.java
similarity index 73%
rename from server/src/main/java/org/apache/druid/discovery/NodeType.java
rename to server/src/main/java/org/apache/druid/discovery/NodeRole.java
index 1f6f90d..564d19f 100644
--- a/server/src/main/java/org/apache/druid/discovery/NodeType.java
+++ b/server/src/main/java/org/apache/druid/discovery/NodeRole.java
@@ -22,13 +22,16 @@ package org.apache.druid.discovery;
import com.fasterxml.jackson.annotation.JsonValue;
/**
- *
* This is a historical occasion that this enum is different from {@link
- * org.apache.druid.server.coordination.ServerType} because they are essentially the same abstraction, but merging them
- * could only increase the complexity and drop the code safety, because they name the same types differently ("peon" -
- * "indexer-executor" and "middleManager" - "realtime") and both expose them via JSON APIs.
+ * org.apache.druid.server.coordination.ServerType} (also called "node type" in various places) because they are
+ * essentially the same abstraction, but merging them could only increase the complexity and drop the code safety,
+ * because they name the same types differently ("peon" - "indexer-executor" and "middleManager" - "realtime") and both
+ * expose them via JSON APIs.
+ *
+ * These abstractions can probably be merged when Druid updates to Jackson 2.9 that supports JsonAliases, see
+ * see https://github.com/apache/incubator-druid/issues/7152.
*/
-public enum NodeType
+public enum NodeRole
{
COORDINATOR("coordinator"),
HISTORICAL("historical"),
@@ -41,7 +44,7 @@ public enum NodeType
private final String jsonName;
- NodeType(String jsonName)
+ NodeRole(String jsonName)
{
this.jsonName = jsonName;
}
diff --git a/server/src/main/java/org/apache/druid/guice/CoordinatorDiscoveryModule.java b/server/src/main/java/org/apache/druid/guice/CoordinatorDiscoveryModule.java
index d4ac2c4..b90a9b5 100644
--- a/server/src/main/java/org/apache/druid/guice/CoordinatorDiscoveryModule.java
+++ b/server/src/main/java/org/apache/druid/guice/CoordinatorDiscoveryModule.java
@@ -28,7 +28,7 @@ import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
import org.apache.druid.curator.discovery.ServerDiscoverySelector;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
-import org.apache.druid.discovery.NodeType;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.java.util.http.client.HttpClient;
@@ -65,7 +65,7 @@ public class CoordinatorDiscoveryModule implements Module
return new DruidLeaderClient(
httpClient,
druidNodeDiscoveryProvider,
- NodeType.COORDINATOR,
+ NodeRole.COORDINATOR,
"/druid/coordinator/v1/leader",
serverDiscoverySelector
);
diff --git a/server/src/main/java/org/apache/druid/guice/IndexingServiceDiscoveryModule.java b/server/src/main/java/org/apache/druid/guice/IndexingServiceDiscoveryModule.java
index 05b76ed..3c4f63c 100644
--- a/server/src/main/java/org/apache/druid/guice/IndexingServiceDiscoveryModule.java
+++ b/server/src/main/java/org/apache/druid/guice/IndexingServiceDiscoveryModule.java
@@ -28,7 +28,7 @@ import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
import org.apache.druid.curator.discovery.ServerDiscoverySelector;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
-import org.apache.druid.discovery.NodeType;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.java.util.http.client.HttpClient;
@@ -65,7 +65,7 @@ public class IndexingServiceDiscoveryModule implements Module
return new DruidLeaderClient(
httpClient,
druidNodeDiscoveryProvider,
- NodeType.OVERLORD,
+ NodeRole.OVERLORD,
"/druid/indexer/v1/leader",
serverDiscoverySelector
);
diff --git a/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java b/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java
index 8093f1d..fdce8e8 100644
--- a/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java
+++ b/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java
@@ -42,8 +42,9 @@ import java.util.concurrent.ExecutorService;
/**
* This module is used to fulfill dependency injection of query processing and caching resources: buffer pools and
* thread pools on Router Druid node type. Router needs to inject those resources, because it depends on
- * {@link org.apache.druid.query.QueryToolChest}s, and they couple query type aspects not related to processing and caching,
- * which Router uses, and related to processing and caching, which Router doesn't use, but they inject the resources.
+ * {@link org.apache.druid.query.QueryToolChest}s, and they couple query type aspects not related to processing and
+ * caching, which Router uses, and related to processing and caching, which Router doesn't use, but they inject the
+ * resources.
*/
public class RouterProcessingModule implements Module
{
diff --git a/server/src/main/java/org/apache/druid/guice/NodeTypeConfig.java b/server/src/main/java/org/apache/druid/guice/ServerTypeConfig.java
similarity index 81%
rename from server/src/main/java/org/apache/druid/guice/NodeTypeConfig.java
rename to server/src/main/java/org/apache/druid/guice/ServerTypeConfig.java
index a8039a1..ee20650 100644
--- a/server/src/main/java/org/apache/druid/guice/NodeTypeConfig.java
+++ b/server/src/main/java/org/apache/druid/guice/ServerTypeConfig.java
@@ -23,19 +23,19 @@ import org.apache.druid.server.coordination.ServerType;
/**
*/
-public class NodeTypeConfig
+public class ServerTypeConfig
{
- private final ServerType nodeType;
+ private final ServerType serverType;
- public NodeTypeConfig(
- ServerType nodeType
+ public ServerTypeConfig(
+ ServerType serverType
)
{
- this.nodeType = nodeType;
+ this.serverType = serverType;
}
- public ServerType getNodeType()
+ public ServerType getServerType()
{
- return nodeType;
+ return serverType;
}
}
diff --git a/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java b/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java
index 542290f..a1770a9 100644
--- a/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java
+++ b/server/src/main/java/org/apache/druid/guice/StorageNodeModule.java
@@ -45,16 +45,20 @@ public class StorageNodeModule implements Module
JsonConfigProvider.bind(binder, "druid.server", DruidServerConfig.class);
JsonConfigProvider.bind(binder, "druid.segmentCache", SegmentLoaderConfig.class);
- binder.bind(NodeTypeConfig.class).toProvider(Providers.of(null));
+ binder.bind(ServerTypeConfig.class).toProvider(Providers.of(null));
binder.bind(ColumnConfig.class).to(DruidProcessingConfig.class);
}
@Provides
@LazySingleton
- public DruidServerMetadata getMetadata(@Self DruidNode node, @Nullable NodeTypeConfig nodeType, DruidServerConfig config)
+ public DruidServerMetadata getMetadata(
+ @Self DruidNode node,
+ @Nullable ServerTypeConfig serverTypeConfig,
+ DruidServerConfig config
+ )
{
- if (nodeType == null) {
- throw new ProvisionException("Must override the binding for NodeTypeConfig if you want a DruidServerMetadata.");
+ if (serverTypeConfig == null) {
+ throw new ProvisionException("Must override the binding for ServerTypeConfig if you want a DruidServerMetadata.");
}
return new DruidServerMetadata(
@@ -62,7 +66,7 @@ public class StorageNodeModule implements Module
node.getHostAndPort(),
node.getHostAndTlsPort(),
config.getMaxSize(),
- nodeType.getNodeType(),
+ serverTypeConfig.getServerType(),
config.getTier(),
config.getPriority()
);
@@ -70,19 +74,16 @@ public class StorageNodeModule implements Module
@Provides
@LazySingleton
- public DataNodeService getDataNodeService(
- @Nullable NodeTypeConfig nodeType,
- DruidServerConfig config
- )
+ public DataNodeService getDataNodeService(@Nullable ServerTypeConfig serverTypeConfig, DruidServerConfig config)
{
- if (nodeType == null) {
- throw new ProvisionException("Must override the binding for NodeTypeConfig if you want a DruidServerMetadata.");
+ if (serverTypeConfig == null) {
+ throw new ProvisionException("Must override the binding for ServerTypeConfig if you want a DruidServerMetadata.");
}
return new DataNodeService(
config.getTier(),
config.getMaxSize(),
- nodeType.getNodeType(),
+ serverTypeConfig.getServerType(),
config.getPriority()
);
}
diff --git a/server/src/main/java/org/apache/druid/server/coordination/ServerType.java b/server/src/main/java/org/apache/druid/server/coordination/ServerType.java
index d1b7fd1..df25fb3 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/ServerType.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/ServerType.java
@@ -21,6 +21,7 @@ package org.apache.druid.server.coordination;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.java.util.common.StringUtils;
/**
@@ -36,10 +37,13 @@ import org.apache.druid.java.util.common.StringUtils;
* which is the format expected for the server type string prior to the patch that introduced ServerType:
* https://github.com/apache/incubator-druid/pull/4148
*
- * This is a historical occasion that this enum is different from {@link org.apache.druid.discovery.NodeType} because
+ * This is a historical occasion that this enum is different from {@link NodeRole} because
* they are essentially the same abstraction, but merging them could only increase the complexity and drop the code
* safety, because they name the same types differently ("indexer-executor" - "peon" and "realtime" - "middleManager")
* and both expose them via JSON APIs.
+ *
+ * These abstractions can probably be merged when Druid updates to Jackson 2.9 that supports JsonAliases, see
+ * see https://github.com/apache/incubator-druid/issues/7152.
*/
public enum ServerType
{
diff --git a/server/src/main/java/org/apache/druid/server/http/ClusterResource.java b/server/src/main/java/org/apache/druid/server/http/ClusterResource.java
index 926b90f..eabf51e 100644
--- a/server/src/main/java/org/apache/druid/server/http/ClusterResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/ClusterResource.java
@@ -28,7 +28,7 @@ import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
-import org.apache.druid.discovery.NodeType;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.http.security.StateResourceFilter;
@@ -62,26 +62,26 @@ public class ClusterResource
@Produces(MediaType.APPLICATION_JSON)
public Response getClusterServers(@QueryParam("full") boolean full)
{
- ImmutableMap.Builder<NodeType, Object> entityBuilder = new ImmutableMap.Builder<>();
+ ImmutableMap.Builder<NodeRole, Object> entityBuilder = new ImmutableMap.Builder<>();
- entityBuilder.put(NodeType.COORDINATOR, getNodes(NodeType.COORDINATOR, full));
- entityBuilder.put(NodeType.OVERLORD, getNodes(NodeType.OVERLORD, full));
- entityBuilder.put(NodeType.BROKER, getNodes(NodeType.BROKER, full));
- entityBuilder.put(NodeType.HISTORICAL, getNodes(NodeType.HISTORICAL, full));
+ entityBuilder.put(NodeRole.COORDINATOR, getNodes(NodeRole.COORDINATOR, full));
+ entityBuilder.put(NodeRole.OVERLORD, getNodes(NodeRole.OVERLORD, full));
+ entityBuilder.put(NodeRole.BROKER, getNodes(NodeRole.BROKER, full));
+ entityBuilder.put(NodeRole.HISTORICAL, getNodes(NodeRole.HISTORICAL, full));
- Collection<Object> mmNodes = getNodes(NodeType.MIDDLE_MANAGER, full);
+ Collection<Object> mmNodes = getNodes(NodeRole.MIDDLE_MANAGER, full);
if (!mmNodes.isEmpty()) {
- entityBuilder.put(NodeType.MIDDLE_MANAGER, mmNodes);
+ entityBuilder.put(NodeRole.MIDDLE_MANAGER, mmNodes);
}
- Collection<Object> indexerNodes = getNodes(NodeType.INDEXER, full);
+ Collection<Object> indexerNodes = getNodes(NodeRole.INDEXER, full);
if (!indexerNodes.isEmpty()) {
- entityBuilder.put(NodeType.INDEXER, indexerNodes);
+ entityBuilder.put(NodeRole.INDEXER, indexerNodes);
}
- Collection<Object> routerNodes = getNodes(NodeType.ROUTER, full);
+ Collection<Object> routerNodes = getNodes(NodeRole.ROUTER, full);
if (!routerNodes.isEmpty()) {
- entityBuilder.put(NodeType.ROUTER, routerNodes);
+ entityBuilder.put(NodeRole.ROUTER, routerNodes);
}
return Response.status(Response.Status.OK).entity(entityBuilder.build()).build();
@@ -89,22 +89,22 @@ public class ClusterResource
@GET
@Produces({MediaType.APPLICATION_JSON})
- @Path("/{nodeType}")
- public Response getClusterServers(@PathParam("nodeType") NodeType nodeType, @QueryParam("full") boolean full)
+ @Path("/{nodeRole}")
+ public Response getClusterServers(@PathParam("nodeRole") NodeRole nodeRole, @QueryParam("full") boolean full)
{
- if (nodeType == null) {
+ if (nodeRole == null) {
return Response.serverError()
.status(Response.Status.BAD_REQUEST)
- .entity("Invalid nodeType of null. Valid node types are " + Arrays.toString(NodeType.values()))
+ .entity("Invalid nodeRole of null. Valid node roles are " + Arrays.toString(NodeRole.values()))
.build();
} else {
- return Response.status(Response.Status.OK).entity(getNodes(nodeType, full)).build();
+ return Response.status(Response.Status.OK).entity(getNodes(nodeRole, full)).build();
}
}
- private Collection<Object> getNodes(NodeType nodeType, boolean full)
+ private Collection<Object> getNodes(NodeRole nodeRole, boolean full)
{
- Collection<DiscoveryDruidNode> discoveryDruidNodes = druidNodeDiscoveryProvider.getForNodeType(nodeType)
+ Collection<DiscoveryDruidNode> discoveryDruidNodes = druidNodeDiscoveryProvider.getForNodeRole(nodeRole)
.getAllNodes();
if (full) {
return (Collection) discoveryDruidNodes;
diff --git a/server/src/main/java/org/apache/druid/server/http/SelfDiscoveryResource.java b/server/src/main/java/org/apache/druid/server/http/SelfDiscoveryResource.java
new file mode 100644
index 0000000..46d04a9
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/http/SelfDiscoveryResource.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.sun.jersey.spi.container.ResourceFilters;
+import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.java.util.common.lifecycle.Lifecycle;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.http.security.StateResourceFilter;
+import org.eclipse.jetty.http.HttpStatus;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.util.Collections;
+import java.util.function.BooleanSupplier;
+
+/**
+ * This class is annotated {@link Singleton} rather than {@link org.apache.druid.guice.LazySingleton} because it adds
+ * a lifecycle handler in the constructor. That should happen before the lifecycle is started, i. e. eagerly during the
+ * DI configuration phase.
+ */
+@Singleton
+public class SelfDiscoveryResource
+{
+ private BooleanSupplier selfDiscovered;
+
+ @Inject
+ public SelfDiscoveryResource(
+ @Self DruidNode thisDruidNode,
+ @Self NodeRole thisNodeRole,
+ DruidNodeDiscoveryProvider nodeDiscoveryProvider,
+ Lifecycle lifecycle
+ )
+ {
+ Lifecycle.Handler selfDiscoveryListenerRegistrator = new Lifecycle.Handler()
+ {
+ @Override
+ public void start()
+ {
+ selfDiscovered = nodeDiscoveryProvider.getForNode(thisDruidNode, thisNodeRole);
+ }
+
+ @Override
+ public void stop()
+ {
+ // do nothing
+ }
+ };
+ // Using Lifecycle.Stage.SERVER because DruidNodeDiscoveryProvider should be already started when
+ // selfDiscoveryListenerRegistrator.start() is called.
+ lifecycle.addHandler(selfDiscoveryListenerRegistrator, Lifecycle.Stage.SERVER);
+ }
+
+ /** See the description of this endpoint in api-reference.md. */
+ @GET
+ @Path("/status/selfDiscoveredStatus")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ResourceFilters(StateResourceFilter.class)
+ public Response getSelfDiscoveredStatus()
+ {
+ return Response.ok(Collections.singletonMap("selfDiscovered", selfDiscovered.getAsBoolean())).build();
+ }
+
+ /** See the description of this endpoint in api-reference.md. */
+ @GET
+ @Path("/status/selfDiscovered")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ResourceFilters(StateResourceFilter.class)
+ public Response getSelfDiscovered()
+ {
+ if (selfDiscovered.getAsBoolean()) {
+ return Response.ok().build();
+ } else {
+ return Response.status(HttpStatus.SERVICE_UNAVAILABLE_503).build();
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/server/http/security/StateResourceFilter.java b/server/src/main/java/org/apache/druid/server/http/security/StateResourceFilter.java
index 275ea35..3a2d0e3 100644
--- a/server/src/main/java/org/apache/druid/server/http/security/StateResourceFilter.java
+++ b/server/src/main/java/org/apache/druid/server/http/security/StateResourceFilter.java
@@ -46,9 +46,7 @@ import org.apache.druid.server.security.ResourceAction;
public class StateResourceFilter extends AbstractResourceFilter
{
@Inject
- public StateResourceFilter(
- AuthorizerMapper authorizerMapper
- )
+ public StateResourceFilter(AuthorizerMapper authorizerMapper)
{
super(authorizerMapper);
}
diff --git a/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java b/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java
index 9263b35..96473e1 100644
--- a/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java
+++ b/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java
@@ -29,7 +29,7 @@ import org.apache.druid.client.selector.Server;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
-import org.apache.druid.discovery.NodeType;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
@@ -127,7 +127,7 @@ public class TieredBrokerHostSelector<T>
servers.put(entry.getValue(), new NodesHolder());
}
- DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForNodeType(NodeType.BROKER);
+ DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForNodeRole(NodeRole.BROKER);
druidNodeDiscovery.registerListener(
new DruidNodeDiscovery.Listener()
{
diff --git a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
index 5e07b7f..f0fd54a 100644
--- a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
+++ b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
@@ -29,7 +29,7 @@ import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
-import org.apache.druid.discovery.NodeType;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.concurrent.Execs;
@@ -171,7 +171,7 @@ public class HttpServerInventoryViewTest
DiscoveryDruidNode druidNode = new DiscoveryDruidNode(
new DruidNode("service", "host", false, 8080, null, true, false),
- NodeType.HISTORICAL,
+ NodeRole.HISTORICAL,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0)
)
diff --git a/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java b/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java
index c2121a7..1851cc7 100644
--- a/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java
+++ b/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java
@@ -27,7 +27,7 @@ import org.apache.druid.curator.CuratorTestBase;
import org.apache.druid.curator.announcement.Announcer;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
-import org.apache.druid.discovery.NodeType;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.server.DruidNode;
@@ -40,6 +40,7 @@ import org.junit.Test;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
+import java.util.function.BooleanSupplier;
/**
*
@@ -58,10 +59,11 @@ public class CuratorDruidNodeAnnouncerAndDiscoveryTest extends CuratorTestBase
ObjectMapper objectMapper = new DefaultObjectMapper();
//additional setup to serde DruidNode
- objectMapper.setInjectableValues(new InjectableValues.Std()
- .addValue(ServerConfig.class, new ServerConfig())
- .addValue("java.lang.String", "dummy")
- .addValue("java.lang.Integer", 1234)
+ objectMapper.setInjectableValues(
+ new InjectableValues.Std()
+ .addValue(ServerConfig.class, new ServerConfig())
+ .addValue("java.lang.String", "dummy")
+ .addValue("java.lang.Integer", 1234)
);
curator.start();
@@ -79,32 +81,32 @@ public class CuratorDruidNodeAnnouncerAndDiscoveryTest extends CuratorTestBase
objectMapper
);
- DiscoveryDruidNode node1 = new DiscoveryDruidNode(
+ DiscoveryDruidNode coordinatorNode1 = new DiscoveryDruidNode(
new DruidNode("s1", "h1", false, 8080, null, true, false),
- NodeType.COORDINATOR,
+ NodeRole.COORDINATOR,
ImmutableMap.of()
);
- DiscoveryDruidNode node2 = new DiscoveryDruidNode(
+ DiscoveryDruidNode coordinatorNode2 = new DiscoveryDruidNode(
new DruidNode("s2", "h2", false, 8080, null, true, false),
- NodeType.COORDINATOR,
+ NodeRole.COORDINATOR,
ImmutableMap.of()
);
- DiscoveryDruidNode node3 = new DiscoveryDruidNode(
+ DiscoveryDruidNode overlordNode1 = new DiscoveryDruidNode(
new DruidNode("s3", "h3", false, 8080, null, true, false),
- NodeType.OVERLORD,
+ NodeRole.OVERLORD,
ImmutableMap.of()
);
- DiscoveryDruidNode node4 = new DiscoveryDruidNode(
+ DiscoveryDruidNode overlordNode2 = new DiscoveryDruidNode(
new DruidNode("s4", "h4", false, 8080, null, true, false),
- NodeType.OVERLORD,
+ NodeRole.OVERLORD,
ImmutableMap.of()
);
- druidNodeAnnouncer.announce(node1);
- druidNodeAnnouncer.announce(node3);
+ druidNodeAnnouncer.announce(coordinatorNode1);
+ druidNodeAnnouncer.announce(overlordNode1);
CuratorDruidNodeDiscoveryProvider druidNodeDiscoveryProvider = new CuratorDruidNodeDiscoveryProvider(
curator,
@@ -113,84 +115,61 @@ public class CuratorDruidNodeAnnouncerAndDiscoveryTest extends CuratorTestBase
);
druidNodeDiscoveryProvider.start();
- DruidNodeDiscovery coordDiscovery = druidNodeDiscoveryProvider.getForNodeType(NodeType.COORDINATOR);
- DruidNodeDiscovery overlordDiscovery = druidNodeDiscoveryProvider.getForNodeType(NodeType.OVERLORD);
+ DruidNodeDiscovery coordDiscovery = druidNodeDiscoveryProvider.getForNodeRole(NodeRole.COORDINATOR);
+ BooleanSupplier coord1NodeDiscovery =
+ druidNodeDiscoveryProvider.getForNode(coordinatorNode1.getDruidNode(), NodeRole.COORDINATOR);
- while (!checkNodes(ImmutableSet.of(node1), coordDiscovery.getAllNodes())) {
+ DruidNodeDiscovery overlordDiscovery = druidNodeDiscoveryProvider.getForNodeRole(NodeRole.OVERLORD);
+ BooleanSupplier overlord1NodeDiscovery =
+ druidNodeDiscoveryProvider.getForNode(overlordNode1.getDruidNode(), NodeRole.OVERLORD);
+
+ while (!checkNodes(ImmutableSet.of(coordinatorNode1), coordDiscovery.getAllNodes()) &&
+ !coord1NodeDiscovery.getAsBoolean()) {
Thread.sleep(100);
}
- while (!checkNodes(ImmutableSet.of(node3), overlordDiscovery.getAllNodes())) {
+ while (!checkNodes(ImmutableSet.of(overlordNode1), overlordDiscovery.getAllNodes()) &&
+ !overlord1NodeDiscovery.getAsBoolean()) {
Thread.sleep(100);
}
HashSet<DiscoveryDruidNode> coordNodes = new HashSet<>();
- coordDiscovery.registerListener(
- new DruidNodeDiscovery.Listener()
- {
- @Override
- public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
- {
- coordNodes.addAll(nodes);
- }
-
- @Override
- public void nodesRemoved(Collection<DiscoveryDruidNode> nodes)
- {
- coordNodes.removeAll(nodes);
- }
- }
- );
+ coordDiscovery.registerListener(createSetAggregatingListener(coordNodes));
HashSet<DiscoveryDruidNode> overlordNodes = new HashSet<>();
- overlordDiscovery.registerListener(
- new DruidNodeDiscovery.Listener()
- {
- @Override
- public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
- {
- overlordNodes.addAll(nodes);
- }
-
- @Override
- public void nodesRemoved(Collection<DiscoveryDruidNode> nodes)
- {
- overlordNodes.removeAll(nodes);
- }
- }
- );
+ overlordDiscovery.registerListener(createSetAggregatingListener(overlordNodes));
- while (!checkNodes(ImmutableSet.of(node1), coordNodes)) {
+ while (!checkNodes(ImmutableSet.of(coordinatorNode1), coordNodes)) {
Thread.sleep(100);
}
- while (!checkNodes(ImmutableSet.of(node3), overlordNodes)) {
+ while (!checkNodes(ImmutableSet.of(overlordNode1), overlordNodes)) {
Thread.sleep(100);
}
- druidNodeAnnouncer.announce(node2);
- druidNodeAnnouncer.announce(node4);
+ druidNodeAnnouncer.announce(coordinatorNode2);
+ druidNodeAnnouncer.announce(overlordNode2);
- while (!checkNodes(ImmutableSet.of(node1, node2), coordDiscovery.getAllNodes())) {
+ while (!checkNodes(ImmutableSet.of(coordinatorNode1, coordinatorNode2), coordDiscovery.getAllNodes())) {
Thread.sleep(100);
}
- while (!checkNodes(ImmutableSet.of(node3, node4), overlordDiscovery.getAllNodes())) {
+ while (!checkNodes(ImmutableSet.of(overlordNode1, overlordNode2), overlordDiscovery.getAllNodes())) {
Thread.sleep(100);
}
- while (!checkNodes(ImmutableSet.of(node1, node2), coordNodes)) {
+ while (!checkNodes(ImmutableSet.of(coordinatorNode1, coordinatorNode2), coordNodes)) {
Thread.sleep(100);
}
- while (!checkNodes(ImmutableSet.of(node3, node4), overlordNodes)) {
+ while (!checkNodes(ImmutableSet.of(overlordNode1, overlordNode2), overlordNodes)) {
Thread.sleep(100);
}
- druidNodeAnnouncer.unannounce(node1);
- druidNodeAnnouncer.unannounce(node2);
- druidNodeAnnouncer.unannounce(node3);
- druidNodeAnnouncer.unannounce(node4);
+ druidNodeAnnouncer.unannounce(coordinatorNode1);
+ druidNodeAnnouncer.unannounce(coordinatorNode2);
+ druidNodeAnnouncer.unannounce(overlordNode1);
+ druidNodeAnnouncer.unannounce(overlordNode2);
while (!checkNodes(ImmutableSet.of(), coordDiscovery.getAllNodes())) {
Thread.sleep(100);
@@ -212,6 +191,24 @@ public class CuratorDruidNodeAnnouncerAndDiscoveryTest extends CuratorTestBase
announcer.stop();
}
+ private static DruidNodeDiscovery.Listener createSetAggregatingListener(Set<DiscoveryDruidNode> set)
+ {
+ return new DruidNodeDiscovery.Listener()
+ {
+ @Override
+ public void nodesAdded(Collection<DiscoveryDruidNode> nodes)
+ {
+ set.addAll(nodes);
+ }
+
+ @Override
+ public void nodesRemoved(Collection<DiscoveryDruidNode> nodes)
+ {
+ set.removeAll(nodes);
+ }
+ };
+ }
+
private boolean checkNodes(Set<DiscoveryDruidNode> expected, Collection<DiscoveryDruidNode> actual)
{
return expected.equals(ImmutableSet.copyOf(actual));
diff --git a/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java b/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java
index 40576e2..7c376ea 100644
--- a/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java
+++ b/server/src/test/java/org/apache/druid/discovery/DruidLeaderClientTest.java
@@ -80,7 +80,7 @@ public class DruidLeaderClientTest extends BaseJettyTest
protected Injector setupInjector()
{
final DruidNode node = new DruidNode("test", "localhost", false, null, null, true, false);
- discoveryDruidNode = new DiscoveryDruidNode(node, NodeType.PEON, ImmutableMap.of());
+ discoveryDruidNode = new DiscoveryDruidNode(node, NodeRole.PEON, ImmutableMap.of());
Injector injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(), ImmutableList.<Module>of(
@@ -115,14 +115,14 @@ public class DruidLeaderClientTest extends BaseJettyTest
);
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
- EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.PEON)).andReturn(druidNodeDiscovery);
+ EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.PEON)).andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider);
DruidLeaderClient druidLeaderClient = new DruidLeaderClient(
httpClient,
druidNodeDiscoveryProvider,
- NodeType.PEON,
+ NodeRole.PEON,
"/simple/leader",
EasyMock.createNiceMock(ServerDiscoverySelector.class)
);
@@ -140,14 +140,14 @@ public class DruidLeaderClientTest extends BaseJettyTest
EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of());
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
- EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.PEON)).andReturn(druidNodeDiscovery);
+ EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.PEON)).andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider);
DruidLeaderClient druidLeaderClient = new DruidLeaderClient(
httpClient,
druidNodeDiscoveryProvider,
- NodeType.PEON,
+ NodeRole.PEON,
"/simple/leader",
EasyMock.createNiceMock(ServerDiscoverySelector.class)
);
@@ -167,14 +167,14 @@ public class DruidLeaderClientTest extends BaseJettyTest
);
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
- EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.PEON)).andReturn(druidNodeDiscovery);
+ EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.PEON)).andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider);
DruidLeaderClient druidLeaderClient = new DruidLeaderClient(
httpClient,
druidNodeDiscoveryProvider,
- NodeType.PEON,
+ NodeRole.PEON,
"/simple/leader",
EasyMock.createNiceMock(ServerDiscoverySelector.class)
);
@@ -194,21 +194,21 @@ public class DruidLeaderClientTest extends BaseJettyTest
DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class);
DiscoveryDruidNode dummyNode = new DiscoveryDruidNode(
new DruidNode("test", "dummyhost", false, 64231, null, true, false),
- NodeType.PEON,
+ NodeRole.PEON,
ImmutableMap.of()
);
EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(dummyNode));
EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(discoveryDruidNode));
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
- EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.PEON)).andReturn(druidNodeDiscovery).anyTimes();
+ EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.PEON)).andReturn(druidNodeDiscovery).anyTimes();
EasyMock.replay(serverDiscoverySelector, druidNodeDiscovery, druidNodeDiscoveryProvider);
DruidLeaderClient druidLeaderClient = new DruidLeaderClient(
httpClient,
druidNodeDiscoveryProvider,
- NodeType.PEON,
+ NodeRole.PEON,
"/simple/leader",
serverDiscoverySelector
);
@@ -228,14 +228,14 @@ public class DruidLeaderClientTest extends BaseJettyTest
);
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
- EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.PEON)).andReturn(druidNodeDiscovery);
+ EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.PEON)).andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider);
DruidLeaderClient druidLeaderClient = new DruidLeaderClient(
httpClient,
druidNodeDiscoveryProvider,
- NodeType.PEON,
+ NodeRole.PEON,
"/simple/leader",
EasyMock.createNiceMock(ServerDiscoverySelector.class)
);
diff --git a/server/src/test/java/org/apache/druid/discovery/DruidNodeDiscoveryProviderTest.java b/server/src/test/java/org/apache/druid/discovery/DruidNodeDiscoveryProviderTest.java
index 2b5722d..64cc54f 100644
--- a/server/src/test/java/org/apache/druid/discovery/DruidNodeDiscoveryProviderTest.java
+++ b/server/src/test/java/org/apache/druid/discovery/DruidNodeDiscoveryProviderTest.java
@@ -32,6 +32,7 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.function.BooleanSupplier;
/**
*/
@@ -88,7 +89,7 @@ public class DruidNodeDiscoveryProviderTest
DiscoveryDruidNode node1 = new DiscoveryDruidNode(
new DruidNode("s1", "h1", false, 8080, null, true, false),
- NodeType.HISTORICAL,
+ NodeRole.HISTORICAL,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0),
LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier"))
@@ -96,21 +97,21 @@ public class DruidNodeDiscoveryProviderTest
DiscoveryDruidNode node2 = new DiscoveryDruidNode(
new DruidNode("s2", "h2", false, 8080, null, true, false),
- NodeType.HISTORICAL,
+ NodeRole.HISTORICAL,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0))
);
DiscoveryDruidNode node3 = new DiscoveryDruidNode(
new DruidNode("s3", "h3", false, 8080, null, true, false),
- NodeType.HISTORICAL,
+ NodeRole.HISTORICAL,
ImmutableMap.of(
LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier"))
);
DiscoveryDruidNode node4 = new DiscoveryDruidNode(
new DruidNode("s4", "h4", false, 8080, null, true, false),
- NodeType.PEON,
+ NodeRole.PEON,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0),
LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier"))
@@ -118,35 +119,35 @@ public class DruidNodeDiscoveryProviderTest
DiscoveryDruidNode node5 = new DiscoveryDruidNode(
new DruidNode("s5", "h5", false, 8080, null, true, false),
- NodeType.PEON,
+ NodeRole.PEON,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0))
);
DiscoveryDruidNode node6 = new DiscoveryDruidNode(
new DruidNode("s6", "h6", false, 8080, null, true, false),
- NodeType.PEON,
+ NodeRole.PEON,
ImmutableMap.of(
LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier"))
);
DiscoveryDruidNode node7 = new DiscoveryDruidNode(
new DruidNode("s7", "h7", false, 8080, null, true, false),
- NodeType.BROKER,
+ NodeRole.BROKER,
ImmutableMap.of(
LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier"))
);
DiscoveryDruidNode node7Clone = new DiscoveryDruidNode(
new DruidNode("s7", "h7", false, 8080, null, true, false),
- NodeType.BROKER,
+ NodeRole.BROKER,
ImmutableMap.of(
LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier"))
);
DiscoveryDruidNode node8 = new DiscoveryDruidNode(
new DruidNode("s8", "h8", false, 8080, null, true, false),
- NodeType.COORDINATOR,
+ NodeRole.COORDINATOR,
ImmutableMap.of()
);
@@ -184,7 +185,13 @@ public class DruidNodeDiscoveryProviderTest
private List<DruidNodeDiscovery.Listener> listeners = new ArrayList<>();
@Override
- public DruidNodeDiscovery getForNodeType(NodeType nodeType)
+ public BooleanSupplier getForNode(DruidNode node, NodeRole nodeRole)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public DruidNodeDiscovery getForNodeRole(NodeRole nodeRole)
{
return new DruidNodeDiscovery()
{
diff --git a/server/src/test/java/org/apache/druid/server/http/security/ResourceFilterTestHelper.java b/server/src/test/java/org/apache/druid/server/http/security/ResourceFilterTestHelper.java
index 84810cf..95a0cc2 100644
--- a/server/src/test/java/org/apache/druid/server/http/security/ResourceFilterTestHelper.java
+++ b/server/src/test/java/org/apache/druid/server/http/security/ResourceFilterTestHelper.java
@@ -49,6 +49,7 @@ import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.PathSegment;
+import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collection;
@@ -133,14 +134,9 @@ public class ResourceFilterTestHelper
).atLeastOnce();
}
- public static Collection<Object[]> getRequestPaths(final Class clazz)
+ public static Collection<Object[]> getRequestPathsWithAuthorizer(final AnnotatedElement classOrMethod)
{
- return getRequestPaths(clazz, ImmutableList.of(), ImmutableList.of());
- }
-
- public static Collection<Object[]> getRequestPathsWithAuthorizer(final Class clazz)
- {
- return getRequestPaths(clazz, ImmutableList.of(AuthorizerMapper.class), ImmutableList.of());
+ return getRequestPaths(classOrMethod, ImmutableList.of(AuthorizerMapper.class), ImmutableList.of());
}
public static Collection<Object[]> getRequestPaths(
@@ -152,17 +148,17 @@ public class ResourceFilterTestHelper
}
public static Collection<Object[]> getRequestPaths(
- final Class clazz,
+ final AnnotatedElement classOrMethod,
final Iterable<Class<?>> mockableInjections,
final Iterable<Key<?>> mockableKeys
)
{
- return getRequestPaths(clazz, mockableInjections, mockableKeys, ImmutableList.of());
+ return getRequestPaths(classOrMethod, mockableInjections, mockableKeys, ImmutableList.of());
}
// Feeds in an array of [ PathName, MethodName, ResourceFilter , Injector]
public static Collection<Object[]> getRequestPaths(
- final Class clazz,
+ final AnnotatedElement classOrMethod,
final Iterable<Class<?>> mockableInjections,
final Iterable<Key<?>> mockableKeys,
final Iterable<?> injectedObjs
@@ -187,11 +183,17 @@ public class ResourceFilterTestHelper
}
}
);
- final String basepath = ((Path) clazz.getAnnotation(Path.class)).value().substring(1); //Ignore the first "/"
+ final String basepath = classOrMethod.getAnnotation(Path.class).value().substring(1); //Ignore the first "/"
final List<Class<? extends ResourceFilter>> baseResourceFilters =
- clazz.getAnnotation(ResourceFilters.class) == null ? Collections.emptyList() :
- ImmutableList.copyOf(((ResourceFilters) clazz.getAnnotation(ResourceFilters.class)).value());
+ classOrMethod.getAnnotation(ResourceFilters.class) == null ? Collections.emptyList() :
+ ImmutableList.copyOf(classOrMethod.getAnnotation(ResourceFilters.class).value());
+ List<Method> methods;
+ if (classOrMethod instanceof Class<?>) {
+ methods = ImmutableList.copyOf(((Class<?>) classOrMethod).getDeclaredMethods());
+ } else {
+ methods = Collections.singletonList((Method) classOrMethod);
+ }
return ImmutableList.copyOf(
Iterables.concat(
// Step 3 - Merge all the Objects arrays for each endpoints
@@ -206,7 +208,7 @@ public class ResourceFilterTestHelper
// Filter out non resource endpoint methods
// and also the endpoints that does not have any
// ResourceFilters applied to them
- ImmutableList.copyOf(clazz.getDeclaredMethods()),
+ methods,
new Predicate<Method>()
{
@Override
@@ -239,18 +241,14 @@ public class ResourceFilterTestHelper
if (method.getAnnotation(Path.class) != null) {
return new Object[]{
StringUtils.format("%s%s", basepath, method.getAnnotation(Path.class).value()),
- input.getAnnotation(GET.class) == null ? (method.getAnnotation(DELETE.class) == null
- ? "POST"
- : "DELETE") : "GET",
+ httpMethodFromAnnotation(input, method),
injector.getInstance(input),
injector
};
} else {
return new Object[]{
basepath,
- input.getAnnotation(GET.class) == null ? (method.getAnnotation(DELETE.class) == null
- ? "POST"
- : "DELETE") : "GET",
+ httpMethodFromAnnotation(input, method),
injector.getInstance(input),
injector
};
@@ -264,4 +262,13 @@ public class ResourceFilterTestHelper
)
);
}
+
+ private static String httpMethodFromAnnotation(Class<? extends ResourceFilter> input, Method method)
+ {
+ if (input.getAnnotation(GET.class) != null) {
+ return "GET";
+ } else {
+ return method.getAnnotation(DELETE.class) != null ? "DELETE" : "POST";
+ }
+ }
}
diff --git a/server/src/test/java/org/apache/druid/server/http/security/SecurityResourceFilterTest.java b/server/src/test/java/org/apache/druid/server/http/security/SecurityResourceFilterTest.java
index d42dfa5..344396c 100644
--- a/server/src/test/java/org/apache/druid/server/http/security/SecurityResourceFilterTest.java
+++ b/server/src/test/java/org/apache/druid/server/http/security/SecurityResourceFilterTest.java
@@ -36,6 +36,7 @@ import org.apache.druid.server.http.IntervalsResource;
import org.apache.druid.server.http.MetadataResource;
import org.apache.druid.server.http.RouterResource;
import org.apache.druid.server.http.RulesResource;
+import org.apache.druid.server.http.SelfDiscoveryResource;
import org.apache.druid.server.http.ServersResource;
import org.apache.druid.server.http.TiersResource;
import org.apache.druid.server.security.ForbiddenException;
@@ -51,9 +52,8 @@ import java.util.Collection;
@RunWith(Parameterized.class)
public class SecurityResourceFilterTest extends ResourceFilterTestHelper
{
-
@Parameterized.Parameters(name = "{index}: requestPath={0}, requestMethod={1}, resourceFilter={2}")
- public static Collection<Object[]> data()
+ public static Collection<Object[]> data() throws NoSuchMethodException
{
return ImmutableList.copyOf(
Iterables.concat(
@@ -70,6 +70,8 @@ public class SecurityResourceFilterTest extends ResourceFilterTestHelper
getRequestPathsWithAuthorizer(CoordinatorDynamicConfigsResource.class),
getRequestPathsWithAuthorizer(QueryResource.class),
getRequestPathsWithAuthorizer(StatusResource.class),
+ getRequestPathsWithAuthorizer(SelfDiscoveryResource.class.getDeclaredMethod("getSelfDiscoveredStatus")),
+ getRequestPathsWithAuthorizer(SelfDiscoveryResource.class.getDeclaredMethod("getSelfDiscovered")),
getRequestPathsWithAuthorizer(BrokerQueryResource.class),
getRequestPathsWithAuthorizer(RouterResource.class)
)
diff --git a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupNodeDiscoveryTest.java b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupNodeDiscoveryTest.java
index be6d607..66c2493 100644
--- a/server/src/test/java/org/apache/druid/server/lookup/cache/LookupNodeDiscoveryTest.java
+++ b/server/src/test/java/org/apache/druid/server/lookup/cache/LookupNodeDiscoveryTest.java
@@ -26,7 +26,7 @@ import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.LookupNodeService;
-import org.apache.druid.discovery.NodeType;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.http.HostAndPortWithScheme;
import org.easymock.EasyMock;
@@ -54,21 +54,21 @@ public class LookupNodeDiscoveryTest
DiscoveryDruidNode node1 = new DiscoveryDruidNode(
new DruidNode("s1", "h1", false, 8080, null, true, false),
- NodeType.HISTORICAL,
+ NodeRole.HISTORICAL,
ImmutableMap.of(
LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier1"))
);
DiscoveryDruidNode node2 = new DiscoveryDruidNode(
new DruidNode("s2", "h2", false, 8080, null, true, false),
- NodeType.PEON,
+ NodeRole.PEON,
ImmutableMap.of(
LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier1"))
);
DiscoveryDruidNode node3 = new DiscoveryDruidNode(
new DruidNode("s3", "h3", false, 8080, null, true, false),
- NodeType.PEON,
+ NodeRole.PEON,
ImmutableMap.of(
LookupNodeService.DISCOVERY_SERVICE_KEY, new LookupNodeService("tier2"))
);
diff --git a/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java b/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java
index 81f2c10..1ea675f 100644
--- a/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java
+++ b/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java
@@ -32,7 +32,7 @@ import org.apache.druid.client.selector.Server;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
-import org.apache.druid.discovery.NodeType;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
@@ -75,19 +75,19 @@ public class TieredBrokerHostSelectorTest
node1 = new DiscoveryDruidNode(
new DruidNode("hotBroker", "hotHost", false, 8080, null, true, false),
- NodeType.BROKER,
+ NodeRole.BROKER,
ImmutableMap.of()
);
node2 = new DiscoveryDruidNode(
new DruidNode("coldBroker", "coldHost1", false, 8080, null, true, false),
- NodeType.BROKER,
+ NodeRole.BROKER,
ImmutableMap.of()
);
node3 = new DiscoveryDruidNode(
new DruidNode("coldBroker", "coldHost2", false, 8080, null, true, false),
- NodeType.BROKER,
+ NodeRole.BROKER,
ImmutableMap.of()
);
@@ -107,7 +107,7 @@ public class TieredBrokerHostSelectorTest
}
};
- EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.BROKER))
+ EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.BROKER))
.andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscoveryProvider);
diff --git a/services/src/main/java/org/apache/druid/cli/CliBroker.java b/services/src/main/java/org/apache/druid/cli/CliBroker.java
index b585c71..45864f4 100644
--- a/services/src/main/java/org/apache/druid/cli/CliBroker.java
+++ b/services/src/main/java/org/apache/druid/cli/CliBroker.java
@@ -20,6 +20,7 @@
package org.apache.druid.cli;
import com.google.common.collect.ImmutableList;
+import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.name.Names;
import io.airlift.airline.Command;
@@ -33,7 +34,7 @@ import org.apache.druid.client.selector.CustomTierSelectorStrategyConfig;
import org.apache.druid.client.selector.ServerSelectorStrategy;
import org.apache.druid.client.selector.TierSelectorStrategy;
import org.apache.druid.discovery.LookupNodeService;
-import org.apache.druid.discovery.NodeType;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.CacheModule;
import org.apache.druid.guice.DruidProcessingModule;
import org.apache.druid.guice.Jerseys;
@@ -42,6 +43,7 @@ import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
+import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.RetryQueryRunnerConfig;
@@ -50,6 +52,7 @@ import org.apache.druid.server.BrokerQueryResource;
import org.apache.druid.server.ClientInfoResource;
import org.apache.druid.server.ClientQuerySegmentWalker;
import org.apache.druid.server.http.BrokerResource;
+import org.apache.druid.server.http.SelfDiscoveryResource;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.QueryCountStatsProvider;
import org.apache.druid.server.router.TieredBrokerConfig;
@@ -120,14 +123,19 @@ public class CliBroker extends ServerRunnable
LifecycleModule.register(binder, Server.class);
+ binder.bind(NodeRole.class).annotatedWith(Self.class).toInstance(NodeRole.BROKER);
bindAnnouncer(
binder,
- DiscoverySideEffectsProvider.builder(NodeType.BROKER)
- .serviceClasses(ImmutableList.of(LookupNodeService.class))
- .useLegacyAnnouncer(true)
- .build()
+ DiscoverySideEffectsProvider
+ .builder(NodeRole.BROKER)
+ .serviceClasses(ImmutableList.of(LookupNodeService.class))
+ .useLegacyAnnouncer(true)
+ .build()
);
+
+ Jerseys.addResource(binder, SelfDiscoveryResource.class);
+ LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class));
},
new LookupModule(),
new SqlModule()
diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
index 4b3f035..bfe5ef7 100644
--- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates;
import com.google.inject.Binder;
import com.google.inject.Inject;
+import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.name.Names;
@@ -35,7 +36,7 @@ import org.apache.druid.client.HttpServerInventoryViewResource;
import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.client.indexing.HttpIndexingServiceClient;
import org.apache.druid.client.indexing.IndexingServiceClient;
-import org.apache.druid.discovery.NodeType;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.ConditionalMultibind;
import org.apache.druid.guice.ConfigProvider;
import org.apache.druid.guice.Jerseys;
@@ -45,6 +46,7 @@ import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.CoordinatorIndexingServiceHelper;
import org.apache.druid.guice.annotations.EscalatedGlobal;
+import org.apache.druid.guice.annotations.Self;
import org.apache.druid.guice.http.JettyHttpClientModule;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ExecutorServices;
@@ -82,6 +84,7 @@ import org.apache.druid.server.http.MetadataResource;
import org.apache.druid.server.http.RedirectFilter;
import org.apache.druid.server.http.RedirectInfo;
import org.apache.druid.server.http.RulesResource;
+import org.apache.druid.server.http.SelfDiscoveryResource;
import org.apache.druid.server.http.ServersResource;
import org.apache.druid.server.http.TiersResource;
import org.apache.druid.server.initialization.ZkPathsConfig;
@@ -221,8 +224,8 @@ public class CliCoordinator extends ServerRunnable
"'druid.coordinator.merge.on' is not supported anymore. "
+ "Please consider using Coordinator's automatic compaction instead. "
+ "See https://druid.apache.org/docs/latest/operations/segment-optimization.html and "
- + "https://druid.apache.org/docs/latest/operations/api-reference.html#compaction-configuration for more "
- + "details about compaction."
+ + "https://druid.apache.org/docs/latest/operations/api-reference.html#compaction-configuration "
+ + "for more details about compaction."
);
}
@@ -236,11 +239,16 @@ public class CliCoordinator extends ServerRunnable
DruidCoordinatorCleanupPendingSegments.class
);
+ binder.bind(NodeRole.class).annotatedWith(Self.class).toInstance(NodeRole.COORDINATOR);
+
bindAnnouncer(
binder,
Coordinator.class,
- DiscoverySideEffectsProvider.builder(NodeType.COORDINATOR).build()
+ DiscoverySideEffectsProvider.builder(NodeRole.COORDINATOR).build()
);
+
+ Jerseys.addResource(binder, SelfDiscoveryResource.class);
+ LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class));
}
@Provides
diff --git a/services/src/main/java/org/apache/druid/cli/CliHistorical.java b/services/src/main/java/org/apache/druid/cli/CliHistorical.java
index ba55ab7..c1bc8ba 100644
--- a/services/src/main/java/org/apache/druid/cli/CliHistorical.java
+++ b/services/src/main/java/org/apache/druid/cli/CliHistorical.java
@@ -20,13 +20,14 @@
package org.apache.druid.cli;
import com.google.common.collect.ImmutableList;
+import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.name.Names;
import io.airlift.airline.Command;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.LookupNodeService;
-import org.apache.druid.discovery.NodeType;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.CacheModule;
import org.apache.druid.guice.DruidProcessingModule;
import org.apache.druid.guice.Jerseys;
@@ -34,9 +35,10 @@ import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
-import org.apache.druid.guice.NodeTypeConfig;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
+import org.apache.druid.guice.ServerTypeConfig;
+import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.lookup.LookupModule;
@@ -47,6 +49,7 @@ import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordination.ZkCoordinator;
import org.apache.druid.server.http.HistoricalResource;
import org.apache.druid.server.http.SegmentListerResource;
+import org.apache.druid.server.http.SelfDiscoveryResource;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.QueryCountStatsProvider;
import org.apache.druid.timeline.PruneLastCompactionState;
@@ -89,7 +92,7 @@ public class CliHistorical extends ServerRunnable
binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
binder.bind(QuerySegmentWalker.class).to(ServerManager.class).in(LazySingleton.class);
- binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig(ServerType.HISTORICAL));
+ binder.bind(ServerTypeConfig.class).toInstance(new ServerTypeConfig(ServerType.HISTORICAL));
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
binder.bind(QueryCountStatsProvider.class).to(QueryResource.class);
Jerseys.addResource(binder, QueryResource.class);
@@ -101,12 +104,18 @@ public class CliHistorical extends ServerRunnable
JsonConfigProvider.bind(binder, "druid.historical.cache", CacheConfig.class);
binder.install(new CacheModule());
+ binder.bind(NodeRole.class).annotatedWith(Self.class).toInstance(NodeRole.HISTORICAL);
+
bindAnnouncer(
binder,
- DiscoverySideEffectsProvider.builder(NodeType.HISTORICAL)
- .serviceClasses(ImmutableList.of(DataNodeService.class, LookupNodeService.class))
- .build()
+ DiscoverySideEffectsProvider
+ .builder(NodeRole.HISTORICAL)
+ .serviceClasses(ImmutableList.of(DataNodeService.class, LookupNodeService.class))
+ .build()
);
+
+ Jerseys.addResource(binder, SelfDiscoveryResource.class);
+ LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class));
},
new LookupModule()
);
diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java
index 01d53f1..6e51c45 100644
--- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java
+++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java
@@ -30,7 +30,7 @@ import io.airlift.airline.Command;
import org.apache.druid.client.DruidServer;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.LookupNodeService;
-import org.apache.druid.discovery.NodeType;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.discovery.WorkerNodeService;
import org.apache.druid.guice.DruidProcessingModule;
import org.apache.druid.guice.IndexingServiceFirehoseModule;
@@ -41,10 +41,10 @@ import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
-import org.apache.druid.guice.NodeTypeConfig;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.guice.QueryablePeonModule;
+import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.guice.annotations.Parent;
import org.apache.druid.guice.annotations.RemoteChatHandler;
import org.apache.druid.guice.annotations.Self;
@@ -136,7 +136,7 @@ public class CliIndexer extends ServerRunnable
.to(UnifiedIndexerAppenderatorsManager.class)
.in(LazySingleton.class);
- binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig(ServerType.INDEXER_EXECUTOR));
+ binder.bind(ServerTypeConfig.class).toInstance(new ServerTypeConfig(ServerType.INDEXER_EXECUTOR));
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);
Jerseys.addResource(binder, SegmentListerResource.class);
@@ -151,15 +151,12 @@ public class CliIndexer extends ServerRunnable
bindAnnouncer(
binder,
- DiscoverySideEffectsProvider.builder(NodeType.INDEXER)
- .serviceClasses(
- ImmutableList.of(
- LookupNodeService.class,
- WorkerNodeService.class,
- DataNodeService.class
- )
- )
- .build()
+ DiscoverySideEffectsProvider
+ .builder(NodeRole.INDEXER)
+ .serviceClasses(
+ ImmutableList.of(LookupNodeService.class, WorkerNodeService.class, DataNodeService.class)
+ )
+ .build()
);
}
diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
index 88646dd..b6ce582 100644
--- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
+++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java
@@ -31,7 +31,7 @@ import com.google.inject.util.Providers;
import io.airlift.airline.Command;
import org.apache.druid.client.indexing.HttpIndexingServiceClient;
import org.apache.druid.client.indexing.IndexingServiceClient;
-import org.apache.druid.discovery.NodeType;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.discovery.WorkerNodeService;
import org.apache.druid.guice.IndexingServiceFirehoseModule;
import org.apache.druid.guice.IndexingServiceInputSourceModule;
@@ -64,6 +64,7 @@ import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.DummyForInjectionAppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.http.SelfDiscoveryResource;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.timeline.PruneLastCompactionState;
import org.eclipse.jetty.server.Server;
@@ -140,12 +141,18 @@ public class CliMiddleManager extends ServerRunnable
LifecycleModule.register(binder, Server.class);
+ binder.bind(NodeRole.class).annotatedWith(Self.class).toInstance(NodeRole.MIDDLE_MANAGER);
+
bindAnnouncer(
binder,
- DiscoverySideEffectsProvider.builder(NodeType.MIDDLE_MANAGER)
- .serviceClasses(ImmutableList.of(WorkerNodeService.class))
- .build()
+ DiscoverySideEffectsProvider
+ .builder(NodeRole.MIDDLE_MANAGER)
+ .serviceClasses(ImmutableList.of(WorkerNodeService.class))
+ .build()
);
+
+ Jerseys.addResource(binder, SelfDiscoveryResource.class);
+ LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class));
}
@Provides
diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
index 23e985e..6e00ba9 100644
--- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
+++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
@@ -38,7 +38,7 @@ import org.apache.druid.client.indexing.HttpIndexingServiceClient;
import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.IndexingServiceSelectorConfig;
-import org.apache.druid.discovery.NodeType;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.IndexingServiceFirehoseModule;
import org.apache.druid.guice.IndexingServiceInputSourceModule;
import org.apache.druid.guice.IndexingServiceModuleHelper;
@@ -52,6 +52,7 @@ import org.apache.druid.guice.ListProvider;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
@@ -104,6 +105,7 @@ import org.apache.druid.server.audit.AuditManagerProvider;
import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;
import org.apache.druid.server.http.RedirectFilter;
import org.apache.druid.server.http.RedirectInfo;
+import org.apache.druid.server.http.SelfDiscoveryResource;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.initialization.jetty.JettyServerInitUtils;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
@@ -246,11 +248,16 @@ public class CliOverlord extends ServerRunnable
LifecycleModule.register(binder, Server.class);
}
+ binder.bind(NodeRole.class).annotatedWith(Self.class).toInstance(NodeRole.OVERLORD);
+
bindAnnouncer(
binder,
IndexingService.class,
- DiscoverySideEffectsProvider.builder(NodeType.OVERLORD).build()
+ DiscoverySideEffectsProvider.builder(NodeRole.OVERLORD).build()
);
+
+ Jerseys.addResource(binder, SelfDiscoveryResource.class);
+ LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class));
}
private void configureTaskStorage(Binder binder)
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index a075e31..1e82c6d 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -40,6 +40,7 @@ import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.HttpIndexingServiceClient;
import org.apache.druid.client.indexing.IndexingServiceClient;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.Binders;
import org.apache.druid.guice.CacheModule;
import org.apache.druid.guice.DruidProcessingModule;
@@ -50,13 +51,14 @@ import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
-import org.apache.druid.guice.NodeTypeConfig;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
import org.apache.druid.guice.QueryablePeonModule;
+import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Parent;
+import org.apache.druid.guice.annotations.Self;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.indexing.common.RetryPolicyConfig;
import org.apache.druid.indexing.common.RetryPolicyFactory;
@@ -123,10 +125,12 @@ import java.util.Set;
@Command(
name = "peon",
description = "Runs a Peon, this is an individual forked \"task\" used as part of the indexing service. "
- + "This should rarely, if ever, be used directly. See https://druid.apache.org/docs/latest/design/peons.html for a description"
+ + "This should rarely, if ever, be used directly. "
+ + "See https://druid.apache.org/docs/latest/design/peons.html for a description"
)
public class CliPeon extends GuiceRunnable
{
+ @SuppressWarnings("WeakerAccess")
@Arguments(description = "task.json status.json report.json", required = true)
public List<String> taskAndStatusFile;
@@ -139,8 +143,12 @@ public class CliPeon extends GuiceRunnable
// path to store the task's TaskReport objects
private String taskReportPath;
+ /**
+ * Still using --nodeType as the flag for backward compatibility, although the concept is now more precisely called
+ * "serverType".
+ */
@Option(name = "--nodeType", title = "nodeType", description = "Set the node type to expose on ZK")
- public String nodeType = "indexer-executor";
+ public String serverType = "indexer-executor";
private static final Logger log = new Logger(CliPeon.class);
@@ -210,7 +218,7 @@ public class CliPeon extends GuiceRunnable
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);
Jerseys.addResource(binder, SegmentListerResource.class);
- binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig(ServerType.fromString(nodeType)));
+ binder.bind(ServerTypeConfig.class).toInstance(new ServerTypeConfig(ServerType.fromString(serverType)));
LifecycleModule.register(binder, Server.class);
}
@@ -249,12 +257,7 @@ public class CliPeon extends GuiceRunnable
@Nullable BatchDataSegmentAnnouncer announcer
)
{
- return new SegmentListerResource(
- jsonMapper,
- smileMapper,
- announcer,
- null
- );
+ return new SegmentListerResource(jsonMapper, smileMapper, announcer, null);
}
},
new QueryablePeonModule(),
@@ -311,7 +314,7 @@ public class CliPeon extends GuiceRunnable
}
}
- public static void bindRowIngestionMeters(Binder binder)
+ static void bindRowIngestionMeters(Binder binder)
{
PolyBind.createChoice(
binder,
@@ -328,7 +331,7 @@ public class CliPeon extends GuiceRunnable
binder.bind(DropwizardRowIngestionMetersFactory.class).in(LazySingleton.class);
}
- public static void bindChatHandler(Binder binder)
+ static void bindChatHandler(Binder binder)
{
PolyBind.createChoice(
binder,
@@ -350,7 +353,7 @@ public class CliPeon extends GuiceRunnable
binder.bind(NoopChatHandlerProvider.class).in(LazySingleton.class);
}
- public static void bindPeonDataSegmentHandlers(Binder binder)
+ static void bindPeonDataSegmentHandlers(Binder binder)
{
// Build it to make it bind even if nothing binds to it.
Binders.dataSegmentKillerBinder(binder);
@@ -361,7 +364,7 @@ public class CliPeon extends GuiceRunnable
binder.bind(DataSegmentArchiver.class).to(OmniDataSegmentArchiver.class).in(LazySingleton.class);
}
- public static void configureTaskActionClient(Binder binder)
+ private static void configureTaskActionClient(Binder binder)
{
PolyBind.createChoice(
binder,
@@ -386,9 +389,11 @@ public class CliPeon extends GuiceRunnable
.addBinding("remote")
.to(RemoteTaskActionClientFactory.class)
.in(LazySingleton.class);
+
+ binder.bind(NodeRole.class).annotatedWith(Self.class).toInstance(NodeRole.PEON);
}
- public static void bindTaskConfigAndClients(Binder binder)
+ static void bindTaskConfigAndClients(Binder binder)
{
binder.bind(TaskToolboxFactory.class).in(LazySingleton.class);
@@ -406,13 +411,13 @@ public class CliPeon extends GuiceRunnable
binder.bind(RetryPolicyFactory.class).in(LazySingleton.class);
}
- public static void bindRealtimeCache(Binder binder)
+ static void bindRealtimeCache(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.realtime.cache", CacheConfig.class);
binder.install(new CacheModule());
}
- public static void bindCoordinatorHandoffNotiferAndClient(Binder binder)
+ static void bindCoordinatorHandoffNotiferAndClient(Binder binder)
{
JsonConfigProvider.bind(
binder,
diff --git a/services/src/main/java/org/apache/druid/cli/CliRouter.java b/services/src/main/java/org/apache/druid/cli/CliRouter.java
index c6f5a9b..d945222 100644
--- a/services/src/main/java/org/apache/druid/cli/CliRouter.java
+++ b/services/src/main/java/org/apache/druid/cli/CliRouter.java
@@ -20,12 +20,13 @@
package org.apache.druid.cli;
import com.google.common.collect.ImmutableList;
+import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import com.google.inject.name.Names;
import io.airlift.airline.Command;
import org.apache.druid.curator.discovery.DiscoveryModule;
-import org.apache.druid.discovery.NodeType;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
@@ -40,6 +41,7 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.lookup.LookupSerdeModule;
import org.apache.druid.server.AsyncQueryForwardingServlet;
import org.apache.druid.server.http.RouterResource;
+import org.apache.druid.server.http.SelfDiscoveryResource;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.metrics.QueryCountStatsProvider;
import org.apache.druid.server.router.AvaticaConnectionBalancer;
@@ -59,7 +61,8 @@ import java.util.List;
*/
@Command(
name = "router",
- description = "Experimental! Understands tiers and routes things to different brokers, see https://druid.apache.org/docs/latest/development/router.html for a description"
+ description = "Experimental! Understands tiers and routes things to different brokers, "
+ + "see https://druid.apache.org/docs/latest/development/router.html for a description"
)
public class CliRouter extends ServerRunnable
{
@@ -106,10 +109,15 @@ public class CliRouter extends ServerRunnable
LifecycleModule.register(binder, Server.class);
DiscoveryModule.register(binder, Self.class);
+ binder.bind(NodeRole.class).annotatedWith(Self.class).toInstance(NodeRole.ROUTER);
+
bindAnnouncer(
binder,
- DiscoverySideEffectsProvider.builder(NodeType.ROUTER).build()
+ DiscoverySideEffectsProvider.builder(NodeRole.ROUTER).build()
);
+
+ Jerseys.addResource(binder, SelfDiscoveryResource.class);
+ LifecycleModule.registerKey(binder, Key.get(SelfDiscoveryResource.class));
},
new LookupSerdeModule()
);
diff --git a/services/src/main/java/org/apache/druid/cli/ServerRunnable.java b/services/src/main/java/org/apache/druid/cli/ServerRunnable.java
index 4fa878b..8ac9244 100644
--- a/services/src/main/java/org/apache/druid/cli/ServerRunnable.java
+++ b/services/src/main/java/org/apache/druid/cli/ServerRunnable.java
@@ -30,7 +30,7 @@ import org.apache.druid.curator.discovery.ServiceAnnouncer;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.DruidService;
-import org.apache.druid.discovery.NodeType;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.annotations.Self;
@@ -102,13 +102,13 @@ public abstract class ServerRunnable extends GuiceRunnable
public static class Builder
{
- private NodeType nodeType;
+ private NodeRole nodeRole;
private List<Class<? extends DruidService>> serviceClasses = ImmutableList.of();
private boolean useLegacyAnnouncer;
- public Builder(final NodeType nodeType)
+ public Builder(final NodeRole nodeRole)
{
- this.nodeType = nodeType;
+ this.nodeRole = nodeRole;
}
public Builder serviceClasses(final List<Class<? extends DruidService>> serviceClasses)
@@ -125,13 +125,13 @@ public abstract class ServerRunnable extends GuiceRunnable
public DiscoverySideEffectsProvider build()
{
- return new DiscoverySideEffectsProvider(nodeType, serviceClasses, useLegacyAnnouncer);
+ return new DiscoverySideEffectsProvider(nodeRole, serviceClasses, useLegacyAnnouncer);
}
}
- public static Builder builder(final NodeType nodeType)
+ public static Builder builder(final NodeRole nodeRole)
{
- return new Builder(nodeType);
+ return new Builder(nodeRole);
}
@Inject
@@ -150,17 +150,17 @@ public abstract class ServerRunnable extends GuiceRunnable
@Inject
private Injector injector;
- private final NodeType nodeType;
+ private final NodeRole nodeRole;
private final List<Class<? extends DruidService>> serviceClasses;
private final boolean useLegacyAnnouncer;
private DiscoverySideEffectsProvider(
- final NodeType nodeType,
+ final NodeRole nodeRole,
final List<Class<? extends DruidService>> serviceClasses,
final boolean useLegacyAnnouncer
)
{
- this.nodeType = nodeType;
+ this.nodeRole = nodeRole;
this.serviceClasses = serviceClasses;
this.useLegacyAnnouncer = useLegacyAnnouncer;
}
@@ -174,7 +174,7 @@ public abstract class ServerRunnable extends GuiceRunnable
builder.put(service.getName(), service);
}
- DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(druidNode, nodeType, builder.build());
+ DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(druidNode, nodeRole, builder.build());
lifecycle.addHandler(
new Lifecycle.Handler()
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index 8ee7b7e..eb06587 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -53,7 +53,7 @@ import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
-import org.apache.druid.discovery.NodeType;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
import org.apache.druid.java.util.common.RE;
@@ -499,11 +499,11 @@ public class SystemSchema extends AbstractSchema
final FluentIterable<Object[]> results = FluentIterable
.from(() -> druidServers)
- .transform(val -> {
+ .transform((DiscoveryDruidNode val) -> {
boolean isDataNode = false;
final DruidNode node = val.getDruidNode();
long currHistoricalSize = 0;
- if (val.getNodeType().equals(NodeType.HISTORICAL)) {
+ if (val.getNodeRole().equals(NodeRole.HISTORICAL)) {
final DruidServer server = serverInventoryView.getInventoryValue(val.toDruidServer().getName());
currHistoricalSize = server.getCurrSize();
isDataNode = true;
@@ -513,7 +513,7 @@ public class SystemSchema extends AbstractSchema
extractHost(node.getHost()),
(long) extractPort(node.getHostAndPort()),
(long) extractPort(node.getHostAndTlsPort()),
- StringUtils.toLowerCase(toStringOrNull(val.getNodeType())),
+ StringUtils.toLowerCase(toStringOrNull(val.getNodeRole())),
isDataNode ? val.toDruidServer().getTier() : null,
isDataNode ? currHistoricalSize : CURRENT_SERVER_SIZE,
isDataNode ? val.toDruidServer().getMaxSize() : MAX_SERVER_SIZE
@@ -524,8 +524,8 @@ public class SystemSchema extends AbstractSchema
private Iterator<DiscoveryDruidNode> getDruidServers(DruidNodeDiscoveryProvider druidNodeDiscoveryProvider)
{
- return Arrays.stream(NodeType.values())
- .flatMap(nodeType -> druidNodeDiscoveryProvider.getForNodeType(nodeType).getAllNodes().stream())
+ return Arrays.stream(NodeRole.values())
+ .flatMap(nodeRole -> druidNodeDiscoveryProvider.getForNodeRole(nodeRole).getAllNodes().stream())
.collect(Collectors.toList())
.iterator();
}
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index ab10c65..8e9d6d9 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -46,7 +46,7 @@ import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
-import org.apache.druid.discovery.NodeType;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
@@ -356,37 +356,37 @@ public class SystemSchemaTest extends CalciteTestBase
private final DiscoveryDruidNode coordinator = new DiscoveryDruidNode(
new DruidNode("s1", "localhost", false, 8081, null, true, false),
- NodeType.COORDINATOR,
+ NodeRole.COORDINATOR,
ImmutableMap.of()
);
private final DiscoveryDruidNode overlord = new DiscoveryDruidNode(
new DruidNode("s2", "localhost", false, 8090, null, true, false),
- NodeType.OVERLORD,
+ NodeRole.OVERLORD,
ImmutableMap.of()
);
private final DiscoveryDruidNode broker1 = new DiscoveryDruidNode(
new DruidNode("s3", "localhost", false, 8082, null, true, false),
- NodeType.BROKER,
+ NodeRole.BROKER,
ImmutableMap.of()
);
private final DiscoveryDruidNode broker2 = new DiscoveryDruidNode(
new DruidNode("s3", "brokerHost", false, 8082, null, true, false),
- NodeType.BROKER,
+ NodeRole.BROKER,
ImmutableMap.of()
);
private final DiscoveryDruidNode router = new DiscoveryDruidNode(
new DruidNode("s4", "localhost", false, 8888, null, true, false),
- NodeType.ROUTER,
+ NodeRole.ROUTER,
ImmutableMap.of()
);
private final DiscoveryDruidNode historical1 = new DiscoveryDruidNode(
new DruidNode("s5", "localhost", false, 8083, null, true, false),
- NodeType.HISTORICAL,
+ NodeRole.HISTORICAL,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0)
)
@@ -394,21 +394,21 @@ public class SystemSchemaTest extends CalciteTestBase
private final DiscoveryDruidNode historical2 = new DiscoveryDruidNode(
new DruidNode("s5", "histHost", false, 8083, null, true, false),
- NodeType.HISTORICAL,
+ NodeRole.HISTORICAL,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0))
);
private final DiscoveryDruidNode middleManager = new DiscoveryDruidNode(
new DruidNode("s6", "mmHost", false, 8091, null, true, false),
- NodeType.MIDDLE_MANAGER,
+ NodeRole.MIDDLE_MANAGER,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0))
);
private final DiscoveryDruidNode peon1 = new DiscoveryDruidNode(
new DruidNode("s7", "localhost", false, 8080, null, true, false),
- NodeType.PEON,
+ NodeRole.PEON,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0)
)
@@ -416,14 +416,14 @@ public class SystemSchemaTest extends CalciteTestBase
private final DiscoveryDruidNode peon2 = new DiscoveryDruidNode(
new DruidNode("s7", "peonHost", false, 8080, null, true, false),
- NodeType.PEON,
+ NodeRole.PEON,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.HISTORICAL, 0))
);
private final DiscoveryDruidNode indexer = new DiscoveryDruidNode(
new DruidNode("s8", "indexerHost", false, 8091, null, true, false),
- NodeType.INDEXER,
+ NodeRole.INDEXER,
ImmutableMap.of(
DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerType.INDEXER_EXECUTOR, 0))
);
@@ -702,24 +702,24 @@ public class SystemSchemaTest extends CalciteTestBase
final DruidNodeDiscovery indexerNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class);
- EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.COORDINATOR))
+ EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.COORDINATOR))
.andReturn(coordinatorNodeDiscovery)
.once();
- EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.OVERLORD))
+ EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.OVERLORD))
.andReturn(overlordNodeDiscovery)
.once();
- EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.BROKER)).andReturn(brokerNodeDiscovery).once();
- EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.ROUTER)).andReturn(routerNodeDiscovery).once();
- EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.HISTORICAL))
+ EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.BROKER)).andReturn(brokerNodeDiscovery).once();
+ EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.ROUTER)).andReturn(routerNodeDiscovery).once();
+ EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.HISTORICAL))
.andReturn(historicalNodeDiscovery)
.once();
- EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.MIDDLE_MANAGER))
+ EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.MIDDLE_MANAGER))
.andReturn(mmNodeDiscovery)
.once();
- EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.INDEXER))
+ EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.INDEXER))
.andReturn(indexerNodeDiscovery)
.once();
- EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType(NodeType.PEON)).andReturn(peonNodeDiscovery).once();
+ EasyMock.expect(druidNodeDiscoveryProvider.getForNodeRole(NodeRole.PEON)).andReturn(peonNodeDiscovery).once();
EasyMock.expect(coordinatorNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(coordinator)).once();
EasyMock.expect(overlordNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of(overlord)).once();
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index 00610e2..38cc983 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -44,7 +44,7 @@ import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
-import org.apache.druid.discovery.NodeType;
+import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.ExpressionModule;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.Pair;
@@ -780,7 +780,7 @@ public class CalciteTests
final DruidLeaderClient druidLeaderClient = new DruidLeaderClient(
EasyMock.createMock(HttpClient.class),
EasyMock.createMock(DruidNodeDiscoveryProvider.class),
- NodeType.COORDINATOR,
+ NodeRole.COORDINATOR,
"/simple/leader",
new ServerDiscoverySelector(EasyMock.createMock(ServiceProvider.class), "test")
)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org