You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2020/11/03 19:49:34 UTC

[lucene-solr] 03/03: SOLR-14749: Refactoring. Allow to set new ClusterEventProducer plugin impl dynamically. Implement "waitFor" in the CollectionsRepairEventListener.

This is an automated email from the ASF dual-hosted git repository.

ab pushed a commit to branch jira/solr-14749-api
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 801425d76e37e30ba93cf3c0f778b1eee3c6f5cc
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Tue Nov 3 20:47:54 2020 +0100

    SOLR-14749: Refactoring. Allow to set new ClusterEventProducer plugin impl
    dynamically. Implement "waitFor" in the CollectionsRepairEventListener.
---
 .../cluster/events/ClusterEventProducerBase.java   |  85 ++++++++++++++
 .../apache/solr/cluster/events/NoOpProducer.java   |  35 +++---
 .../events/impl}/ClusterEventProducerFactory.java  |  92 +++++++--------
 .../impl/CollectionsRepairEventListener.java       |  41 ++++++-
 .../events/impl/DefaultClusterEventProducer.java   |  82 ++++---------
 .../impl/DelegatingClusterEventProducer.java       | 130 +++++++++++++++++++++
 .../java/org/apache/solr/core/CoreContainer.java   |   1 +
 .../cluster/events/ClusterEventProducerTest.java   |  13 +++
 .../impl/CollectionsRepairEventListenerTest.java   |  17 +++
 9 files changed, 365 insertions(+), 131 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducerBase.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducerBase.java
