You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2020/11/04 14:55:19 UTC

[lucene-solr] branch jira/solr-14749-api updated: SOLR-14749: Fix synchronization issues. Add Closeable to be sure that resources are closed properly. Add unit test to verify plugin reloading & event publishing.

This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch jira/solr-14749-api
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/jira/solr-14749-api by this push:
     new 9d6de70  SOLR-14749: Fix synchronization issues. Add Closeable to be sure that resources are closed properly. Add unit test to verify plugin reloading & event publishing.
9d6de70 is described below

commit 9d6de705809f4bad8f14ab1dbb3cc28831d67662
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Wed Nov 4 15:54:00 2020 +0100

    SOLR-14749: Fix synchronization issues. Add Closeable to be sure that resources
    are closed properly. Add unit test to verify plugin reloading & event publishing.
---
 .../apache/solr/api/ContainerPluginsRegistry.java  |  12 ++-
 .../solr/cluster/events/ClusterEventListener.java  |   4 +-
 .../solr/cluster/events/ClusterEventProducer.java  |   4 +-
 .../cluster/events/ClusterEventProducerBase.java   |  32 +++---
 .../events/impl/ClusterEventProducerFactory.java   |  26 +++--
 .../impl/CollectionsRepairEventListener.java       |  70 +++++++++++--
 .../events/impl/DefaultClusterEventProducer.java   |  13 +--
 .../impl/DelegatingClusterEventProducer.java       |  15 ++-
 .../java/org/apache/solr/core/CoreContainer.java   |   3 +
 .../solr/cluster/events/AllEventsListener.java     |   5 +
 .../cluster/events/ClusterEventProducerTest.java   | 113 +++++++++++++++++----
 .../impl/CollectionsRepairEventListenerTest.java   |  20 +++-
 .../apache/solr/handler/TestContainerPlugin.java   |   7 --
 13 files changed, 251 insertions(+), 73 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/api/ContainerPluginsRegistry.java b/solr/core/src/java/org/apache/solr/api/ContainerPluginsRegistry.java
index 9154ae4..3b2e4f8 100644
--- a/solr/core/src/java/org/apache/solr/api/ContainerPluginsRegistry.java
+++ b/solr/core/src/java/org/apache/solr/api/ContainerPluginsRegistry.java
@@ -39,6 +39,7 @@ import org.apache.solr.common.MapWriter;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.annotation.JsonProperty;
 import org.apache.solr.common.cloud.ClusterPropertiesListener;
+import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.common.util.PathTrie;
 import org.apache.solr.common.util.ReflectMapWriter;
 import org.apache.solr.common.util.StrUtils;
@@ -65,7 +66,7 @@ import static org.apache.solr.common.util.Utils.makeMap;
  * for additional functionality by {@link PluginRegistryListener}-s registered with
  * this class.</p>
  */
