You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2023/04/05 09:45:49 UTC
[druid] branch master updated: add null safety checks for DiscoveryDruidNode services for more resilient http server and task views (#13930)
This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 1c8a184677 add null safety checks for DiscoveryDruidNode services for more resilient http server and task views (#13930)
1c8a184677 is described below
commit 1c8a18467744445a6a2c05bb43ff935618baecc6
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Wed Apr 5 02:45:39 2023 -0700
add null safety checks for DiscoveryDruidNode services for more resilient http server and task views (#13930)
* add null safety checks for DiscoveryDruidNode services for more resilient http server and task vi
---
.../overlord/hrtr/HttpRemoteTaskRunner.java | 22 +++++++++--
.../druid/client/HttpServerInventoryView.java | 32 +++++++++++----
.../apache/druid/discovery/DiscoveryDruidNode.java | 14 +++++++
.../druid/server/coordination/ServerType.java | 12 ++++++
.../server/lookup/cache/LookupNodeDiscovery.java | 45 ++++++++++------------
.../druid/client/HttpServerInventoryViewTest.java | 27 +++++++++++++
.../druid/sql/calcite/schema/SystemSchema.java | 25 ++++++------
7 files changed, 127 insertions(+), 50 deletions(-)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 65f2477080..8674d5a111 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -68,6 +68,7 @@ import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerSelectStrategy;
import org.apache.druid.indexing.worker.TaskAnnouncement;
import org.apache.druid.indexing.worker.Worker;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
@@ -565,13 +566,26 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
private Worker toWorker(DiscoveryDruidNode node)
{
+ final WorkerNodeService workerNodeService = node.getService(WorkerNodeService.DISCOVERY_SERVICE_KEY, WorkerNodeService.class);
+ if (workerNodeService == null) {
+ // this shouldn't typically happen, but just in case it does, make a dummy worker to allow the callbacks to
+ // continue since addWorker/removeWorker only need worker.getHost()
+ return new Worker(
+ node.getDruidNode().getServiceScheme(),
+ node.getDruidNode().getHostAndPortToUse(),
+ null,
+ 0,
+ "",
+ WorkerConfig.DEFAULT_CATEGORY
+ );
+ }
return new Worker(
node.getDruidNode().getServiceScheme(),
node.getDruidNode().getHostAndPortToUse(),
- ((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getIp(),
- ((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getCapacity(),
- ((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getVersion(),
- ((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getCategory()
+ workerNodeService.getIp(),
+ workerNodeService.getCapacity(),
+ workerNodeService.getVersion(),
+ workerNodeService.getCategory()
);
}
diff --git a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
index 893d455dc7..2f86c46064 100644
--- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
+++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
@@ -44,12 +44,14 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.ChangeRequestHttpSyncer;
import org.apache.druid.server.coordination.ChangeRequestsSnapshot;
import org.apache.druid.server.coordination.DataSegmentChangeRequest;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.SegmentChangeRequestDrop;
import org.apache.druid.server.coordination.SegmentChangeRequestLoad;
+import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
@@ -174,15 +176,29 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
private DruidServer toDruidServer(DiscoveryDruidNode node)
{
-
+ final DruidNode druidNode = node.getDruidNode();
+ final DataNodeService dataNodeService = node.getService(DataNodeService.DISCOVERY_SERVICE_KEY, DataNodeService.class);
+ if (dataNodeService == null) {
+ // this shouldn't typically happen, but just in case it does, make a dummy server to allow the
+ // callbacks to continue since serverAdded/serverRemoved only need node.getName()
+ return new DruidServer(
+ druidNode.getHostAndPortToUse(),
+ druidNode.getHostAndPort(),
+ druidNode.getHostAndTlsPort(),
+ 0L,
+ ServerType.fromNodeRole(node.getNodeRole()),
+ DruidServer.DEFAULT_TIER,
+ DruidServer.DEFAULT_PRIORITY
+ );
+ }
return new DruidServer(
- node.getDruidNode().getHostAndPortToUse(),
- node.getDruidNode().getHostAndPort(),
- node.getDruidNode().getHostAndTlsPort(),
- ((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getMaxSize(),
- ((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getServerType(),
- ((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getTier(),
- ((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getPriority()
+ druidNode.getHostAndPortToUse(),
+ druidNode.getHostAndPort(),
+ druidNode.getHostAndTlsPort(),
+ dataNodeService.getMaxSize(),
+ dataNodeService.getServerType(),
+ dataNodeService.getTier(),
+ dataNodeService.getPriority()
);
}
}
diff --git a/server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java b/server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java
index 17be440c83..5ecdf5e043 100644
--- a/server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java
+++ b/server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java
@@ -21,6 +21,7 @@ package org.apache.druid.discovery;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
@@ -30,6 +31,7 @@ import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.DruidNode;
+import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -164,6 +166,18 @@ public class DiscoveryDruidNode
return druidNode;
}
+ @Nullable
+ @JsonIgnore
+ public <T extends DruidService> T getService(String key, Class<T> clazz)
+ {
+ final DruidService o = services.get(key);
+ if (o != null && clazz.isAssignableFrom(o.getClass())) {
+ //noinspection unchecked
+ return (T) o;
+ }
+ return null;
+ }
+
@Override
public boolean equals(Object o)
{
diff --git a/server/src/main/java/org/apache/druid/server/coordination/ServerType.java b/server/src/main/java/org/apache/druid/server/coordination/ServerType.java
index b9bf2f81f3..59d6f6bcc6 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/ServerType.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/ServerType.java
@@ -149,6 +149,18 @@ public enum ServerType
return ServerType.valueOf(StringUtils.toUpperCase(type).replace('-', '_'));
}
+ public static ServerType fromNodeRole(NodeRole nodeRole)
+ {
+ // this doesn't actually check that the NodeRole is a typical data node
+ if (nodeRole.equals(NodeRole.HISTORICAL)) {
+ return ServerType.HISTORICAL;
+ } else if (nodeRole.equals(NodeRole.BROKER)) {
+ return ServerType.BROKER;
+ } else {
+ return ServerType.INDEXER_EXECUTOR;
+ }
+ }
+
@Override
@JsonValue
public String toString()
diff --git a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupNodeDiscovery.java b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupNodeDiscovery.java
index 40100fbae3..c809794724 100644
--- a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupNodeDiscovery.java
+++ b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupNodeDiscovery.java
@@ -19,17 +19,13 @@
package org.apache.druid.server.lookup.cache;
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableSet;
-import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.server.http.HostAndPortWithScheme;
-import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Set;
@@ -50,27 +46,21 @@ public class LookupNodeDiscovery
return Collections2.transform(
Collections2.filter(
druidNodeDiscovery.getAllNodes(),
- new Predicate<DiscoveryDruidNode>()
- {
- @Override
- public boolean apply(@Nullable DiscoveryDruidNode node)
- {
- return tier.equals(((LookupNodeService) node.getServices()
- .get(LookupNodeService.DISCOVERY_SERVICE_KEY)).getLookupTier());
+ node -> {
+ if (node == null) {
+ return false;
}
+ final LookupNodeService lookupNodeService = node.getService(
+ LookupNodeService.DISCOVERY_SERVICE_KEY,
+ LookupNodeService.class
+ );
+ return lookupNodeService != null && tier.equals(lookupNodeService.getLookupTier());
}
),
- new Function<DiscoveryDruidNode, HostAndPortWithScheme>()
- {
- @Override
- public HostAndPortWithScheme apply(@Nullable DiscoveryDruidNode input)
- {
- return HostAndPortWithScheme.fromString(
- input.getDruidNode().getServiceScheme(),
- input.getDruidNode().getHostAndPortToUse()
- );
- }
- }
+ input -> HostAndPortWithScheme.fromString(
+ input.getDruidNode().getServiceScheme(),
+ input.getDruidNode().getHostAndPortToUse()
+ )
);
}
@@ -79,8 +69,15 @@ public class LookupNodeDiscovery
ImmutableSet.Builder<String> builder = new ImmutableSet.Builder<>();
druidNodeDiscovery.getAllNodes().forEach(
- node -> builder.add(((LookupNodeService) node.getServices()
- .get(LookupNodeService.DISCOVERY_SERVICE_KEY)).getLookupTier())
+ node -> {
+ final LookupNodeService lookupService = node.getService(
+ LookupNodeService.DISCOVERY_SERVICE_KEY,
+ LookupNodeService.class
+ );
+ if (lookupService != null) {
+ builder.add(lookupService.getLookupTier());
+ }
+ }
);
return builder.build();
diff --git a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
index b08db90c2c..54f12c8081 100644
--- a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
+++ b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
@@ -29,6 +29,7 @@ import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
+import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.RE;
@@ -58,6 +59,7 @@ import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
@@ -265,6 +267,31 @@ public class HttpServerInventoryViewTest
druidNodeDiscovery.listener.nodesRemoved(ImmutableList.of(druidNode));
+ // test removal event with empty services
+ druidNodeDiscovery.listener.nodesRemoved(
+ ImmutableList.of(
+ new DiscoveryDruidNode(
+ new DruidNode("service", "host", false, 8080, null, true, false),
+ NodeRole.INDEXER,
+ Collections.emptyMap()
+ )
+ )
+ );
+
+ // test removal rogue node (announced a service as a DataNodeService but wasn't a DataNodeService at the key)
+ druidNodeDiscovery.listener.nodesRemoved(
+ ImmutableList.of(
+ new DiscoveryDruidNode(
+ new DruidNode("service", "host", false, 8080, null, true, false),
+ NodeRole.INDEXER,
+ ImmutableMap.of(
+ DataNodeService.DISCOVERY_SERVICE_KEY,
+ new LookupNodeService("lookyloo")
+ )
+ )
+ )
+ );
+
serverRemovedCalled.await();
Assert.assertNull(httpServerInventoryView.getInventoryValue("host:8080"));
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index 4af0b934ef..10090e7258 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -56,7 +56,6 @@ import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
-import org.apache.druid.discovery.DruidService;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
@@ -545,7 +544,9 @@ public class SystemSchema extends AbstractSchema
.from(() -> druidServers)
.transform((DiscoveryDruidNode discoveryDruidNode) -> {
//noinspection ConstantConditions
- final boolean isDiscoverableDataServer = isDiscoverableDataServer(discoveryDruidNode);
+ final boolean isDiscoverableDataServer = isDiscoverableDataServer(
+ discoveryDruidNode.getService(DataNodeService.DISCOVERY_SERVICE_KEY, DataNodeService.class)
+ );
final NodeRole serverRole = discoveryDruidNode.getNodeRole();
if (isDiscoverableDataServer) {
@@ -651,23 +652,19 @@ public class SystemSchema extends AbstractSchema
};
}
- private static boolean isDiscoverableDataServer(DiscoveryDruidNode druidNode)
+ private static boolean isDiscoverableDataServer(DataNodeService dataNodeService)
{
- final DruidService druidService = druidNode.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY);
- if (druidService == null) {
- return false;
- }
- final DataNodeService dataNodeService = (DataNodeService) druidService;
- return dataNodeService.isDiscoverable();
+ return dataNodeService != null && dataNodeService.isDiscoverable();
}
private static DruidServer toDruidServer(DiscoveryDruidNode discoveryDruidNode)
{
- if (isDiscoverableDataServer(discoveryDruidNode)) {
- final DruidNode druidNode = discoveryDruidNode.getDruidNode();
- final DataNodeService dataNodeService = (DataNodeService) discoveryDruidNode
- .getServices()
- .get(DataNodeService.DISCOVERY_SERVICE_KEY);
+ final DruidNode druidNode = discoveryDruidNode.getDruidNode();
+ final DataNodeService dataNodeService = discoveryDruidNode.getService(
+ DataNodeService.DISCOVERY_SERVICE_KEY,
+ DataNodeService.class
+ );
+ if (isDiscoverableDataServer(dataNodeService)) {
return new DruidServer(
druidNode.getHostAndPortToUse(),
druidNode.getHostAndPort(),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org