You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by kf...@apache.org on 2023/03/30 10:49:22 UTC

[druid] branch master updated: Handle null values in BrokerServerView.serverAddedSegment (#13980)

This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 47face9ca9 Handle null values in BrokerServerView.serverAddedSegment (#13980)
47face9ca9 is described below

commit 47face9ca92a50290e9d98c7a84e5648c5417894
Author: Kashif Faraz <ka...@gmail.com>
AuthorDate: Thu Mar 30 16:19:05 2023 +0530

    Handle null values in BrokerServerView.serverAddedSegment (#13980)
    
    Due to race conditions, the BrokerServerView may sometimes try to add a segment to a server which has already been removed from the inventory. This results in an NPE and keeps the BrokerServerView from processing all change requests.
---
 .../org/apache/druid/client/BrokerServerView.java  | 40 ++++++++++++++--------
 1 file changed, 26 insertions(+), 14 deletions(-)

diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
index 05dad05bd9..a10dd28539 100644
--- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java
+++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
@@ -71,9 +71,9 @@ public class BrokerServerView implements TimelineServerView
 
   private final Object lock = new Object();
 
-  private final ConcurrentMap<String, QueryableDruidServer> clients;
-  private final Map<SegmentId, ServerSelector> selectors;
-  private final Map<String, VersionedIntervalTimeline<String, ServerSelector>> timelines;
+  private final ConcurrentMap<String, QueryableDruidServer> clients = new ConcurrentHashMap<>();
+  private final Map<SegmentId, ServerSelector> selectors = new HashMap<>();
+  private final Map<String, VersionedIntervalTimeline<String, ServerSelector>> timelines = new HashMap<>();
   private final ConcurrentMap<TimelineCallback, Executor> timelineCallbacks = new ConcurrentHashMap<>();
 
   private final QueryToolChestWarehouse warehouse;
@@ -107,9 +107,6 @@ public class BrokerServerView implements TimelineServerView
     this.baseView = baseView;
     this.tierSelectorStrategy = tierSelectorStrategy;
     this.emitter = emitter;
-    this.clients = new ConcurrentHashMap<>();
-    this.selectors = new HashMap<>();
-    this.timelines = new HashMap<>();
 
     // Validate and set the segment watcher config
     validateSegmentWatcherConfig(segmentWatcherConfig);
@@ -183,10 +180,10 @@ public class BrokerServerView implements TimelineServerView
   {
     if (segmentWatcherConfig.isAwaitInitializationOnStart()) {
       final long startMillis = System.currentTimeMillis();
-      log.info("%s waiting for initialization.", getClass().getSimpleName());
+      log.info("BrokerServerView waiting for initialization.");
       awaitInitialization();
       final long endMillis = System.currentTimeMillis();
-      log.info("%s initialized in [%,d] ms.", getClass().getSimpleName(), endMillis - startMillis);
+      log.info("BrokerServerView initialized in [%,d] ms.", endMillis - startMillis);
       emitter.emit(ServiceMetricEvent.builder().build(
           "init/serverview/time",
           endMillis - startMillis
@@ -267,7 +264,7 @@ public class BrokerServerView implements TimelineServerView
 
   private void serverAddedSegment(final DruidServerMetadata server, final DataSegment segment)
   {
-    SegmentId segmentId = segment.getId();
+    final SegmentId segmentId = segment.getId();
     synchronized (lock) {
       // in theory we could probably just filter this to ensure we don't put ourselves in here, to make broker tree
       // query topologies, but for now just skip all brokers, so we don't create some sort of wild infinite query
@@ -291,7 +288,17 @@ public class BrokerServerView implements TimelineServerView
 
         QueryableDruidServer queryableDruidServer = clients.get(server.getName());
         if (queryableDruidServer == null) {
-          queryableDruidServer = addServer(baseView.getInventoryValue(server.getName()));
+          DruidServer inventoryValue = baseView.getInventoryValue(server.getName());
+          if (inventoryValue == null) {
+            log.warn(
+                "Could not find server[%s] in inventory. Skipping addition of segment[%s].",
+                server.getName(),
+                segmentId
+            );
+            return;
+          } else {
+            queryableDruidServer = addServer(inventoryValue);
+          }
         }
         selector.addServerAndUpdateSegment(queryableDruidServer, segment);
       }
@@ -302,8 +309,7 @@ public class BrokerServerView implements TimelineServerView
 
   private void serverRemovedSegment(DruidServerMetadata server, DataSegment segment)
   {
-
-    SegmentId segmentId = segment.getId();
+    final SegmentId segmentId = segment.getId();
     final ServerSelector selector;
 
     synchronized (lock) {
@@ -323,7 +329,13 @@ public class BrokerServerView implements TimelineServerView
       }
 
       QueryableDruidServer queryableDruidServer = clients.get(server.getName());
-      if (!selector.removeServer(queryableDruidServer)) {
+      if (queryableDruidServer == null) {
+        log.warn(
+            "Could not find server[%s] in inventory. Skipping removal of segment[%s].",
+            server.getName(),
+            segmentId
+        );
+      } else if (!selector.removeServer(queryableDruidServer)) {
         log.warn(
             "Asked to disassociate non-existant association between server[%s] and segment[%s]",
             server,
@@ -378,7 +390,7 @@ public class BrokerServerView implements TimelineServerView
     synchronized (lock) {
       QueryableDruidServer queryableDruidServer = clients.get(server.getName());
       if (queryableDruidServer == null) {
-        log.error("No QueryableDruidServer found for %s", server.getName());
+        log.error("No QueryRunner found for server name[%s].", server.getName());
         return null;
       }
       return queryableDruidServer.getQueryRunner();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org