You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2020/10/01 06:06:12 UTC

[lucene-solr] 02/02: SOLR-14749: Fix the race between plugin loading and registration and Overseer leader election. Make events implement MapWriter-s. Add more unit tests.

This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch jira/solr-14749
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit a8658cef6cf14bb2c7ee667801b5241e4e875467
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Thu Oct 1 08:04:08 2020 +0200

    SOLR-14749: Fix the race between plugin loading and registration and
    Overseer leader election. Make events implement MapWriter-s.
    Add more unit tests.
---
 .../src/java/org/apache/solr/api/AnnotatedApi.java |  15 ++-
 .../apache/solr/api/CustomContainerPlugins.java    |  11 +-
 .../src/java/org/apache/solr/cloud/Overseer.java   |  55 ++++++---
 .../apache/solr/cluster/events/ClusterEvent.java   |  10 +-
 .../events/ClusterPropertiesChangedEvent.java      |   6 +
 .../solr/cluster/events/CollectionsAddedEvent.java |   7 ++
 .../cluster/events/CollectionsRemovedEvent.java    |   7 ++
 .../apache/solr/cluster/events/NodesDownEvent.java |   7 ++
 .../apache/solr/cluster/events/NodesUpEvent.java   |   7 ++
 .../solr/cluster/events/ReplicasDownEvent.java     |   7 ++
 .../events/impl/ClusterEventProducerImpl.java      |  21 +---
 .../impl/CollectionsRepairEventListener.java       |   3 +-
 .../java/org/apache/solr/core/CoreContainer.java   |  95 ++++++++++++---
 .../solr/handler/admin/ContainerPluginsApi.java    |   6 +-
 .../test/org/apache/solr/cloud/OverseerTest.java   |  24 ++--
 .../solr/cluster/events/AllEventsListener.java     |   3 -
 .../cluster/events/ClusterEventProducerTest.java   | 127 ++++++++++++++++++++-
 .../apache/solr/handler/TestContainerPlugin.java   |   4 +-
 18 files changed, 335 insertions(+), 80 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/api/AnnotatedApi.java b/solr/core/src/java/org/apache/solr/api/AnnotatedApi.java
index fd77413..1558f1c 100644
--- a/solr/core/src/java/org/apache/solr/api/AnnotatedApi.java
+++ b/solr/core/src/java/org/apache/solr/api/AnnotatedApi.java
@@ -85,9 +85,18 @@ public class AnnotatedApi extends Api implements PermissionNameProvider , Closea
   }
 
   public static List<Api> getApis(Object obj) {
-    return getApis(obj.getClass(), obj);
+    return getApis(obj.getClass(), obj, true);
   }