-public class ContainerPluginsRegistry implements ClusterPropertiesListener, MapWriter {
+public class ContainerPluginsRegistry implements ClusterPropertiesListener, MapWriter, Closeable {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final ObjectMapper mapper = SolrJacksonAnnotationInspector.createObjectMapper();
@@ -100,6 +101,15 @@ public class ContainerPluginsRegistry implements ClusterPropertiesListener, MapW
     currentPlugins.forEach(ew.getBiConsumer());
   }
 
+  @Override
+  public void close() throws IOException {
+    currentPlugins.values().forEach(apiInfo -> {
+      if (apiInfo.instance instanceof Closeable) {
+        IOUtils.closeQuietly((Closeable) apiInfo.instance);
+      }
+    });
+  }
+
   public synchronized ApiInfo getPlugin(String name) {
     return currentPlugins.get(name);
   }
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java
index 6f84457..3443b37 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java
@@ -16,13 +16,15 @@
  */
 package org.apache.solr.cluster.events;
 
+import java.io.Closeable;
+
 /**
  * Components that want to be notified of cluster-wide events should use this.
  *
  * XXX should this work only for ClusterSingleton-s? some types of events may be
  * XXX difficult (or pointless) to propagate to every node.
  */
-public interface ClusterEventListener {
+public interface ClusterEventListener extends Closeable {
 
   /**
    * Handle the event. Implementations should be non-blocking - if any long
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java
index f87ce7c..d3b0ee7 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java
@@ -18,10 +18,12 @@ package org.apache.solr.cluster.events;
 
 import org.apache.solr.cloud.ClusterSingleton;
 
+import java.io.Closeable;
+
 /**
  * Component that produces {@link ClusterEvent} instances.
  */
-public interface ClusterEventProducer extends ClusterSingleton {
+public interface ClusterEventProducer extends ClusterSingleton, Closeable {
 
   /** Unique name for the registration of a plugin-based implementation. */
   String PLUGIN_NAME = "cluster-event-producer";
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducerBase.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducerBase.java
index ecb5825..9f9d0d2 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducerBase.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducerBase.java
@@ -1,9 +1,11 @@
 package org.apache.solr.cluster.events;
 
+import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.core.CoreContainer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.Collections;
 import java.util.Map;
@@ -11,7 +13,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- *
+ * Base class for implementing {@link ClusterEventProducer}.
  */
 public abstract class ClusterEventProducerBase implements ClusterEventProducer {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -34,7 +36,7 @@ public abstract class ClusterEventProducerBase implements ClusterEventProducer {
         log.warn("event type {} not supported yet.", type);
         continue;
       }
-      // to avoid removing no-longer empty set in unregister
+      // to avoid removing no-longer empty set on race in unregister
       synchronized (listeners) {
         listeners.computeIfAbsent(type, t -> ConcurrentHashMap.newKeySet())
             .add(listener);
@@ -65,21 +67,25 @@ public abstract class ClusterEventProducerBase implements ClusterEventProducer {
     return state;
   }
 
-  public Map<ClusterEvent.EventType, Set<ClusterEventListener>> getEventListeners() {
-    return listeners;
-  }
-
-  public CoreContainer getCoreContainer() {
-    return cc;
+  @Override
+  public void close() throws IOException {
+    synchronized (listeners) {
+      listeners.values().forEach(listenerSet ->
+          listenerSet.forEach(listener -> IOUtils.closeQuietly(listener)));
+    }
   }
 
   public abstract Set<ClusterEvent.EventType> getSupportedEventTypes();
 
   protected void fireEvent(ClusterEvent event) {
-    listeners.getOrDefault(event.getType(), Collections.emptySet())
-        .forEach(listener -> {
-          log.debug("--- firing event {} to {}", event, listener);
-          listener.onEvent(event);
-        });
+    synchronized (listeners) {
+      listeners.getOrDefault(event.getType(), Collections.emptySet())
+          .forEach(listener -> {
+            if (log.isDebugEnabled()) {
+              log.debug("--- firing event {} to {}", event, listener);
+            }
+            listener.onEvent(event);
+          });
+    }
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerFactory.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerFactory.java
index 0e2bd9a..6d14817 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerFactory.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerFactory.java
@@ -26,6 +26,8 @@ public class ClusterEventProducerFactory extends ClusterEventProducerBase {
 
   public ClusterEventProducerFactory(CoreContainer cc) {
     super(cc);
+    // this initial listener is used only for capturing plugin registrations
+    // done by other nodes while this CoreContainer is still loading
     initialPluginListener = new ContainerPluginsRegistry.PluginRegistryListener() {
       @Override
       public void added(ContainerPluginsRegistry.ApiInfo plugin) {
@@ -85,18 +87,20 @@ public class ClusterEventProducerFactory extends ClusterEventProducerBase {
       throw new RuntimeException("this factory can be called only once!");
     }
     final DelegatingClusterEventProducer clusterEventProducer = new DelegatingClusterEventProducer(cc);
-    // since this is a ClusterSingleton, register it as such
+    // since this is a ClusterSingleton, register it as such, under unique name
     cc.getClusterSingletons().getSingletons().put(ClusterEventProducer.PLUGIN_NAME +"_delegate", clusterEventProducer);
     ContainerPluginsRegistry.ApiInfo clusterEventProducerInfo = plugins.getPlugin(ClusterEventProducer.PLUGIN_NAME);
     if (clusterEventProducerInfo != null) {
-      // the listener in ClusterSingletons already registered it
+      // the listener in ClusterSingletons already registered this instance
       clusterEventProducer.setDelegate((ClusterEventProducer) clusterEventProducerInfo.getInstance());
     } else {
       // use the default NoOp impl
     }
     // transfer those listeners that were already registered to the initial impl
     transferListeners(clusterEventProducer, plugins);
-    // install plugin registry listener
+
+    // install plugin registry listener that maintains plugin-based listeners in
+    // the event producer impl
     ContainerPluginsRegistry.PluginRegistryListener pluginListener = new ContainerPluginsRegistry.PluginRegistryListener() {
       @Override
       public void added(ContainerPluginsRegistry.ApiInfo plugin) {
@@ -152,13 +156,15 @@ public class ClusterEventProducerFactory extends ClusterEventProducerBase {
   }
 
   private void transferListeners(ClusterEventProducer target, ContainerPluginsRegistry plugins) {
-    // stop capturing listener plugins
-    plugins.unregisterListener(initialPluginListener);
-    // transfer listeners that are already registered
-    listeners.forEach((type, listenersSet) -> {
-      listenersSet.forEach(listener -> target.registerListener(listener, type));
-    });
-    listeners.clear();
+    synchronized (listeners) {
+      // stop capturing listener plugins
+      plugins.unregisterListener(initialPluginListener);
+      // transfer listeners that are already registered
+      listeners.forEach((type, listenersSet) -> {
+        listenersSet.forEach(listener -> target.registerListener(listener, type));
+      });
+      listeners.clear();
+    }
   }
 
   @Override
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
index de69fb9..48400f8 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.cluster.events.impl;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
@@ -26,6 +27,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -41,18 +44,22 @@ import org.apache.solr.cluster.events.NodesDownEvent;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.ReplicaPosition;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
 import org.apache.solr.core.CoreContainer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * This is an illustration how to re-implement the combination of 8x
- * NodeLostTrigger and AutoAddReplicasPlanAction to maintain the collection's replication factor.
- * <p>NOTE: there's no support for 'waitFor' yet.</p>
- * <p>NOTE 2: this functionality would be probably more reliable when executed also as a
+ * This is an illustration how to re-implement the combination of Solr 8x
+ * NodeLostTrigger and AutoAddReplicasPlanAction to maintain the collection's replicas when
+ * nodes are lost.
+ * <p>The notion of <code>waitFor</code> delay between detection and repair action is
+ * implemented as a scheduled execution of the repair method, which is called every 1 sec
+ * to check whether there are any lost nodes that exceeded their <code>waitFor</code> period.</p>
+ * <p>NOTE: this functionality would be probably more reliable when executed also as a
  * periodically scheduled check - both as a reactive (listener) and proactive (scheduled) measure.</p>
  */
-public class CollectionsRepairEventListener implements ClusterEventListener, ClusterSingleton {
+public class CollectionsRepairEventListener implements ClusterEventListener, ClusterSingleton, Closeable {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   public static final String PLUGIN_NAME = "collectionsRepairListener";
@@ -68,6 +75,8 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
 
   private int waitForSecond = DEFAULT_WAIT_FOR_SEC;
 
+  private ScheduledThreadPoolExecutor waitForExecutor;
+
   public CollectionsRepairEventListener(CoreContainer cc) {
     this.solrClient = cc.getSolrClientCache().getCloudSolrClient(cc.getZkController().getZkClient().getZkServerAddress());
     this.solrCloudManager = cc.getZkController().getSolrCloudManager();
@@ -75,6 +84,9 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
 
   @VisibleForTesting
   public void setWaitForSecond(int waitForSecond) {
+    if (log.isDebugEnabled()) {
+      log.debug("-- setting waitFor={}", waitForSecond);
+    }
     this.waitForSecond = waitForSecond;
   }
 
@@ -108,10 +120,20 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
     // if so, remove them from the tracking map
     Set<String> trackingKeySet = nodeNameVsTimeRemoved.keySet();
     trackingKeySet.removeAll(solrCloudManager.getClusterStateProvider().getLiveNodes());
+    // add any new lost nodes (old lost nodes are skipped)
     event.getNodeNames().forEachRemaining(lostNode -> {
       nodeNameVsTimeRemoved.computeIfAbsent(lostNode, n -> solrCloudManager.getTimeSource().getTimeNs());
     });
+  }
 
+  private void runRepair() {
+    if (nodeNameVsTimeRemoved.isEmpty()) {
+      // nothing to do
+      return;
+    }
+    if (log.isDebugEnabled()) {
+      log.debug("-- runRepair for {} lost nodes", nodeNameVsTimeRemoved.size());
+    }
     Set<String> reallyLostNodes = new HashSet<>();
     nodeNameVsTimeRemoved.forEach((lostNode, timeRemoved) -> {
       long now = solrCloudManager.getTimeSource().getTimeNs();
@@ -120,7 +142,16 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
         reallyLostNodes.add(lostNode);
       }
     });
-
+    if (reallyLostNodes.isEmpty()) {
+      if (log.isDebugEnabled()) {
+        log.debug("--- skipping repair, {} nodes are still in waitFor period", nodeNameVsTimeRemoved.size());
+      }
+      return;
+    } else {
+      if (log.isDebugEnabled()) {
+        log.debug("--- running repair for nodes that are still lost after waitFor: {}", reallyLostNodes);
+      }
+    }
     // collect all lost replicas
     // collection / positions
     Map<String, List<ReplicaPosition>> newPositions = new HashMap<>();
@@ -169,7 +200,7 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
       return;
     }
 
-    // remove from the tracking set
+    // remove all nodes with expired waitFor from the tracking set
     nodeNameVsTimeRemoved.keySet().removeAll(reallyLostNodes);
 
     // send ADDREPLICA admin requests for each lost replica
@@ -191,12 +222,16 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
         log.warn("Exception calling ADDREPLICA {}: {}", addReplica.getParams().toQueryString(), e);
       }
     });
-
-    // ... and DELETERPLICA for lost ones?
   }
 
   @Override
   public void start() throws Exception {
+    state = State.STARTING;
+    waitForExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1,
+        new SolrNamedThreadFactory("collectionsRepair_waitFor"));
+    waitForExecutor.setRemoveOnCancelPolicy(true);
+    waitForExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+    waitForExecutor.scheduleAtFixedRate(() -> runRepair(), 0, waitForSecond, TimeUnit.SECONDS);
     state = State.RUNNING;
   }
 
@@ -207,6 +242,23 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
 
   @Override
   public void stop() {
+    state = State.STOPPING;
+    waitForExecutor.shutdownNow();
+    try {
+      waitForExecutor.awaitTermination(60, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      log.warn("Failed to shut down the waitFor executor - interrupted...");
+      Thread.currentThread().interrupt();
+    }
+    waitForExecutor = null;
     state = State.STOPPED;
   }
+
+  @Override
+  public void close() throws IOException {
+    if (log.isDebugEnabled()) {
+      log.debug("-- close() called");
+    }
+    stop();
+  }
 }
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/DefaultClusterEventProducer.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/DefaultClusterEventProducer.java
index 188bfa5..e24fb40 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/impl/DefaultClusterEventProducer.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/DefaultClusterEventProducer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.cluster.events.impl;
 
+import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.time.Instant;
 import java.util.Arrays;
@@ -71,9 +72,6 @@ public class DefaultClusterEventProducer extends ClusterEventProducerBase {
   // ClusterSingleton lifecycle methods
   @Override
   public synchronized void start() {
-    if (log.isDebugEnabled()) {
-      log.debug("-- starting DCEP", new Exception(Integer.toHexString(hashCode())));
-    }
     if (cc == null) {
       liveNodesListener = null;
       cloudCollectionsListener = null;
@@ -205,9 +203,6 @@ public class DefaultClusterEventProducer extends ClusterEventProducerBase {
 
   @Override
   public synchronized void stop() {
-    if (log.isDebugEnabled()) {
-      log.debug("-- stopping DCEP {}", Integer.toHexString(hashCode()));
-    }
     state = State.STOPPING;
     doStop();
     state = State.STOPPED;
@@ -227,4 +222,10 @@ public class DefaultClusterEventProducer extends ClusterEventProducerBase {
     cloudCollectionsListener = null;
     clusterPropertiesListener = null;
   }
+
+  @Override
+  public void close() throws IOException {
+    stop();
+    super.close();
+  }
 }
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/DelegatingClusterEventProducer.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/DelegatingClusterEventProducer.java
index 0f31de3..8d8a972 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/impl/DelegatingClusterEventProducer.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/DelegatingClusterEventProducer.java
@@ -21,16 +21,18 @@ import org.apache.solr.cluster.events.ClusterEventListener;
 import org.apache.solr.cluster.events.ClusterEventProducer;
 import org.apache.solr.cluster.events.NoOpProducer;
 import org.apache.solr.cluster.events.ClusterEventProducerBase;
+import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.core.CoreContainer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.Set;
 
 /**
- * This implementation allows Solr to dynamically change the underlying implementation in
- * response to the changed plugin configuration.
+ * This implementation allows Solr to dynamically change the underlying implementation
+ * of {@link ClusterEventProducer} in response to the changed plugin configuration.
  */
 public final class DelegatingClusterEventProducer extends ClusterEventProducerBase {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -42,6 +44,15 @@ public final class DelegatingClusterEventProducer extends ClusterEventProducerBa
     delegate = new NoOpProducer(cc);
   }
 
+  @Override
+  public void close() throws IOException {
+    if (log.isDebugEnabled()) {
+      log.debug("--closing delegate for CC-{}: {}", Integer.toHexString(cc.hashCode()), delegate);
+    }
+    IOUtils.closeQuietly(delegate);
+    super.close();
+  }
+
   public void setDelegate(ClusterEventProducer newDelegate) {
     if (log.isDebugEnabled()) {
       log.debug("--setting new delegate for CC-{}: {}", Integer.toHexString(cc.hashCode()), newDelegate);
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 2f2652b..831f81e 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -1130,6 +1130,9 @@ public class CoreContainer {
       if (solrClientCache != null) {
         solrClientCache.close();
       }
+      if (containerPluginsRegistry != null) {
+        IOUtils.closeQuietly(containerPluginsRegistry);
+      }
 
     } finally {
       try {
diff --git a/solr/core/src/test/org/apache/solr/cluster/events/AllEventsListener.java b/solr/core/src/test/org/apache/solr/cluster/events/AllEventsListener.java
index f3ebf41..873a0d5 100644
--- a/solr/core/src/test/org/apache/solr/cluster/events/AllEventsListener.java
+++ b/solr/core/src/test/org/apache/solr/cluster/events/AllEventsListener.java
@@ -19,6 +19,7 @@ package org.apache.solr.cluster.events;
 
 import org.junit.Assert;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -53,4 +54,8 @@ public class AllEventsListener implements ClusterEventListener {
       Assert.fail("Timed out waiting for expected event " + expectedType);
     }
   }
+
+  public void close() throws IOException {
+
+  }
 }
diff --git a/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java b/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java
index 031076f..3c71823 100644
--- a/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java
+++ b/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java
@@ -35,6 +35,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.time.Instant;
 import java.util.Collections;
@@ -61,15 +62,6 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
     configureCluster(3)
         .addConfig("conf", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
         .configure();
-    PluginMeta plugin = new PluginMeta();
-    plugin.klass = DefaultClusterEventProducer.class.getName();
-    plugin.name = ClusterEventProducer.PLUGIN_NAME;
-    V2Request req = new V2Request.Builder("/cluster/plugin")
-        .withMethod(POST)
-        .withPayload(Collections.singletonMap("add", plugin))
-        .build();
-    V2Response rsp = req.process(cluster.getSolrClient());
-    assertNotNull(rsp);
   }
 
   @Before
@@ -82,15 +74,37 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
   }
 
   @After
-  public void teardown() {
+  public void teardown() throws Exception {
     System.clearProperty("enable.packages");
     if (eventsListener != null) {
       cluster.getOpenOverseer().getCoreContainer().getClusterEventProducer().unregisterListener(eventsListener);
+      eventsListener.events.clear();
+    }
+    V2Request readPluginState = new V2Request.Builder("/cluster/plugin")
+        .forceV2(true)
+        .withMethod(GET)
+        .build();
+    V2Response rsp = readPluginState.process(cluster.getSolrClient());
+    if (rsp._getStr("/plugin/" + ClusterEventProducer.PLUGIN_NAME + "/class", null) != null) {
+      V2Request req = new V2Request.Builder("/cluster/plugin")
+          .withMethod(POST)
+          .withPayload(Collections.singletonMap("remove", ClusterEventProducer.PLUGIN_NAME))
+          .build();
+      req.process(cluster.getSolrClient());
     }
   }
 
   @Test
   public void testEvents() throws Exception {
+    PluginMeta plugin = new PluginMeta();
+    plugin.klass = DefaultClusterEventProducer.class.getName();
+    plugin.name = ClusterEventProducer.PLUGIN_NAME;
+    V2Request req = new V2Request.Builder("/cluster/plugin")
+        .withMethod(POST)
+        .withPayload(Collections.singletonMap("add", plugin))
+        .build();
+    V2Response rsp = req.process(cluster.getSolrClient());
+    assertEquals(0, rsp.getStatus());
 
     // NODES_DOWN
 
@@ -108,7 +122,7 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
     String nodeName = nonOverseerJetty.getNodeName();
     cluster.stopJettySolrRunner(nonOverseerJetty);
     cluster.waitForJettyToStop(nonOverseerJetty);
-    eventsListener.waitForExpectedEvent(10);
+    eventsListener.waitForExpectedEvent(30);
     assertNotNull("should be NODES_DOWN events", eventsListener.events.get(ClusterEvent.EventType.NODES_DOWN));
     List<ClusterEvent> events = eventsListener.events.get(ClusterEvent.EventType.NODES_DOWN);
     assertEquals("should be one NODES_DOWN event", 1, events.size());
@@ -121,7 +135,7 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
     eventsListener.setExpectedType(ClusterEvent.EventType.NODES_UP);
     JettySolrRunner newNode = cluster.startJettySolrRunner();
     cluster.waitForNode(newNode, 60);
-    eventsListener.waitForExpectedEvent(10);
+    eventsListener.waitForExpectedEvent(30);
     assertNotNull("should be NODES_UP events", eventsListener.events.get(ClusterEvent.EventType.NODES_UP));
     events = eventsListener.events.get(ClusterEvent.EventType.NODES_UP);
     assertEquals("should be one NODES_UP event", 1, events.size());
@@ -136,7 +150,7 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
     CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collection, "conf", 1, 1);
     cluster.getSolrClient().request(create);
     cluster.waitForActiveCollection(collection, 1, 1);
-    eventsListener.waitForExpectedEvent(10);
+    eventsListener.waitForExpectedEvent(30);
     assertNotNull("should be COLLECTIONS_ADDED events", eventsListener.events.get(ClusterEvent.EventType.COLLECTIONS_ADDED));
     events = eventsListener.events.get(ClusterEvent.EventType.COLLECTIONS_ADDED);
     assertEquals("should be one COLLECTIONS_ADDED event", 1, events.size());
@@ -149,7 +163,7 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
     eventsListener.setExpectedType(ClusterEvent.EventType.COLLECTIONS_REMOVED);
     CollectionAdminRequest.Delete delete = CollectionAdminRequest.deleteCollection(collection);
     cluster.getSolrClient().request(delete);
-    eventsListener.waitForExpectedEvent(10);
+    eventsListener.waitForExpectedEvent(30);
     assertNotNull("should be COLLECTIONS_REMOVED events", eventsListener.events.get(ClusterEvent.EventType.COLLECTIONS_REMOVED));
     events = eventsListener.events.get(ClusterEvent.EventType.COLLECTIONS_REMOVED);
     assertEquals("should be one COLLECTIONS_REMOVED event", 1, events.size());
@@ -159,11 +173,12 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
     assertEquals("should be collection " + collection, collection, collectionsRemoved.getCollectionNames().next());
 
     // CLUSTER_CONFIG_CHANGED
+    eventsListener.events.clear();
     eventsListener.setExpectedType(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED);
     ClusterProperties clusterProperties = new ClusterProperties(cluster.getZkClient());
     Map<String, Object> oldProps = new HashMap<>(clusterProperties.getClusterProperties());
     clusterProperties.setClusterProperty("ext.foo", "bar");
-    eventsListener.waitForExpectedEvent(10);
+    eventsListener.waitForExpectedEvent(30);
     assertNotNull("should be CLUSTER_CONFIG_CHANGED events", eventsListener.events.get(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED));
     events = eventsListener.events.get(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED);
     assertEquals("should be one CLUSTER_CONFIG_CHANGED event", 1, events.size());
@@ -175,13 +190,14 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
         "bar", newProps.get("ext.foo"));
 
     // unset the property
+    eventsListener.events.clear();
     eventsListener.setExpectedType(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED);
     clusterProperties.setClusterProperty("ext.foo", null);
-    eventsListener.waitForExpectedEvent(10);
+    eventsListener.waitForExpectedEvent(30);
     assertNotNull("should be CLUSTER_CONFIG_CHANGED events", eventsListener.events.get(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED));
     events = eventsListener.events.get(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED);
-    assertEquals("should be two CLUSTER_CONFIG_CHANGED events", 2, events.size());
-    event = events.get(1);
+    assertEquals("should be one CLUSTER_CONFIG_CHANGED event", 1, events.size());
+    event = events.get(0);
     assertEquals("should be CLUSTER_CONFIG_CHANGED event type", ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED, event.getType());
     propertiesChanged = (ClusterPropertiesChangedEvent) event;
     assertEquals("new properties should not have 'ext.foo' property: " + propertiesChanged.getNewClusterProperties(),
@@ -242,19 +258,36 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
       }
       state = State.STOPPED;
     }
+
+    @Override
+    public void close() throws IOException {
+      if (log.isDebugEnabled()) {
+        log.debug("closing {}", Integer.toHexString(hashCode()));
+      }
+    }
   }
 
   @Test
   public void testListenerPlugins() throws Exception {
     PluginMeta plugin = new PluginMeta();
+    plugin.klass = DefaultClusterEventProducer.class.getName();
+    plugin.name = ClusterEventProducer.PLUGIN_NAME;
+    V2Request req = new V2Request.Builder("/cluster/plugin")
+        .withMethod(POST)
+        .withPayload(Collections.singletonMap("add", plugin))
+        .build();
+    V2Response rsp = req.process(cluster.getSolrClient());
+    assertEquals(0, rsp.getStatus());
+
+    plugin = new PluginMeta();
     plugin.name = "testplugin";
     plugin.klass = DummyEventListener.class.getName();
-    V2Request req = new V2Request.Builder("/cluster/plugin")
+    req = new V2Request.Builder("/cluster/plugin")
         .forceV2(true)
         .withMethod(POST)
         .withPayload(singletonMap("add", plugin))
         .build();
-    V2Response rsp = req.process(cluster.getSolrClient());
+    rsp = req.process(cluster.getSolrClient());
     //just check if the plugin is indeed registered
     V2Request readPluginState = new V2Request.Builder("/cluster/plugin")
         .forceV2(true)
@@ -293,5 +326,45 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
     now = Instant.now();
     assertTrue("timestamp of the event is in the future", now.isAfter(lastEvent.getTimestamp()));
     assertEquals(collection, ((CollectionsRemovedEvent)lastEvent).getCollectionNames().next());
+
+    // test changing the ClusterEventProducer plugin dynamically
+
+    // remove the plugin (a NoOpProducer will be used instead)
+    req = new V2Request.Builder("/cluster/plugin")
+        .withMethod(POST)
+        .withPayload(Collections.singletonMap("remove", ClusterEventProducer.PLUGIN_NAME))
+        .build();
+    req.process(cluster.getSolrClient());
+
+    dummyEventLatch = new CountDownLatch(1);
+    lastEvent = null;
+    // should not receive any events now
+    cluster.getSolrClient().request(create);
+    cluster.waitForActiveCollection(collection, 1, 1);
+    await = dummyEventLatch.await(5, TimeUnit.SECONDS);
+    if (await) {
+      fail("should not receive any events but got " + lastEvent);
+    }
+    // reinstall the plugin
+    plugin = new PluginMeta();
+    plugin.klass = DefaultClusterEventProducer.class.getName();
+    plugin.name = ClusterEventProducer.PLUGIN_NAME;
+    req = new V2Request.Builder("/cluster/plugin")
+        .withMethod(POST)
+        .withPayload(Collections.singletonMap("add", plugin))
+        .build();
+    rsp = req.process(cluster.getSolrClient());
+    assertEquals(0, rsp.getStatus());
+
+    dummyEventLatch = new CountDownLatch(1);
+    lastEvent = null;
+
+    cluster.getSolrClient().request(delete);
+    await = dummyEventLatch.await(30, TimeUnit.SECONDS);
+    if (!await) {
+      fail("Timed out waiting for COLLECTIONS_REMOVED event, " + collection);
+    }
+    assertNotNull("lastEvent should be COLLECTIONS_REMOVED", lastEvent);
+    assertEquals("lastEvent should be COLLECTIONS_REMOVED", ClusterEvent.EventType.COLLECTIONS_REMOVED, lastEvent.getType());
   }
 }
diff --git a/solr/core/src/test/org/apache/solr/cluster/events/impl/CollectionsRepairEventListenerTest.java b/solr/core/src/test/org/apache/solr/cluster/events/impl/CollectionsRepairEventListenerTest.java
index d672f6d..b34067e 100644
--- a/solr/core/src/test/org/apache/solr/cluster/events/impl/CollectionsRepairEventListenerTest.java
+++ b/solr/core/src/test/org/apache/solr/cluster/events/impl/CollectionsRepairEventListenerTest.java
@@ -29,10 +29,12 @@ import org.apache.solr.cluster.events.ClusterEvent;
 import org.apache.solr.cluster.events.ClusterEventListener;
 import org.apache.solr.cluster.events.ClusterEventProducer;
 import org.apache.solr.core.CoreContainer;
+import org.apache.solr.util.LogLevel;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -42,6 +44,7 @@ import static org.apache.solr.client.solrj.SolrRequest.METHOD.POST;
 /**
  *
  */
+@LogLevel("org.apache.solr.cluster.events=DEBUG")
 public class CollectionsRepairEventListenerTest extends SolrCloudTestCase {
 
   public static class CollectionsRepairWrapperListener implements ClusterEventListener, ClusterSingleton {
@@ -49,9 +52,9 @@ public class CollectionsRepairEventListenerTest extends SolrCloudTestCase {
 
     CountDownLatch completed = new CountDownLatch(1);
 
-    CollectionsRepairWrapperListener(CoreContainer cc) throws Exception {
+    CollectionsRepairWrapperListener(CoreContainer cc, int waitFor) throws Exception {
       delegate = new CollectionsRepairEventListener(cc);
-      delegate.setWaitForSecond(0);
+      delegate.setWaitForSecond(waitFor);
     }
 
     @Override
@@ -79,12 +82,18 @@ public class CollectionsRepairEventListenerTest extends SolrCloudTestCase {
     public void stop() {
       delegate.stop();
     }
+
+    @Override
+    public void close() throws IOException {
+      delegate.close();
+    }
   }
 
   private static AllEventsListener eventsListener = new AllEventsListener();
   private static CollectionsRepairWrapperListener repairListener;
 
   private static int NUM_NODES = 3;
+  private static int waitFor;
 
   @BeforeClass
   public static void setupCluster() throws Exception {
@@ -100,10 +109,13 @@ public class CollectionsRepairEventListenerTest extends SolrCloudTestCase {
         .build();
     V2Response rsp = req.process(cluster.getSolrClient());
     assertNotNull(rsp);
+
+    waitFor = 1 + random().nextInt(9);
+
     CoreContainer cc = cluster.getOpenOverseer().getCoreContainer();
     cc.getClusterEventProducer()
         .registerListener(eventsListener, ClusterEvent.EventType.values());
-    repairListener = new CollectionsRepairWrapperListener(cc);
+    repairListener = new CollectionsRepairWrapperListener(cc, waitFor);
     cc.getClusterEventProducer()
         .registerListener(repairListener, ClusterEvent.EventType.NODES_DOWN);
     repairListener.start();
@@ -140,6 +152,8 @@ public class CollectionsRepairEventListenerTest extends SolrCloudTestCase {
     eventsListener.waitForExpectedEvent(10);
     cluster.waitForActiveCollection(collection, 1, 2);
 
+    Thread.sleep(TimeUnit.MILLISECONDS.convert(waitFor, TimeUnit.SECONDS));
+
     // wait for completed processing in the repair listener
     boolean await = repairListener.completed.await(60, TimeUnit.SECONDS);
     if (!await) {
diff --git a/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java b/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
index 6f7a18a..3465a5b 100644
--- a/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
+++ b/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
@@ -93,13 +93,6 @@ public class TestContainerPlugin extends SolrCloudTestCase {
           .build();
       expectError(req, cluster.getSolrClient(), errPath, "No method with @Command in class");
 
-      //test with an invalid class
-      // XXX (ab) in order to support ClusterSingleton we allow adding
-      // plugins without Api EndPoints
-
-//      plugin.klass = C1.class.getName();
-//      expectError(req, cluster.getSolrClient(), errPath, "No @EndPoints");
-
       //test with a valid class. This should succeed now
       plugin.klass = C3.class.getName();
       req.process(cluster.getSolrClient());