You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2020/09/14 15:46:53 UTC

[lucene-solr] 02/02: SOLR-14749: Add more types of cluster events. Improve example implementation.

This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch jira/solr-14749
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 6087e0218fb6b2566028fb71e1c89b89ceb8b4f1
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Mon Sep 14 17:45:35 2020 +0200

    SOLR-14749: Add more types of cluster events. Improve example implementation.
---
 .../src/java/org/apache/solr/cloud/Overseer.java   |   4 +-
 ...wnEvent.java => ClusterConfigChangedEvent.java} |  14 +-
 .../apache/solr/cluster/events/ClusterEvent.java   |   9 +-
 ...deDownEvent.java => CollectionsAddedEvent.java} |   8 +-
 ...DownEvent.java => CollectionsRemovedEvent.java} |   8 +-
 .../{NodeUpEvent.java => NodesDownEvent.java}      |   8 +-
 .../{NodeDownEvent.java => NodesUpEvent.java}      |   8 +-
 .../{NodeDownEvent.java => ReplicasDownEvent.java} |  10 +-
 .../events/impl/ClusterEventProducerImpl.java      | 155 ++++++++++++++++-----
 ...er.java => CollectionsRepairEventListener.java} |  24 ++--
 .../java/org/apache/solr/core/CoreContainer.java   |  10 +-
 11 files changed, 180 insertions(+), 78 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 3b80371..733ddaa 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -782,7 +782,7 @@ public class Overseer implements SolrCloseable {
    * Start {@link ClusterSingleton} plugins when we become the leader.
    */
   private void startClusterSingletons() {
-    getCoreContainer().getContainerSingletons().forEach((name, singleton) -> {
+    getCoreContainer().getClusterSingletons().forEach((name, singleton) -> {
       try {
         singleton.start();
         if (singleton instanceof ClusterEventListener) {
@@ -798,7 +798,7 @@ public class Overseer implements SolrCloseable {
    * Stop {@link ClusterSingleton} plugins when we lose leadership.
    */
   private void stopClusterSingletons() {
-    getCoreContainer().getContainerSingletons().forEach((name, singleton) -> {
+    getCoreContainer().getClusterSingletons().forEach((name, singleton) -> {
       if (singleton instanceof ClusterEventListener) {
         getCoreContainer().getClusterEventProducer().unregisterListener((ClusterEventListener) singleton);
       }
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ReplicaDownEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterConfigChangedEvent.java
similarity index 80%
rename from solr/core/src/java/org/apache/solr/cluster/events/ReplicaDownEvent.java
rename to solr/core/src/java/org/apache/solr/cluster/events/ClusterConfigChangedEvent.java
index bce20b4..e536468 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/ReplicaDownEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterConfigChangedEvent.java
@@ -16,22 +16,20 @@
  */
 package org.apache.solr.cluster.events;
 
+import java.util.Map;
+
 /**
  *
  */
-public interface ReplicaDownEvent extends ClusterEvent {
+public interface ClusterConfigChangedEvent extends ClusterEvent {
 
   @Override
   default EventType getType() {
-    return EventType.REPLICA_DOWN;
+    return EventType.CLUSTER_CONFIG_CHANGED;
   }
 
-  String getNodeName();
-
-  String getCollectionName();
+  Map<String, Object> getOldClusterConfig();
 
-  String getShardName();
+  Map<String, Object> getNewClusterConfig();
 
-  // so called coreNodeName
-  String getReplicaName();
 }
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java
index ee8d955..2853f74 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java
@@ -24,9 +24,12 @@ import java.time.Instant;
 public interface ClusterEvent {
 
   enum EventType {
-    NODE_DOWN,
-    NODE_UP,
-    REPLICA_DOWN,
+    NODES_DOWN,
+    NODES_UP,
+    COLLECTIONS_ADDED,
+    COLLECTIONS_REMOVED,
+    REPLICAS_DOWN,
+    CLUSTER_CONFIG_CHANGED,
     // other types? eg. Overseer leader change, shard leader change,
     // node overload (eg. CPU / MEM circuit breakers tripped)?
   }
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/NodeDownEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsAddedEvent.java
similarity index 83%
copy from solr/core/src/java/org/apache/solr/cluster/events/NodeDownEvent.java
copy to solr/core/src/java/org/apache/solr/cluster/events/CollectionsAddedEvent.java
index 47e6d84..f4ba87e 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/NodeDownEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsAddedEvent.java
@@ -16,15 +16,17 @@
  */
 package org.apache.solr.cluster.events;
 
+import java.util.Collection;
+
 /**
  *
  */
-public interface NodeDownEvent extends ClusterEvent {
+public interface CollectionsAddedEvent extends ClusterEvent {
 
   @Override
   default EventType getType() {
-    return EventType.NODE_DOWN;
+    return EventType.COLLECTIONS_ADDED;
   }
 
-  String getNodeName();
+  Collection<String> getCollectionNames();
 }
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/NodeDownEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsRemovedEvent.java
similarity index 83%
copy from solr/core/src/java/org/apache/solr/cluster/events/NodeDownEvent.java
copy to solr/core/src/java/org/apache/solr/cluster/events/CollectionsRemovedEvent.java
index 47e6d84..e6a9175 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/NodeDownEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsRemovedEvent.java
@@ -16,15 +16,17 @@
  */
 package org.apache.solr.cluster.events;
 
+import java.util.Collection;
+
 /**
  *
  */
-public interface NodeDownEvent extends ClusterEvent {
+public interface CollectionsRemovedEvent extends ClusterEvent {
 
   @Override
   default EventType getType() {
-    return EventType.NODE_DOWN;
+    return EventType.COLLECTIONS_REMOVED;
   }
 
-  String getNodeName();
+  Collection<String> getCollectionNames();
 }
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/NodeUpEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/NodesDownEvent.java
similarity index 85%
rename from solr/core/src/java/org/apache/solr/cluster/events/NodeUpEvent.java
rename to solr/core/src/java/org/apache/solr/cluster/events/NodesDownEvent.java
index 594473d..06141e3 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/NodeUpEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/NodesDownEvent.java
@@ -16,15 +16,17 @@
  */
 package org.apache.solr.cluster.events;
 
+import java.util.Collection;
+
 /**
  *
  */
-public interface NodeUpEvent extends ClusterEvent {
+public interface NodesDownEvent extends ClusterEvent {
 
   @Override
   default EventType getType() {
-    return EventType.NODE_UP;
+    return EventType.NODES_DOWN;
   }
 
-  String getNodeName();
+  Collection<String> getNodeNames();
 }
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/NodeDownEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/NodesUpEvent.java
similarity index 85%
copy from solr/core/src/java/org/apache/solr/cluster/events/NodeDownEvent.java
copy to solr/core/src/java/org/apache/solr/cluster/events/NodesUpEvent.java
index 47e6d84..241b8bf 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/NodeDownEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/NodesUpEvent.java
@@ -16,15 +16,17 @@
  */
 package org.apache.solr.cluster.events;
 
+import java.util.Collection;
+
 /**
  *
  */
-public interface NodeDownEvent extends ClusterEvent {
+public interface NodesUpEvent extends ClusterEvent {
 
   @Override
   default EventType getType() {
-    return EventType.NODE_DOWN;
+    return EventType.NODES_UP;
   }
 
-  String getNodeName();
+  Collection<String> getNodeNames();
 }
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/NodeDownEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/ReplicasDownEvent.java
similarity index 81%
rename from solr/core/src/java/org/apache/solr/cluster/events/NodeDownEvent.java
rename to solr/core/src/java/org/apache/solr/cluster/events/ReplicasDownEvent.java
index 47e6d84..e4dc106 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/NodeDownEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ReplicasDownEvent.java
@@ -16,15 +16,19 @@
  */
 package org.apache.solr.cluster.events;
 
+import org.apache.solr.common.cloud.Replica;
+
+import java.util.Collection;
+
 /**
  *
  */
-public interface NodeDownEvent extends ClusterEvent {
+public interface ReplicasDownEvent extends ClusterEvent {
 
   @Override
   default EventType getType() {
-    return EventType.NODE_DOWN;
+    return EventType.REPLICAS_DOWN;
   }
 
-  String getNodeName();
+  Collection<Replica> getReplicas();
 }
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java
index f0ea7f1..3186b18 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java
@@ -20,18 +20,25 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cluster.events.ClusterConfigChangedEvent;
 import org.apache.solr.cluster.events.ClusterEvent;
 import org.apache.solr.cluster.events.ClusterEventListener;
 import org.apache.solr.cluster.events.ClusterEventProducer;
 import org.apache.solr.cloud.ClusterSingleton;
-import org.apache.solr.cluster.events.NodeDownEvent;
-import org.apache.solr.cluster.events.NodeUpEvent;
+import org.apache.solr.cluster.events.CollectionsAddedEvent;
+import org.apache.solr.cluster.events.CollectionsRemovedEvent;
+import org.apache.solr.cluster.events.NodesDownEvent;
+import org.apache.solr.cluster.events.NodesUpEvent;
+import org.apache.solr.common.cloud.CloudCollectionsListener;
+import org.apache.solr.common.cloud.ClusterPropertiesListener;
 import org.apache.solr.common.cloud.LiveNodesListener;
 import org.apache.solr.core.CoreContainer;
 import org.slf4j.Logger;
@@ -50,14 +57,20 @@ public class ClusterEventProducerImpl implements ClusterEventProducer, ClusterSi
   private final Map<ClusterEvent.EventType, Set<ClusterEventListener>> listeners = new HashMap<>();
   private final CoreContainer cc;
   private LiveNodesListener liveNodesListener;
+  private CloudCollectionsListener cloudCollectionsListener;
+  private ClusterPropertiesListener clusterPropertiesListener;
+  private Map<String, Object> lastClusterProperties;
   private ZkController zkController;
   private boolean running;
 
   private final Set<ClusterEvent.EventType> supportedEvents =
-      new HashSet<>() {{
-        add(ClusterEvent.EventType.NODE_DOWN);
-        add(ClusterEvent.EventType.NODE_UP);
-      }};
+      new HashSet<>(Arrays.asList(
+          ClusterEvent.EventType.NODES_DOWN,
+          ClusterEvent.EventType.NODES_UP,
+          ClusterEvent.EventType.COLLECTIONS_ADDED,
+          ClusterEvent.EventType.COLLECTIONS_REMOVED,
+          ClusterEvent.EventType.CLUSTER_CONFIG_CHANGED
+      ));
 
   private volatile boolean isClosed = false;
 
@@ -71,6 +84,8 @@ public class ClusterEventProducerImpl implements ClusterEventProducer, ClusterSi
     this.zkController = this.cc.getZkController();
     if (zkController == null) {
       liveNodesListener = null;
+      cloudCollectionsListener = null;
+      clusterPropertiesListener = null;
       return;
     }
 
@@ -88,39 +103,103 @@ public class ClusterEventProducerImpl implements ClusterEventProducer, ClusterSi
       if (oldNodes.equals(newNodes)) {
         return false;
       }
-      oldNodes.forEach(oldNode -> {
-        if (!newNodes.contains(oldNode)) {
-          fireEvent(new NodeDownEvent() {
-            final Instant timestamp = Instant.now();
-            @Override
-            public Instant getTimestamp() {
-              return timestamp;
-            }
-
-            @Override
-            public String getNodeName() {
-              return oldNode;
-            }
-          });
+      final Instant now = Instant.now();
+      final Set<String> downNodes = new HashSet<>(oldNodes);
+      downNodes.removeAll(newNodes);
+      if (!downNodes.isEmpty()) {
+        fireEvent(new NodesDownEvent() {
+          @Override
+          public Collection<String> getNodeNames() {
+            return downNodes;
+          }
+
+          @Override
+          public Instant getTimestamp() {
+            return now;
+          }
+        });
+      }
+      final Set<String> upNodes = new HashSet<>(newNodes);
+      upNodes.removeAll(oldNodes);
+      if (!upNodes.isEmpty()) {
+        fireEvent(new NodesUpEvent() {
+          @Override
+          public Collection<String> getNodeNames() {
+            return upNodes;
+          }
+
+          @Override
+          public Instant getTimestamp() {
+            return now;
+          }
+        });
+      }
+      return false;
+    };
+
+    cloudCollectionsListener = ((oldCollections, newCollections) -> {
+      if (oldCollections.equals(newCollections)) {
+        return;
+      }
+      final Instant now = Instant.now();
+      final Set<String> removed = new HashSet<>(oldCollections);
+      removed.removeAll(newCollections);
+      if (!removed.isEmpty()) {
+        fireEvent(new CollectionsRemovedEvent() {
+          @Override
+          public Collection<String> getCollectionNames() {
+            return removed;
+          }
+
+          @Override
+          public Instant getTimestamp() {
+            return now;
+          }
+        });
+      }
+      final Set<String> added = new HashSet<>(newCollections);
+      added.removeAll(oldCollections);
+      if (!added.isEmpty()) {
+        fireEvent(new CollectionsAddedEvent() {
+          @Override
+          public Collection<String> getCollectionNames() {
+            return added;
+          }
+
+          @Override
+          public Instant getTimestamp() {
+            return now;
+          }
+        });
+      }
+    });
+    zkController.zkStateReader.registerCloudCollectionsListener(cloudCollectionsListener);
+
+    lastClusterProperties = zkController.zkStateReader.getClusterProperties();
+    clusterPropertiesListener = (newProperties) -> {
+      if (newProperties.equals(lastClusterProperties)) {
+        return false;
+      }
+      fireEvent(new ClusterConfigChangedEvent() {
+        @Override
+        public Map<String, Object> getOldClusterConfig() {
+          return lastClusterProperties;
         }
-      });
-      newNodes.forEach(newNode -> {
-        if (!oldNodes.contains(newNode)) {
-          fireEvent(new NodeUpEvent() {
-            final Instant timestamp = Instant.now();
-            @Override
-            public Instant getTimestamp() {
-              return timestamp;
-            }
-            @Override
-            public String getNodeName() {
-              return newNode;
-            }
-          });
+
+        @Override
+        public Map<String, Object> getNewClusterConfig() {
+          return newProperties;
+        }
+
+        @Override
+        public Instant getTimestamp() {
+          return Instant.now();
         }
       });
+      lastClusterProperties = newProperties;
       return false;
     };
+    zkController.zkStateReader.registerClusterPropertiesListener(clusterPropertiesListener);
 
     // XXX register collection state listener?
     // XXX not sure how to efficiently monitor for REPLICA_DOWN events
@@ -141,7 +220,15 @@ public class ClusterEventProducerImpl implements ClusterEventProducer, ClusterSi
     if (liveNodesListener != null) {
       zkController.zkStateReader.removeLiveNodesListener(liveNodesListener);
     }
+    if (cloudCollectionsListener != null) {
+      zkController.zkStateReader.removeCloudCollectionsListener(cloudCollectionsListener);
+    }
+    if (clusterPropertiesListener != null) {
+      zkController.zkStateReader.removeClusterPropertiesListener(clusterPropertiesListener);
+    }
     liveNodesListener = null;
+    cloudCollectionsListener = null;
+    clusterPropertiesListener = null;
   }
 
   private void ensureRunning() {
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/AutoAddReplicasEventListener.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
similarity index 80%
rename from solr/core/src/java/org/apache/solr/cluster/events/impl/AutoAddReplicasEventListener.java
rename to solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
index 2fa6475..fc2ac25 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/impl/AutoAddReplicasEventListener.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
@@ -25,6 +25,8 @@ import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.cluster.events.ClusterEvent;
 import org.apache.solr.cluster.events.ClusterEventListener;
 import org.apache.solr.cloud.ClusterSingleton;
+import org.apache.solr.cluster.events.NodesDownEvent;
+import org.apache.solr.cluster.events.ReplicasDownEvent;
 import org.apache.solr.core.CoreContainer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,13 +36,13 @@ import org.slf4j.LoggerFactory;
  * This is an (incomplete) illustration how to re-implement the combination of 8x
  * NodeLostTrigger and AutoAddReplicasPlanAction to maintain the collection's replication factor.
  */
-public class AutoAddReplicasEventListener implements ClusterSingleton, ClusterEventListener {
+public class CollectionsRepairEventListener implements ClusterSingleton, ClusterEventListener {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private static final Set<ClusterEvent.EventType> EVENT_TYPES = new HashSet<>(
       Arrays.asList(
-          ClusterEvent.EventType.NODE_DOWN,
-          ClusterEvent.EventType.REPLICA_DOWN
+          ClusterEvent.EventType.NODES_DOWN,
+          ClusterEvent.EventType.REPLICAS_DOWN
       ));
 
   private final CoreContainer cc;
@@ -48,7 +50,7 @@ public class AutoAddReplicasEventListener implements ClusterSingleton, ClusterEv
 
   private boolean running = false;
 
-  public AutoAddReplicasEventListener(CoreContainer cc) {
+  public CollectionsRepairEventListener(CoreContainer cc) {
     this.cc = cc;
     this.solrClientCache = cc.getSolrClientCache();
   }
@@ -65,25 +67,25 @@ public class AutoAddReplicasEventListener implements ClusterSingleton, ClusterEv
       return;
     }
     switch (event.getType()) {
-      case NODE_DOWN:
-        handleNodeDown(event);
+      case NODES_DOWN:
+        handleNodesDown((NodesDownEvent) event);
         break;
-      case NODE_UP:
+      case NODES_UP:
         // ignore? rebalance replicas?
         break;
-      case REPLICA_DOWN:
-        handleReplicaDown(event);
+      case REPLICAS_DOWN:
+        handleReplicasDown((ReplicasDownEvent) event);
         break;
       default:
         log.warn("Unsupported event {}, ignoring...", event);
     }
   }
 
-  private void handleNodeDown(ClusterEvent event) {
+  private void handleNodesDown(NodesDownEvent event) {
     // send MOVEREPLICA admin requests for all replicas from that node
   }
 
-  private void handleReplicaDown(ClusterEvent event) {
+  private void handleReplicasDown(ReplicasDownEvent event) {
     // send ADDREPLICA admin request
   }
 
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index ba6cb6c..54c113d 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -177,7 +177,7 @@ public class CoreContainer {
   public final Supplier<SolrZkClient> zkClientSupplier = () -> getZkController().getZkClient();
 
   private final CustomContainerPlugins customContainerPlugins =  new CustomContainerPlugins(this, containerHandlers.getApiBag());
-  private final Map<String, ClusterSingleton> containerSingletons = new HashMap<>();
+  private final Map<String, ClusterSingleton> clusterSingletons = new ConcurrentHashMap<>();
 
   protected final Map<String, CoreLoadFailure> coreInitFailures = new ConcurrentHashMap<>();
 
@@ -899,11 +899,11 @@ public class CoreContainer {
       containerHandlers.keySet().forEach(handlerName -> {
         SolrRequestHandler handler = containerHandlers.get(handlerName);
         if (handler instanceof ClusterSingleton) {
-          containerSingletons.put(handlerName, (ClusterSingleton) handler);
+          clusterSingletons.put(handlerName, (ClusterSingleton) handler);
         }
       });
       // our default clusterEventProducer is also a ClusterSingleton
-      containerSingletons.put("clusterEventProducer", (ClusterSingleton) clusterEventProducer);
+      clusterSingletons.put("clusterEventProducer", (ClusterSingleton) clusterEventProducer);
       zkSys.getZkController().checkOverseerDesignate();
     }
     // This is a bit redundant but these are two distinct concepts for all they're accomplished at the same time.
@@ -2107,8 +2107,8 @@ public class CoreContainer {
     return customContainerPlugins;
   }
 
-  public Map<String, ClusterSingleton> getContainerSingletons() {
-    return Collections.unmodifiableMap(containerSingletons);
+  public Map<String, ClusterSingleton> getClusterSingletons() {
+    return clusterSingletons;
   }
 
   public ClusterEventProducer getClusterEventProducer() {