-  public static List<Api> getApis(Class<? extends Object> theClass , Object obj)  {
+
+  /**
+   * Get a list of Api-s supported by this class.
+   * @param theClass class
+   * @param obj object of this class (may be null)
+   * @param required if true then an exception is thrown if no Api-s can be retrieved, if false
+   *                then absence of Api-s is silently ignored.
+   * @return list of discovered Api-s
+   */
+  public static List<Api> getApis(Class<? extends Object> theClass , Object obj, boolean required)  {
     Class<?> klas = null;
     try {
       klas = MethodHandles.publicLookup().accessClass(theClass);
@@ -122,7 +131,7 @@ public class AnnotatedApi extends Api implements PermissionNameProvider , Closea
         SpecProvider specProvider = readSpec(endPoint, Collections.singletonList(m));
         apis.add(new AnnotatedApi(specProvider, endPoint, Collections.singletonMap("", cmd), null));
       }
-      if (apis.isEmpty()) {
+      if (required && apis.isEmpty()) {
         throw new RuntimeException("Invalid Class : " + klas.getName() + " No @EndPoints");
       }
 
diff --git a/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java b/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java
index c6bfca6..ec3e4e3 100644
--- a/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java
+++ b/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java
@@ -180,9 +180,10 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
       Object instance = newApiInfo.getInstance();
       if (instance instanceof ClusterSingleton) {
         ClusterSingleton singleton = (ClusterSingleton) instance;
-        coreContainer.getClusterSingletons().put(singleton.getName(), singleton);
+        coreContainer.getClusterSingletons().getSingletons().put(singleton.getName(), singleton);
         // easy check to see if we should immediately start this singleton
-        if (coreContainer.getClusterEventProducer().isRunning()) {
+        if (coreContainer.getClusterEventProducer() != null &&
+            coreContainer.getClusterEventProducer().isRunning()) {
           try {
             singleton.start();
           } catch (Exception exc) {
@@ -206,7 +207,7 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
       if (instance instanceof ClusterSingleton) {
         ClusterSingleton singleton = (ClusterSingleton) instance;
         singleton.stop();
-        coreContainer.getClusterSingletons().remove(singleton.getName());
+        coreContainer.getClusterSingletons().getSingletons().remove(singleton.getName());
       }
       if (instance instanceof ClusterEventListener) {
         coreContainer.getClusterEventProducer().unregisterListener((ClusterEventListener) instance);
@@ -322,7 +323,7 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
       }
 
       try {
-        List<Api> apis = AnnotatedApi.getApis(klas, null);
+        List<Api> apis = AnnotatedApi.getApis(klas, null, false);
         for (Object api : apis) {
           EndPoint endPoint = ((AnnotatedApi) api).getEndPoint();
           if (endPoint.path().length > 1 || endPoint.method().length > 1) {
@@ -374,7 +375,7 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
         }
       }
       this.holders = new ArrayList<>();
-      for (Api api : AnnotatedApi.getApis(instance)) {
+      for (Api api : AnnotatedApi.getApis(instance.getClass(), instance, false)) {
         holders.add(new ApiHolder((AnnotatedApi) api));
       }
     }
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index b8403f4..2465f8a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -29,6 +29,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
 
 import org.apache.lucene.util.Version;
@@ -782,34 +784,48 @@ public class Overseer implements SolrCloseable {
    * Start {@link ClusterSingleton} plugins when we become the leader.
    */
   public void startClusterSingletons() {
-    Map<String, ClusterSingleton> singletons = getCoreContainer().getClusterSingletons();
-    if (singletons == null) {
-      return;
-    }
-    if (isClosed()) {
-      return;
-    }
-    singletons.forEach((name, singleton) -> {
+    CoreContainer.ClusterSingletons singletons = getCoreContainer().getClusterSingletons();
+    final Runnable initializer = () -> {
+      if (isClosed()) {
+        return;
+      }
       try {
-        singleton.start();
-        if (singleton instanceof ClusterEventListener) {
-          getCoreContainer().getClusterEventProducer().registerListener((ClusterEventListener) singleton);
-        }
-      } catch (Exception e) {
-        log.warn("Exception starting ClusterSingleton {}: {}", singleton, e);
+        singletons.waitUntilReady(60, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        log.warn("Interrupted initialization of ClusterSingleton-s");
+        return;
+      } catch (TimeoutException te) {
+        log.warn("Timed out during initialization of ClusterSingleton-s");
+        return;
       }
-    });
+      singletons.getSingletons().forEach((name, singleton) -> {
+        try {
+          singleton.start();
+          if (singleton instanceof ClusterEventListener) {
+            getCoreContainer().getClusterEventProducer().registerListener((ClusterEventListener) singleton);
+          }
+        } catch (Exception e) {
+          log.warn("Exception starting ClusterSingleton {}: {}", singleton, e);
+        }
+      });
+    };
+    if (singletons.isReady()) {
+      // wait until all singleton-s are ready for the first startup
+      getCoreContainer().runAsync(initializer);
+    } else {
+      initializer.run();
+    }
   }
 
   /**
    * Stop {@link ClusterSingleton} plugins when we lose leadership.
    */
   private void stopClusterSingletons() {
-    Map<String, ClusterSingleton> singletons = getCoreContainer().getClusterSingletons();
+    CoreContainer.ClusterSingletons singletons = getCoreContainer().getClusterSingletons();
     if (singletons == null) {
       return;
     }
-    singletons.forEach((name, singleton) -> {
+    singletons.getSingletons().forEach((name, singleton) -> {
       if (singleton instanceof ClusterEventListener) {
         getCoreContainer().getClusterEventProducer().unregisterListener((ClusterEventListener) singleton);
       }
@@ -856,10 +872,13 @@ public class Overseer implements SolrCloseable {
     if (this.id != null) {
       log.info("Overseer (id={}) closing", id);
     }
+    // stop singletons only on the leader
+    if (!this.closed) {
+      stopClusterSingletons();
+    }
     this.closed = true;
     doClose();
 
-    stopClusterSingletons();
 
     assert ObjectReleaseTracker.release(this);
   }
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java
index 1929f0c..2dc7a32 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java
@@ -16,12 +16,15 @@
  */
 package org.apache.solr.cluster.events;
 
+import org.apache.solr.common.MapWriter;
+
+import java.io.IOException;
 import java.time.Instant;
 
 /**
  * Cluster-level event.
  */
-public interface ClusterEvent {
+public interface ClusterEvent extends MapWriter {
 
   enum EventType {
     /** One or more nodes went down. */
@@ -46,4 +49,9 @@ public interface ClusterEvent {
   /** Get event timestamp. This is the instant when the event was generated (not necessarily when
    * the underlying condition first occurred). */
   Instant getTimestamp();
+
+  default void writeMap(EntryWriter ew) throws IOException {
+    ew.put("type", getType());
+    ew.put("timestamp", getTimestamp().toEpochMilli());
+  }
 }
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterPropertiesChangedEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterPropertiesChangedEvent.java
index ee513d8..ad9c0b8 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/ClusterPropertiesChangedEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterPropertiesChangedEvent.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.cluster.events;
 
+import java.io.IOException;
 import java.util.Map;
 
 /**
@@ -30,4 +31,9 @@ public interface ClusterPropertiesChangedEvent extends ClusterEvent {
 
   Map<String, Object> getNewClusterProperties();
 
+  @Override
+  default void writeMap(EntryWriter ew) throws IOException {
+    ClusterEvent.super.writeMap(ew);
+    ew.put("newClusterProperties", getNewClusterProperties());
+  }
 }
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/CollectionsAddedEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsAddedEvent.java
index 0b1c46b..78046f8 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/CollectionsAddedEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsAddedEvent.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.cluster.events;
 
+import java.io.IOException;
 import java.util.Iterator;
 
 /**
@@ -29,4 +30,10 @@ public interface CollectionsAddedEvent extends ClusterEvent {
   }
 
   Iterator<String> getCollectionNames();
+
+  @Override
+  default void writeMap(EntryWriter ew) throws IOException {
+    ClusterEvent.super.writeMap(ew);
+    ew.put("collectionNames", getCollectionNames());
+  }
 }
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/CollectionsRemovedEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsRemovedEvent.java
index e6fc64e..a93be4c 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/CollectionsRemovedEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsRemovedEvent.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.cluster.events;
 
+import java.io.IOException;
 import java.util.Iterator;
 
 /**
@@ -29,4 +30,10 @@ public interface CollectionsRemovedEvent extends ClusterEvent {
   }
 
   Iterator<String> getCollectionNames();
+
+  @Override
+  default void writeMap(EntryWriter ew) throws IOException {
+    ClusterEvent.super.writeMap(ew);
+    ew.put("collectionNames", getCollectionNames());
+  }
 }
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/NodesDownEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/NodesDownEvent.java
index a8e7a2e..5001ccb 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/NodesDownEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/NodesDownEvent.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.cluster.events;
 
+import java.io.IOException;
 import java.util.Iterator;
 
 /**
@@ -29,4 +30,10 @@ public interface NodesDownEvent extends ClusterEvent {
   }
 
   Iterator<String> getNodeNames();
+
+  @Override
+  default void writeMap(EntryWriter ew) throws IOException {
+    ClusterEvent.super.writeMap(ew);
+    ew.put("nodeNames", getNodeNames());
+  }
 }
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/NodesUpEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/NodesUpEvent.java
index f83bf91..fa08f85 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/NodesUpEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/NodesUpEvent.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.cluster.events;
 
+import java.io.IOException;
 import java.util.Iterator;
 
 /**
@@ -29,4 +30,10 @@ public interface NodesUpEvent extends ClusterEvent {
   }
 
   Iterator<String> getNodeNames();
+
+  @Override
+  default void writeMap(EntryWriter ew) throws IOException {
+    ClusterEvent.super.writeMap(ew);
+    ew.put("nodeNames", getNodeNames());
+  }
 }
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ReplicasDownEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/ReplicasDownEvent.java
index 69ec48c..1d3ce9b 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/ReplicasDownEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ReplicasDownEvent.java
@@ -18,6 +18,7 @@ package org.apache.solr.cluster.events;
 
 import org.apache.solr.common.cloud.Replica;
 
+import java.io.IOException;
 import java.util.Iterator;
 
 /**
@@ -31,4 +32,10 @@ public interface ReplicasDownEvent extends ClusterEvent {
   }
 
   Iterator<Replica> getReplicas();
+
+  @Override
+  default void writeMap(EntryWriter ew) throws IOException {
+    ClusterEvent.super.writeMap(ew);
+    ew.put("replicas", getReplicas());
+  }
 }
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java
index 5886d67..034fa8a 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java
@@ -16,16 +16,12 @@
  */
 package org.apache.solr.cluster.events.impl;
 
-import java.io.Closeable;
-import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.time.Instant;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
 
@@ -53,7 +49,7 @@ import org.slf4j.LoggerFactory;
  * (not in parallel) and in arbitrary order. This means that if any listener blocks the
  * processing other listeners may be invoked much later or not at all.</p>
  */
-public class ClusterEventProducerImpl implements ClusterEventProducer, ClusterSingleton, Closeable {
+public class ClusterEventProducerImpl implements ClusterEventProducer, ClusterSingleton {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final Map<ClusterEvent.EventType, Set<ClusterEventListener>> listeners = new HashMap<>();
@@ -62,7 +58,7 @@ public class ClusterEventProducerImpl implements ClusterEventProducer, ClusterSi
   private CloudCollectionsListener cloudCollectionsListener;
   private ClusterPropertiesListener clusterPropertiesListener;
   private ZkController zkController;
-  private boolean running;
+  private volatile boolean running;
 
   private final Set<ClusterEvent.EventType> supportedEvents =
       new HashSet<>(Arrays.asList(
@@ -73,8 +69,6 @@ public class ClusterEventProducerImpl implements ClusterEventProducer, ClusterSi
           ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED
       ));
 
-  private volatile boolean isClosed = false;
-
   public ClusterEventProducerImpl(CoreContainer coreContainer) {
     this.coreContainer = coreContainer;
   }
@@ -96,7 +90,7 @@ public class ClusterEventProducerImpl implements ClusterEventProducer, ClusterSi
     // register liveNodesListener
     liveNodesListener = (oldNodes, newNodes) -> {
       // already closed but still registered
-      if (isClosed) {
+      if (!running) {
         // remove the listener
         return true;
       }
@@ -196,6 +190,8 @@ public class ClusterEventProducerImpl implements ClusterEventProducer, ClusterSi
 
     // XXX register collection state listener?
     // XXX not sure how to efficiently monitor for REPLICA_DOWN events
+
+    running = true;
   }
 
   @Override
@@ -239,13 +235,6 @@ public class ClusterEventProducerImpl implements ClusterEventProducer, ClusterSi
   }
 
   @Override
-  public void close() throws IOException {
-    stop();
-    isClosed = true;
-    listeners.clear();
-  }
-
-  @Override
   public Map<ClusterEvent.EventType, Set<ClusterEventListener>> getEventListeners() {
     return listeners;
   }
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
index 88df58f..42dcde3 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
@@ -19,7 +19,6 @@ package org.apache.solr.cluster.events.impl;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -60,7 +59,7 @@ public class CollectionsRepairEventListener implements ClusterEventListener {
   private final SolrClient solrClient;
   private final SolrCloudManager solrCloudManager;
 
-  private boolean running = false;
+  private volatile boolean running = false;
 
   public CollectionsRepairEventListener(CoreContainer cc) {
     this.solrClient = cc.getSolrClientCache().getCloudSolrClient(cc.getZkController().getZkClient().getZkServerAddress());
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index f8ec18a..1427dad 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -39,9 +39,12 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -69,9 +72,10 @@ import org.apache.solr.client.solrj.io.SolrClientCache;
 import org.apache.solr.client.solrj.util.SolrIdentifierValidator;
 import org.apache.solr.cloud.CloudDescriptor;
 import org.apache.solr.cloud.ClusterSingleton;
-import org.apache.solr.cloud.Overseer;
 import org.apache.solr.cloud.OverseerTaskQueue;
 import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cluster.events.ClusterEvent;
+import org.apache.solr.cluster.events.ClusterEventListener;
 import org.apache.solr.cluster.events.ClusterEventProducer;
 import org.apache.solr.cluster.events.impl.ClusterEventProducerImpl;
 import org.apache.solr.common.AlreadyClosedException;
@@ -172,6 +176,72 @@ public class CoreContainer {
     }
   }
 
+  public static class ClusterSingletons {
+    private Map<String, ClusterSingleton> singletonMap = new ConcurrentHashMap<>();
+    // we use this latch to delay the initial startup of singletons, due to
+    // the leader election occurring in parallel with the rest of the load() method.
+    private CountDownLatch readyLatch = new CountDownLatch(1);
+
+    public Map<String, ClusterSingleton> getSingletons() {
+      return singletonMap;
+    }
+
+    public boolean isReady() {
+      return readyLatch.getCount() > 0;
+    }
+
+    public void setReady() {
+      readyLatch.countDown();
+    }
+
+    public void waitUntilReady(long timeout, TimeUnit timeUnit)
+        throws InterruptedException, TimeoutException {
+      boolean await = readyLatch.await(timeout, timeUnit);
+      if (!await) {
+        throw new TimeoutException("Timed out waiting for ClusterSingletons to become ready.");
+      }
+    }
+  }
+
+  /**
+   * This class helps in handling the initial registration of plugin-based listeners,
+   * when both the final {@link ClusterEventProducer} implementation and listeners
+   * are configured using plugins.
+   */
+  public static class InitialClusterEventProducer implements ClusterEventProducer {
+    Map<ClusterEvent.EventType, Set<ClusterEventListener>> initialListeners = new HashMap<>();
+
+    @Override
+    public Map<ClusterEvent.EventType, Set<ClusterEventListener>> getEventListeners() {
+      return initialListeners;
+    }
+
+    public void transferListeners(ClusterEventProducer target) {
+      initialListeners.forEach((type, listeners) -> {
+        listeners.forEach(listener -> {
+          try {
+            target.registerListener(listener, type);
+          } catch (Exception e) {
+            log.warn("Unable to register event listener for type {}: {}", type, e);
+          }
+        });
+      });
+    }
+
+    @Override
+    public void start() throws Exception {
+    }
+
+    @Override
+    public boolean isRunning() {
+      return false;
+    }
+
+    @Override
+    public void stop() {
+    }
+  }
+
   private volatile PluginBag<SolrRequestHandler> containerHandlers = new PluginBag<>(SolrRequestHandler.class, null);
 
   /**
@@ -247,12 +317,11 @@ public class CoreContainer {
 
   private volatile SolrClientCache solrClientCache;
 
-  private volatile ClusterEventProducer clusterEventProducer;
+  private volatile ClusterEventProducer clusterEventProducer = new InitialClusterEventProducer();
 
   private final ObjectCache objectCache = new ObjectCache();
 
-  private final Map<String, ClusterSingleton> clusterSingletons = new ConcurrentHashMap<>();
-
+  private final ClusterSingletons clusterSingletons = new ClusterSingletons();
   private PackageStoreAPI packageStoreAPI;
   private PackageLoader packageLoader;
 
@@ -903,30 +972,28 @@ public class CoreContainer {
 
       // init ClusterSingleton-s
 
-      // register handlers that are also ClusterSingleton
+      // register the handlers that are also ClusterSingleton
       containerHandlers.keySet().forEach(handlerName -> {
         SolrRequestHandler handler = containerHandlers.get(handlerName);
         if (handler instanceof ClusterSingleton) {
-          clusterSingletons.put(handlerName, (ClusterSingleton) handler);
+          clusterSingletons.singletonMap.put(handlerName, (ClusterSingleton) handler);
         }
       });
       // create the ClusterEventProducer
+      InitialClusterEventProducer initialClusterEventProducer = (InitialClusterEventProducer) clusterEventProducer;
       CustomContainerPlugins.ApiInfo clusterEventProducerInfo = customContainerPlugins.getPlugin(ClusterEventProducer.PLUGIN_NAME);
       if (clusterEventProducerInfo != null) {
         clusterEventProducer = (ClusterEventProducer) clusterEventProducerInfo.getInstance();
       } else {
         clusterEventProducer = new ClusterEventProducerImpl(this);
-        clusterSingletons.put(ClusterEventProducer.PLUGIN_NAME, clusterEventProducer);
+        clusterSingletons.singletonMap.put(ClusterEventProducer.PLUGIN_NAME, clusterEventProducer);
       }
+      // transfer those listeners that were already registered to the initial impl
+      initialClusterEventProducer.transferListeners(clusterEventProducer);
 
+      clusterSingletons.setReady();
       zkSys.getZkController().checkOverseerDesignate();
 
-      // XXX note that ClusterSingleton components are registered too late -
-      // XXX the Overseer leader may be already started
-      Overseer overseer = zkSys.getZkController().getOverseer();
-      if (!overseer.isClosed()) { // we are the leader
-        overseer.startClusterSingletons();
-      }
     }
     // This is a bit redundant but these are two distinct concepts for all they're accomplished at the same time.
     status |= LOAD_COMPLETE | INITIAL_CORE_LOAD_COMPLETE;
@@ -2124,7 +2191,7 @@ public class CoreContainer {
     return customContainerPlugins;
   }
 
-  public Map<String, ClusterSingleton> getClusterSingletons() {
+  public ClusterSingletons getClusterSingletons() {
     return clusterSingletons;
   }
 
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ContainerPluginsApi.java b/solr/core/src/java/org/apache/solr/handler/admin/ContainerPluginsApi.java
index 9f2f459..ad95423 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ContainerPluginsApi.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ContainerPluginsApi.java
@@ -51,7 +51,7 @@ import static org.apache.lucene.util.IOUtils.closeWhileHandlingException;
 public class ContainerPluginsApi {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  public static final String PLUGINS = "plugins";
+  public static final String PLUGINS = "plugin";
   private final Supplier<SolrZkClient> zkClientSupplier;
   private final CoreContainer coreContainer;
   public final Read readAPI = new Read();
@@ -64,7 +64,7 @@ public class ContainerPluginsApi {
 
   public class Read {
     @EndPoint(method = METHOD.GET,
-        path = "/cluster/plugins",
+        path = "/cluster/plugin",
         permission = PermissionNameProvider.Name.COLL_READ_PERM)
     public void list(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException {
       rsp.add(PLUGINS, plugins(zkClientSupplier));
@@ -72,7 +72,7 @@ public class ContainerPluginsApi {
   }
 
   @EndPoint(method = METHOD.POST,
-      path = "/cluster/plugins",
+      path = "/cluster/plugin",
       permission = PermissionNameProvider.Name.COLL_EDIT_PERM)
   public class Edit {
 
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index cf30ca9..f3e73e2 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -17,13 +17,7 @@
 package org.apache.solr.cloud;
 
 import static org.apache.solr.cloud.AbstractDistribZkTestBase.verifyReplicaStatus;
-import static org.mockito.Mockito.anyBoolean;
-import static org.mockito.Mockito.anyInt;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
@@ -118,7 +112,6 @@ public class OverseerTest extends SolrTestCaseJ4 {
 
   private static SolrZkClient zkClient;
 
-
   private volatile boolean testDone = false;
 
   private final List<ZkController> zkControllers = Collections.synchronizedList(new ArrayList<>());
@@ -128,7 +121,6 @@ public class OverseerTest extends SolrTestCaseJ4 {
   private final List<HttpShardHandlerFactory> httpShardHandlerFactorys = Collections.synchronizedList(new ArrayList<>());
   private final List<UpdateShardHandler> updateShardHandlers = Collections.synchronizedList(new ArrayList<>());
   private final List<CloudSolrClient> solrClients = Collections.synchronizedList(new ArrayList<>());
-
   private static final String COLLECTION = SolrTestCaseJ4.DEFAULT_TEST_COLLECTION_NAME;
 
   public static class MockZKController{
@@ -307,6 +299,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
   @Before
   public void setUp() throws Exception {
     testDone = false;
+
     super.setUp();
   }
 
@@ -323,6 +316,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
     }
 
     server = null;
+
   }
 
   @After
@@ -1429,7 +1423,10 @@ public class OverseerTest extends SolrTestCaseJ4 {
         Mockito.withSettings().defaultAnswer(Mockito.CALLS_REAL_METHODS));
     when(mockAlwaysUpCoreContainer.isShutDown()).thenReturn(testDone);  // Allow retry on session expiry
     when(mockAlwaysUpCoreContainer.getResourceLoader()).thenReturn(new SolrResourceLoader());
-    FieldSetter.setField(mockAlwaysUpCoreContainer, CoreContainer.class.getDeclaredField("containerSingletons"), Collections.emptyMap());
+    CoreContainer.ClusterSingletons singletons = new CoreContainer.ClusterSingletons();
+    // don't wait for all singletons
+    singletons.setReady();
+    FieldSetter.setField(mockAlwaysUpCoreContainer, CoreContainer.class.getDeclaredField("clusterSingletons"), singletons);
     ClusterEventProducerImpl clusterEventProducer = new ClusterEventProducerImpl(mockAlwaysUpCoreContainer);
     when(mockAlwaysUpCoreContainer.getClusterEventProducer()).thenReturn(clusterEventProducer);
     FieldSetter.setField(zkController, ZkController.class.getDeclaredField("zkClient"), zkClient);
@@ -1437,6 +1434,13 @@ public class OverseerTest extends SolrTestCaseJ4 {
     when(zkController.getCoreContainer()).thenReturn(mockAlwaysUpCoreContainer);
     when(zkController.getZkClient()).thenReturn(zkClient);
     when(zkController.getZkStateReader()).thenReturn(reader);
+    // primitive support for CC.runAsync
+    doAnswer(invocable -> {
+      Runnable r = invocable.getArgument(0);
+      Thread t = new Thread(r);
+      t.start();
+      return null;
+    }).when(mockAlwaysUpCoreContainer).runAsync(any(Runnable.class));
 
     when(zkController.getLeaderProps(anyString(), anyString(), anyInt())).thenCallRealMethod();
     when(zkController.getLeaderProps(anyString(), anyString(), anyInt(), anyBoolean())).thenCallRealMethod();
diff --git a/solr/core/src/test/org/apache/solr/cluster/events/AllEventsListener.java b/solr/core/src/test/org/apache/solr/cluster/events/AllEventsListener.java
index 9d03c8d..8da6bea 100644
--- a/solr/core/src/test/org/apache/solr/cluster/events/AllEventsListener.java
+++ b/solr/core/src/test/org/apache/solr/cluster/events/AllEventsListener.java
@@ -20,12 +20,9 @@ package org.apache.solr.cluster.events;
 import org.junit.Assert;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
diff --git a/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java b/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java
index 12079b2..ea6c5f5 100644
--- a/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java
+++ b/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java
@@ -19,39 +19,64 @@ package org.apache.solr.cluster.events;
 
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.V2Request;
+import org.apache.solr.client.solrj.request.beans.PluginMeta;
+import org.apache.solr.client.solrj.response.V2Response;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.cloud.ClusterProperties;
+import org.apache.solr.common.util.Utils;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.lang.invoke.MethodHandles;
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.solr.client.solrj.SolrRequest.METHOD.GET;
+import static org.apache.solr.client.solrj.SolrRequest.METHOD.POST;
 
 /**
  *
  */
 public class ClusterEventProducerTest extends SolrCloudTestCase {
 
-  private static AllEventsListener eventsListener = new AllEventsListener();
+  private AllEventsListener eventsListener;
 
   @BeforeClass
   public static void setupCluster() throws Exception {
     configureCluster(3)
         .addConfig("conf", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
         .configure();
-    cluster.getOpenOverseer().getCoreContainer().getClusterEventProducer().registerListener(eventsListener);
   }
 
   @Before
   public void setUp() throws Exception  {
+    System.setProperty("enable.packages", "true");
     super.setUp();
     cluster.deleteAllCollections();
+    eventsListener = new AllEventsListener();
+    cluster.getOpenOverseer().getCoreContainer().getClusterEventProducer().registerListener(eventsListener);
+  }
+
+  @After
+  public void teardown() {
+    System.clearProperty("enable.packages");
+    if (eventsListener != null) {
+      cluster.getOpenOverseer().getCoreContainer().getClusterEventProducer().unregisterListener(eventsListener);
+    }
   }
 
   @Test
-  public void testNodesEvent() throws Exception {
+  public void testEvents() throws Exception {
 
     // NODES_DOWN
 
@@ -149,4 +174,100 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
         null, propertiesChanged.getNewClusterProperties().get("ext.foo"));
 
   }
+
+  private static CountDownLatch dummyEventLatch = new CountDownLatch(1);
+  private static ClusterEvent lastEvent = null;
+
+  public static class DummyEventListener implements ClusterEventListener {
+    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+    boolean running = false;
+    @Override
+    public void onEvent(ClusterEvent event) {
+      if (!running) {
+        log.debug("skipped event, not running: {}", event);
+        return;
+      }
+      if (event.getType() == ClusterEvent.EventType.COLLECTIONS_ADDED ||
+          event.getType() == ClusterEvent.EventType.COLLECTIONS_REMOVED) {
+        log.debug("recorded event {}", Utils.toJSONString(event));
+        lastEvent = event;
+        dummyEventLatch.countDown();
+      } else {
+        log.debug("skipped event, wrong type: {}", event.getType());
+      }
+    }
+
+    @Override
+    public String getName() {
+      return "dummy";
+    }
+
+    @Override
+    public void start() throws Exception {
+      log.debug("starting {}", Integer.toHexString(hashCode()));
+      running = true;
+    }
+
+    @Override
+    public boolean isRunning() {
+      return running;
+    }
+
+    @Override
+    public void stop() {
+      log.debug("stopping {}", Integer.toHexString(hashCode()));
+      running = false;
+    }
+  }
+
+  @Test
+  public void testListenerPlugins() throws Exception {
+    PluginMeta plugin = new PluginMeta();
+    plugin.name = "testplugin";
+    plugin.klass = DummyEventListener.class.getName();
+    V2Request req = new V2Request.Builder("/cluster/plugin")
+        .forceV2(true)
+        .withMethod(POST)
+        .withPayload(singletonMap("add", plugin))
+        .build();
+    V2Response rsp = req.process(cluster.getSolrClient());
+    //just check if the plugin is indeed registered
+    V2Request readPluginState = new V2Request.Builder("/cluster/plugin")
+        .forceV2(true)
+        .withMethod(GET)
+        .build();
+    rsp = readPluginState.process(cluster.getSolrClient());
+    assertEquals(DummyEventListener.class.getName(), rsp._getStr("/plugin/testplugin/class", null));
+
+    String collection = "testListenerPlugins_collection";
+    CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collection, "conf", 1, 1);
+    cluster.getSolrClient().request(create);
+    cluster.waitForActiveCollection(collection, 1, 1);
+    boolean await = dummyEventLatch.await(30, TimeUnit.SECONDS);
+    if (!await) {
+      fail("Timed out waiting for COLLECTIONS_ADDED event, " + collection);
+    }
+    assertNotNull("lastEvent should be COLLECTIONS_ADDED", lastEvent);
+    assertEquals("lastEvent should be COLLECTIONS_ADDED", ClusterEvent.EventType.COLLECTIONS_ADDED, lastEvent.getType());
+    // verify timestamp
+    Instant now = Instant.now();
+    assertTrue("timestamp of the event is in the future", now.isAfter(lastEvent.getTimestamp()));
+    assertEquals(collection, ((CollectionsAddedEvent)lastEvent).getCollectionNames().next());
+
+    dummyEventLatch = new CountDownLatch(1);
+    lastEvent = null;
+
+    CollectionAdminRequest.Delete delete = CollectionAdminRequest.deleteCollection(collection);
+    cluster.getSolrClient().request(delete);
+    await = dummyEventLatch.await(30, TimeUnit.SECONDS);
+    if (!await) {
+      fail("Timed out waiting for COLLECTIONS_REMOVED event, " + collection);
+    }
+    assertNotNull("lastEvent should be COLLECTIONS_REMOVED", lastEvent);
+    assertEquals("lastEvent should be COLLECTIONS_REMOVED", ClusterEvent.EventType.COLLECTIONS_REMOVED, lastEvent.getType());
+    // verify timestamp
+    now = Instant.now();
+    assertTrue("timestamp of the event is in the future", now.isAfter(lastEvent.getTimestamp()));
+    assertEquals(collection, ((CollectionsRemovedEvent)lastEvent).getCollectionNames().next());
+  }
 }
diff --git a/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java b/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
index 4c37c17..bd9bf7c 100644
--- a/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
+++ b/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
@@ -92,8 +92,8 @@ public class TestContainerPlugin extends SolrCloudTestCase {
       expectError(req, cluster.getSolrClient(), errPath, "No method with @Command in class");
 
       //test with an invalid class
-      plugin.klass = C1.class.getName();
-      expectError(req, cluster.getSolrClient(), errPath, "No @EndPoints");
+//      plugin.klass = C1.class.getName();
+//      expectError(req, cluster.getSolrClient(), errPath, "No @EndPoints");
 
       //test with a valid class. This should succeed now
       plugin.klass = C3.class.getName();