You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2020/11/04 14:55:19 UTC
[lucene-solr] branch jira/solr-14749-api updated: SOLR-14749: Fix
synchronization issues. Add Closeable to be sure that resources are closed
properly. Add unit test to verify plugin reloading & event publishing.
This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch jira/solr-14749-api
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/jira/solr-14749-api by this push:
new 9d6de70 SOLR-14749: Fix synchronization issues. Add Closeable to be sure that resources are closed properly. Add unit test to verify plugin reloading & event publishing.
9d6de70 is described below
commit 9d6de705809f4bad8f14ab1dbb3cc28831d67662
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Wed Nov 4 15:54:00 2020 +0100
SOLR-14749: Fix synchronization issues. Add Closeable to be sure that resources
are closed properly. Add unit test to verify plugin reloading & event publishing.
---
.../apache/solr/api/ContainerPluginsRegistry.java | 12 ++-
.../solr/cluster/events/ClusterEventListener.java | 4 +-
.../solr/cluster/events/ClusterEventProducer.java | 4 +-
.../cluster/events/ClusterEventProducerBase.java | 32 +++---
.../events/impl/ClusterEventProducerFactory.java | 26 +++--
.../impl/CollectionsRepairEventListener.java | 70 +++++++++++--
.../events/impl/DefaultClusterEventProducer.java | 13 +--
.../impl/DelegatingClusterEventProducer.java | 15 ++-
.../java/org/apache/solr/core/CoreContainer.java | 3 +
.../solr/cluster/events/AllEventsListener.java | 5 +
.../cluster/events/ClusterEventProducerTest.java | 113 +++++++++++++++++----
.../impl/CollectionsRepairEventListenerTest.java | 20 +++-
.../apache/solr/handler/TestContainerPlugin.java | 7 --
13 files changed, 251 insertions(+), 73 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/api/ContainerPluginsRegistry.java b/solr/core/src/java/org/apache/solr/api/ContainerPluginsRegistry.java
index 9154ae4..3b2e4f8 100644
--- a/solr/core/src/java/org/apache/solr/api/ContainerPluginsRegistry.java
+++ b/solr/core/src/java/org/apache/solr/api/ContainerPluginsRegistry.java
@@ -39,6 +39,7 @@ import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.annotation.JsonProperty;
import org.apache.solr.common.cloud.ClusterPropertiesListener;
+import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.PathTrie;
import org.apache.solr.common.util.ReflectMapWriter;
import org.apache.solr.common.util.StrUtils;
@@ -65,7 +66,7 @@ import static org.apache.solr.common.util.Utils.makeMap;
* for additional functionality by {@link PluginRegistryListener}-s registered with
* this class.</p>
*/
-public class ContainerPluginsRegistry implements ClusterPropertiesListener, MapWriter {
+public class ContainerPluginsRegistry implements ClusterPropertiesListener, MapWriter, Closeable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final ObjectMapper mapper = SolrJacksonAnnotationInspector.createObjectMapper();
@@ -100,6 +101,15 @@ public class ContainerPluginsRegistry implements ClusterPropertiesListener, MapW
currentPlugins.forEach(ew.getBiConsumer());
}
+ @Override
+ public void close() throws IOException {
+ currentPlugins.values().forEach(apiInfo -> {
+ if (apiInfo.instance instanceof Closeable) {
+ IOUtils.closeQuietly((Closeable) apiInfo.instance);
+ }
+ });
+ }
+
public synchronized ApiInfo getPlugin(String name) {
return currentPlugins.get(name);
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java
index 6f84457..3443b37 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventListener.java
@@ -16,13 +16,15 @@
*/
package org.apache.solr.cluster.events;
+import java.io.Closeable;
+
/**
* Components that want to be notified of cluster-wide events should use this.
*
* XXX should this work only for ClusterSingleton-s? some types of events may be
* XXX difficult (or pointless) to propagate to every node.
*/
-public interface ClusterEventListener {
+public interface ClusterEventListener extends Closeable {
/**
* Handle the event. Implementations should be non-blocking - if any long
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java
index f87ce7c..d3b0ee7 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducer.java
@@ -18,10 +18,12 @@ package org.apache.solr.cluster.events;
import org.apache.solr.cloud.ClusterSingleton;
+import java.io.Closeable;
+
/**
* Component that produces {@link ClusterEvent} instances.
*/
-public interface ClusterEventProducer extends ClusterSingleton {
+public interface ClusterEventProducer extends ClusterSingleton, Closeable {
/** Unique name for the registration of a plugin-based implementation. */
String PLUGIN_NAME = "cluster-event-producer";
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducerBase.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducerBase.java
index ecb5825..9f9d0d2 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducerBase.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEventProducerBase.java
@@ -1,9 +1,11 @@
package org.apache.solr.cluster.events;
+import org.apache.solr.common.util.IOUtils;
import org.apache.solr.core.CoreContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.Map;
@@ -11,7 +13,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
- *
+ * Base class for implementing {@link ClusterEventProducer}.
*/
public abstract class ClusterEventProducerBase implements ClusterEventProducer {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -34,7 +36,7 @@ public abstract class ClusterEventProducerBase implements ClusterEventProducer {
log.warn("event type {} not supported yet.", type);
continue;
}
- // to avoid removing no-longer empty set in unregister
+ // to avoid removing no-longer empty set on race in unregister
synchronized (listeners) {
listeners.computeIfAbsent(type, t -> ConcurrentHashMap.newKeySet())
.add(listener);
@@ -65,21 +67,25 @@ public abstract class ClusterEventProducerBase implements ClusterEventProducer {
return state;
}
- public Map<ClusterEvent.EventType, Set<ClusterEventListener>> getEventListeners() {
- return listeners;
- }
-
- public CoreContainer getCoreContainer() {
- return cc;
+ @Override
+ public void close() throws IOException {
+ synchronized (listeners) {
+ listeners.values().forEach(listenerSet ->
+ listenerSet.forEach(listener -> IOUtils.closeQuietly(listener)));
+ }
}
public abstract Set<ClusterEvent.EventType> getSupportedEventTypes();
protected void fireEvent(ClusterEvent event) {
- listeners.getOrDefault(event.getType(), Collections.emptySet())
- .forEach(listener -> {
- log.debug("--- firing event {} to {}", event, listener);
- listener.onEvent(event);
- });
+ synchronized (listeners) {
+ listeners.getOrDefault(event.getType(), Collections.emptySet())
+ .forEach(listener -> {
+ if (log.isDebugEnabled()) {
+ log.debug("--- firing event {} to {}", event, listener);
+ }
+ listener.onEvent(event);
+ });
+ }
}
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerFactory.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerFactory.java
index 0e2bd9a..6d14817 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerFactory.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerFactory.java
@@ -26,6 +26,8 @@ public class ClusterEventProducerFactory extends ClusterEventProducerBase {
public ClusterEventProducerFactory(CoreContainer cc) {
super(cc);
+ // this initial listener is used only for capturing plugin registrations
+ // done by other nodes while this CoreContainer is still loading
initialPluginListener = new ContainerPluginsRegistry.PluginRegistryListener() {
@Override
public void added(ContainerPluginsRegistry.ApiInfo plugin) {
@@ -85,18 +87,20 @@ public class ClusterEventProducerFactory extends ClusterEventProducerBase {
throw new RuntimeException("this factory can be called only once!");
}
final DelegatingClusterEventProducer clusterEventProducer = new DelegatingClusterEventProducer(cc);
- // since this is a ClusterSingleton, register it as such
+ // since this is a ClusterSingleton, register it as such, under unique name
cc.getClusterSingletons().getSingletons().put(ClusterEventProducer.PLUGIN_NAME +"_delegate", clusterEventProducer);
ContainerPluginsRegistry.ApiInfo clusterEventProducerInfo = plugins.getPlugin(ClusterEventProducer.PLUGIN_NAME);
if (clusterEventProducerInfo != null) {
- // the listener in ClusterSingletons already registered it
+ // the listener in ClusterSingletons already registered this instance
clusterEventProducer.setDelegate((ClusterEventProducer) clusterEventProducerInfo.getInstance());
} else {
// use the default NoOp impl
}
// transfer those listeners that were already registered to the initial impl
transferListeners(clusterEventProducer, plugins);
- // install plugin registry listener
+
+ // install plugin registry listener that maintains plugin-based listeners in
+ // the event producer impl
ContainerPluginsRegistry.PluginRegistryListener pluginListener = new ContainerPluginsRegistry.PluginRegistryListener() {
@Override
public void added(ContainerPluginsRegistry.ApiInfo plugin) {
@@ -152,13 +156,15 @@ public class ClusterEventProducerFactory extends ClusterEventProducerBase {
}
private void transferListeners(ClusterEventProducer target, ContainerPluginsRegistry plugins) {
- // stop capturing listener plugins
- plugins.unregisterListener(initialPluginListener);
- // transfer listeners that are already registered
- listeners.forEach((type, listenersSet) -> {
- listenersSet.forEach(listener -> target.registerListener(listener, type));
- });
- listeners.clear();
+ synchronized (listeners) {
+ // stop capturing listener plugins
+ plugins.unregisterListener(initialPluginListener);
+ // transfer listeners that are already registered
+ listeners.forEach((type, listenersSet) -> {
+ listenersSet.forEach(listener -> target.registerListener(listener, type));
+ });
+ listeners.clear();
+ }
}
@Override
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
index de69fb9..48400f8 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
@@ -16,6 +16,7 @@
*/
package org.apache.solr.cluster.events.impl;
+import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
@@ -26,6 +27,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -41,18 +44,22 @@ import org.apache.solr.cluster.events.NodesDownEvent;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
+import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.core.CoreContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * This is an illustration how to re-implement the combination of 8x
- * NodeLostTrigger and AutoAddReplicasPlanAction to maintain the collection's replication factor.
- * <p>NOTE: there's no support for 'waitFor' yet.</p>
- * <p>NOTE 2: this functionality would be probably more reliable when executed also as a
+ * This is an illustration how to re-implement the combination of Solr 8x
+ * NodeLostTrigger and AutoAddReplicasPlanAction to maintain the collection's replicas when
+ * nodes are lost.
+ * <p>The notion of <code>waitFor</code> delay between detection and repair action is
+ * implemented as a scheduled execution of the repair method, which is called every 1 sec
+ * to check whether there are any lost nodes that exceeded their <code>waitFor</code> period.</p>
+ * <p>NOTE: this functionality would be probably more reliable when executed also as a
* periodically scheduled check - both as a reactive (listener) and proactive (scheduled) measure.</p>
*/
-public class CollectionsRepairEventListener implements ClusterEventListener, ClusterSingleton {
+public class CollectionsRepairEventListener implements ClusterEventListener, ClusterSingleton, Closeable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String PLUGIN_NAME = "collectionsRepairListener";
@@ -68,6 +75,8 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
private int waitForSecond = DEFAULT_WAIT_FOR_SEC;
+ private ScheduledThreadPoolExecutor waitForExecutor;
+
public CollectionsRepairEventListener(CoreContainer cc) {
this.solrClient = cc.getSolrClientCache().getCloudSolrClient(cc.getZkController().getZkClient().getZkServerAddress());
this.solrCloudManager = cc.getZkController().getSolrCloudManager();
@@ -75,6 +84,9 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
@VisibleForTesting
public void setWaitForSecond(int waitForSecond) {
+ if (log.isDebugEnabled()) {
+ log.debug("-- setting waitFor={}", waitForSecond);
+ }
this.waitForSecond = waitForSecond;
}
@@ -108,10 +120,20 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
// if so, remove them from the tracking map
Set<String> trackingKeySet = nodeNameVsTimeRemoved.keySet();
trackingKeySet.removeAll(solrCloudManager.getClusterStateProvider().getLiveNodes());
+ // add any new lost nodes (old lost nodes are skipped)
event.getNodeNames().forEachRemaining(lostNode -> {
nodeNameVsTimeRemoved.computeIfAbsent(lostNode, n -> solrCloudManager.getTimeSource().getTimeNs());
});
+ }
+ private void runRepair() {
+ if (nodeNameVsTimeRemoved.isEmpty()) {
+ // nothing to do
+ return;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("-- runRepair for {} lost nodes", nodeNameVsTimeRemoved.size());
+ }
Set<String> reallyLostNodes = new HashSet<>();
nodeNameVsTimeRemoved.forEach((lostNode, timeRemoved) -> {
long now = solrCloudManager.getTimeSource().getTimeNs();
@@ -120,7 +142,16 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
reallyLostNodes.add(lostNode);
}
});
-
+ if (reallyLostNodes.isEmpty()) {
+ if (log.isDebugEnabled()) {
+ log.debug("--- skipping repair, {} nodes are still in waitFor period", nodeNameVsTimeRemoved.size());
+ }
+ return;
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("--- running repair for nodes that are still lost after waitFor: {}", reallyLostNodes);
+ }
+ }
// collect all lost replicas
// collection / positions
Map<String, List<ReplicaPosition>> newPositions = new HashMap<>();
@@ -169,7 +200,7 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
return;
}
- // remove from the tracking set
+ // remove all nodes with expired waitFor from the tracking set
nodeNameVsTimeRemoved.keySet().removeAll(reallyLostNodes);
// send ADDREPLICA admin requests for each lost replica
@@ -191,12 +222,16 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
log.warn("Exception calling ADDREPLICA {}: {}", addReplica.getParams().toQueryString(), e);
}
});
-
- // ... and DELETERPLICA for lost ones?
}
@Override
public void start() throws Exception {
+ state = State.STARTING;
+ waitForExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1,
+ new SolrNamedThreadFactory("collectionsRepair_waitFor"));
+ waitForExecutor.setRemoveOnCancelPolicy(true);
+ waitForExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ waitForExecutor.scheduleAtFixedRate(() -> runRepair(), 0, waitForSecond, TimeUnit.SECONDS);
state = State.RUNNING;
}
@@ -207,6 +242,23 @@ public class CollectionsRepairEventListener implements ClusterEventListener, Clu
@Override
public void stop() {
+ state = State.STOPPING;
+ waitForExecutor.shutdownNow();
+ try {
+ waitForExecutor.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.warn("Failed to shut down the waitFor executor - interrupted...");
+ Thread.currentThread().interrupt();
+ }
+ waitForExecutor = null;
state = State.STOPPED;
}
+
+ @Override
+ public void close() throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("-- close() called");
+ }
+ stop();
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/DefaultClusterEventProducer.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/DefaultClusterEventProducer.java
index 188bfa5..e24fb40 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/impl/DefaultClusterEventProducer.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/DefaultClusterEventProducer.java
@@ -16,6 +16,7 @@
*/
package org.apache.solr.cluster.events.impl;
+import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.time.Instant;
import java.util.Arrays;
@@ -71,9 +72,6 @@ public class DefaultClusterEventProducer extends ClusterEventProducerBase {
// ClusterSingleton lifecycle methods
@Override
public synchronized void start() {
- if (log.isDebugEnabled()) {
- log.debug("-- starting DCEP", new Exception(Integer.toHexString(hashCode())));
- }
if (cc == null) {
liveNodesListener = null;
cloudCollectionsListener = null;
@@ -205,9 +203,6 @@ public class DefaultClusterEventProducer extends ClusterEventProducerBase {
@Override
public synchronized void stop() {
- if (log.isDebugEnabled()) {
- log.debug("-- stopping DCEP {}", Integer.toHexString(hashCode()));
- }
state = State.STOPPING;
doStop();
state = State.STOPPED;
@@ -227,4 +222,10 @@ public class DefaultClusterEventProducer extends ClusterEventProducerBase {
cloudCollectionsListener = null;
clusterPropertiesListener = null;
}
+
+ @Override
+ public void close() throws IOException {
+ stop();
+ super.close();
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/DelegatingClusterEventProducer.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/DelegatingClusterEventProducer.java
index 0f31de3..8d8a972 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/impl/DelegatingClusterEventProducer.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/DelegatingClusterEventProducer.java
@@ -21,16 +21,18 @@ import org.apache.solr.cluster.events.ClusterEventListener;
import org.apache.solr.cluster.events.ClusterEventProducer;
import org.apache.solr.cluster.events.NoOpProducer;
import org.apache.solr.cluster.events.ClusterEventProducerBase;
+import org.apache.solr.common.util.IOUtils;
import org.apache.solr.core.CoreContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Set;
/**
- * This implementation allows Solr to dynamically change the underlying implementation in
- * response to the changed plugin configuration.
+ * This implementation allows Solr to dynamically change the underlying implementation
+ * of {@link ClusterEventProducer} in response to the changed plugin configuration.
*/
public final class DelegatingClusterEventProducer extends ClusterEventProducerBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -42,6 +44,15 @@ public final class DelegatingClusterEventProducer extends ClusterEventProducerBa
delegate = new NoOpProducer(cc);
}
+ @Override
+ public void close() throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("--closing delegate for CC-{}: {}", Integer.toHexString(cc.hashCode()), delegate);
+ }
+ IOUtils.closeQuietly(delegate);
+ super.close();
+ }
+
public void setDelegate(ClusterEventProducer newDelegate) {
if (log.isDebugEnabled()) {
log.debug("--setting new delegate for CC-{}: {}", Integer.toHexString(cc.hashCode()), newDelegate);
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 2f2652b..831f81e 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -1130,6 +1130,9 @@ public class CoreContainer {
if (solrClientCache != null) {
solrClientCache.close();
}
+ if (containerPluginsRegistry != null) {
+ IOUtils.closeQuietly(containerPluginsRegistry);
+ }
} finally {
try {
diff --git a/solr/core/src/test/org/apache/solr/cluster/events/AllEventsListener.java b/solr/core/src/test/org/apache/solr/cluster/events/AllEventsListener.java
index f3ebf41..873a0d5 100644
--- a/solr/core/src/test/org/apache/solr/cluster/events/AllEventsListener.java
+++ b/solr/core/src/test/org/apache/solr/cluster/events/AllEventsListener.java
@@ -19,6 +19,7 @@ package org.apache.solr.cluster.events;
import org.junit.Assert;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -53,4 +54,8 @@ public class AllEventsListener implements ClusterEventListener {
Assert.fail("Timed out waiting for expected event " + expectedType);
}
}
+
+ public void close() throws IOException {
+
+ }
}
diff --git a/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java b/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java
index 031076f..3c71823 100644
--- a/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java
+++ b/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java
@@ -35,6 +35,7 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.time.Instant;
import java.util.Collections;
@@ -61,15 +62,6 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
configureCluster(3)
.addConfig("conf", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
.configure();
- PluginMeta plugin = new PluginMeta();
- plugin.klass = DefaultClusterEventProducer.class.getName();
- plugin.name = ClusterEventProducer.PLUGIN_NAME;
- V2Request req = new V2Request.Builder("/cluster/plugin")
- .withMethod(POST)
- .withPayload(Collections.singletonMap("add", plugin))
- .build();
- V2Response rsp = req.process(cluster.getSolrClient());
- assertNotNull(rsp);
}
@Before
@@ -82,15 +74,37 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
}
@After
- public void teardown() {
+ public void teardown() throws Exception {
System.clearProperty("enable.packages");
if (eventsListener != null) {
cluster.getOpenOverseer().getCoreContainer().getClusterEventProducer().unregisterListener(eventsListener);
+ eventsListener.events.clear();
+ }
+ V2Request readPluginState = new V2Request.Builder("/cluster/plugin")
+ .forceV2(true)
+ .withMethod(GET)
+ .build();
+ V2Response rsp = readPluginState.process(cluster.getSolrClient());
+ if (rsp._getStr("/plugin/" + ClusterEventProducer.PLUGIN_NAME + "/class", null) != null) {
+ V2Request req = new V2Request.Builder("/cluster/plugin")
+ .withMethod(POST)
+ .withPayload(Collections.singletonMap("remove", ClusterEventProducer.PLUGIN_NAME))
+ .build();
+ req.process(cluster.getSolrClient());
}
}
@Test
public void testEvents() throws Exception {
+ PluginMeta plugin = new PluginMeta();
+ plugin.klass = DefaultClusterEventProducer.class.getName();
+ plugin.name = ClusterEventProducer.PLUGIN_NAME;
+ V2Request req = new V2Request.Builder("/cluster/plugin")
+ .withMethod(POST)
+ .withPayload(Collections.singletonMap("add", plugin))
+ .build();
+ V2Response rsp = req.process(cluster.getSolrClient());
+ assertEquals(0, rsp.getStatus());
// NODES_DOWN
@@ -108,7 +122,7 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
String nodeName = nonOverseerJetty.getNodeName();
cluster.stopJettySolrRunner(nonOverseerJetty);
cluster.waitForJettyToStop(nonOverseerJetty);
- eventsListener.waitForExpectedEvent(10);
+ eventsListener.waitForExpectedEvent(30);
assertNotNull("should be NODES_DOWN events", eventsListener.events.get(ClusterEvent.EventType.NODES_DOWN));
List<ClusterEvent> events = eventsListener.events.get(ClusterEvent.EventType.NODES_DOWN);
assertEquals("should be one NODES_DOWN event", 1, events.size());
@@ -121,7 +135,7 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
eventsListener.setExpectedType(ClusterEvent.EventType.NODES_UP);
JettySolrRunner newNode = cluster.startJettySolrRunner();
cluster.waitForNode(newNode, 60);
- eventsListener.waitForExpectedEvent(10);
+ eventsListener.waitForExpectedEvent(30);
assertNotNull("should be NODES_UP events", eventsListener.events.get(ClusterEvent.EventType.NODES_UP));
events = eventsListener.events.get(ClusterEvent.EventType.NODES_UP);
assertEquals("should be one NODES_UP event", 1, events.size());
@@ -136,7 +150,7 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collection, "conf", 1, 1);
cluster.getSolrClient().request(create);
cluster.waitForActiveCollection(collection, 1, 1);
- eventsListener.waitForExpectedEvent(10);
+ eventsListener.waitForExpectedEvent(30);
assertNotNull("should be COLLECTIONS_ADDED events", eventsListener.events.get(ClusterEvent.EventType.COLLECTIONS_ADDED));
events = eventsListener.events.get(ClusterEvent.EventType.COLLECTIONS_ADDED);
assertEquals("should be one COLLECTIONS_ADDED event", 1, events.size());
@@ -149,7 +163,7 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
eventsListener.setExpectedType(ClusterEvent.EventType.COLLECTIONS_REMOVED);
CollectionAdminRequest.Delete delete = CollectionAdminRequest.deleteCollection(collection);
cluster.getSolrClient().request(delete);
- eventsListener.waitForExpectedEvent(10);
+ eventsListener.waitForExpectedEvent(30);
assertNotNull("should be COLLECTIONS_REMOVED events", eventsListener.events.get(ClusterEvent.EventType.COLLECTIONS_REMOVED));
events = eventsListener.events.get(ClusterEvent.EventType.COLLECTIONS_REMOVED);
assertEquals("should be one COLLECTIONS_REMOVED event", 1, events.size());
@@ -159,11 +173,12 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
assertEquals("should be collection " + collection, collection, collectionsRemoved.getCollectionNames().next());
// CLUSTER_CONFIG_CHANGED
+ eventsListener.events.clear();
eventsListener.setExpectedType(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED);
ClusterProperties clusterProperties = new ClusterProperties(cluster.getZkClient());
Map<String, Object> oldProps = new HashMap<>(clusterProperties.getClusterProperties());
clusterProperties.setClusterProperty("ext.foo", "bar");
- eventsListener.waitForExpectedEvent(10);
+ eventsListener.waitForExpectedEvent(30);
assertNotNull("should be CLUSTER_CONFIG_CHANGED events", eventsListener.events.get(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED));
events = eventsListener.events.get(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED);
assertEquals("should be one CLUSTER_CONFIG_CHANGED event", 1, events.size());
@@ -175,13 +190,14 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
"bar", newProps.get("ext.foo"));
// unset the property
+ eventsListener.events.clear();
eventsListener.setExpectedType(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED);
clusterProperties.setClusterProperty("ext.foo", null);
- eventsListener.waitForExpectedEvent(10);
+ eventsListener.waitForExpectedEvent(30);
assertNotNull("should be CLUSTER_CONFIG_CHANGED events", eventsListener.events.get(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED));
events = eventsListener.events.get(ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED);
- assertEquals("should be two CLUSTER_CONFIG_CHANGED events", 2, events.size());
- event = events.get(1);
+ assertEquals("should be one CLUSTER_CONFIG_CHANGED event", 1, events.size());
+ event = events.get(0);
assertEquals("should be CLUSTER_CONFIG_CHANGED event type", ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED, event.getType());
propertiesChanged = (ClusterPropertiesChangedEvent) event;
assertEquals("new properties should not have 'ext.foo' property: " + propertiesChanged.getNewClusterProperties(),
@@ -242,19 +258,36 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
}
state = State.STOPPED;
}
+
+ @Override
+ public void close() throws IOException {
+ if (log.isDebugEnabled()) {
+ log.debug("closing {}", Integer.toHexString(hashCode()));
+ }
+ }
}
@Test
public void testListenerPlugins() throws Exception {
PluginMeta plugin = new PluginMeta();
+ plugin.klass = DefaultClusterEventProducer.class.getName();
+ plugin.name = ClusterEventProducer.PLUGIN_NAME;
+ V2Request req = new V2Request.Builder("/cluster/plugin")
+ .withMethod(POST)
+ .withPayload(Collections.singletonMap("add", plugin))
+ .build();
+ V2Response rsp = req.process(cluster.getSolrClient());
+ assertEquals(0, rsp.getStatus());
+
+ plugin = new PluginMeta();
plugin.name = "testplugin";
plugin.klass = DummyEventListener.class.getName();
- V2Request req = new V2Request.Builder("/cluster/plugin")
+ req = new V2Request.Builder("/cluster/plugin")
.forceV2(true)
.withMethod(POST)
.withPayload(singletonMap("add", plugin))
.build();
- V2Response rsp = req.process(cluster.getSolrClient());
+ rsp = req.process(cluster.getSolrClient());
//just check if the plugin is indeed registered
V2Request readPluginState = new V2Request.Builder("/cluster/plugin")
.forceV2(true)
@@ -293,5 +326,45 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
now = Instant.now();
assertTrue("timestamp of the event is in the future", now.isAfter(lastEvent.getTimestamp()));
assertEquals(collection, ((CollectionsRemovedEvent)lastEvent).getCollectionNames().next());
+
+ // test changing the ClusterEventProducer plugin dynamically
+
+ // remove the plugin (a NoOpProducer will be used instead)
+ req = new V2Request.Builder("/cluster/plugin")
+ .withMethod(POST)
+ .withPayload(Collections.singletonMap("remove", ClusterEventProducer.PLUGIN_NAME))
+ .build();
+ req.process(cluster.getSolrClient());
+
+ dummyEventLatch = new CountDownLatch(1);
+ lastEvent = null;
+ // should not receive any events now
+ cluster.getSolrClient().request(create);
+ cluster.waitForActiveCollection(collection, 1, 1);
+ await = dummyEventLatch.await(5, TimeUnit.SECONDS);
+ if (await) {
+ fail("should not receive any events but got " + lastEvent);
+ }
+ // reinstall the plugin
+ plugin = new PluginMeta();
+ plugin.klass = DefaultClusterEventProducer.class.getName();
+ plugin.name = ClusterEventProducer.PLUGIN_NAME;
+ req = new V2Request.Builder("/cluster/plugin")
+ .withMethod(POST)
+ .withPayload(Collections.singletonMap("add", plugin))
+ .build();
+ rsp = req.process(cluster.getSolrClient());
+ assertEquals(0, rsp.getStatus());
+
+ dummyEventLatch = new CountDownLatch(1);
+ lastEvent = null;
+
+ cluster.getSolrClient().request(delete);
+ await = dummyEventLatch.await(30, TimeUnit.SECONDS);
+ if (!await) {
+ fail("Timed out waiting for COLLECTIONS_REMOVED event, " + collection);
+ }
+ assertNotNull("lastEvent should be COLLECTIONS_REMOVED", lastEvent);
+ assertEquals("lastEvent should be COLLECTIONS_REMOVED", ClusterEvent.EventType.COLLECTIONS_REMOVED, lastEvent.getType());
}
}
diff --git a/solr/core/src/test/org/apache/solr/cluster/events/impl/CollectionsRepairEventListenerTest.java b/solr/core/src/test/org/apache/solr/cluster/events/impl/CollectionsRepairEventListenerTest.java
index d672f6d..b34067e 100644
--- a/solr/core/src/test/org/apache/solr/cluster/events/impl/CollectionsRepairEventListenerTest.java
+++ b/solr/core/src/test/org/apache/solr/cluster/events/impl/CollectionsRepairEventListenerTest.java
@@ -29,10 +29,12 @@ import org.apache.solr.cluster.events.ClusterEvent;
import org.apache.solr.cluster.events.ClusterEventListener;
import org.apache.solr.cluster.events.ClusterEventProducer;
import org.apache.solr.core.CoreContainer;
+import org.apache.solr.util.LogLevel;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -42,6 +44,7 @@ import static org.apache.solr.client.solrj.SolrRequest.METHOD.POST;
/**
*
*/
+@LogLevel("org.apache.solr.cluster.events=DEBUG")
public class CollectionsRepairEventListenerTest extends SolrCloudTestCase {
public static class CollectionsRepairWrapperListener implements ClusterEventListener, ClusterSingleton {
@@ -49,9 +52,9 @@ public class CollectionsRepairEventListenerTest extends SolrCloudTestCase {
CountDownLatch completed = new CountDownLatch(1);
- CollectionsRepairWrapperListener(CoreContainer cc) throws Exception {
+ CollectionsRepairWrapperListener(CoreContainer cc, int waitFor) throws Exception {
delegate = new CollectionsRepairEventListener(cc);
- delegate.setWaitForSecond(0);
+ delegate.setWaitForSecond(waitFor);
}
@Override
@@ -79,12 +82,18 @@ public class CollectionsRepairEventListenerTest extends SolrCloudTestCase {
public void stop() {
delegate.stop();
}
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ }
}
private static AllEventsListener eventsListener = new AllEventsListener();
private static CollectionsRepairWrapperListener repairListener;
private static int NUM_NODES = 3;
+ private static int waitFor;
@BeforeClass
public static void setupCluster() throws Exception {
@@ -100,10 +109,13 @@ public class CollectionsRepairEventListenerTest extends SolrCloudTestCase {
.build();
V2Response rsp = req.process(cluster.getSolrClient());
assertNotNull(rsp);
+
+ waitFor = 1 + random().nextInt(9);
+
CoreContainer cc = cluster.getOpenOverseer().getCoreContainer();
cc.getClusterEventProducer()
.registerListener(eventsListener, ClusterEvent.EventType.values());
- repairListener = new CollectionsRepairWrapperListener(cc);
+ repairListener = new CollectionsRepairWrapperListener(cc, waitFor);
cc.getClusterEventProducer()
.registerListener(repairListener, ClusterEvent.EventType.NODES_DOWN);
repairListener.start();
@@ -140,6 +152,8 @@ public class CollectionsRepairEventListenerTest extends SolrCloudTestCase {
eventsListener.waitForExpectedEvent(10);
cluster.waitForActiveCollection(collection, 1, 2);
+ Thread.sleep(TimeUnit.MILLISECONDS.convert(waitFor, TimeUnit.SECONDS));
+
// wait for completed processing in the repair listener
boolean await = repairListener.completed.await(60, TimeUnit.SECONDS);
if (!await) {
diff --git a/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java b/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
index 6f7a18a..3465a5b 100644
--- a/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
+++ b/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
@@ -93,13 +93,6 @@ public class TestContainerPlugin extends SolrCloudTestCase {
.build();
expectError(req, cluster.getSolrClient(), errPath, "No method with @Command in class");
- //test with an invalid class
- // XXX (ab) in order to support ClusterSingleton we allow adding
- // plugins without Api EndPoints
-
-// plugin.klass = C1.class.getName();
-// expectError(req, cluster.getSolrClient(), errPath, "No @EndPoints");
-
//test with a valid class. This should succeed now
plugin.klass = C3.class.getName();
req.process(cluster.getSolrClient());