You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2020/11/03 19:49:34 UTC
[lucene-solr] 03/03: SOLR-14749: Refactoring. Allow to set new
ClusterEventProducer plugin impl dynamically. Implement "waitFor" in the
CollectionsRepairEventListener.
This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch jira/solr-14749-api
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 801425d76e37e30ba93cf3c0f778b1eee3c6f5cc
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Tue Nov 3 20:47:54 2020 +0100
SOLR-14749: Refactoring. Allow to set new ClusterEventProducer plugin impl
dynamically. Implement "waitFor" in the CollectionsRepairEventListener.
---
.../cluster/events/ClusterEventProducerBase.java | 85 ++++++++++++++
.../apache/solr/cluster/events/NoOpProducer.java | 35 +++---
.../events/impl}/ClusterEventProducerFactory.java | 92 +++++++--------
.../impl/CollectionsRepairEventListener.java | 41 ++++++-
.../events/impl/DefaultClusterEventProducer.java | 82 ++++---------
.../impl/DelegatingClusterEventProducer.java | 130 +++++++++++++++++++++
.../java/org/apache/solr/core/CoreContainer.java | 1 +
.../cluster/events/ClusterEventProducerTest.java | 13 +++
.../impl/CollectionsRepairEventListenerTest.java | 17 +++
9 files changed, 365 insertions(+), 131 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducerBase.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducerBase.java
new file mode 100644
index 0000000..ecb5825
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducerBase.java
@@ -0,0 +1,85 @@
+package org.apache.solr.cluster.events;
+
+import org.apache.solr.core.CoreContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ *
+ */
+public abstract class ClusterEventProducerBase implements ClusterEventProducer {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ protected final Map<ClusterEvent.EventType, Set<ClusterEventListener>> listeners = new ConcurrentHashMap<>();
+ protected volatile State state = State.STOPPED;
+ protected final CoreContainer cc;
+
+ protected ClusterEventProducerBase(CoreContainer cc) {
+ this.cc = cc;
+ }
+
+ @Override
+ public void registerListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
+ if (eventTypes == null || eventTypes.length == 0) {
+ eventTypes = ClusterEvent.EventType.values();
+ }
+ for (ClusterEvent.EventType type : eventTypes) {
+ if (!getSupportedEventTypes().contains(type)) {
+ log.warn("event type {} not supported yet.", type);
+ continue;
+ }
+ // to avoid removing no-longer empty set in unregister
+ synchronized (listeners) {
+ listeners.computeIfAbsent(type, t -> ConcurrentHashMap.newKeySet())
+ .add(listener);
+ }
+ }
+ }
+
+ @Override
+ public void unregisterListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
+ if (eventTypes == null || eventTypes.length == 0) {
+ eventTypes = ClusterEvent.EventType.values();
+ }
+ synchronized (listeners) {
+ for (ClusterEvent.EventType type : eventTypes) {
+ Set<ClusterEventListener> perType = listeners.get(type);
+ if (perType != null) {
+ perType.remove(listener);
+ if (perType.isEmpty()) {
+ listeners.remove(type);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public State getState() {
+ return state;
+ }
+
+ public Map<ClusterEvent.EventType, Set<ClusterEventListener>> getEventListeners() {
+ return listeners;
+ }
+
+ public CoreContainer getCoreContainer() {
+ return cc;
+ }
+
+ public abstract Set<ClusterEvent.EventType> getSupportedEventTypes();
+
+ protected void fireEvent(ClusterEvent event) {
+ listeners.getOrDefault(event.getType(), Collections.emptySet())
+ .forEach(listener -> {
+ log.debug("--- firing event {} to {}", event, listener);
+ listener.onEvent(event);
+ });
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/NoOpProducer.java b/solr/core/src/java/org/apache/solr/cluster/events/NoOpProducer.java
index 8632895..16db727 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/NoOpProducer.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/NoOpProducer.java
@@ -16,39 +16,36 @@
*/
package org.apache.solr.cluster.events;
+import org.apache.solr.core.CoreContainer;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
/**
- * No-op implementation of {@link ClusterEventProducer}. This implementation is always in
- * RUNNING state.
+ * No-op implementation of {@link ClusterEventProducer}. This implementation doesn't
+ * generate any events.
*/
-public final class NoOpProducer implements ClusterEventProducer {
+public final class NoOpProducer extends ClusterEventProducerBase {
- @Override
- public void registerListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
- // no-op
- }
+ public static final Set<ClusterEvent.EventType> ALL_EVENT_TYPES = new HashSet<>(Arrays.asList(ClusterEvent.EventType.values()));
- @Override
- public void unregisterListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
- // no-op
+ public NoOpProducer(CoreContainer cc) {
+ super(cc);
}
@Override
- public String getName() {
- return ClusterEventProducer.PLUGIN_NAME;
+ public Set<ClusterEvent.EventType> getSupportedEventTypes() {
+ return ALL_EVENT_TYPES;
}
@Override
public void start() throws Exception {
- // no-op
- }
-
- @Override
- public State getState() {
- return State.RUNNING;
+ state = State.RUNNING;
}
@Override
public void stop() {
- // no-op
+ state = State.STOPPED;
}
}
diff --git a/solr/core/src/java/org/apache/solr/core/ClusterEventProducerFactory.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerFactory.java
similarity index 65%
rename from solr/core/src/java/org/apache/solr/core/ClusterEventProducerFactory.java
rename to solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerFactory.java
index 7142cba..0e2bd9a 100644
--- a/solr/core/src/java/org/apache/solr/core/ClusterEventProducerFactory.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerFactory.java
@@ -1,14 +1,16 @@
-package org.apache.solr.core;
+package org.apache.solr.cluster.events.impl;
import org.apache.solr.api.ContainerPluginsRegistry;
import org.apache.solr.cluster.events.ClusterEvent;
import org.apache.solr.cluster.events.ClusterEventListener;
import org.apache.solr.cluster.events.ClusterEventProducer;
-import org.apache.solr.cluster.events.impl.DefaultClusterEventProducer;
+import org.apache.solr.cluster.events.ClusterEventProducerBase;
+import org.apache.solr.cluster.events.NoOpProducer;
+import org.apache.solr.core.CoreContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
+import java.lang.invoke.MethodHandles;
import java.util.Set;
/**
@@ -16,14 +18,14 @@ import java.util.Set;
* when both the final {@link ClusterEventProducer} implementation and listeners
* are configured using plugins.
*/
-public class ClusterEventProducerFactory implements ClusterEventProducer {
- private Map<ClusterEvent.EventType, Set<ClusterEventListener>> initialListeners = new HashMap<>();
+public class ClusterEventProducerFactory extends ClusterEventProducerBase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
private ContainerPluginsRegistry.PluginRegistryListener initialPluginListener;
- private final CoreContainer cc;
private boolean created = false;
public ClusterEventProducerFactory(CoreContainer cc) {
- this.cc = cc;
+ super(cc);
initialPluginListener = new ContainerPluginsRegistry.PluginRegistryListener() {
@Override
public void added(ContainerPluginsRegistry.ApiInfo plugin) {
@@ -55,6 +57,11 @@ public class ClusterEventProducerFactory implements ClusterEventProducer {
};
}
+ @Override
+ public Set<ClusterEvent.EventType> getSupportedEventTypes() {
+ return NoOpProducer.ALL_EVENT_TYPES;
+ }
+
/**
* This method returns an initial plugin registry listener that helps to capture the
* freshly loaded listener plugins before the final cluster event producer is created.
@@ -73,20 +80,19 @@ public class ClusterEventProducerFactory implements ClusterEventProducer {
* @param plugins current plugin configurations
* @return configured instance of cluster event producer (with side-effects, see above)
*/
- public ClusterEventProducer create(ContainerPluginsRegistry plugins) {
+ public DelegatingClusterEventProducer create(ContainerPluginsRegistry plugins) {
if (created) {
throw new RuntimeException("this factory can be called only once!");
}
- final ClusterEventProducer clusterEventProducer;
+ final DelegatingClusterEventProducer clusterEventProducer = new DelegatingClusterEventProducer(cc);
+ // since this is a ClusterSingleton, register it as such
+ cc.getClusterSingletons().getSingletons().put(ClusterEventProducer.PLUGIN_NAME +"_delegate", clusterEventProducer);
ContainerPluginsRegistry.ApiInfo clusterEventProducerInfo = plugins.getPlugin(ClusterEventProducer.PLUGIN_NAME);
if (clusterEventProducerInfo != null) {
// the listener in ClusterSingletons already registered it
- clusterEventProducer = (ClusterEventProducer) clusterEventProducerInfo.getInstance();
+ clusterEventProducer.setDelegate((ClusterEventProducer) clusterEventProducerInfo.getInstance());
} else {
- // create the default impl
- clusterEventProducer = new DefaultClusterEventProducer(cc);
- // since this is a ClusterSingleton, register it as such
- cc.getClusterSingletons().getSingletons().put(ClusterEventProducer.PLUGIN_NAME, clusterEventProducer);
+ // use the default NoOp impl
}
// transfer those listeners that were already registered to the initial impl
transferListeners(clusterEventProducer, plugins);
@@ -101,6 +107,15 @@ public class ClusterEventProducerFactory implements ClusterEventProducer {
if (instance instanceof ClusterEventListener) {
ClusterEventListener listener = (ClusterEventListener) instance;
clusterEventProducer.registerListener(listener);
+ } else if (instance instanceof ClusterEventProducer) {
+ // replace the existing impl
+ if (cc.getClusterEventProducer() instanceof DelegatingClusterEventProducer) {
+ ((DelegatingClusterEventProducer) cc.getClusterEventProducer())
+ .setDelegate((ClusterEventProducer) instance);
+ } else {
+ log.warn("Can't configure plugin-based ClusterEventProducer while CoreContainer is still loading - " +
+ " using existing implementation {}", cc.getClusterEventProducer().getClass().getName());
+ }
}
}
@@ -113,6 +128,15 @@ public class ClusterEventProducerFactory implements ClusterEventProducer {
if (instance instanceof ClusterEventListener) {
ClusterEventListener listener = (ClusterEventListener) instance;
clusterEventProducer.unregisterListener(listener);
+ } else if (instance instanceof ClusterEventProducer) {
+ // replace the existing impl with NoOp
+ if (cc.getClusterEventProducer() instanceof DelegatingClusterEventProducer) {
+ ((DelegatingClusterEventProducer) cc.getClusterEventProducer())
+ .setDelegate(new NoOpProducer(cc));
+ } else {
+ log.warn("Can't configure plugin-based ClusterEventProducer while CoreContainer is still loading - " +
+ " using existing implementation {}", cc.getClusterEventProducer().getClass().getName());
+ }
}
}
@@ -131,45 +155,19 @@ public class ClusterEventProducerFactory implements ClusterEventProducer {
// stop capturing listener plugins
plugins.unregisterListener(initialPluginListener);
// transfer listeners that are already registered
- initialListeners.forEach((type, listeners) -> {
- listeners.forEach(listener -> target.registerListener(listener, type));
+ listeners.forEach((type, listenersSet) -> {
+ listenersSet.forEach(listener -> target.registerListener(listener, type));
});
- initialListeners.clear();
- initialListeners = null;
- }
-
- // ClusterEventProducer API, parts needed to register initial listeners.
-
- @Override
- public void registerListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
- if (eventTypes == null || eventTypes.length == 0) {
- eventTypes = ClusterEvent.EventType.values();
- }
- for (ClusterEvent.EventType type : eventTypes) {
- initialListeners.computeIfAbsent(type, t -> new HashSet<>())
- .add(listener);
- }
- }
-
- @Override
- public void unregisterListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
- throw new UnsupportedOperationException("unregister listener not implemented");
+ listeners.clear();
}
@Override
public void start() throws Exception {
- throw new UnsupportedOperationException("start not implemented");
- }
-
- @Override
- public State getState() {
- return State.STOPPED;
+ state = State.RUNNING;
}
@Override
public void stop() {
- throw new UnsupportedOperationException("stop not implemented");
+ state = State.STOPPED;
}
-
-
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
index a5fa14f..de69fb9 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
@@ -25,8 +25,11 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -53,6 +56,8 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String PLUGIN_NAME = "collectionsRepairListener";
+ public static final int DEFAULT_WAIT_FOR_SEC = 30;
+
private static final String ASYNC_ID_PREFIX = "_async_" + PLUGIN_NAME;
private static final AtomicInteger counter = new AtomicInteger();
@@ -61,11 +66,18 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
private State state = State.STOPPED;
+ private int waitForSecond = DEFAULT_WAIT_FOR_SEC;
+
public CollectionsRepairEventListener(CoreContainer cc) {
this.solrClient = cc.getSolrClientCache().getCloudSolrClient(cc.getZkController().getZkClient().getZkServerAddress());
this.solrCloudManager = cc.getZkController().getSolrCloudManager();
}
+ @VisibleForTesting
+ public void setWaitForSecond(int waitForSecond) {
+ this.waitForSecond = waitForSecond;
+ }
+
@Override
public String getName() {
return PLUGIN_NAME;
@@ -86,19 +98,39 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
}
}
+ private Map<String, Long> nodeNameVsTimeRemoved = new ConcurrentHashMap<>();
+
private void handleNodesDown(NodesDownEvent event) {
+
+ // tracking for the purpose of "waitFor" delay
+
+ // have any nodes that we were tracking been added to the cluster?
+ // if so, remove them from the tracking map
+ Set<String> trackingKeySet = nodeNameVsTimeRemoved.keySet();
+ trackingKeySet.removeAll(solrCloudManager.getClusterStateProvider().getLiveNodes());
+ event.getNodeNames().forEachRemaining(lostNode -> {
+ nodeNameVsTimeRemoved.computeIfAbsent(lostNode, n -> solrCloudManager.getTimeSource().getTimeNs());
+ });
+
+ Set<String> reallyLostNodes = new HashSet<>();
+ nodeNameVsTimeRemoved.forEach((lostNode, timeRemoved) -> {
+ long now = solrCloudManager.getTimeSource().getTimeNs();
+ long te = TimeUnit.SECONDS.convert(now - timeRemoved, TimeUnit.NANOSECONDS);
+ if (te >= waitForSecond) {
+ reallyLostNodes.add(lostNode);
+ }
+ });
+
// collect all lost replicas
// collection / positions
Map<String, List<ReplicaPosition>> newPositions = new HashMap<>();
try {
ClusterState clusterState = solrCloudManager.getClusterStateProvider().getClusterState();
- Set<String> lostNodeNames = new HashSet<>();
- event.getNodeNames().forEachRemaining(lostNodeNames::add);
clusterState.forEachCollection(coll -> {
// shard / type / count
Map<String, Map<Replica.Type, AtomicInteger>> lostReplicas = new HashMap<>();
coll.forEachReplica((shard, replica) -> {
- if (lostNodeNames.contains(replica.getNodeName())) {
+ if (reallyLostNodes.contains(replica.getNodeName())) {
lostReplicas.computeIfAbsent(shard, s -> new HashMap<>())
.computeIfAbsent(replica.type, t -> new AtomicInteger())
.incrementAndGet();
@@ -137,6 +169,9 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
return;
}
+ // remove from the tracking set
+ nodeNameVsTimeRemoved.keySet().removeAll(reallyLostNodes);
+
// send ADDREPLICA admin requests for each lost replica
// XXX should we use 'async' for that, to avoid blocking here?
List<CollectionAdminRequest.AddReplica> addReplicas = new ArrayList<>();
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/DefaultClusterEventProducer.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/DefaultClusterEventProducer.java
index f47d576..188bfa5 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/impl/DefaultClusterEventProducer.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/DefaultClusterEventProducer.java
@@ -19,20 +19,16 @@ package org.apache.solr.cluster.events.impl;
import java.lang.invoke.MethodHandles;
import java.time.Instant;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cluster.events.ClusterEventProducerBase;
import org.apache.solr.cluster.events.ClusterPropertiesChangedEvent;
import org.apache.solr.cluster.events.ClusterEvent;
-import org.apache.solr.cluster.events.ClusterEventListener;
import org.apache.solr.cluster.events.ClusterEventProducer;
-import org.apache.solr.cloud.ClusterSingleton;
import org.apache.solr.cluster.events.CollectionsAddedEvent;
import org.apache.solr.cluster.events.CollectionsRemovedEvent;
import org.apache.solr.cluster.events.NodesDownEvent;
@@ -51,16 +47,13 @@ import org.slf4j.LoggerFactory;
* (not in parallel) and in arbitrary order. This means that if any listener blocks the
* processing other listeners may be invoked much later or not at all.</p>
*/
-public class DefaultClusterEventProducer implements ClusterEventProducer, ClusterSingleton {
+public class DefaultClusterEventProducer extends ClusterEventProducerBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final Map<ClusterEvent.EventType, Set<ClusterEventListener>> listeners = new HashMap<>();
- private CoreContainer coreContainer;
private LiveNodesListener liveNodesListener;
private CloudCollectionsListener cloudCollectionsListener;
private ClusterPropertiesListener clusterPropertiesListener;
private ZkController zkController;
- private State state = State.STOPPED;
private final Set<ClusterEvent.EventType> supportedEvents =
new HashSet<>(Arrays.asList(
@@ -71,22 +64,29 @@ public class DefaultClusterEventProducer implements ClusterEventProducer, Cluste
ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED
));
- public DefaultClusterEventProducer(CoreContainer coreContainer) {
- this.coreContainer = coreContainer;
+ public DefaultClusterEventProducer(CoreContainer cc) {
+ super(cc);
}
// ClusterSingleton lifecycle methods
@Override
- public void start() {
- if (coreContainer == null) {
+ public synchronized void start() {
+ if (log.isDebugEnabled()) {
+ log.debug("-- starting DCEP", new Exception(Integer.toHexString(hashCode())));
+ }
+ if (cc == null) {
liveNodesListener = null;
cloudCollectionsListener = null;
clusterPropertiesListener = null;
state = State.STOPPED;
return;
}
+ if (state == State.RUNNING) {
+ log.warn("Double start() invoked on {}, ignoring", this);
+ return;
+ }
state = State.STARTING;
- this.zkController = this.coreContainer.getZkController();
+ this.zkController = this.cc.getZkController();
// clean up any previous instances
doStop();
@@ -198,18 +198,16 @@ public class DefaultClusterEventProducer implements ClusterEventProducer, Cluste
state = State.RUNNING;
}
- private void fireEvent(ClusterEvent event) {
- listeners.getOrDefault(event.getType(), Collections.emptySet())
- .forEach(listener -> listener.onEvent(event));
- }
-
@Override
- public State getState() {
- return state;
+ public Set<ClusterEvent.EventType> getSupportedEventTypes() {
+ return supportedEvents;
}
@Override
- public void stop() {
+ public synchronized void stop() {
+ if (log.isDebugEnabled()) {
+ log.debug("-- stopping DCEP {}", Integer.toHexString(hashCode()));
+ }
state = State.STOPPING;
doStop();
state = State.STOPPED;
@@ -229,44 +227,4 @@ public class DefaultClusterEventProducer implements ClusterEventProducer, Cluste
cloudCollectionsListener = null;
clusterPropertiesListener = null;
}
-
- @Override
- public void registerListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
- if (eventTypes == null || eventTypes.length == 0) {
- eventTypes = ClusterEvent.EventType.values();
- }
- for (ClusterEvent.EventType type : eventTypes) {
- if (!supportedEvents.contains(type)) {
- log.warn("event type {} not supported yet.", type);
- continue;
- }
- // to avoid removing no-longer empty set in unregister
- synchronized (listeners) {
- listeners.computeIfAbsent(type, t -> ConcurrentHashMap.newKeySet())
- .add(listener);
- }
- }
- }
-
- @Override
- public void unregisterListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
- if (eventTypes == null || eventTypes.length == 0) {
- eventTypes = ClusterEvent.EventType.values();
- }
- synchronized (listeners) {
- for (ClusterEvent.EventType type : eventTypes) {
- Set<ClusterEventListener> perType = listeners.get(type);
- if (perType != null) {
- perType.remove(listener);
- if (perType.isEmpty()) {
- listeners.remove(type);
- }
- }
- }
- }
- }
-
- public Map<ClusterEvent.EventType, Set<ClusterEventListener>> getEventListeners() {
- return listeners;
- }
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/DelegatingClusterEventProducer.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/DelegatingClusterEventProducer.java
new file mode 100644
index 0000000..0f31de3
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/DelegatingClusterEventProducer.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events.impl;
+
+import org.apache.solr.cluster.events.ClusterEvent;
+import org.apache.solr.cluster.events.ClusterEventListener;
+import org.apache.solr.cluster.events.ClusterEventProducer;
+import org.apache.solr.cluster.events.NoOpProducer;
+import org.apache.solr.cluster.events.ClusterEventProducerBase;
+import org.apache.solr.core.CoreContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Set;
+
+/**
+ * This implementation allows Solr to dynamically change the underlying implementation in
+ * response to the changed plugin configuration.
+ */
+public final class DelegatingClusterEventProducer extends ClusterEventProducerBase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private ClusterEventProducer delegate;
+
+ public DelegatingClusterEventProducer(CoreContainer cc) {
+ super(cc);
+ delegate = new NoOpProducer(cc);
+ }
+
+ public void setDelegate(ClusterEventProducer newDelegate) {
+ if (log.isDebugEnabled()) {
+ log.debug("--setting new delegate for CC-{}: {}", Integer.toHexString(cc.hashCode()), newDelegate);
+ }
+ this.delegate = newDelegate;
+ // transfer all listeners to the new delegate
+ listeners.forEach((type, listenerSet) -> {
+ listenerSet.forEach(listener -> {
+ try {
+ delegate.registerListener(listener, type);
+ } catch (Exception e) {
+ log.warn("Exception registering listener with the new event producer", e);
+ // make sure it's not registered
+ delegate.unregisterListener(listener, type);
+ // unregister it here, too
+ super.unregisterListener(listener, type);
+ }
+ });
+ });
+ if ((state == State.RUNNING || state == State.STARTING) &&
+ !(delegate.getState() == State.RUNNING || delegate.getState() == State.STARTING)) {
+ try {
+ delegate.start();
+ if (log.isDebugEnabled()) {
+ log.debug("--- started delegate {}", delegate);
+ }
+ } catch (Exception e) {
+ log.warn("Unable to start the new delegate {}: {}", delegate.getClass().getName(), e);
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("--- delegate {} already in state {}", delegate, delegate.getState());
+ }
+ }
+ }
+
+ @Override
+ public void registerListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
+ super.registerListener(listener, eventTypes);
+ delegate.registerListener(listener, eventTypes);
+ }
+
+ @Override
+ public void unregisterListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
+ super.unregisterListener(listener, eventTypes);
+ delegate.unregisterListener(listener, eventTypes);
+ }
+
+ @Override
+ public synchronized void start() throws Exception {
+ if (log.isDebugEnabled()) {
+ log.debug("-- starting CC-{}, Delegating {}, delegate {}",
+ Integer.toHexString(cc.hashCode()), Integer.toHexString(hashCode()), delegate);
+ }
+ state = State.STARTING;
+ if (!(delegate.getState() == State.RUNNING || delegate.getState() == State.STARTING)) {
+ try {
+ delegate.start();
+ if (log.isDebugEnabled()) {
+ log.debug("--- started delegate {}", delegate);
+ }
+ } finally {
+ state = delegate.getState();
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("--- delegate {} already in state {}", delegate, delegate.getState());
+ }
+ }
+ }
+
+ @Override
+ public Set<ClusterEvent.EventType> getSupportedEventTypes() {
+ return NoOpProducer.ALL_EVENT_TYPES;
+ }
+
+ @Override
+ public synchronized void stop() {
+ if (log.isDebugEnabled()) {
+ log.debug("-- stopping Delegating {}, delegate {}", Integer.toHexString(hashCode()), delegate);
+ }
+ state = State.STOPPING;
+ delegate.stop();
+ state = delegate.getState();
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 7390258..2f2652b 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -74,6 +74,7 @@ import org.apache.solr.cloud.ClusterSingleton;
import org.apache.solr.cloud.OverseerTaskQueue;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cluster.events.ClusterEventProducer;
+import org.apache.solr.cluster.events.impl.ClusterEventProducerFactory;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
diff --git a/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java b/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java
index e5b8661..031076f 100644
--- a/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java
+++ b/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java
@@ -24,8 +24,10 @@ import org.apache.solr.client.solrj.request.beans.PluginMeta;
import org.apache.solr.client.solrj.response.V2Response;
import org.apache.solr.cloud.ClusterSingleton;
import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.cluster.events.impl.DefaultClusterEventProducer;
import org.apache.solr.common.cloud.ClusterProperties;
import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.LogLevel;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -35,6 +37,7 @@ import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.time.Instant;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -48,6 +51,7 @@ import static org.apache.solr.client.solrj.SolrRequest.METHOD.POST;
/**
*
*/
+@LogLevel("org.apache.solr.cluster.events=DEBUG")
public class ClusterEventProducerTest extends SolrCloudTestCase {
private AllEventsListener eventsListener;
@@ -57,6 +61,15 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
configureCluster(3)
.addConfig("conf", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
.configure();
+ PluginMeta plugin = new PluginMeta();
+ plugin.klass = DefaultClusterEventProducer.class.getName();
+ plugin.name = ClusterEventProducer.PLUGIN_NAME;
+ V2Request req = new V2Request.Builder("/cluster/plugin")
+ .withMethod(POST)
+ .withPayload(Collections.singletonMap("add", plugin))
+ .build();
+ V2Response rsp = req.process(cluster.getSolrClient());
+ assertNotNull(rsp);
}
@Before
diff --git a/solr/core/src/test/org/apache/solr/cluster/events/impl/CollectionsRepairEventListenerTest.java b/solr/core/src/test/org/apache/solr/cluster/events/impl/CollectionsRepairEventListenerTest.java
index 9e8adb1..d672f6d 100644
--- a/solr/core/src/test/org/apache/solr/cluster/events/impl/CollectionsRepairEventListenerTest.java
+++ b/solr/core/src/test/org/apache/solr/cluster/events/impl/CollectionsRepairEventListenerTest.java
@@ -19,19 +19,26 @@ package org.apache.solr.cluster.events.impl;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.V2Request;
+import org.apache.solr.client.solrj.request.beans.PluginMeta;
+import org.apache.solr.client.solrj.response.V2Response;
import org.apache.solr.cloud.ClusterSingleton;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.cluster.events.AllEventsListener;
import org.apache.solr.cluster.events.ClusterEvent;
import org.apache.solr.cluster.events.ClusterEventListener;
+import org.apache.solr.cluster.events.ClusterEventProducer;
import org.apache.solr.core.CoreContainer;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import static org.apache.solr.client.solrj.SolrRequest.METHOD.POST;
+
/**
*
*/
@@ -44,6 +51,7 @@ public class CollectionsRepairEventListenerTest extends SolrCloudTestCase {
CollectionsRepairWrapperListener(CoreContainer cc) throws Exception {
delegate = new CollectionsRepairEventListener(cc);
+ delegate.setWaitForSecond(0);
}
@Override
@@ -83,6 +91,15 @@ public class CollectionsRepairEventListenerTest extends SolrCloudTestCase {
configureCluster(NUM_NODES)
.addConfig("conf", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
.configure();
+ PluginMeta plugin = new PluginMeta();
+ plugin.klass = DefaultClusterEventProducer.class.getName();
+ plugin.name = ClusterEventProducer.PLUGIN_NAME;
+ V2Request req = new V2Request.Builder("/cluster/plugin")
+ .withMethod(POST)
+ .withPayload(Collections.singletonMap("add", plugin))
+ .build();
+ V2Response rsp = req.process(cluster.getSolrClient());
+ assertNotNull(rsp);
CoreContainer cc = cluster.getOpenOverseer().getCoreContainer();
cc.getClusterEventProducer()
.registerListener(eventsListener, ClusterEvent.EventType.values());