You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/11/11 19:18:34 UTC

[GitHub] leventov commented on a change in pull request #4697: update internal-discovery Listener for node list and use same at router and coordinator

leventov commented on a change in pull request #4697: update internal-discovery Listener for node list and use same at router and coordinator
URL: https://github.com/apache/incubator-druid/pull/4697#discussion_r232500809
 
 

 ##########
 File path: server/src/main/java/io/druid/discovery/DruidNodeDiscoveryProvider.java
 ##########
 @@ -73,82 +76,139 @@
   /**
    * Get DruidNodeDiscovery instance to discover nodes that announce given service in its metadata.
    */
-  public synchronized DruidNodeDiscovery getForService(String serviceName)
+  public DruidNodeDiscovery getForService(String serviceName)
   {
-    ServiceListener nodeDiscovery = serviceDiscoveryMap.get(serviceName);
-
-    if (nodeDiscovery == null) {
-      Set<String> nodeTypesToWatch = DruidNodeDiscoveryProvider.SERVICE_TO_NODE_TYPES.get(serviceName);
-      if (nodeTypesToWatch == null) {
-        throw new IAE("Unknown service [%s].", serviceName);
-      }
+    return serviceDiscoveryMap.compute(
+        serviceName,
+        (k, v) -> {
+          if (v != null) {
+            return v;
+          }
 
-      nodeDiscovery = new ServiceListener(serviceName);
-      for (String nodeType : nodeTypesToWatch) {
-        getForNodeType(nodeType).registerListener(nodeDiscovery);
-      }
-      serviceDiscoveryMap.put(serviceName, nodeDiscovery);
-    }
+          Set<String> nodeTypesToWatch = DruidNodeDiscoveryProvider.SERVICE_TO_NODE_TYPES.get(serviceName);
+          if (nodeTypesToWatch == null) {
+            throw new IAE("Unknown service [%s].", serviceName);
+          }
 
-    return nodeDiscovery;
+          ServiceDruidNodeDiscovery serviceDiscovery = new ServiceDruidNodeDiscovery(serviceName);
+          for (String nodeType : nodeTypesToWatch) {
+            getForNodeType(nodeType).registerListener(serviceDiscovery.nodeTypeListener());
+          }
+          return serviceDiscovery;
+        }
+    );
   }
 
-  private static class ServiceListener implements DruidNodeDiscovery, DruidNodeDiscovery.Listener
+  private static class ServiceDruidNodeDiscovery implements DruidNodeDiscovery
   {
+    private static final Logger log = new Logger(ServiceDruidNodeDiscovery.class);
+
     private final String service;
     private final Map<String, DiscoveryDruidNode> nodes = new ConcurrentHashMap<>();
 
     private final List<Listener> listeners = new ArrayList<>();
 
-    ServiceListener(String service)
+    private final Object lock = new Object();
+
+    private Set<NodeTypeListener> uninitializedNodeTypeListeners = new HashSet<>();
+
+    ServiceDruidNodeDiscovery(String service)
     {
       this.service = service;
     }
 
     @Override
-    public synchronized void nodeAdded(DiscoveryDruidNode node)
+    public Collection<DiscoveryDruidNode> getAllNodes()
     {
-      if (node.getServices().containsKey(service)) {
-        DiscoveryDruidNode prev = nodes.putIfAbsent(node.getDruidNode().getHostAndPortToUse(), node);
-
-        if (prev == null) {
-          for (Listener listener : listeners) {
-            listener.nodeAdded(node);
-          }
-        } else {
-          log.warn("Node[%s] discovered but already exists [%s].", node, prev);
-        }
-      } else {
-        log.warn("Node[%s] discovered but doesn't have service[%s]. Ignored.", node, service);
-      }
+      return Collections.unmodifiableCollection(nodes.values());
     }
 
     @Override
-    public synchronized void nodeRemoved(DiscoveryDruidNode node)
+    public void registerListener(Listener listener)
     {
-      DiscoveryDruidNode prev = nodes.remove(node.getDruidNode().getHostAndPortToUse());
-      if (prev != null) {
-        for (Listener listener : listeners) {
-          listener.nodeRemoved(node);
+      synchronized (lock) {
+        if (uninitializedNodeTypeListeners.isEmpty()) {
 
 Review comment:
   @himanshug could you explain what does this mean? The logic around `uninitializedNodeTypeListeners` is super convoluted. And it doesn't seem to work for Listeners that are not `NodeTypeListener`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org