You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by cw...@apache.org on 2023/04/05 09:45:49 UTC

[druid] branch master updated: add null safety checks for DiscoveryDruidNode services for more resilient http server and task views (#13930)

This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c8a184677 add null safety checks for DiscoveryDruidNode services for more resilient http server and task views (#13930)
1c8a184677 is described below

commit 1c8a18467744445a6a2c05bb43ff935618baecc6
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Wed Apr 5 02:45:39 2023 -0700

    add null safety checks for DiscoveryDruidNode services for more resilient http server and task views (#13930)
    
    * add null safety checks for DiscoveryDruidNode services for more resilient http server and task vi
---
 .../overlord/hrtr/HttpRemoteTaskRunner.java        | 22 +++++++++--
 .../druid/client/HttpServerInventoryView.java      | 32 +++++++++++----
 .../apache/druid/discovery/DiscoveryDruidNode.java | 14 +++++++
 .../druid/server/coordination/ServerType.java      | 12 ++++++
 .../server/lookup/cache/LookupNodeDiscovery.java   | 45 ++++++++++------------
 .../druid/client/HttpServerInventoryViewTest.java  | 27 +++++++++++++
 .../druid/sql/calcite/schema/SystemSchema.java     | 25 ++++++------
 7 files changed, 127 insertions(+), 50 deletions(-)

diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index 65f2477080..8674d5a111 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -68,6 +68,7 @@ import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
 import org.apache.druid.indexing.overlord.setup.WorkerSelectStrategy;
 import org.apache.druid.indexing.worker.TaskAnnouncement;
 import org.apache.druid.indexing.worker.Worker;
+import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
@@ -565,13 +566,26 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
 
   private Worker toWorker(DiscoveryDruidNode node)
   {
+    final WorkerNodeService workerNodeService = node.getService(WorkerNodeService.DISCOVERY_SERVICE_KEY, WorkerNodeService.class);
+    if (workerNodeService == null) {
+      // this shouldn't typically happen, but just in case it does, make a dummy worker to allow the callbacks to
+      // continue since addWorker/removeWorker only need worker.getHost()
+      return new Worker(
+          node.getDruidNode().getServiceScheme(),
+          node.getDruidNode().getHostAndPortToUse(),
+          null,
+          0,
+          "",
+          WorkerConfig.DEFAULT_CATEGORY
+      );
+    }
     return new Worker(
         node.getDruidNode().getServiceScheme(),
         node.getDruidNode().getHostAndPortToUse(),
-        ((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getIp(),
-        ((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getCapacity(),
-        ((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getVersion(),
-        ((WorkerNodeService) node.getServices().get(WorkerNodeService.DISCOVERY_SERVICE_KEY)).getCategory()
+        workerNodeService.getIp(),
+        workerNodeService.getCapacity(),
+        workerNodeService.getVersion(),
+        workerNodeService.getCategory()
     );
   }
 
diff --git a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
index 893d455dc7..2f86c46064 100644
--- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
+++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
@@ -44,12 +44,14 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordination.ChangeRequestHttpSyncer;
 import org.apache.druid.server.coordination.ChangeRequestsSnapshot;
 import org.apache.druid.server.coordination.DataSegmentChangeRequest;
 import org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.server.coordination.SegmentChangeRequestDrop;
 import org.apache.druid.server.coordination.SegmentChangeRequestLoad;
+import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 
@@ -174,15 +176,29 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
 
               private DruidServer toDruidServer(DiscoveryDruidNode node)
               {
-
+                final DruidNode druidNode = node.getDruidNode();
+                final DataNodeService dataNodeService = node.getService(DataNodeService.DISCOVERY_SERVICE_KEY, DataNodeService.class);
+                if (dataNodeService == null) {
+                  // this shouldn't typically happen, but just in case it does, make a dummy server to allow the
+                  // callbacks to continue since serverAdded/serverRemoved only need node.getName()
+                  return new DruidServer(
+                      druidNode.getHostAndPortToUse(),
+                      druidNode.getHostAndPort(),
+                      druidNode.getHostAndTlsPort(),
+                      0L,
+                      ServerType.fromNodeRole(node.getNodeRole()),
+                      DruidServer.DEFAULT_TIER,
+                      DruidServer.DEFAULT_PRIORITY
+                  );
+                }
                 return new DruidServer(
-                    node.getDruidNode().getHostAndPortToUse(),
-                    node.getDruidNode().getHostAndPort(),
-                    node.getDruidNode().getHostAndTlsPort(),
-                    ((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getMaxSize(),
-                    ((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getServerType(),
-                    ((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getTier(),
-                    ((DataNodeService) node.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY)).getPriority()
+                    druidNode.getHostAndPortToUse(),
+                    druidNode.getHostAndPort(),
+                    druidNode.getHostAndTlsPort(),
+                    dataNodeService.getMaxSize(),
+                    dataNodeService.getServerType(),
+                    dataNodeService.getTier(),
+                    dataNodeService.getPriority()
                 );
               }
             }
diff --git a/server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java b/server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java
index 17be440c83..5ecdf5e043 100644
--- a/server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java
+++ b/server/src/main/java/org/apache/druid/discovery/DiscoveryDruidNode.java
@@ -21,6 +21,7 @@ package org.apache.druid.discovery;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Maps;
@@ -30,6 +31,7 @@ import org.apache.druid.java.util.common.NonnullPair;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.server.DruidNode;
 
+import javax.annotation.Nullable;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -164,6 +166,18 @@ public class DiscoveryDruidNode
     return druidNode;
   }
 
+  @Nullable
+  @JsonIgnore
+  public <T extends DruidService> T getService(String key, Class<T> clazz)
+  {
+    final DruidService o = services.get(key);
+    if (o != null && clazz.isAssignableFrom(o.getClass())) {
+      //noinspection unchecked
+      return (T) o;
+    }
+    return null;
+  }
+
   @Override
   public boolean equals(Object o)
   {
diff --git a/server/src/main/java/org/apache/druid/server/coordination/ServerType.java b/server/src/main/java/org/apache/druid/server/coordination/ServerType.java
index b9bf2f81f3..59d6f6bcc6 100644
--- a/server/src/main/java/org/apache/druid/server/coordination/ServerType.java
+++ b/server/src/main/java/org/apache/druid/server/coordination/ServerType.java
@@ -149,6 +149,18 @@ public enum ServerType
     return ServerType.valueOf(StringUtils.toUpperCase(type).replace('-', '_'));
   }
 
+  public static ServerType fromNodeRole(NodeRole nodeRole)
+  {
+    // this doesn't actually check that the NodeRole is a typical data node
+    if (nodeRole.equals(NodeRole.HISTORICAL)) {
+      return ServerType.HISTORICAL;
+    } else if (nodeRole.equals(NodeRole.BROKER)) {
+      return ServerType.BROKER;
+    } else {
+      return ServerType.INDEXER_EXECUTOR;
+    }
+  }
+
   @Override
   @JsonValue
   public String toString()
diff --git a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupNodeDiscovery.java b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupNodeDiscovery.java
index 40100fbae3..c809794724 100644
--- a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupNodeDiscovery.java
+++ b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupNodeDiscovery.java
@@ -19,17 +19,13 @@
 
 package org.apache.druid.server.lookup.cache;
 
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableSet;
-import org.apache.druid.discovery.DiscoveryDruidNode;
 import org.apache.druid.discovery.DruidNodeDiscovery;
 import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
 import org.apache.druid.discovery.LookupNodeService;
 import org.apache.druid.server.http.HostAndPortWithScheme;
 
-import javax.annotation.Nullable;
 import java.util.Collection;
 import java.util.Set;
 
@@ -50,27 +46,21 @@ public class LookupNodeDiscovery
     return Collections2.transform(
         Collections2.filter(
             druidNodeDiscovery.getAllNodes(),
-            new Predicate<DiscoveryDruidNode>()
-            {
-              @Override
-              public boolean apply(@Nullable DiscoveryDruidNode node)
-              {
-                return tier.equals(((LookupNodeService) node.getServices()
-                                                            .get(LookupNodeService.DISCOVERY_SERVICE_KEY)).getLookupTier());
+            node -> {
+              if (node == null) {
+                return false;
               }
+              final LookupNodeService lookupNodeService = node.getService(
+                  LookupNodeService.DISCOVERY_SERVICE_KEY,
+                  LookupNodeService.class
+              );
+              return lookupNodeService != null && tier.equals(lookupNodeService.getLookupTier());
             }
         ),
-        new Function<DiscoveryDruidNode, HostAndPortWithScheme>()
-        {
-          @Override
-          public HostAndPortWithScheme apply(@Nullable DiscoveryDruidNode input)
-          {
-            return HostAndPortWithScheme.fromString(
-                input.getDruidNode().getServiceScheme(),
-                input.getDruidNode().getHostAndPortToUse()
-            );
-          }
-        }
+        input -> HostAndPortWithScheme.fromString(
+            input.getDruidNode().getServiceScheme(),
+            input.getDruidNode().getHostAndPortToUse()
+        )
     );
   }
 
@@ -79,8 +69,15 @@ public class LookupNodeDiscovery
     ImmutableSet.Builder<String> builder = new ImmutableSet.Builder<>();
 
     druidNodeDiscovery.getAllNodes().forEach(
-        node -> builder.add(((LookupNodeService) node.getServices()
-                                                     .get(LookupNodeService.DISCOVERY_SERVICE_KEY)).getLookupTier())
+        node -> {
+          final LookupNodeService lookupService = node.getService(
+              LookupNodeService.DISCOVERY_SERVICE_KEY,
+              LookupNodeService.class
+          );
+          if (lookupService != null) {
+            builder.add(lookupService.getLookupTier());
+          }
+        }
     );
 
     return builder.build();
diff --git a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
index b08db90c2c..54f12c8081 100644
--- a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
+++ b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
@@ -29,6 +29,7 @@ import org.apache.druid.discovery.DataNodeService;
 import org.apache.druid.discovery.DiscoveryDruidNode;
 import org.apache.druid.discovery.DruidNodeDiscovery;
 import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
+import org.apache.druid.discovery.LookupNodeService;
 import org.apache.druid.discovery.NodeRole;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.RE;
@@ -58,6 +59,7 @@ import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
@@ -265,6 +267,31 @@ public class HttpServerInventoryViewTest
 
     druidNodeDiscovery.listener.nodesRemoved(ImmutableList.of(druidNode));
 
+    // test removal event with empty services
+    druidNodeDiscovery.listener.nodesRemoved(
+        ImmutableList.of(
+            new DiscoveryDruidNode(
+                new DruidNode("service", "host", false, 8080, null, true, false),
+                NodeRole.INDEXER,
+                Collections.emptyMap()
+            )
+        )
+    );
+
+    // test removal rogue node (announced a service as a DataNodeService but wasn't a DataNodeService at the key)
+    druidNodeDiscovery.listener.nodesRemoved(
+        ImmutableList.of(
+            new DiscoveryDruidNode(
+                new DruidNode("service", "host", false, 8080, null, true, false),
+                NodeRole.INDEXER,
+                ImmutableMap.of(
+                    DataNodeService.DISCOVERY_SERVICE_KEY,
+                    new LookupNodeService("lookyloo")
+                )
+            )
+        )
+    );
+
     serverRemovedCalled.await();
     Assert.assertNull(httpServerInventoryView.getInventoryValue("host:8080"));
 
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index 4af0b934ef..10090e7258 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -56,7 +56,6 @@ import org.apache.druid.discovery.DataNodeService;
 import org.apache.druid.discovery.DiscoveryDruidNode;
 import org.apache.druid.discovery.DruidLeaderClient;
 import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
-import org.apache.druid.discovery.DruidService;
 import org.apache.druid.discovery.NodeRole;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
@@ -545,7 +544,9 @@ public class SystemSchema extends AbstractSchema
           .from(() -> druidServers)
           .transform((DiscoveryDruidNode discoveryDruidNode) -> {
             //noinspection ConstantConditions
-            final boolean isDiscoverableDataServer = isDiscoverableDataServer(discoveryDruidNode);
+            final boolean isDiscoverableDataServer = isDiscoverableDataServer(
+                discoveryDruidNode.getService(DataNodeService.DISCOVERY_SERVICE_KEY, DataNodeService.class)
+            );
             final NodeRole serverRole = discoveryDruidNode.getNodeRole();
 
             if (isDiscoverableDataServer) {
@@ -651,23 +652,19 @@ public class SystemSchema extends AbstractSchema
       };
     }
 
-    private static boolean isDiscoverableDataServer(DiscoveryDruidNode druidNode)
+    private static boolean isDiscoverableDataServer(DataNodeService dataNodeService)
     {
-      final DruidService druidService = druidNode.getServices().get(DataNodeService.DISCOVERY_SERVICE_KEY);
-      if (druidService == null) {
-        return false;
-      }
-      final DataNodeService dataNodeService = (DataNodeService) druidService;
-      return dataNodeService.isDiscoverable();
+      return dataNodeService != null && dataNodeService.isDiscoverable();
     }
 
     private static DruidServer toDruidServer(DiscoveryDruidNode discoveryDruidNode)
     {
-      if (isDiscoverableDataServer(discoveryDruidNode)) {
-        final DruidNode druidNode = discoveryDruidNode.getDruidNode();
-        final DataNodeService dataNodeService = (DataNodeService) discoveryDruidNode
-            .getServices()
-            .get(DataNodeService.DISCOVERY_SERVICE_KEY);
+      final DruidNode druidNode = discoveryDruidNode.getDruidNode();
+      final DataNodeService dataNodeService = discoveryDruidNode.getService(
+          DataNodeService.DISCOVERY_SERVICE_KEY,
+          DataNodeService.class
+      );
+      if (isDiscoverableDataServer(dataNodeService)) {
         return new DruidServer(
             druidNode.getHostAndPortToUse(),
             druidNode.getHostAndPort(),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org