You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2020/11/05 11:18:19 UTC
[lucene-solr] branch master updated: SOLR-14749: Provide a clean
API for cluster-level event processing.
This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/master by this push:
new bdc6e82 SOLR-14749: Provide a clean API for cluster-level event processing.
bdc6e82 is described below
commit bdc6e8247fdb162902c794b73fdc228d526a3a6e
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Thu Nov 5 12:16:35 2020 +0100
SOLR-14749: Provide a clean API for cluster-level event processing.
---
solr/CHANGES.txt | 3 +-
...rPlugins.java => ContainerPluginsRegistry.java} | 16 +-
.../apache/solr/cluster/events/ClusterEvent.java | 47 +++
.../solr/cluster/events/ClusterEventListener.java | 36 ++
.../solr/cluster/events/ClusterEventProducer.java | 62 ++++
.../cluster/events/ClusterEventProducerBase.java | 107 ++++++
.../events/ClusterPropertiesChangedEvent.java | 33 ++
.../solr/cluster/events/CollectionsAddedEvent.java | 33 ++
.../cluster/events/CollectionsRemovedEvent.java | 32 ++
.../apache/solr/cluster/events/NoOpProducer.java | 51 +++
.../apache/solr/cluster/events/NodesDownEvent.java | 32 ++
.../apache/solr/cluster/events/NodesUpEvent.java | 32 ++
.../events/impl/ClusterEventProducerFactory.java | 195 +++++++++++
.../impl/CollectionsRepairEventListener.java | 264 +++++++++++++++
.../events/impl/DefaultClusterEventProducer.java | 231 +++++++++++++
.../impl/DelegatingClusterEventProducer.java | 141 ++++++++
.../solr/cluster/events/impl/package-info.java | 23 ++
.../apache/solr/cluster/events/package-info.java | 23 ++
.../org/apache/solr/core/ClusterSingletons.java | 22 +-
.../java/org/apache/solr/core/CoreContainer.java | 34 +-
.../solr/handler/admin/ContainerPluginsApi.java | 4 +-
.../solr/cluster/events/AllEventsListener.java | 61 ++++
.../cluster/events/ClusterEventProducerTest.java | 370 +++++++++++++++++++++
.../impl/CollectionsRepairEventListenerTest.java | 164 +++++++++
.../apache/solr/handler/TestContainerPlugin.java | 7 -
solr/dev-docs/plugins/container-plugins.adoc | 81 +++++
26 files changed, 2073 insertions(+), 31 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 75641af..0dd9f18 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -16,7 +16,8 @@ New Features
* SOLR-13528 Rate Limiting in Solr (Atri Sharma, Mike Drob)
-* SOLR-14749: Improve support for arbitrary container-level plugins. Add ClusterSingleton
+* SOLR-14749: Provide a clean API for cluster-level event processing.
+ Improve support for arbitrary container-level plugins. Add ClusterSingleton
support for plugins that require only one active instance in the cluster. (ab, noble)
Improvements
diff --git a/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java b/solr/core/src/java/org/apache/solr/api/ContainerPluginsRegistry.java
similarity index 96%
rename from solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java
rename to solr/core/src/java/org/apache/solr/api/ContainerPluginsRegistry.java
index 91b0452..8d0267c 100644
--- a/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java
+++ b/solr/core/src/java/org/apache/solr/api/ContainerPluginsRegistry.java
@@ -39,6 +39,7 @@ import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.annotation.JsonProperty;
import org.apache.solr.common.cloud.ClusterPropertiesListener;
+import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.PathTrie;
import org.apache.solr.common.util.ReflectMapWriter;
import org.apache.solr.common.util.StrUtils;
@@ -65,7 +66,7 @@ import static org.apache.solr.common.util.Utils.makeMap;
* for additional functionality by {@link PluginRegistryListener}-s registered with
* this class.</p>
*/
-public class CustomContainerPlugins implements ClusterPropertiesListener, MapWriter {
+public class ContainerPluginsRegistry implements ClusterPropertiesListener, MapWriter, Closeable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final ObjectMapper mapper = SolrJacksonAnnotationInspector.createObjectMapper();
@@ -90,16 +91,25 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
listeners.remove(listener);
}
- public CustomContainerPlugins(CoreContainer coreContainer, ApiBag apiBag) {
+ public ContainerPluginsRegistry(CoreContainer coreContainer, ApiBag apiBag) {
this.coreContainer = coreContainer;
this.containerApiBag = apiBag;
}
@Override
- public void writeMap(EntryWriter ew) throws IOException {
+ public synchronized void writeMap(EntryWriter ew) throws IOException {
currentPlugins.forEach(ew.getBiConsumer());
}
+ @Override
+ public synchronized void close() throws IOException {
+ currentPlugins.values().forEach(apiInfo -> {
+ if (apiInfo.instance instanceof Closeable) {
+ IOUtils.closeQuietly((Closeable) apiInfo.instance);
+ }
+ });
+ }
+
public synchronized ApiInfo getPlugin(String name) {
return currentPlugins.get(name);
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java
new file mode 100644
index 0000000..a9f09a1
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events;
+
+import java.time.Instant;
+
+/**
+ * Cluster-level event.
+ */
+public interface ClusterEvent {
+
+ enum EventType {
+ /** One or more nodes went down. */
+ NODES_DOWN,
+ /** One or more nodes went up. */
+ NODES_UP,
+ /** One or more collections have been added. */
+ COLLECTIONS_ADDED,
+ /** One or more collections have been removed. */
+ COLLECTIONS_REMOVED,
+ /** Cluster properties have changed. */
+ CLUSTER_PROPERTIES_CHANGED
+ // other types? eg. Overseer leader change, shard leader change,
+ // node overload (eg. CPU / MEM circuit breakers tripped)?
+ }
+
+ /** Get event type. */
+ EventType getType();
+
+ /** Get event timestamp. This is the instant when the event was generated (not necessarily when
+ * the underlying condition first occurred). */
+ Instant getTimestamp();
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java
new file mode 100644
index 0000000..3443b37
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events;
+
+import java.io.Closeable;
+
+/**
+ * Components that want to be notified of cluster-wide events should use this.
+ *
+ * XXX should this work only for ClusterSingleton-s? some types of events may be
+ * XXX difficult (or pointless) to propagate to every node.
+ */
+public interface ClusterEventListener extends Closeable {
+
+ /**
+ * Handle the event. Implementations should be non-blocking - if any long
+ * processing is needed it should be performed asynchronously.
+ * @param event cluster event
+ */
+ void onEvent(ClusterEvent event);
+
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java
new file mode 100644
index 0000000..d3b0ee7
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events;
+
+import org.apache.solr.cloud.ClusterSingleton;
+
+import java.io.Closeable;
+
+/**
+ * Component that produces {@link ClusterEvent} instances.
+ */
+public interface ClusterEventProducer extends ClusterSingleton, Closeable {
+
+ /** Unique name for the registration of a plugin-based implementation. */
+ String PLUGIN_NAME = "cluster-event-producer";
+
+ @Override
+ default String getName() {
+ return PLUGIN_NAME;
+ }
+
+ /**
+ * Register an event listener for processing the specified event types.
+ * @param listener non-null listener. If the same instance of the listener is
+ * already registered for some event types then it will be also registered
+ * for additional event types specified in this call.
+ * @param eventTypes event types that this listener is being registered for.
+ * If this is null or empty then all types will be used.
+ */
+ void registerListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes);
+
+ /**
+ * Unregister an event listener for all event types.
+ * @param listener non-null listener.
+ */
+ default void unregisterListener(ClusterEventListener listener) {
+ unregisterListener(listener, ClusterEvent.EventType.values());
+ }
+
+ /**
+ * Unregister an event listener for specified event types.
+ * @param listener non-null listener.
+ * @param eventTypes event types from which the listener will be unregistered. If this
+ * is null or empty then all event types will be used
+ */
+ void unregisterListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes);
+
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducerBase.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducerBase.java
new file mode 100644
index 0000000..4bd400d
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducerBase.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events;
+
+import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.core.CoreContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Base class for implementing {@link ClusterEventProducer}.
+ */
+public abstract class ClusterEventProducerBase implements ClusterEventProducer {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ protected final Map<ClusterEvent.EventType, Set<ClusterEventListener>> listeners = new ConcurrentHashMap<>();
+ protected volatile State state = State.STOPPED;
+ protected final CoreContainer cc;
+
+ protected ClusterEventProducerBase(CoreContainer cc) {
+ this.cc = cc;
+ }
+
+ @Override
+ public void registerListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
+ if (eventTypes == null || eventTypes.length == 0) {
+ eventTypes = ClusterEvent.EventType.values();
+ }
+ for (ClusterEvent.EventType type : eventTypes) {
+ if (!getSupportedEventTypes().contains(type)) {
+ log.warn("event type {} not supported yet.", type);
+ continue;
+ }
+ // to avoid removing no-longer empty set on race in unregister
+ synchronized (listeners) {
+ listeners.computeIfAbsent(type, t -> ConcurrentHashMap.newKeySet())
+ .add(listener);
+ }
+ }
+ }
+
+ @Override
+ public void unregisterListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
+ if (eventTypes == null || eventTypes.length == 0) {
+ eventTypes = ClusterEvent.EventType.values();
+ }
+ synchronized (listeners) {
+ for (ClusterEvent.EventType type : eventTypes) {
+ Set<ClusterEventListener> perType = listeners.get(type);
+ if (perType != null) {
+ perType.remove(listener);
+ if (perType.isEmpty()) {
+ listeners.remove(type);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public State getState() {
+ return state;
+ }
+
+ @Override
+ public void close() throws IOException {
+ synchronized (listeners) {
+ listeners.values().forEach(listenerSet ->
+ listenerSet.forEach(listener -> IOUtils.closeQuietly(listener)));
+ }
+ }
+
+ public abstract Set<ClusterEvent.EventType> getSupportedEventTypes();
+
+ protected void fireEvent(ClusterEvent event) {
+ synchronized (listeners) {
+ listeners.getOrDefault(event.getType(), Collections.emptySet())
+ .forEach(listener -> {
+ if (log.isDebugEnabled()) {
+ log.debug("--- firing event {} to {}", event, listener);
+ }
+ listener.onEvent(event);
+ });
+ }
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterPropertiesChangedEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterPropertiesChangedEvent.java
new file mode 100644
index 0000000..ee513d8
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterPropertiesChangedEvent.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events;
+
+import java.util.Map;
+
+/**
+ * Event generated when {@link org.apache.solr.common.cloud.ZkStateReader#CLUSTER_PROPS} is modified.
+ */
+public interface ClusterPropertiesChangedEvent extends ClusterEvent {
+
+ @Override
+ default EventType getType() {
+ return EventType.CLUSTER_PROPERTIES_CHANGED;
+ }
+
+ Map<String, Object> getNewClusterProperties();
+
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/CollectionsAddedEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsAddedEvent.java
new file mode 100644
index 0000000..8614769
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsAddedEvent.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events;
+
+import java.util.Iterator;
+
+/**
+ * Event generated when some collections have been added.
+ */
+public interface CollectionsAddedEvent extends ClusterEvent {
+
+ @Override
+ default EventType getType() {
+ return EventType.COLLECTIONS_ADDED;
+ }
+
+ Iterator<String> getCollectionNames();
+
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/CollectionsRemovedEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsRemovedEvent.java
new file mode 100644
index 0000000..e6fc64e
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsRemovedEvent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events;
+
+import java.util.Iterator;
+
+/**
+ * Event generated when some collections have been removed.
+ */
+public interface CollectionsRemovedEvent extends ClusterEvent {
+
+ @Override
+ default EventType getType() {
+ return EventType.COLLECTIONS_REMOVED;
+ }
+
+ Iterator<String> getCollectionNames();
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/NoOpProducer.java b/solr/core/src/java/org/apache/solr/cluster/events/NoOpProducer.java
new file mode 100644
index 0000000..16db727
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/NoOpProducer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events;
+
+import org.apache.solr.core.CoreContainer;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * No-op implementation of {@link ClusterEventProducer}. This implementation doesn't
+ * generate any events.
+ */
+public final class NoOpProducer extends ClusterEventProducerBase {
+
+ public static final Set<ClusterEvent.EventType> ALL_EVENT_TYPES = new HashSet<>(Arrays.asList(ClusterEvent.EventType.values()));
+
+ public NoOpProducer(CoreContainer cc) {
+ super(cc);
+ }
+
+ @Override
+ public Set<ClusterEvent.EventType> getSupportedEventTypes() {
+ return ALL_EVENT_TYPES;
+ }
+
+ @Override
+ public void start() throws Exception {
+ state = State.RUNNING;
+ }
+
+ @Override
+ public void stop() {
+ state = State.STOPPED;
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/NodesDownEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/NodesDownEvent.java
new file mode 100644
index 0000000..a8e7a2e
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/NodesDownEvent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events;
+
+import java.util.Iterator;
+
+/**
+ * Event generated when some nodes went down.
+ */
+public interface NodesDownEvent extends ClusterEvent {
+
+ @Override
+ default EventType getType() {
+ return EventType.NODES_DOWN;
+ }
+
+ Iterator<String> getNodeNames();
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/NodesUpEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/NodesUpEvent.java
new file mode 100644
index 0000000..f83bf91
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/NodesUpEvent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events;
+
+import java.util.Iterator;
+
+/**
+ * Event generated when some nodes went up.
+ */
+public interface NodesUpEvent extends ClusterEvent {
+
+ @Override
+ default EventType getType() {
+ return EventType.NODES_UP;
+ }
+
+ Iterator<String> getNodeNames();
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerFactory.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerFactory.java
new file mode 100644
index 0000000..17f769b
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerFactory.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events.impl;
+
+import org.apache.solr.api.ContainerPluginsRegistry;
+import org.apache.solr.cluster.events.ClusterEvent;
+import org.apache.solr.cluster.events.ClusterEventListener;
+import org.apache.solr.cluster.events.ClusterEventProducer;
+import org.apache.solr.cluster.events.ClusterEventProducerBase;
+import org.apache.solr.cluster.events.NoOpProducer;
+import org.apache.solr.core.CoreContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Set;
+
+/**
+ * This class helps in handling the initial registration of plugin-based listeners,
+ * when both the final {@link ClusterEventProducer} implementation and listeners
+ * are configured using plugins.
+ */
+public class ClusterEventProducerFactory extends ClusterEventProducerBase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private ContainerPluginsRegistry.PluginRegistryListener initialPluginListener;
+ private boolean created = false;
+
+ public ClusterEventProducerFactory(CoreContainer cc) {
+ super(cc);
+ // this initial listener is used only for capturing plugin registrations
+ // done by other nodes while this CoreContainer is still loading
+ initialPluginListener = new ContainerPluginsRegistry.PluginRegistryListener() {
+ @Override
+ public void added(ContainerPluginsRegistry.ApiInfo plugin) {
+ if (plugin == null || plugin.getInstance() == null) {
+ return;
+ }
+ Object instance = plugin.getInstance();
+ if (instance instanceof ClusterEventListener) {
+ registerListener((ClusterEventListener) instance);
+ }
+ }
+
+ @Override
+ public void deleted(ContainerPluginsRegistry.ApiInfo plugin) {
+ if (plugin == null || plugin.getInstance() == null) {
+ return;
+ }
+ Object instance = plugin.getInstance();
+ if (instance instanceof ClusterEventListener) {
+ unregisterListener((ClusterEventListener) instance);
+ }
+ }
+
+ @Override
+ public void modified(ContainerPluginsRegistry.ApiInfo old, ContainerPluginsRegistry.ApiInfo replacement) {
+ added(replacement);
+ deleted(old);
+ }
+ };
+ }
+
+ @Override
+ public Set<ClusterEvent.EventType> getSupportedEventTypes() {
+ return NoOpProducer.ALL_EVENT_TYPES;
+ }
+
+ /**
+ * This method returns an initial plugin registry listener that helps to capture the
+ * freshly loaded listener plugins before the final cluster event producer is created.
+ * @return initial listener
+ */
+ public ContainerPluginsRegistry.PluginRegistryListener getPluginRegistryListener() {
+ return initialPluginListener;
+ }
+
+ /**
+ * Create a {@link ClusterEventProducer} based on the current plugin configurations.
+ * <p>NOTE: this method can only be called once because it has side-effects, such as
+ * transferring the initially collected listeners to the resulting producer's instance, and
+ * installing a {@link org.apache.solr.api.ContainerPluginsRegistry.PluginRegistryListener}.
+ * Calling this method more than once will result in an exception.</p>
+ * @param plugins current plugin configurations
+ * @return configured instance of cluster event producer (with side-effects, see above)
+ */
+ public DelegatingClusterEventProducer create(ContainerPluginsRegistry plugins) {
+ if (created) {
+ throw new RuntimeException("this factory can be called only once!");
+ }
+ final DelegatingClusterEventProducer clusterEventProducer = new DelegatingClusterEventProducer(cc);
+ // since this is a ClusterSingleton, register it as such, under unique name
+ cc.getClusterSingletons().getSingletons().put(ClusterEventProducer.PLUGIN_NAME +"_delegate", clusterEventProducer);
+ ContainerPluginsRegistry.ApiInfo clusterEventProducerInfo = plugins.getPlugin(ClusterEventProducer.PLUGIN_NAME);
+ if (clusterEventProducerInfo != null) {
+ // the listener in ClusterSingletons already registered this instance
+ clusterEventProducer.setDelegate((ClusterEventProducer) clusterEventProducerInfo.getInstance());
+ } else {
+ // use the default NoOp impl
+ }
+ // transfer those listeners that were already registered to the initial impl
+ transferListeners(clusterEventProducer, plugins);
+
+ // install plugin registry listener that maintains plugin-based listeners in
+ // the event producer impl
+ ContainerPluginsRegistry.PluginRegistryListener pluginListener = new ContainerPluginsRegistry.PluginRegistryListener() {
+ @Override
+ public void added(ContainerPluginsRegistry.ApiInfo plugin) {
+ if (plugin == null || plugin.getInstance() == null) {
+ return;
+ }
+ Object instance = plugin.getInstance();
+ if (instance instanceof ClusterEventListener) {
+ ClusterEventListener listener = (ClusterEventListener) instance;
+ clusterEventProducer.registerListener(listener);
+ } else if (instance instanceof ClusterEventProducer) {
+ // replace the existing impl
+ if (cc.getClusterEventProducer() instanceof DelegatingClusterEventProducer) {
+ ((DelegatingClusterEventProducer) cc.getClusterEventProducer())
+ .setDelegate((ClusterEventProducer) instance);
+ } else {
+ log.warn("Can't configure plugin-based ClusterEventProducer while CoreContainer is still loading - " +
+ " using existing implementation {}", cc.getClusterEventProducer().getClass().getName());
+ }
+ }
+ }
+
+ @Override
+ public void deleted(ContainerPluginsRegistry.ApiInfo plugin) {
+ if (plugin == null || plugin.getInstance() == null) {
+ return;
+ }
+ Object instance = plugin.getInstance();
+ if (instance instanceof ClusterEventListener) {
+ ClusterEventListener listener = (ClusterEventListener) instance;
+ clusterEventProducer.unregisterListener(listener);
+ } else if (instance instanceof ClusterEventProducer) {
+ // replace the existing impl with NoOp
+ if (cc.getClusterEventProducer() instanceof DelegatingClusterEventProducer) {
+ ((DelegatingClusterEventProducer) cc.getClusterEventProducer())
+ .setDelegate(new NoOpProducer(cc));
+ } else {
+ log.warn("Can't configure plugin-based ClusterEventProducer while CoreContainer is still loading - " +
+ " using existing implementation {}", cc.getClusterEventProducer().getClass().getName());
+ }
+ }
+ }
+
+ @Override
+ public void modified(ContainerPluginsRegistry.ApiInfo old, ContainerPluginsRegistry.ApiInfo replacement) {
+ added(replacement);
+ deleted(old);
+ }
+ };
+ plugins.registerListener(pluginListener);
+ created = true;
+ return clusterEventProducer;
+ }
+
+ private void transferListeners(ClusterEventProducer target, ContainerPluginsRegistry plugins) {
+ synchronized (listeners) {
+ // stop capturing listener plugins
+ plugins.unregisterListener(initialPluginListener);
+ // transfer listeners that are already registered
+ listeners.forEach((type, listenersSet) -> {
+ listenersSet.forEach(listener -> target.registerListener(listener, type));
+ });
+ listeners.clear();
+ }
+ }
+
+ @Override
+ public void start() throws Exception {
+ state = State.RUNNING;
+ }
+
+ @Override
+ public void stop() {
+ state = State.STOPPED;
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
new file mode 100644
index 0000000..48400f8
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events.impl;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.ClusterSingleton;
+import org.apache.solr.cloud.api.collections.Assign;
+import org.apache.solr.cluster.events.ClusterEvent;
+import org.apache.solr.cluster.events.ClusterEventListener;
+import org.apache.solr.cluster.events.NodesDownEvent;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ReplicaPosition;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
+import org.apache.solr.core.CoreContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is an illustration how to re-implement the combination of Solr 8x
+ * NodeLostTrigger and AutoAddReplicasPlanAction to maintain the collection's replicas when
+ * nodes are lost.
+ * <p>The notion of <code>waitFor</code> delay between detection and repair action is
+ * implemented as a scheduled execution of the repair method, which is called every 1 sec
+ * to check whether there are any lost nodes that exceeded their <code>waitFor</code> period.</p>
+ * <p>NOTE: this functionality would be probably more reliable when executed also as a
+ * periodically scheduled check - both as a reactive (listener) and proactive (scheduled) measure.</p>
+ */
+public class CollectionsRepairEventListener implements ClusterEventListener, ClusterSingleton, Closeable {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static final String PLUGIN_NAME = "collectionsRepairListener";
+ public static final int DEFAULT_WAIT_FOR_SEC = 30;
+
+ private static final String ASYNC_ID_PREFIX = "_async_" + PLUGIN_NAME;
+ private static final AtomicInteger counter = new AtomicInteger();
+
+ private final SolrClient solrClient;
+ private final SolrCloudManager solrCloudManager;
+
+ private State state = State.STOPPED;
+
+ private int waitForSecond = DEFAULT_WAIT_FOR_SEC;
+
+ private ScheduledThreadPoolExecutor waitForExecutor;
+
+ public CollectionsRepairEventListener(CoreContainer cc) {
+ this.solrClient = cc.getSolrClientCache().getCloudSolrClient(cc.getZkController().getZkClient().getZkServerAddress());
+ this.solrCloudManager = cc.getZkController().getSolrCloudManager();
+ }
+
+ @VisibleForTesting
+ public void setWaitForSecond(int waitForSecond) {
+ if (log.isDebugEnabled()) {
+ log.debug("-- setting waitFor={}", waitForSecond);
+ }
+ this.waitForSecond = waitForSecond;
+ }
+
+ @Override
+ public String getName() {
+ return PLUGIN_NAME;
+ }
+
+ @Override
+ public void onEvent(ClusterEvent event) {
+ if (state != State.RUNNING) {
+ // ignore the event
+ return;
+ }
+ switch (event.getType()) {
+ case NODES_DOWN:
+ handleNodesDown((NodesDownEvent) event);
+ break;
+ default:
+ log.warn("Unsupported event {}, ignoring...", event);
+ }
+ }
+
+ private Map<String, Long> nodeNameVsTimeRemoved = new ConcurrentHashMap<>();
+
+ private void handleNodesDown(NodesDownEvent event) {
+
+ // tracking for the purpose of "waitFor" delay
+
+ // have any nodes that we were tracking been added to the cluster?
+ // if so, remove them from the tracking map
+ Set<String> trackingKeySet = nodeNameVsTimeRemoved.keySet();
+ trackingKeySet.removeAll(solrCloudManager.getClusterStateProvider().getLiveNodes());
+ // add any new lost nodes (old lost nodes are skipped)
+ event.getNodeNames().forEachRemaining(lostNode -> {
+ nodeNameVsTimeRemoved.computeIfAbsent(lostNode, n -> solrCloudManager.getTimeSource().getTimeNs());
+ });
+ }
+
+ private void runRepair() {
+ if (nodeNameVsTimeRemoved.isEmpty()) {
+ // nothing to do
+ return;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("-- runRepair for {} lost nodes", nodeNameVsTimeRemoved.size());
+ }
+ Set<String> reallyLostNodes = new HashSet<>();
+ nodeNameVsTimeRemoved.forEach((lostNode, timeRemoved) -> {
+ long now = solrCloudManager.getTimeSource().getTimeNs();
+ long te = TimeUnit.SECONDS.convert(now - timeRemoved, TimeUnit.NANOSECONDS);
+ if (te >= waitForSecond) {
+ reallyLostNodes.add(lostNode);
+ }
+ });
+ if (reallyLostNodes.isEmpty()) {
+ if (log.isDebugEnabled()) {
+ log.debug("--- skipping repair, {} nodes are still in waitFor period", nodeNameVsTimeRemoved.size());
+ }
+ return;
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("--- running repair for nodes that are still lost after waitFor: {}", reallyLostNodes);
+ }
+ }
+ // collect all lost replicas
+ // collection / positions
+ Map<String, List<ReplicaPosition>> newPositions = new HashMap<>();
+ try {
+ ClusterState clusterState = solrCloudManager.getClusterStateProvider().getClusterState();
+ clusterState.forEachCollection(coll -> {
+ // shard / type / count
+ Map<String, Map<Replica.Type, AtomicInteger>> lostReplicas = new HashMap<>();
+ coll.forEachReplica((shard, replica) -> {
+ if (reallyLostNodes.contains(replica.getNodeName())) {
+ lostReplicas.computeIfAbsent(shard, s -> new HashMap<>())
+ .computeIfAbsent(replica.type, t -> new AtomicInteger())
+ .incrementAndGet();
+ }
+ });
+ Assign.AssignStrategy assignStrategy = Assign.createAssignStrategy(solrCloudManager, clusterState, coll);
+ lostReplicas.forEach((shard, types) -> {
+ Assign.AssignRequestBuilder assignRequestBuilder = new Assign.AssignRequestBuilder()
+ .forCollection(coll.getName())
+ .forShard(Collections.singletonList(shard));
+ types.forEach((type, count) -> {
+ switch (type) {
+ case NRT:
+ assignRequestBuilder.assignNrtReplicas(count.get());
+ break;
+ case PULL:
+ assignRequestBuilder.assignPullReplicas(count.get());
+ break;
+ case TLOG:
+ assignRequestBuilder.assignTlogReplicas(count.get());
+ break;
+ }
+ });
+ Assign.AssignRequest assignRequest = assignRequestBuilder.build();
+ try {
+ List<ReplicaPosition> positions = assignStrategy.assign(solrCloudManager, assignRequest);
+ newPositions.put(coll.getName(), positions);
+ } catch (Exception e) {
+ log.warn("Exception computing positions for {}/{}: {}", coll.getName(), shard, e);
+ return;
+ }
+ });
+ });
+ } catch (IOException e) {
+ log.warn("Exception getting cluster state", e);
+ return;
+ }
+
+ // remove all nodes with expired waitFor from the tracking set
+ nodeNameVsTimeRemoved.keySet().removeAll(reallyLostNodes);
+
+ // send ADDREPLICA admin requests for each lost replica
+ // XXX should we use 'async' for that, to avoid blocking here?
+ List<CollectionAdminRequest.AddReplica> addReplicas = new ArrayList<>();
+ newPositions.forEach((collection, positions) -> {
+ positions.forEach(position -> {
+ CollectionAdminRequest.AddReplica addReplica = CollectionAdminRequest
+ .addReplicaToShard(collection, position.shard, position.type);
+ addReplica.setNode(position.node);
+ addReplica.setAsyncId(ASYNC_ID_PREFIX + counter.incrementAndGet());
+ addReplicas.add(addReplica);
+ });
+ });
+ addReplicas.forEach(addReplica -> {
+ try {
+ solrClient.request(addReplica);
+ } catch (Exception e) {
+ log.warn("Exception calling ADDREPLICA {}: {}", addReplica.getParams().toQueryString(), e);
+ }
+ });
+ }
+
+ @Override
+ public void start() throws Exception {
+ state = State.STARTING;
+ waitForExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1,
+ new SolrNamedThreadFactory("collectionsRepair_waitFor"));
+ waitForExecutor.setRemoveOnCancelPolicy(true);
+ waitForExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ waitForExecutor.scheduleAtFixedRate(() -> runRepair(), 0, waitForSecond, TimeUnit.SECONDS);
+ state = State.RUNNING;
+ }
+
+ @Override
+ public State getState() {
+ return state;
+ }
+
+ @Override
+ public void stop() {
+ state = State.STOPPING;
+ waitForExecutor.shutdownNow();
+ try {
+ waitForExecutor.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.warn("Failed to shut down the waitFor executor - interrupted...");
+ Thread.currentThread().interrupt();
+ }
+ waitForExecutor = null;
+ state = State.STOPPED;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("-- close() called");
+ }
+ stop();
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/DefaultClusterEventProducer.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/DefaultClusterEventProducer.java
new file mode 100644
index 0000000..e24fb40
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/DefaultClusterEventProducer.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events.impl;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cluster.events.ClusterEventProducerBase;
+import org.apache.solr.cluster.events.ClusterPropertiesChangedEvent;
+import org.apache.solr.cluster.events.ClusterEvent;
+import org.apache.solr.cluster.events.ClusterEventProducer;
+import org.apache.solr.cluster.events.CollectionsAddedEvent;
+import org.apache.solr.cluster.events.CollectionsRemovedEvent;
+import org.apache.solr.cluster.events.NodesDownEvent;
+import org.apache.solr.cluster.events.NodesUpEvent;
+import org.apache.solr.common.cloud.CloudCollectionsListener;
+import org.apache.solr.common.cloud.ClusterPropertiesListener;
+import org.apache.solr.common.cloud.LiveNodesListener;
+import org.apache.solr.core.CoreContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of {@link ClusterEventProducer}.
+ * <h3>Implementation notes</h3>
+ * <p>For each cluster event relevant listeners are always invoked sequentially
+ * (not in parallel) and in arbitrary order. This means that if any listener blocks the
+ * processing other listeners may be invoked much later or not at all.</p>
+ */
+public class DefaultClusterEventProducer extends ClusterEventProducerBase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private LiveNodesListener liveNodesListener;
+ private CloudCollectionsListener cloudCollectionsListener;
+ private ClusterPropertiesListener clusterPropertiesListener;
+ private ZkController zkController;
+
+ private final Set<ClusterEvent.EventType> supportedEvents =
+ new HashSet<>(Arrays.asList(
+ ClusterEvent.EventType.NODES_DOWN,
+ ClusterEvent.EventType.NODES_UP,
+ ClusterEvent.EventType.COLLECTIONS_ADDED,
+ ClusterEvent.EventType.COLLECTIONS_REMOVED,
+ ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED
+ ));
+
+ public DefaultClusterEventProducer(CoreContainer cc) {
+ super(cc);
+ }
+
+ // ClusterSingleton lifecycle methods
+ @Override
+ public synchronized void start() {
+ if (cc == null) {
+ liveNodesListener = null;
+ cloudCollectionsListener = null;
+ clusterPropertiesListener = null;
+ state = State.STOPPED;
+ return;
+ }
+ if (state == State.RUNNING) {
+ log.warn("Double start() invoked on {}, ignoring", this);
+ return;
+ }
+ state = State.STARTING;
+ this.zkController = this.cc.getZkController();
+
+ // clean up any previous instances
+ doStop();
+
+ // register liveNodesListener
+ liveNodesListener = (oldNodes, newNodes) -> {
+ // already closed but still registered
+ if (state == State.STOPPING || state == State.STOPPED) {
+ // remove the listener
+ return true;
+ }
+ // spurious event, ignore but keep listening
+ if (oldNodes.equals(newNodes)) {
+ return false;
+ }
+ final Instant now = Instant.now();
+ final Set<String> downNodes = new HashSet<>(oldNodes);
+ downNodes.removeAll(newNodes);
+ if (!downNodes.isEmpty()) {
+ fireEvent(new NodesDownEvent() {
+ @Override
+ public Iterator<String> getNodeNames() {
+ return downNodes.iterator();
+ }
+
+ @Override
+ public Instant getTimestamp() {
+ return now;
+ }
+ });
+ }
+ final Set<String> upNodes = new HashSet<>(newNodes);
+ upNodes.removeAll(oldNodes);
+ if (!upNodes.isEmpty()) {
+ fireEvent(new NodesUpEvent() {
+ @Override
+ public Iterator<String> getNodeNames() {
+ return upNodes.iterator();
+ }
+
+ @Override
+ public Instant getTimestamp() {
+ return now;
+ }
+ });
+ }
+ return false;
+ };
+ zkController.zkStateReader.registerLiveNodesListener(liveNodesListener);
+
+ cloudCollectionsListener = ((oldCollections, newCollections) -> {
+ if (oldCollections.equals(newCollections)) {
+ return;
+ }
+ final Instant now = Instant.now();
+ final Set<String> removed = new HashSet<>(oldCollections);
+ removed.removeAll(newCollections);
+ if (!removed.isEmpty()) {
+ fireEvent(new CollectionsRemovedEvent() {
+ @Override
+ public Iterator<String> getCollectionNames() {
+ return removed.iterator();
+ }
+
+ @Override
+ public Instant getTimestamp() {
+ return now;
+ }
+ });
+ }
+ final Set<String> added = new HashSet<>(newCollections);
+ added.removeAll(oldCollections);
+ if (!added.isEmpty()) {
+ fireEvent(new CollectionsAddedEvent() {
+ @Override
+ public Iterator<String> getCollectionNames() {
+ return added.iterator();
+ }
+
+ @Override
+ public Instant getTimestamp() {
+ return now;
+ }
+ });
+ }
+ });
+ zkController.zkStateReader.registerCloudCollectionsListener(cloudCollectionsListener);
+
+ clusterPropertiesListener = (newProperties) -> {
+ fireEvent(new ClusterPropertiesChangedEvent() {
+ final Instant now = Instant.now();
+ @Override
+ public Map<String, Object> getNewClusterProperties() {
+ return newProperties;
+ }
+
+ @Override
+ public Instant getTimestamp() {
+ return now;
+ }
+ });
+ return false;
+ };
+ zkController.zkStateReader.registerClusterPropertiesListener(clusterPropertiesListener);
+
+ // XXX register collection state listener?
+ // XXX not sure how to efficiently monitor for REPLICA_DOWN events
+
+ state = State.RUNNING;
+ }
+
+ @Override
+ public Set<ClusterEvent.EventType> getSupportedEventTypes() {
+ return supportedEvents;
+ }
+
+ @Override
+ public synchronized void stop() {
+ state = State.STOPPING;
+ doStop();
+ state = State.STOPPED;
+ }
+
+ private void doStop() {
+ if (liveNodesListener != null) {
+ zkController.zkStateReader.removeLiveNodesListener(liveNodesListener);
+ }
+ if (cloudCollectionsListener != null) {
+ zkController.zkStateReader.removeCloudCollectionsListener(cloudCollectionsListener);
+ }
+ if (clusterPropertiesListener != null) {
+ zkController.zkStateReader.removeClusterPropertiesListener(clusterPropertiesListener);
+ }
+ liveNodesListener = null;
+ cloudCollectionsListener = null;
+ clusterPropertiesListener = null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ stop();
+ super.close();
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/DelegatingClusterEventProducer.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/DelegatingClusterEventProducer.java
new file mode 100644
index 0000000..8d8a972
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/DelegatingClusterEventProducer.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cluster.events.impl;
+
+import org.apache.solr.cluster.events.ClusterEvent;
+import org.apache.solr.cluster.events.ClusterEventListener;
+import org.apache.solr.cluster.events.ClusterEventProducer;
+import org.apache.solr.cluster.events.NoOpProducer;
+import org.apache.solr.cluster.events.ClusterEventProducerBase;
+import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.core.CoreContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Set;
+
+/**
+ * This implementation allows Solr to dynamically change the underlying implementation
+ * of {@link ClusterEventProducer} in response to the changed plugin configuration.
+ */
+public final class DelegatingClusterEventProducer extends ClusterEventProducerBase {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private ClusterEventProducer delegate;
+
+ public DelegatingClusterEventProducer(CoreContainer cc) {
+ super(cc);
+ delegate = new NoOpProducer(cc);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("--closing delegate for CC-{}: {}", Integer.toHexString(cc.hashCode()), delegate);
+ }
+ IOUtils.closeQuietly(delegate);
+ super.close();
+ }
+
+ public void setDelegate(ClusterEventProducer newDelegate) {
+ if (log.isDebugEnabled()) {
+ log.debug("--setting new delegate for CC-{}: {}", Integer.toHexString(cc.hashCode()), newDelegate);
+ }
+ this.delegate = newDelegate;
+ // transfer all listeners to the new delegate
+ listeners.forEach((type, listenerSet) -> {
+ listenerSet.forEach(listener -> {
+ try {
+ delegate.registerListener(listener, type);
+ } catch (Exception e) {
+ log.warn("Exception registering listener with the new event producer", e);
+ // make sure it's not registered
+ delegate.unregisterListener(listener, type);
+ // unregister it here, too
+ super.unregisterListener(listener, type);
+ }
+ });
+ });
+ if ((state == State.RUNNING || state == State.STARTING) &&
+ !(delegate.getState() == State.RUNNING || delegate.getState() == State.STARTING)) {
+ try {
+ delegate.start();
+ if (log.isDebugEnabled()) {
+ log.debug("--- started delegate {}", delegate);
+ }
+ } catch (Exception e) {
+ log.warn("Unable to start the new delegate {}: {}", delegate.getClass().getName(), e);
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("--- delegate {} already in state {}", delegate, delegate.getState());
+ }
+ }
+ }
+
+ @Override
+ public void registerListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
+ super.registerListener(listener, eventTypes);
+ delegate.registerListener(listener, eventTypes);
+ }
+
+ @Override
+ public void unregisterListener(ClusterEventListener listener, ClusterEvent.EventType... eventTypes) {
+ super.unregisterListener(listener, eventTypes);
+ delegate.unregisterListener(listener, eventTypes);
+ }
+
+ @Override
+ public synchronized void start() throws Exception {
+ if (log.isDebugEnabled()) {
+ log.debug("-- starting CC-{}, Delegating {}, delegate {}",
+ Integer.toHexString(cc.hashCode()), Integer.toHexString(hashCode()), delegate);
+ }
+ state = State.STARTING;
+ if (!(delegate.getState() == State.RUNNING || delegate.getState() == State.STARTING)) {
+ try {
+ delegate.start();
+ if (log.isDebugEnabled()) {
+ log.debug("--- started delegate {}", delegate);
+ }
+ } finally {
+ state = delegate.getState();
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("--- delegate {} already in state {}", delegate, delegate.getState());
+ }
+ }
+ }
+
+ @Override
+ public Set<ClusterEvent.EventType> getSupportedEventTypes() {
+ return NoOpProducer.ALL_EVENT_TYPES;
+ }
+
+ @Override
+ public synchronized void stop() {
+ if (log.isDebugEnabled()) {
+ log.debug("-- stopping Delegating {}, delegate {}", Integer.toHexString(hashCode()), delegate);
+ }
+ state = State.STOPPING;
+ delegate.stop();
+ state = delegate.getState();
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/package-info.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/package-info.java
new file mode 100644
index 0000000..fc00246
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Default implementation of {@link org.apache.solr.cluster.events.ClusterEventProducer}.
+ */
+package org.apache.solr.cluster.events.impl;
+
+
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/package-info.java b/solr/core/src/java/org/apache/solr/cluster/events/package-info.java
new file mode 100644
index 0000000..a334a00
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cluster/events/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Interfaces defining support for cluster-level event generation and processing.
+ */
+package org.apache.solr.cluster.events;
+
+
diff --git a/solr/core/src/java/org/apache/solr/core/ClusterSingletons.java b/solr/core/src/java/org/apache/solr/core/ClusterSingletons.java
index 84acb4f..82564e6 100644
--- a/solr/core/src/java/org/apache/solr/core/ClusterSingletons.java
+++ b/solr/core/src/java/org/apache/solr/core/ClusterSingletons.java
@@ -17,7 +17,7 @@
package org.apache.solr.core;
-import org.apache.solr.api.CustomContainerPlugins;
+import org.apache.solr.api.ContainerPluginsRegistry;
import org.apache.solr.cloud.ClusterSingleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,7 +33,7 @@ import java.util.function.Supplier;
/**
* Helper class to manage the initial registration of {@link ClusterSingleton} plugins and
- * to track the changes in loaded plugins in {@link org.apache.solr.api.CustomContainerPlugins}.
+ * to track the changes in loaded plugins in {@link ContainerPluginsRegistry}.
*/
public class ClusterSingletons {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -41,7 +41,7 @@ public class ClusterSingletons {
private final Map<String, ClusterSingleton> singletonMap = new ConcurrentHashMap<>();
private final Supplier<Boolean> runSingletons;
private final Consumer<Runnable> asyncRunner;
- private final CustomContainerPlugins.PluginRegistryListener pluginListener;
+ private final ContainerPluginsRegistry.PluginRegistryListener pluginListener;
public static final int DEFAULT_WAIT_TIMEOUT_SEC = 60;
@@ -60,9 +60,9 @@ public class ClusterSingletons {
this.runSingletons = runSingletons;
this.asyncRunner = asyncRunner;
// create plugin registry listener
- pluginListener = new CustomContainerPlugins.PluginRegistryListener() {
+ pluginListener = new ContainerPluginsRegistry.PluginRegistryListener() {
@Override
- public void added(CustomContainerPlugins.ApiInfo plugin) {
+ public void added(ContainerPluginsRegistry.ApiInfo plugin) {
if (plugin == null || plugin.getInstance() == null) {
return;
}
@@ -83,7 +83,7 @@ public class ClusterSingletons {
}
@Override
- public void deleted(CustomContainerPlugins.ApiInfo plugin) {
+ public void deleted(ContainerPluginsRegistry.ApiInfo plugin) {
if (plugin == null || plugin.getInstance() == null) {
return;
}
@@ -96,14 +96,14 @@ public class ClusterSingletons {
}
@Override
- public void modified(CustomContainerPlugins.ApiInfo old, CustomContainerPlugins.ApiInfo replacement) {
+ public void modified(ContainerPluginsRegistry.ApiInfo old, ContainerPluginsRegistry.ApiInfo replacement) {
added(replacement);
deleted(old);
}
};
}
- public CustomContainerPlugins.PluginRegistryListener getPluginRegistryListener() {
+ public ContainerPluginsRegistry.PluginRegistryListener getPluginRegistryListener() {
return pluginListener;
}
@@ -151,9 +151,6 @@ public class ClusterSingletons {
*/
public void startClusterSingletons() {
final Runnable initializer = () -> {
- if (!runSingletons.get()) {
- return;
- }
try {
waitUntilReady(DEFAULT_WAIT_TIMEOUT_SEC, TimeUnit.SECONDS);
} catch (InterruptedException e) {
@@ -163,6 +160,9 @@ public class ClusterSingletons {
log.warn("Timed out during initialization of ClusterSingleton-s (waited {} sec)", DEFAULT_WAIT_TIMEOUT_SEC);
return;
}
+ if (!runSingletons.get()) {
+ return;
+ }
singletonMap.forEach((name, singleton) -> {
if (!runSingletons.get()) {
return;
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 2e86b58..831f81e 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -57,7 +57,7 @@ import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
-import org.apache.solr.api.CustomContainerPlugins;
+import org.apache.solr.api.ContainerPluginsRegistry;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer;
@@ -73,6 +73,8 @@ import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ClusterSingleton;
import org.apache.solr.cloud.OverseerTaskQueue;
import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cluster.events.ClusterEventProducer;
+import org.apache.solr.cluster.events.impl.ClusterEventProducerFactory;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
@@ -178,7 +180,7 @@ public class CoreContainer {
*/
public final Supplier<SolrZkClient> zkClientSupplier = () -> getZkController().getZkClient();
- private final CustomContainerPlugins customContainerPlugins = new CustomContainerPlugins(this, containerHandlers.getApiBag());
+ private final ContainerPluginsRegistry containerPluginsRegistry = new ContainerPluginsRegistry(this, containerHandlers.getApiBag());
protected final Map<String, CoreLoadFailure> coreInitFailures = new ConcurrentHashMap<>();
@@ -253,6 +255,10 @@ public class CoreContainer {
getZkController().getOverseer() != null &&
!getZkController().getOverseer().isClosed(),
(r) -> this.runAsync(r));
+
+ // initially these are the same to collect the plugin-based listeners during init
+ private ClusterEventProducer clusterEventProducer;
+
private PackageStoreAPI packageStoreAPI;
private PackageLoader packageLoader;
@@ -671,7 +677,11 @@ public class CoreContainer {
loader.reloadLuceneSPI();
}
- customContainerPlugins.registerListener(clusterSingletons.getPluginRegistryListener());
+ ClusterEventProducerFactory clusterEventProducerFactory = new ClusterEventProducerFactory(this);
+ clusterEventProducer = clusterEventProducerFactory;
+
+ containerPluginsRegistry.registerListener(clusterSingletons.getPluginRegistryListener());
+ containerPluginsRegistry.registerListener(clusterEventProducerFactory.getPluginRegistryListener());
packageStoreAPI = new PackageStoreAPI(this);
containerHandlers.getApiBag().registerObject(packageStoreAPI.readAPI);
@@ -881,12 +891,15 @@ public class CoreContainer {
}
if (isZooKeeperAware()) {
- customContainerPlugins.refresh();
- getZkController().zkStateReader.registerClusterPropertiesListener(customContainerPlugins);
+ containerPluginsRegistry.refresh();
+ getZkController().zkStateReader.registerClusterPropertiesListener(containerPluginsRegistry);
ContainerPluginsApi containerPluginsApi = new ContainerPluginsApi(this);
containerHandlers.getApiBag().registerObject(containerPluginsApi.readAPI);
containerHandlers.getApiBag().registerObject(containerPluginsApi.editAPI);
+ // create target ClusterEventProducer (possibly from plugins)
+ clusterEventProducer = clusterEventProducerFactory.create(containerPluginsRegistry);
+
// init ClusterSingleton-s
// register the handlers that are also ClusterSingleton
@@ -1117,6 +1130,9 @@ public class CoreContainer {
if (solrClientCache != null) {
solrClientCache.close();
}
+ if (containerPluginsRegistry != null) {
+ IOUtils.closeQuietly(containerPluginsRegistry);
+ }
} finally {
try {
@@ -2137,14 +2153,18 @@ public class CoreContainer {
return tragicException != null;
}
- public CustomContainerPlugins getCustomContainerPlugins(){
- return customContainerPlugins;
+ public ContainerPluginsRegistry getContainerPluginsRegistry() {
+ return containerPluginsRegistry;
}
public ClusterSingletons getClusterSingletons() {
return clusterSingletons;
}
+ public ClusterEventProducer getClusterEventProducer() {
+ return clusterEventProducer;
+ }
+
static {
ExecutorUtil.addThreadLocalProvider(SolrRequestInfo.getInheritableThreadLocalProvider());
}
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ContainerPluginsApi.java b/solr/core/src/java/org/apache/solr/handler/admin/ContainerPluginsApi.java
index f6af915..aff5484 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ContainerPluginsApi.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ContainerPluginsApi.java
@@ -28,7 +28,7 @@ import java.util.function.Supplier;
import org.apache.solr.api.AnnotatedApi;
import org.apache.solr.api.Command;
-import org.apache.solr.api.CustomContainerPlugins;
+import org.apache.solr.api.ContainerPluginsRegistry;
import org.apache.solr.api.EndPoint;
import org.apache.solr.api.PayloadObj;
import org.apache.solr.client.solrj.SolrRequest.METHOD;
@@ -137,7 +137,7 @@ public class ContainerPluginsApi {
}
}
List<String> errs = new ArrayList<>();
- CustomContainerPlugins.ApiInfo apiInfo = coreContainer.getCustomContainerPlugins().createInfo(info, errs);
+ ContainerPluginsRegistry.ApiInfo apiInfo = coreContainer.getContainerPluginsRegistry().createInfo(info, errs);
if (!errs.isEmpty()) {
for (String err : errs) payload.addError(err);
return;
diff --git a/solr/core/src/test/org/apache/solr/cluster/events/AllEventsListener.java b/solr/core/src/test/org/apache/solr/cluster/events/AllEventsListener.java
new file mode 100644
index 0000000..873a0d5
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cluster/events/AllEventsListener.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.events;
+
+import org.junit.Assert;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+public class AllEventsListener implements ClusterEventListener {
+ CountDownLatch eventLatch = new CountDownLatch(1);
+ ClusterEvent.EventType expectedType;
+ Map<ClusterEvent.EventType, List<ClusterEvent>> events = new HashMap<>();
+
+ @Override
+ public void onEvent(ClusterEvent event) {
+ events.computeIfAbsent(event.getType(), type -> new ArrayList<>()).add(event);
+ if (event.getType() == expectedType) {
+ eventLatch.countDown();
+ }
+ }
+
+ public void setExpectedType(ClusterEvent.EventType expectedType) {
+ this.expectedType = expectedType;
+ eventLatch = new CountDownLatch(1);
+ }
+
+ public void waitForExpectedEvent(int timeoutSeconds) throws InterruptedException {
+ boolean await = eventLatch.await(timeoutSeconds, TimeUnit.SECONDS);
+ if (!await) {
+ Assert.fail("Timed out waiting for expected event " + expectedType);
+ }
+ }
+
+ public void close() throws IOException {
+
+ }
+}
diff --git a/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java b/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java
new file mode 100644
index 0000000..3c71823
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.events;
+
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.V2Request;
+import org.apache.solr.client.solrj.request.beans.PluginMeta;
+import org.apache.solr.client.solrj.response.V2Response;
+import org.apache.solr.cloud.ClusterSingleton;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.cluster.events.impl.DefaultClusterEventProducer;
+import org.apache.solr.common.cloud.ClusterProperties;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.util.LogLevel;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.solr.client.solrj.SolrRequest.METHOD.GET;
+import static org.apache.solr.client.solrj.SolrRequest.METHOD.POST;
+
+/**
+ *
+ */
+@LogLevel("org.apache.solr.cluster.events=DEBUG")
+public class ClusterEventProducerTest extends SolrCloudTestCase {
+
+ private AllEventsListener eventsListener;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(3)
+ .addConfig("conf", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
+ .configure();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ System.setProperty("enable.packages", "true");
+ super.setUp();
+ cluster.deleteAllCollections();
+ eventsListener = new AllEventsListener();
+ cluster.getOpenOverseer().getCoreContainer().getClusterEventProducer().registerListener(eventsListener);
+ }
+
+ @After
+ public void teardown() throws Exception {
+ System.clearProperty("enable.packages");
+ if (eventsListener != null) {
+ cluster.getOpenOverseer().getCoreContainer().getClusterEventProducer().unregisterListener(eventsListener);
+ eventsListener.events.clear();
+ }
+ V2Request readPluginState = new V2Request.Builder("/cluster/plugin")
+ .forceV2(true)
+ .withMethod(GET)
+ .build();
+ V2Response rsp = readPluginState.process(cluster.getSolrClient());
+ if (rsp._getStr("/plugin/" + ClusterEventProducer.PLUGIN_NAME + "/class", null) != null) {
+ V2Request req = new V2Request.Builder("/cluster/plugin")
+ .withMethod(POST)
+ .withPayload(Collections.singletonMap("remove", ClusterEventProducer.PLUGIN_NAME))
+ .build();
+ req.process(cluster.getSolrClient());
+ }
+ }
+
+ @Test
+ public void testEvents() throws Exception {
+ PluginMeta plugin = new PluginMeta();
+ plugin.klass = DefaultClusterEventProducer.class.getName();
+ plugin.name = ClusterEventProducer.PLUGIN_NAME;
+ V2Request req = new V2Request.Builder("/cluster/plugin")
+ .withMethod(POST)
+ .withPayload(Collections.singletonMap("add", plugin))
+ .build();
+ V2Response rsp = req.process(cluster.getSolrClient());
+ assertEquals(0, rsp.getStatus());
+
+ // NODES_DOWN
+
+ eventsListener.setExpectedType(ClusterEvent.EventType.NODES_DOWN);
+
+ // don't kill Overseer
+ JettySolrRunner nonOverseerJetty = null;
+ for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
+ if (cluster.getOpenOverseer().getCoreContainer().getZkController().getNodeName().equals(jetty.getNodeName())) {
+ continue;
+ }
+ nonOverseerJetty = jetty;
+ break;
+ }
+ String nodeName = nonOverseerJetty.getNodeName();
+ cluster.stopJettySolrRunner(nonOverseerJetty);
+ cluster.waitForJettyToStop(nonOverseerJetty);
+ eventsListener.waitForExpectedEvent(30);
+ assertNotNull("should be NODES_DOWN events", eventsListener.events.get(ClusterEvent.EventType.NODES_DOWN));
+ List<ClusterEvent> events = eventsListener.events.get(ClusterEvent.EventType.NODES_DOWN);
+ assertEquals("should be one NODES_DOWN event", 1, events.size());
+ ClusterEvent event = events.get(0);
+ assertEquals("should be NODES_DOWN event type", ClusterEvent.EventType.NODES_DOWN, event.getType());
+ NodesDownEvent nodesDown = (NodesDownEvent) event;
+ assertEquals("should be node " + nodeName, nodeName, nodesDown.getNodeNames().next());
+
+ // NODES_UP
+ eventsListener.setExpectedType(ClusterEvent.EventType.NODES_UP);
+ JettySolrRunner newNode = cluster.startJettySolrRunner();
+ cluster.waitForNode(newNode, 60);
+ eventsListener.waitForExpectedEvent(30);
+ assertNotNull("should be NODES_UP events", eventsListener.events.get(ClusterEvent.EventType.NODES_UP));
+ events = eventsListener.events.get(ClusterEvent.EventType.NODES_UP);
+ assertEquals("should be one NODES_UP event", 1, events.size());
+ event = events.get(0);
+ assertEquals("should be NODES_UP event type", ClusterEvent.EventType.NODES_UP, event.getType());
+ NodesUpEvent nodesUp = (NodesUpEvent) event;
+ assertEquals("should be node " + newNode.getNodeName(), newNode.getNodeName(), nodesUp.getNodeNames().next());
+
+ // COLLECTIONS_ADDED
+ eventsListener.setExpectedType(ClusterEvent.EventType.COLLECTIONS_ADDED);
+ String collection = "testNodesEvent_collection";
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collection, "conf", 1, 1);
+ cluster.getSolrClient().request(create);
+ cluster.waitForActiveCollection(collection, 1, 1);
+ eventsListener.waitForExpectedEvent(30);
+ assertNotNull("should be COLLECTIONS_ADDED events", eventsListener.events.get(ClusterEvent.EventType.COLLECTIONS_ADDED));
+ events = eventsListener.events.get(ClusterEvent.EventType.COLLECTIONS_ADDED);
+ assertEquals("should be one COLLECTIONS_ADDED event", 1, events.size());
+ event = events.get(0);
+ assertEquals("should be COLLECTIONS_ADDED event type", ClusterEvent.EventType.COLLECTIONS_ADDED, event.getType());
+ CollectionsAddedEvent collectionsAdded = (CollectionsAddedEvent) event;
+ assertEquals("should be collection " + collection, collection, collectionsAdded.getCollectionNames().next());
+
+ // COLLECTIONS_REMOVED
+ eventsListener.setExpectedType(ClusterEvent.EventType.COLLECTIONS_REMOVED);
+ CollectionAdminRequest.Delete delete = CollectionAdminRequest.deleteCollection(collection);
+ cluster.getSolrClient().request(delete);
+ eventsListener.waitForExpectedEvent(30);
+ assertNotNull("should be COLLECTIONS_REMOVED events", eventsListener.events.get(ClusterEvent.EventType.COLLECTIONS_REMOVED));
+ events = eventsListener.events.get(ClusterEvent.EventType.COLLECTIONS_REMOVED);
+ assertEquals("should be one COLLECTIONS_REMOVED event", 1, events.size());
+ event = events.get(0);
+ assertEquals("should be COLLECTIONS_REMOVED event type", ClusterEvent.EventType.COLLECTIONS_REMOVED, event.getType());
+ CollectionsRemovedEvent collectionsRemoved = (CollectionsRemovedEvent) event;
+ assertEquals("should be collection " + collection, collection, collectionsRemoved.getCollectionNames().next());
+
+ // CLUSTER_CONFIG_CHANGED
+ eventsListener.events.clear();
+ eventsListener.setExpectedType(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED);
+ ClusterProperties clusterProperties = new ClusterProperties(cluster.getZkClient());
+ Map<String, Object> oldProps = new HashMap<>(clusterProperties.getClusterProperties());
+ clusterProperties.setClusterProperty("ext.foo", "bar");
+ eventsListener.waitForExpectedEvent(30);
+ assertNotNull("should be CLUSTER_CONFIG_CHANGED events", eventsListener.events.get(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED));
+ events = eventsListener.events.get(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED);
+ assertEquals("should be one CLUSTER_CONFIG_CHANGED event", 1, events.size());
+ event = events.get(0);
+ assertEquals("should be CLUSTER_CONFIG_CHANGED event type", ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED, event.getType());
+ ClusterPropertiesChangedEvent propertiesChanged = (ClusterPropertiesChangedEvent) event;
+ Map<String, Object> newProps = propertiesChanged.getNewClusterProperties();
+ assertEquals("new properties wrong value of the 'ext.foo' property: " + newProps,
+ "bar", newProps.get("ext.foo"));
+
+ // unset the property
+ eventsListener.events.clear();
+ eventsListener.setExpectedType(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED);
+ clusterProperties.setClusterProperty("ext.foo", null);
+ eventsListener.waitForExpectedEvent(30);
+ assertNotNull("should be CLUSTER_CONFIG_CHANGED events", eventsListener.events.get(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED));
+ events = eventsListener.events.get(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED);
+ assertEquals("should be one CLUSTER_CONFIG_CHANGED event", 1, events.size());
+ event = events.get(0);
+ assertEquals("should be CLUSTER_CONFIG_CHANGED event type", ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED, event.getType());
+ propertiesChanged = (ClusterPropertiesChangedEvent) event;
+ assertEquals("new properties should not have 'ext.foo' property: " + propertiesChanged.getNewClusterProperties(),
+ null, propertiesChanged.getNewClusterProperties().get("ext.foo"));
+
+ }
+
+ private static CountDownLatch dummyEventLatch = new CountDownLatch(1);
+ private static ClusterEvent lastEvent = null;
+
+ public static class DummyEventListener implements ClusterEventListener, ClusterSingleton {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ State state = State.STOPPED;
+ @Override
+ public void onEvent(ClusterEvent event) {
+ if (state != State.RUNNING) {
+ if (log.isDebugEnabled()) {
+ log.debug("skipped event, not running: {}", event);
+ }
+ return;
+ }
+ if (event.getType() == ClusterEvent.EventType.COLLECTIONS_ADDED ||
+ event.getType() == ClusterEvent.EventType.COLLECTIONS_REMOVED) {
+ if (log.isDebugEnabled()) {
+ log.debug("recorded event {}", Utils.toJSONString(event));
+ }
+ lastEvent = event;
+ dummyEventLatch.countDown();
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("skipped event, wrong type: {}", event.getType());
+ }
+ }
+ }
+
+ @Override
+ public String getName() {
+ return "dummy";
+ }
+
+ @Override
+ public void start() throws Exception {
+ if (log.isDebugEnabled()) {
+ log.debug("starting {}", Integer.toHexString(hashCode()));
+ }
+ state = State.RUNNING;
+ }
+
+ @Override
+ public State getState() {
+ return state;
+ }
+
+ @Override
+ public void stop() {
+ if (log.isDebugEnabled()) {
+ log.debug("stopping {}", Integer.toHexString(hashCode()));
+ }
+ state = State.STOPPED;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("closing {}", Integer.toHexString(hashCode()));
+ }
+ }
+ }
+
+ @Test
+ public void testListenerPlugins() throws Exception {
+ PluginMeta plugin = new PluginMeta();
+ plugin.klass = DefaultClusterEventProducer.class.getName();
+ plugin.name = ClusterEventProducer.PLUGIN_NAME;
+ V2Request req = new V2Request.Builder("/cluster/plugin")
+ .withMethod(POST)
+ .withPayload(Collections.singletonMap("add", plugin))
+ .build();
+ V2Response rsp = req.process(cluster.getSolrClient());
+ assertEquals(0, rsp.getStatus());
+
+ plugin = new PluginMeta();
+ plugin.name = "testplugin";
+ plugin.klass = DummyEventListener.class.getName();
+ req = new V2Request.Builder("/cluster/plugin")
+ .forceV2(true)
+ .withMethod(POST)
+ .withPayload(singletonMap("add", plugin))
+ .build();
+ rsp = req.process(cluster.getSolrClient());
+ //just check if the plugin is indeed registered
+ V2Request readPluginState = new V2Request.Builder("/cluster/plugin")
+ .forceV2(true)
+ .withMethod(GET)
+ .build();
+ rsp = readPluginState.process(cluster.getSolrClient());
+ assertEquals(DummyEventListener.class.getName(), rsp._getStr("/plugin/testplugin/class", null));
+
+ String collection = "testListenerPlugins_collection";
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collection, "conf", 1, 1);
+ cluster.getSolrClient().request(create);
+ cluster.waitForActiveCollection(collection, 1, 1);
+ boolean await = dummyEventLatch.await(30, TimeUnit.SECONDS);
+ if (!await) {
+ fail("Timed out waiting for COLLECTIONS_ADDED event, " + collection);
+ }
+ assertNotNull("lastEvent should be COLLECTIONS_ADDED", lastEvent);
+ assertEquals("lastEvent should be COLLECTIONS_ADDED", ClusterEvent.EventType.COLLECTIONS_ADDED, lastEvent.getType());
+ // verify timestamp
+ Instant now = Instant.now();
+ assertTrue("timestamp of the event is in the future", now.isAfter(lastEvent.getTimestamp()));
+ assertEquals(collection, ((CollectionsAddedEvent)lastEvent).getCollectionNames().next());
+
+ dummyEventLatch = new CountDownLatch(1);
+ lastEvent = null;
+
+ CollectionAdminRequest.Delete delete = CollectionAdminRequest.deleteCollection(collection);
+ cluster.getSolrClient().request(delete);
+ await = dummyEventLatch.await(30, TimeUnit.SECONDS);
+ if (!await) {
+ fail("Timed out waiting for COLLECTIONS_REMOVED event, " + collection);
+ }
+ assertNotNull("lastEvent should be COLLECTIONS_REMOVED", lastEvent);
+ assertEquals("lastEvent should be COLLECTIONS_REMOVED", ClusterEvent.EventType.COLLECTIONS_REMOVED, lastEvent.getType());
+ // verify timestamp
+ now = Instant.now();
+ assertTrue("timestamp of the event is in the future", now.isAfter(lastEvent.getTimestamp()));
+ assertEquals(collection, ((CollectionsRemovedEvent)lastEvent).getCollectionNames().next());
+
+ // test changing the ClusterEventProducer plugin dynamically
+
+ // remove the plugin (a NoOpProducer will be used instead)
+ req = new V2Request.Builder("/cluster/plugin")
+ .withMethod(POST)
+ .withPayload(Collections.singletonMap("remove", ClusterEventProducer.PLUGIN_NAME))
+ .build();
+ req.process(cluster.getSolrClient());
+
+ dummyEventLatch = new CountDownLatch(1);
+ lastEvent = null;
+ // should not receive any events now
+ cluster.getSolrClient().request(create);
+ cluster.waitForActiveCollection(collection, 1, 1);
+ await = dummyEventLatch.await(5, TimeUnit.SECONDS);
+ if (await) {
+ fail("should not receive any events but got " + lastEvent);
+ }
+ // reinstall the plugin
+ plugin = new PluginMeta();
+ plugin.klass = DefaultClusterEventProducer.class.getName();
+ plugin.name = ClusterEventProducer.PLUGIN_NAME;
+ req = new V2Request.Builder("/cluster/plugin")
+ .withMethod(POST)
+ .withPayload(Collections.singletonMap("add", plugin))
+ .build();
+ rsp = req.process(cluster.getSolrClient());
+ assertEquals(0, rsp.getStatus());
+
+ dummyEventLatch = new CountDownLatch(1);
+ lastEvent = null;
+
+ cluster.getSolrClient().request(delete);
+ await = dummyEventLatch.await(30, TimeUnit.SECONDS);
+ if (!await) {
+ fail("Timed out waiting for COLLECTIONS_REMOVED event, " + collection);
+ }
+ assertNotNull("lastEvent should be COLLECTIONS_REMOVED", lastEvent);
+ assertEquals("lastEvent should be COLLECTIONS_REMOVED", ClusterEvent.EventType.COLLECTIONS_REMOVED, lastEvent.getType());
+ }
+}
diff --git a/solr/core/src/test/org/apache/solr/cluster/events/impl/CollectionsRepairEventListenerTest.java b/solr/core/src/test/org/apache/solr/cluster/events/impl/CollectionsRepairEventListenerTest.java
new file mode 100644
index 0000000..b34067e
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cluster/events/impl/CollectionsRepairEventListenerTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cluster.events.impl;
+
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.V2Request;
+import org.apache.solr.client.solrj.request.beans.PluginMeta;
+import org.apache.solr.client.solrj.response.V2Response;
+import org.apache.solr.cloud.ClusterSingleton;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.cluster.events.AllEventsListener;
+import org.apache.solr.cluster.events.ClusterEvent;
+import org.apache.solr.cluster.events.ClusterEventListener;
+import org.apache.solr.cluster.events.ClusterEventProducer;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.util.LogLevel;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.solr.client.solrj.SolrRequest.METHOD.POST;
+
+/**
+ *
+ */
+@LogLevel("org.apache.solr.cluster.events=DEBUG")
+public class CollectionsRepairEventListenerTest extends SolrCloudTestCase {
+
+ public static class CollectionsRepairWrapperListener implements ClusterEventListener, ClusterSingleton {
+ final CollectionsRepairEventListener delegate;
+
+ CountDownLatch completed = new CountDownLatch(1);
+
+ CollectionsRepairWrapperListener(CoreContainer cc, int waitFor) throws Exception {
+ delegate = new CollectionsRepairEventListener(cc);
+ delegate.setWaitForSecond(waitFor);
+ }
+
+ @Override
+ public void onEvent(ClusterEvent event) {
+ delegate.onEvent(event);
+ completed.countDown();
+ }
+
+ @Override
+ public String getName() {
+ return "wrapperListener";
+ }
+
+ @Override
+ public void start() throws Exception {
+ delegate.start();
+ }
+
+ @Override
+ public State getState() {
+ return delegate.getState();
+ }
+
+ @Override
+ public void stop() {
+ delegate.stop();
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ }
+ }
+
+ private static AllEventsListener eventsListener = new AllEventsListener();
+ private static CollectionsRepairWrapperListener repairListener;
+
+ private static int NUM_NODES = 3;
+ private static int waitFor;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ configureCluster(NUM_NODES)
+ .addConfig("conf", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
+ .configure();
+ PluginMeta plugin = new PluginMeta();
+ plugin.klass = DefaultClusterEventProducer.class.getName();
+ plugin.name = ClusterEventProducer.PLUGIN_NAME;
+ V2Request req = new V2Request.Builder("/cluster/plugin")
+ .withMethod(POST)
+ .withPayload(Collections.singletonMap("add", plugin))
+ .build();
+ V2Response rsp = req.process(cluster.getSolrClient());
+ assertNotNull(rsp);
+
+ waitFor = 1 + random().nextInt(9);
+
+ CoreContainer cc = cluster.getOpenOverseer().getCoreContainer();
+ cc.getClusterEventProducer()
+ .registerListener(eventsListener, ClusterEvent.EventType.values());
+ repairListener = new CollectionsRepairWrapperListener(cc, waitFor);
+ cc.getClusterEventProducer()
+ .registerListener(repairListener, ClusterEvent.EventType.NODES_DOWN);
+ repairListener.start();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ cluster.deleteAllCollections();
+ }
+
+ @Test
+ public void testCollectionRepair() throws Exception {
+ eventsListener.setExpectedType(ClusterEvent.EventType.COLLECTIONS_ADDED);
+ String collection = "testCollectionRepair_collection";
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collection, "conf", 1, 3);
+ cluster.getSolrClient().request(create);
+ cluster.waitForActiveCollection(collection, 1, 3);
+ eventsListener.waitForExpectedEvent(10);
+ eventsListener.setExpectedType(ClusterEvent.EventType.NODES_DOWN);
+
+ // don't kill Overseer
+ JettySolrRunner nonOverseerJetty = null;
+ for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
+ if (cluster.getOpenOverseer().getCoreContainer().getZkController().getNodeName().equals(jetty.getNodeName())) {
+ continue;
+ }
+ nonOverseerJetty = jetty;
+ break;
+ }
+ String nodeName = nonOverseerJetty.getNodeName();
+ cluster.stopJettySolrRunner(nonOverseerJetty);
+ cluster.waitForJettyToStop(nonOverseerJetty);
+ eventsListener.waitForExpectedEvent(10);
+ cluster.waitForActiveCollection(collection, 1, 2);
+
+ Thread.sleep(TimeUnit.MILLISECONDS.convert(waitFor, TimeUnit.SECONDS));
+
+ // wait for completed processing in the repair listener
+ boolean await = repairListener.completed.await(60, TimeUnit.SECONDS);
+ if (!await) {
+ fail("Timeout waiting for the processing to complete");
+ }
+ cluster.waitForActiveCollection(collection, 1, 3);
+ }
+}
diff --git a/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java b/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
index 6f7a18a..3465a5b 100644
--- a/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
+++ b/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
@@ -93,13 +93,6 @@ public class TestContainerPlugin extends SolrCloudTestCase {
.build();
expectError(req, cluster.getSolrClient(), errPath, "No method with @Command in class");
- //test with an invalid class
- // XXX (ab) in order to support ClusterSingleton we allow adding
- // plugins without Api EndPoints
-
-// plugin.klass = C1.class.getName();
-// expectError(req, cluster.getSolrClient(), errPath, "No @EndPoints");
-
//test with a valid class. This should succeed now
plugin.klass = C3.class.getName();
req.process(cluster.getSolrClient());
diff --git a/solr/dev-docs/plugins/container-plugins.adoc b/solr/dev-docs/plugins/container-plugins.adoc
new file mode 100644
index 0000000..b10ba4b
--- /dev/null
+++ b/solr/dev-docs/plugins/container-plugins.adoc
@@ -0,0 +1,81 @@
+= Container Plugins
+:toc: macro
+:toclevels: 3
+
+|===
+| Initial version released| 2020, November 2|Andrzej BiaĆecki
+|===
+
+toc::[]
+
+== Container plugins subsystem
+Container plugins are pluggable components that are defined and instantiated at the
+`CoreContainer` (node) level. The components usually provide an admin-level API for
+additional functionality at the Solr node level.
+
+=== Plugin configurations
+Plugin configurations are maintained in ZooKeeper in the `/clusterprops.json` file, under
+the `plugin` entry. The configuration is a JSON map where keys are the plugin names, and
+values are serialized `org.apache.solr.client.solrj.request.beans.PluginMeta` beans.
+
+== Types of container plugins
+
+=== Plugin life-cycle
+Plugin instances are loaded and initialized when Solr's `CoreContainer` is first created.
+
+Then on each update of the plugin configurations the existing plugin configs are compared
+with the new configs, and plugin instances are respectively created, removed, or
+replaced (i.e. removed and added using the new configuration).
+
+If a plugin implementation class has a constructor that accepts an instance of
+`CoreContainer` then it is instantiated using this constructor, and the current instance
+of `CoreContainer` is passed as argument.
+
+=== PluginRegistryListener
+Components that need to be aware of changes in plugin configurations or registration can
+implement `org.apache.solr.api.ContainerPluginsRegistry.PluginRegistryListener` and register
+it with the instance of registry available from `coreContainer.getContainerPluginsRegistry()`.
+
+=== ClusterSingleton plugins
+Plugins that implement `ClusterSingleton` interface are instantiated on each
+Solr node. However, their start/stop life-cycle, as defined in the interface,
+is controlled in such a way that only a single running instance of the plugin
+is present in the cluster at any time.
+
+(Currently this is implemented by re-using the Overseer leader election, so all
+`ClusterSingleton`-s that are in the RUNNING state execute on the Overseer leader).
+
+Any plugin type can implement this interface to indicate to Solr that
+it requires this cluster singleton behavior.
+
+// explain plugins that register Api-s
+// explain plugins that don't implement any Api
+=== ClusterEventProducer plugin
+In order to support the generation of cluster-level events an implementation of
+`ClusterEventProducer` is created on each Solr node. This component is also a
+`ClusterSingleton`, which means that only one active instance is present in the
+cluster at any time.
+
+If no plugin configuration is specified then the default implementation
+`org.apache.solr.cluster.events.impl.DefaultClusterEventProducer` is used. A no-op
+implementation is also available in `org.apache.solr.cluster.events.NoOpProducer`.
+
+=== ClusterEventListener plugins
+Plugins that implement the `ClusterEventListener` interface will be automatically
+registered with the instance of `ClusterEventProducer`.
+
+== Plugin management API
+
+== Predefined plugin names
+
+Plugins with these names are used in specific parts of Solr. Their names are reserved
+and cannot be used for other plugin types:
+
+// XXX uncomment when we move the config to plugins
+//`placement-plugin`::
+//plugin that implements `PlacementPlugin` interface. This type of plugin
+//determines the replica placement strategy in the cluster.
+
+`cluster-event-producer`::
+plugin that implements `ClusterEventProducer` interface. This type of plugin
+is used for generating cluster-level events.
\ No newline at end of file