new file mode 100644
index 0000000..ecb5825
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducerBase.java
@@ -0,0 +1,85 @@
+package org.apache.solr.cluster.events;
+
+import org.apache.solr.core.CoreContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ *
+ */
+public abstract class ClusterEventProducerBase implements ClusterEventProducer {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  protected final Map<ClusterEvent.EventType, Set<ClusterEventListener>> listeners = new ConcurrentHashMap<>();
+  protected volatile State state = State.STOPPED;
+  protected final CoreContainer cc;
+
+  protected ClusterEventProducerBase(CoreContainer cc) {
+    this.cc = cc;
+  }
+
+  @Override
+  public void registerListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
+    if (eventTypes == null || eventTypes.length == 0) {
+      eventTypes = ClusterEvent.EventType.values();
+    }
+    for (ClusterEvent.EventType type : eventTypes) {
+      if (!getSupportedEventTypes().contains(type)) {
+        log.warn("event type {} not supported yet.", type);
+        continue;
+      }
+      // to avoid removing no-longer empty set in unregister
+      synchronized (listeners) {
+        listeners.computeIfAbsent(type, t -> ConcurrentHashMap.newKeySet())
+            .add(listener);
+      }
+    }
+  }
+
+  @Override
+  public void unregisterListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
+    if (eventTypes == null || eventTypes.length == 0) {
+      eventTypes = ClusterEvent.EventType.values();
+    }
+    synchronized (listeners) {
+      for (ClusterEvent.EventType type : eventTypes) {
+        Set<ClusterEventListener> perType = listeners.get(type);
+        if (perType != null) {
+          perType.remove(listener);
+          if (perType.isEmpty()) {
+            listeners.remove(type);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public State getState() {
+    return state;
+  }
+
+  public Map<ClusterEvent.EventType, Set<ClusterEventListener>> getEventListeners() {
+    return listeners;
+  }
+
+  public CoreContainer getCoreContainer() {
+    return cc;
+  }
+
+  public abstract Set<ClusterEvent.EventType> getSupportedEventTypes();
+
+  protected void fireEvent(ClusterEvent event) {
+    listeners.getOrDefault(event.getType(), Collections.emptySet())
+        .forEach(listener -> {
+          log.debug("--- firing event {} to {}", event, listener);
+          listener.onEvent(event);
+        });
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/NoOpProducer.java b/solr/core/src/java/org/apache/solr/cluster/events/NoOpProducer.java
index 8632895..16db727 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/NoOpProducer.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/NoOpProducer.java
@@ -16,39 +16,36 @@
  */
 package org.apache.solr.cluster.events;
 
+import org.apache.solr.core.CoreContainer;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
 /**
- * No-op implementation of {@link ClusterEventProducer}. This implementation is always in
- * RUNNING state.
+ * No-op implementation of {@link ClusterEventProducer}. This implementation doesn't
+ * generate any events.
  */
-public final class NoOpProducer implements ClusterEventProducer {
+public final class NoOpProducer extends ClusterEventProducerBase {
 
-  @Override
-  public void registerListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
-    // no-op
-  }
+  public static final Set<ClusterEvent.EventType> ALL_EVENT_TYPES = new HashSet<>(Arrays.asList(ClusterEvent.EventType.values()));
 
-  @Override
-  public void unregisterListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
-    // no-op
+  public NoOpProducer(CoreContainer cc) {
+    super(cc);
   }
 
   @Override
-  public String getName() {
-    return ClusterEventProducer.PLUGIN_NAME;
+  public Set<ClusterEvent.EventType> getSupportedEventTypes() {
+    return ALL_EVENT_TYPES;
   }
 
   @Override
   public void start() throws Exception {
-    // no-op
-  }
-
-  @Override
-  public State getState() {
-    return State.RUNNING;
+    state = State.RUNNING;
   }
 
   @Override
   public void stop() {
-    // no-op
+    state = State.STOPPED;
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/core/ClusterEventProducerFactory.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerFactory.java
similarity index 65%
rename from solr/core/src/java/org/apache/solr/core/ClusterEventProducerFactory.java
rename to solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerFactory.java
index 7142cba..0e2bd9a 100644
--- a/solr/core/src/java/org/apache/solr/core/ClusterEventProducerFactory.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerFactory.java
@@ -1,14 +1,16 @@
-package org.apache.solr.core;
+package org.apache.solr.cluster.events.impl;
 
 import org.apache.solr.api.ContainerPluginsRegistry;
 import org.apache.solr.cluster.events.ClusterEvent;
 import org.apache.solr.cluster.events.ClusterEventListener;
 import org.apache.solr.cluster.events.ClusterEventProducer;
-import org.apache.solr.cluster.events.impl.DefaultClusterEventProducer;
+import org.apache.solr.cluster.events.ClusterEventProducerBase;
+import org.apache.solr.cluster.events.NoOpProducer;
+import org.apache.solr.core.CoreContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
+import java.lang.invoke.MethodHandles;
 import java.util.Set;
 
 /**
@@ -16,14 +18,14 @@ import java.util.Set;
  * when both the final {@link ClusterEventProducer} implementation and listeners
  * are configured using plugins.
  */
-public class ClusterEventProducerFactory implements ClusterEventProducer {
-  private Map<ClusterEvent.EventType, Set<ClusterEventListener>> initialListeners = new HashMap<>();
+public class ClusterEventProducerFactory extends ClusterEventProducerBase {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   private ContainerPluginsRegistry.PluginRegistryListener initialPluginListener;
-  private final CoreContainer cc;
   private boolean created = false;
 
   public ClusterEventProducerFactory(CoreContainer cc) {
-    this.cc = cc;
+    super(cc);
     initialPluginListener = new ContainerPluginsRegistry.PluginRegistryListener() {
       @Override
       public void added(ContainerPluginsRegistry.ApiInfo plugin) {
@@ -55,6 +57,11 @@ public class ClusterEventProducerFactory implements ClusterEventProducer {
     };
   }
 
+  @Override
+  public Set<ClusterEvent.EventType> getSupportedEventTypes() {
+    return NoOpProducer.ALL_EVENT_TYPES;
+  }
+
   /**
    * This method returns an initial plugin registry listener that helps to capture the
    * freshly loaded listener plugins before the final cluster event producer is created.
@@ -73,20 +80,19 @@ public class ClusterEventProducerFactory implements ClusterEventProducer {
    * @param plugins current plugin configurations
    * @return configured instance of cluster event producer (with side-effects, see above)
    */
-  public ClusterEventProducer create(ContainerPluginsRegistry plugins) {
+  public DelegatingClusterEventProducer create(ContainerPluginsRegistry plugins) {
     if (created) {
       throw new RuntimeException("this factory can be called only once!");
     }
-    final ClusterEventProducer clusterEventProducer;
+    final DelegatingClusterEventProducer clusterEventProducer = new DelegatingClusterEventProducer(cc);
+    // since this is a ClusterSingleton, register it as such
+    cc.getClusterSingletons().getSingletons().put(ClusterEventProducer.PLUGIN_NAME +"_delegate", clusterEventProducer);
     ContainerPluginsRegistry.ApiInfo clusterEventProducerInfo = plugins.getPlugin(ClusterEventProducer.PLUGIN_NAME);
     if (clusterEventProducerInfo != null) {
       // the listener in ClusterSingletons already registered it
-      clusterEventProducer = (ClusterEventProducer) clusterEventProducerInfo.getInstance();
+      clusterEventProducer.setDelegate((ClusterEventProducer) clusterEventProducerInfo.getInstance());
     } else {
-      // create the default impl
-      clusterEventProducer = new DefaultClusterEventProducer(cc);
-      // since this is a ClusterSingleton, register it as such
-      cc.getClusterSingletons().getSingletons().put(ClusterEventProducer.PLUGIN_NAME, clusterEventProducer);
+      // use the default NoOp impl
     }
     // transfer those listeners that were already registered to the initial impl
     transferListeners(clusterEventProducer, plugins);
@@ -101,6 +107,15 @@ public class ClusterEventProducerFactory implements ClusterEventProducer {
         if (instance instanceof ClusterEventListener) {
           ClusterEventListener listener = (ClusterEventListener) instance;
           clusterEventProducer.registerListener(listener);
+        } else if (instance instanceof ClusterEventProducer) {
+          // replace the existing impl
+          if (cc.getClusterEventProducer() instanceof DelegatingClusterEventProducer) {
+            ((DelegatingClusterEventProducer) cc.getClusterEventProducer())
+                .setDelegate((ClusterEventProducer) instance);
+          } else {
+            log.warn("Can't configure plugin-based ClusterEventProducer while CoreContainer is still loading - " +
+                " using existing implementation {}", cc.getClusterEventProducer().getClass().getName());
+          }
         }
       }
 
@@ -113,6 +128,15 @@ public class ClusterEventProducerFactory implements ClusterEventProducer {
         if (instance instanceof ClusterEventListener) {
           ClusterEventListener listener = (ClusterEventListener) instance;
           clusterEventProducer.unregisterListener(listener);
+        } else if (instance instanceof ClusterEventProducer) {
+          // replace the existing impl with NoOp
+          if (cc.getClusterEventProducer() instanceof DelegatingClusterEventProducer) {
+            ((DelegatingClusterEventProducer) cc.getClusterEventProducer())
+                .setDelegate(new NoOpProducer(cc));
+          } else {
+            log.warn("Can't configure plugin-based ClusterEventProducer while CoreContainer is still loading - " +
+                " using existing implementation {}", cc.getClusterEventProducer().getClass().getName());
+          }
         }
       }
 
@@ -131,45 +155,19 @@ public class ClusterEventProducerFactory implements ClusterEventProducer {
     // stop capturing listener plugins
     plugins.unregisterListener(initialPluginListener);
     // transfer listeners that are already registered
-    initialListeners.forEach((type, listeners) -> {
-      listeners.forEach(listener -> target.registerListener(listener, type));
+    listeners.forEach((type, listenersSet) -> {
+      listenersSet.forEach(listener -> target.registerListener(listener, type));
     });
-    initialListeners.clear();
-    initialListeners = null;
-  }
-
-  // ClusterEventProducer API, parts needed to register initial listeners.
-
-  @Override
-  public void registerListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
-    if (eventTypes == null || eventTypes.length == 0) {
-      eventTypes = ClusterEvent.EventType.values();
-    }
-    for (ClusterEvent.EventType type : eventTypes) {
-      initialListeners.computeIfAbsent(type, t -> new HashSet<>())
-          .add(listener);
-    }
-  }
-
-  @Override
-  public void unregisterListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
-    throw new UnsupportedOperationException("unregister listener not implemented");
+    listeners.clear();
   }
 
   @Override
   public void start() throws Exception {
-    throw new UnsupportedOperationException("start not implemented");
-  }
-
-  @Override
-  public State getState() {
-    return State.STOPPED;
+    state = State.RUNNING;
   }
 
   @Override
   public void stop() {
-    throw new UnsupportedOperationException("stop not implemented");
+    state = State.STOPPED;
   }
-
-
 }
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
index a5fa14f..de69fb9 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
@@ -25,8 +25,11 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -53,6 +56,8 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   public static final String PLUGIN_NAME = "collectionsRepairListener";
+  public static final int DEFAULT_WAIT_FOR_SEC = 30;
+
   private static final String ASYNC_ID_PREFIX = "_async_" + PLUGIN_NAME;
   private static final AtomicInteger counter = new AtomicInteger();
 
@@ -61,11 +66,18 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
 
   private State state = State.STOPPED;
 
+  private int waitForSecond = DEFAULT_WAIT_FOR_SEC;
+
   public CollectionsRepairEventListener(CoreContainer cc) {
     this.solrClient = cc.getSolrClientCache().getCloudSolrClient(cc.getZkController().getZkClient().getZkServerAddress());
     this.solrCloudManager = cc.getZkController().getSolrCloudManager();
   }
 
+  @VisibleForTesting
+  public void setWaitForSecond(int waitForSecond) {
+    this.waitForSecond = waitForSecond;
+  }
+
   @Override
   public String getName() {
     return PLUGIN_NAME;
@@ -86,19 +98,39 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
     }
   }
 
+  private Map<String, Long> nodeNameVsTimeRemoved = new ConcurrentHashMap<>();
+
   private void handleNodesDown(NodesDownEvent event) {
+
+    // tracking for the purpose of "waitFor" delay
+
+    // have any nodes that we were tracking been added to the cluster?
+    // if so, remove them from the tracking map
+    Set<String> trackingKeySet = nodeNameVsTimeRemoved.keySet();
+    trackingKeySet.removeAll(solrCloudManager.getClusterStateProvider().getLiveNodes());
+    event.getNodeNames().forEachRemaining(lostNode -> {
+      nodeNameVsTimeRemoved.computeIfAbsent(lostNode, n -> solrCloudManager.getTimeSource().getTimeNs());
+    });
+
+    Set<String> reallyLostNodes = new HashSet<>();
+    nodeNameVsTimeRemoved.forEach((lostNode, timeRemoved) -> {
+      long now = solrCloudManager.getTimeSource().getTimeNs();
+      long te = TimeUnit.SECONDS.convert(now - timeRemoved, TimeUnit.NANOSECONDS);
+      if (te >= waitForSecond) {
+        reallyLostNodes.add(lostNode);
+      }
+    });
+
     // collect all lost replicas
     // collection / positions
     Map<String, List<ReplicaPosition>> newPositions = new HashMap<>();
     try {
       ClusterState clusterState = solrCloudManager.getClusterStateProvider().getClusterState();
-      Set<String> lostNodeNames = new HashSet<>();
-      event.getNodeNames().forEachRemaining(lostNodeNames::add);
       clusterState.forEachCollection(coll -> {
         // shard / type / count
         Map<String, Map<Replica.Type, AtomicInteger>> lostReplicas = new HashMap<>();
         coll.forEachReplica((shard, replica) -> {
-          if (lostNodeNames.contains(replica.getNodeName())) {
+          if (reallyLostNodes.contains(replica.getNodeName())) {
             lostReplicas.computeIfAbsent(shard, s -> new HashMap<>())
                 .computeIfAbsent(replica.type, t -> new AtomicInteger())
                 .incrementAndGet();
@@ -137,6 +169,9 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
       return;
     }
 
+    // remove from the tracking set
+    nodeNameVsTimeRemoved.keySet().removeAll(reallyLostNodes);
+
     // send ADDREPLICA admin requests for each lost replica
     // XXX should we use 'async' for that, to avoid blocking here?
     List<CollectionAdminRequest.AddReplica> addReplicas = new ArrayList<>();
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/DefaultClusterEventProducer.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/DefaultClusterEventProducer.java
index f47d576..188bfa5 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/impl/DefaultClusterEventProducer.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/DefaultClusterEventProducer.java
@@ -19,20 +19,16 @@ package org.apache.solr.cluster.events.impl;
 import java.lang.invoke.MethodHandles;
 import java.time.Instant;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cluster.events.ClusterEventProducerBase;
 import org.apache.solr.cluster.events.ClusterPropertiesChangedEvent;
 import org.apache.solr.cluster.events.ClusterEvent;
-import org.apache.solr.cluster.events.ClusterEventListener;
 import org.apache.solr.cluster.events.ClusterEventProducer;
-import org.apache.solr.cloud.ClusterSingleton;
 import org.apache.solr.cluster.events.CollectionsAddedEvent;
 import org.apache.solr.cluster.events.CollectionsRemovedEvent;
 import org.apache.solr.cluster.events.NodesDownEvent;
@@ -51,16 +47,13 @@ import org.slf4j.LoggerFactory;
  * (not in parallel) and in arbitrary order. This means that if any listener blocks the
  * processing other listeners may be invoked much later or not at all.</p>
  */
-public class DefaultClusterEventProducer implements ClusterEventProducer, ClusterSingleton {
+public class DefaultClusterEventProducer extends ClusterEventProducerBase {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private final Map<ClusterEvent.EventType, Set<ClusterEventListener>> listeners = new HashMap<>();
-  private CoreContainer coreContainer;
   private LiveNodesListener liveNodesListener;
   private CloudCollectionsListener cloudCollectionsListener;
   private ClusterPropertiesListener clusterPropertiesListener;
   private ZkController zkController;
-  private State state = State.STOPPED;
 
   private final Set<ClusterEvent.EventType> supportedEvents =
       new HashSet<>(Arrays.asList(
@@ -71,22 +64,29 @@ public class DefaultClusterEventProducer implements ClusterEventProducer, Cluste
           ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED
       ));
 
-  public DefaultClusterEventProducer(CoreContainer coreContainer) {
-    this.coreContainer = coreContainer;
+  public DefaultClusterEventProducer(CoreContainer cc) {
+    super(cc);
   }
 
   // ClusterSingleton lifecycle methods
   @Override
-  public void start() {
-    if (coreContainer == null) {
+  public synchronized void start() {
+    if (log.isDebugEnabled()) {
+      log.debug("-- starting DCEP", new Exception(Integer.toHexString(hashCode())));
+    }
+    if (cc == null) {
       liveNodesListener = null;
       cloudCollectionsListener = null;
       clusterPropertiesListener = null;
       state = State.STOPPED;
       return;
     }
+    if (state == State.RUNNING) {
+      log.warn("Double start() invoked on {}, ignoring", this);
+      return;
+    }
     state = State.STARTING;
-    this.zkController = this.coreContainer.getZkController();
+    this.zkController = this.cc.getZkController();
 
     // clean up any previous instances
     doStop();
@@ -198,18 +198,16 @@ public class DefaultClusterEventProducer implements ClusterEventProducer, Cluste
     state = State.RUNNING;
   }
 
-  private void fireEvent(ClusterEvent event) {
-    listeners.getOrDefault(event.getType(), Collections.emptySet())
-        .forEach(listener -> listener.onEvent(event));
-  }
-
   @Override
-  public State getState() {
-    return state;
+  public Set<ClusterEvent.EventType> getSupportedEventTypes() {
+    return supportedEvents;
   }
 
   @Override
-  public void stop() {
+  public synchronized void stop() {
+    if (log.isDebugEnabled()) {
+      log.debug("-- stopping DCEP {}", Integer.toHexString(hashCode()));
+    }
     state = State.STOPPING;
     doStop();
     state = State.STOPPED;
@@ -229,44 +227,4 @@ public class DefaultClusterEventProducer implements ClusterEventProducer, Cluste
     cloudCollectionsListener = null;
     clusterPropertiesListener = null;
   }
-
-  @Override
-  public void registerListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
-    if (eventTypes == null || eventTypes.length == 0) {
-      eventTypes = ClusterEvent.EventType.values();
-    }
-    for (ClusterEvent.EventType type : eventTypes) {
-      if (!supportedEvents.contains(type)) {
-        log.warn("event type {} not supported yet.", type);
-        continue;
-      }
-      // to avoid removing no-longer empty set in unregister
-      synchronized (listeners) {
-        listeners.computeIfAbsent(type, t -> ConcurrentHashMap.newKeySet())
-            .add(listener);
-      }
-    }
-  }
-
-  @Override
-  public void unregisterListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
-    if (eventTypes == null || eventTypes.length == 0) {
-      eventTypes = ClusterEvent.EventType.values();
-    }
-    synchronized (listeners) {
-      for (ClusterEvent.EventType type : eventTypes) {
-        Set<ClusterEventListener> perType = listeners.get(type);
-        if (perType != null) {
-          perType.remove(listener);
-          if (perType.isEmpty()) {
-            listeners.remove(type);
-          }
-        }
-      }
-    }
-  }
-
-  public Map<ClusterEvent.EventType, Set<ClusterEventListener>> getEventListeners() {
-    return listeners;
-  }
 }
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/DelegatingClusterEventProducer.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/DelegatingClusterEventProducer.java
new file mode 100644
index 0000000..0f31de3
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/DelegatingClusterEventProducer.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events.impl;
+
+import org.apache.solr.cluster.events.ClusterEvent;
+import org.apache.solr.cluster.events.ClusterEventListener;
+import org.apache.solr.cluster.events.ClusterEventProducer;
+import org.apache.solr.cluster.events.NoOpProducer;
+import org.apache.solr.cluster.events.ClusterEventProducerBase;
+import org.apache.solr.core.CoreContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Set;
+
+/**
+ * This implementation allows Solr to dynamically change the underlying implementation in
+ * response to the changed plugin configuration.
+ */
+public final class DelegatingClusterEventProducer extends ClusterEventProducerBase {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private ClusterEventProducer delegate;
+
+  public DelegatingClusterEventProducer(CoreContainer cc) {
+    super(cc);
+    delegate = new NoOpProducer(cc);
+  }
+
+  public void setDelegate(ClusterEventProducer newDelegate) {
+    if (log.isDebugEnabled()) {
+      log.debug("--setting new delegate for CC-{}: {}", Integer.toHexString(cc.hashCode()), newDelegate);
+    }
+    this.delegate = newDelegate;
+    // transfer all listeners to the new delegate
+    listeners.forEach((type, listenerSet) -> {
+      listenerSet.forEach(listener -> {
+        try {
+          delegate.registerListener(listener, type);
+        } catch (Exception e) {
+          log.warn("Exception registering listener with the new event producer", e);
+          // make sure it's not registered
+          delegate.unregisterListener(listener, type);
+          // unregister it here, too
+          super.unregisterListener(listener, type);
+        }
+      });
+    });
+    if ((state == State.RUNNING || state == State.STARTING) &&
+        !(delegate.getState() == State.RUNNING || delegate.getState() == State.STARTING)) {
+      try {
+        delegate.start();
+        if (log.isDebugEnabled()) {
+          log.debug("--- started delegate {}", delegate);
+        }
+      } catch (Exception e) {
+        log.warn("Unable to start the new delegate {}: {}", delegate.getClass().getName(), e);
+      }
+    } else {
+      if (log.isDebugEnabled()) {
+        log.debug("--- delegate {} already in state {}", delegate, delegate.getState());
+      }
+    }
+  }
+
+  @Override
+  public void registerListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
+    super.registerListener(listener, eventTypes);
+    delegate.registerListener(listener, eventTypes);
+  }
+
+  @Override
+  public void unregisterListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
+    super.unregisterListener(listener, eventTypes);
+    delegate.unregisterListener(listener, eventTypes);
+  }
+
+  @Override
+  public synchronized void start() throws Exception {
+    if (log.isDebugEnabled()) {
+      log.debug("-- starting CC-{}, Delegating {}, delegate {}",
+          Integer.toHexString(cc.hashCode()), Integer.toHexString(hashCode()), delegate);
+    }
+    state = State.STARTING;
+    if (!(delegate.getState() == State.RUNNING || delegate.getState() == State.STARTING)) {
+      try {
+        delegate.start();
+        if (log.isDebugEnabled()) {
+          log.debug("--- started delegate {}", delegate);
+        }
+      } finally {
+        state = delegate.getState();
+      }
+    } else {
+      if (log.isDebugEnabled()) {
+        log.debug("--- delegate {} already in state {}", delegate, delegate.getState());
+      }
+    }
+  }
+
+  @Override
+  public Set<ClusterEvent.EventType> getSupportedEventTypes() {
+    return NoOpProducer.ALL_EVENT_TYPES;
+  }
+
+  @Override
+  public synchronized void stop() {
+    if (log.isDebugEnabled()) {
+      log.debug("-- stopping Delegating {}, delegate {}", Integer.toHexString(hashCode()), delegate);
+    }
+    state = State.STOPPING;
+    delegate.stop();
+    state = delegate.getState();
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 7390258..2f2652b 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -74,6 +74,7 @@ import org.apache.solr.cloud.ClusterSingleton;
 import org.apache.solr.cloud.OverseerTaskQueue;
 import org.apache.solr.cloud.ZkController;
 import org.apache.solr.cluster.events.ClusterEventProducer;
+import org.apache.solr.cluster.events.impl.ClusterEventProducerFactory;
 import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
diff --git a/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java b/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java
index e5b8661..031076f 100644
--- a/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java
+++ b/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java
@@ -24,8 +24,10 @@ import org.apache.solr.client.solrj.request.beans.PluginMeta;
 import org.apache.solr.client.solrj.response.V2Response;
 import org.apache.solr.cloud.ClusterSingleton;
 import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.cluster.events.impl.DefaultClusterEventProducer;
 import org.apache.solr.common.cloud.ClusterProperties;
 import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.LogLevel;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -35,6 +37,7 @@ import org.slf4j.LoggerFactory;
 
 import java.lang.invoke.MethodHandles;
 import java.time.Instant;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -48,6 +51,7 @@ import static org.apache.solr.client.solrj.SolrRequest.METHOD.POST;
 /**
  *
  */
+@LogLevel("org.apache.solr.cluster.events=DEBUG")
 public class ClusterEventProducerTest extends SolrCloudTestCase {
 
   private AllEventsListener eventsListener;
@@ -57,6 +61,15 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
     configureCluster(3)
         .addConfig("conf", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
         .configure();
+    PluginMeta plugin = new PluginMeta();
+    plugin.klass = DefaultClusterEventProducer.class.getName();
+    plugin.name = ClusterEventProducer.PLUGIN_NAME;
+    V2Request req = new V2Request.Builder("/cluster/plugin")
+        .withMethod(POST)
+        .withPayload(Collections.singletonMap("add", plugin))
+        .build();
+    V2Response rsp = req.process(cluster.getSolrClient());
+    assertNotNull(rsp);
   }
 
   @Before
diff --git a/solr/core/src/test/org/apache/solr/cluster/events/impl/CollectionsRepairEventListenerTest.java b/solr/core/src/test/org/apache/solr/cluster/events/impl/CollectionsRepairEventListenerTest.java
index 9e8adb1..d672f6d 100644
--- a/solr/core/src/test/org/apache/solr/cluster/events/impl/CollectionsRepairEventListenerTest.java
+++ b/solr/core/src/test/org/apache/solr/cluster/events/impl/CollectionsRepairEventListenerTest.java
@@ -19,19 +19,26 @@ package org.apache.solr.cluster.events.impl;
 
 import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.V2Request;
+import org.apache.solr.client.solrj.request.beans.PluginMeta;
+import org.apache.solr.client.solrj.response.V2Response;
 import org.apache.solr.cloud.ClusterSingleton;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.cluster.events.AllEventsListener;
 import org.apache.solr.cluster.events.ClusterEvent;
 import org.apache.solr.cluster.events.ClusterEventListener;
+import org.apache.solr.cluster.events.ClusterEventProducer;
 import org.apache.solr.core.CoreContainer;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.solr.client.solrj.SolrRequest.METHOD.POST;
+
 /**
  *
  */
@@ -44,6 +51,7 @@ public class CollectionsRepairEventListenerTest extends SolrCloudTestCase {
 
     CollectionsRepairWrapperListener(CoreContainer cc) throws Exception {
       delegate = new CollectionsRepairEventListener(cc);
+      delegate.setWaitForSecond(0);
     }
 
     @Override
@@ -83,6 +91,15 @@ public class CollectionsRepairEventListenerTest extends SolrCloudTestCase {
     configureCluster(NUM_NODES)
         .addConfig("conf", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
         .configure();
+    PluginMeta plugin = new PluginMeta();
+    plugin.klass = DefaultClusterEventProducer.class.getName();
+    plugin.name = ClusterEventProducer.PLUGIN_NAME;
+    V2Request req = new V2Request.Builder("/cluster/plugin")
+        .withMethod(POST)
+        .withPayload(Collections.singletonMap("add", plugin))
+        .build();
+    V2Response rsp = req.process(cluster.getSolrClient());
+    assertNotNull(rsp);
     CoreContainer cc = cluster.getOpenOverseer().getCoreContainer();
     cc.getClusterEventProducer()
         .registerListener(eventsListener, ClusterEvent.EventType.values());