You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2020/10/01 06:06:12 UTC
[lucene-solr] 02/02: SOLR-14749: Fix the race between plugin
loading and registration and Overseer leader election. Make events
implement MapWriter-s. Add more unit tests.
This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch jira/solr-14749
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit a8658cef6cf14bb2c7ee667801b5241e4e875467
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Thu Oct 1 08:04:08 2020 +0200
SOLR-14749: Fix the race between plugin loading and registration and
Overseer leader election. Make events implement MapWriter-s.
Add more unit tests.
---
.../src/java/org/apache/solr/api/AnnotatedApi.java | 15 ++-
.../apache/solr/api/CustomContainerPlugins.java | 11 +-
.../src/java/org/apache/solr/cloud/Overseer.java | 55 ++++++---
.../apache/solr/cluster/events/ClusterEvent.java | 10 +-
.../events/ClusterPropertiesChangedEvent.java | 6 +
.../solr/cluster/events/CollectionsAddedEvent.java | 7 ++
.../cluster/events/CollectionsRemovedEvent.java | 7 ++
.../apache/solr/cluster/events/NodesDownEvent.java | 7 ++
.../apache/solr/cluster/events/NodesUpEvent.java | 7 ++
.../solr/cluster/events/ReplicasDownEvent.java | 7 ++
.../events/impl/ClusterEventProducerImpl.java | 21 +---
.../impl/CollectionsRepairEventListener.java | 3 +-
.../java/org/apache/solr/core/CoreContainer.java | 95 ++++++++++++---
.../solr/handler/admin/ContainerPluginsApi.java | 6 +-
.../test/org/apache/solr/cloud/OverseerTest.java | 24 ++--
.../solr/cluster/events/AllEventsListener.java | 3 -
.../cluster/events/ClusterEventProducerTest.java | 127 ++++++++++++++++++++-
.../apache/solr/handler/TestContainerPlugin.java | 4 +-
18 files changed, 335 insertions(+), 80 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/api/AnnotatedApi.java b/solr/core/src/java/org/apache/solr/api/AnnotatedApi.java
index fd77413..1558f1c 100644
--- a/solr/core/src/java/org/apache/solr/api/AnnotatedApi.java
+++ b/solr/core/src/java/org/apache/solr/api/AnnotatedApi.java
@@ -85,9 +85,18 @@ public class AnnotatedApi extends Api implements PermissionNameProvider , Closea
}
public static List<Api> getApis(Object obj) {
- return getApis(obj.getClass(), obj);
+ return getApis(obj.getClass(), obj, true);
}
- public static List<Api> getApis(Class<? extends Object> theClass , Object obj) {
+
+ /**
+ * Get a list of Api-s supported by this class.
+ * @param theClass class
+ * @param obj object of this class (may be null)
+ * @param required if true then an exception is thrown if no Api-s can be retrieved, if false
+ * then absence of Api-s is silently ignored.
+ * @return list of discovered Api-s
+ */
+ public static List<Api> getApis(Class<? extends Object> theClass , Object obj, boolean required) {
Class<?> klas = null;
try {
klas = MethodHandles.publicLookup().accessClass(theClass);
@@ -122,7 +131,7 @@ public class AnnotatedApi extends Api implements PermissionNameProvider , Closea
SpecProvider specProvider = readSpec(endPoint, Collections.singletonList(m));
apis.add(new AnnotatedApi(specProvider, endPoint, Collections.singletonMap("", cmd), null));
}
- if (apis.isEmpty()) {
+ if (required && apis.isEmpty()) {
throw new RuntimeException("Invalid Class : " + klas.getName() + " No @EndPoints");
}
diff --git a/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java b/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java
index c6bfca6..ec3e4e3 100644
--- a/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java
+++ b/solr/core/src/java/org/apache/solr/api/CustomContainerPlugins.java
@@ -180,9 +180,10 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
Object instance = newApiInfo.getInstance();
if (instance instanceof ClusterSingleton) {
ClusterSingleton singleton = (ClusterSingleton) instance;
- coreContainer.getClusterSingletons().put(singleton.getName(), singleton);
+ coreContainer.getClusterSingletons().getSingletons().put(singleton.getName(), singleton);
// easy check to see if we should immediately start this singleton
- if (coreContainer.getClusterEventProducer().isRunning()) {
+ if (coreContainer.getClusterEventProducer() != null &&
+ coreContainer.getClusterEventProducer().isRunning()) {
try {
singleton.start();
} catch (Exception exc) {
@@ -206,7 +207,7 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
if (instance instanceof ClusterSingleton) {
ClusterSingleton singleton = (ClusterSingleton) instance;
singleton.stop();
- coreContainer.getClusterSingletons().remove(singleton.getName());
+ coreContainer.getClusterSingletons().getSingletons().remove(singleton.getName());
}
if (instance instanceof ClusterEventListener) {
coreContainer.getClusterEventProducer().unregisterListener((ClusterEventListener) instance);
@@ -322,7 +323,7 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
}
try {
- List<Api> apis = AnnotatedApi.getApis(klas, null);
+ List<Api> apis = AnnotatedApi.getApis(klas, null, false);
for (Object api : apis) {
EndPoint endPoint = ((AnnotatedApi) api).getEndPoint();
if (endPoint.path().length > 1 || endPoint.method().length > 1) {
@@ -374,7 +375,7 @@ public class CustomContainerPlugins implements ClusterPropertiesListener, MapWri
}
}
this.holders = new ArrayList<>();
- for (Api api : AnnotatedApi.getApis(instance)) {
+ for (Api api : AnnotatedApi.getApis(instance.getClass(), instance, false)) {
holders.add(new ApiHolder((AnnotatedApi) api));
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index b8403f4..2465f8a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -29,6 +29,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import org.apache.lucene.util.Version;
@@ -782,34 +784,48 @@ public class Overseer implements SolrCloseable {
* Start {@link ClusterSingleton} plugins when we become the leader.
*/
public void startClusterSingletons() {
- Map<String, ClusterSingleton> singletons = getCoreContainer().getClusterSingletons();
- if (singletons == null) {
- return;
- }
- if (isClosed()) {
- return;
- }
- singletons.forEach((name, singleton) -> {
+ CoreContainer.ClusterSingletons singletons = getCoreContainer().getClusterSingletons();
+ final Runnable initializer = () -> {
+ if (isClosed()) {
+ return;
+ }
try {
- singleton.start();
- if (singleton instanceof ClusterEventListener) {
- getCoreContainer().getClusterEventProducer().registerListener((ClusterEventListener) singleton);
- }
- } catch (Exception e) {
- log.warn("Exception starting ClusterSingleton {}: {}", singleton, e);
+ singletons.waitUntilReady(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.warn("Interrupted initialization of ClusterSingleton-s");
+ return;
+ } catch (TimeoutException te) {
+ log.warn("Timed out during initialization of ClusterSingleton-s");
+ return;
}
- });
+ singletons.getSingletons().forEach((name, singleton) -> {
+ try {
+ singleton.start();
+ if (singleton instanceof ClusterEventListener) {
+ getCoreContainer().getClusterEventProducer().registerListener((ClusterEventListener) singleton);
+ }
+ } catch (Exception e) {
+ log.warn("Exception starting ClusterSingleton {}: {}", singleton, e);
+ }
+ });
+ };
+ if (singletons.isReady()) {
+ // wait until all singleton-s are ready for the first startup
+ getCoreContainer().runAsync(initializer);
+ } else {
+ initializer.run();
+ }
}
/**
* Stop {@link ClusterSingleton} plugins when we lose leadership.
*/
private void stopClusterSingletons() {
- Map<String, ClusterSingleton> singletons = getCoreContainer().getClusterSingletons();
+ CoreContainer.ClusterSingletons singletons = getCoreContainer().getClusterSingletons();
if (singletons == null) {
return;
}
- singletons.forEach((name, singleton) -> {
+ singletons.getSingletons().forEach((name, singleton) -> {
if (singleton instanceof ClusterEventListener) {
getCoreContainer().getClusterEventProducer().unregisterListener((ClusterEventListener) singleton);
}
@@ -856,10 +872,13 @@ public class Overseer implements SolrCloseable {
if (this.id != null) {
log.info("Overseer (id={}) closing", id);
}
+ // stop singletons only on the leader
+ if (!this.closed) {
+ stopClusterSingletons();
+ }
this.closed = true;
doClose();
- stopClusterSingletons();
assert ObjectReleaseTracker.release(this);
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java
index 1929f0c..2dc7a32 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java
@@ -16,12 +16,15 @@
*/
package org.apache.solr.cluster.events;
+import org.apache.solr.common.MapWriter;
+
+import java.io.IOException;
import java.time.Instant;
/**
* Cluster-level event.
*/
-public interface ClusterEvent {
+public interface ClusterEvent extends MapWriter {
enum EventType {
/** One or more nodes went down. */
@@ -46,4 +49,9 @@ public interface ClusterEvent {
/** Get event timestamp. This is the instant when the event was generated (not necessarily when
* the underlying condition first occurred). */
Instant getTimestamp();
+
+ default void writeMap(EntryWriter ew) throws IOException {
+ ew.put("type", getType());
+ ew.put("timestamp", getTimestamp().toEpochMilli());
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterPropertiesChangedEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterPropertiesChangedEvent.java
index ee513d8..ad9c0b8 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/ClusterPropertiesChangedEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterPropertiesChangedEvent.java
@@ -16,6 +16,7 @@
*/
package org.apache.solr.cluster.events;
+import java.io.IOException;
import java.util.Map;
/**
@@ -30,4 +31,9 @@ public interface ClusterPropertiesChangedEvent extends ClusterEvent {
Map<String, Object> getNewClusterProperties();
+ @Override
+ default void writeMap(EntryWriter ew) throws IOException {
+ ClusterEvent.super.writeMap(ew);
+ ew.put("newClusterProperties", getNewClusterProperties());
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/CollectionsAddedEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsAddedEvent.java
index 0b1c46b..78046f8 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/CollectionsAddedEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsAddedEvent.java
@@ -16,6 +16,7 @@
*/
package org.apache.solr.cluster.events;
+import java.io.IOException;
import java.util.Iterator;
/**
@@ -29,4 +30,10 @@ public interface CollectionsAddedEvent extends ClusterEvent {
}
Iterator<String> getCollectionNames();
+
+ @Override
+ default void writeMap(EntryWriter ew) throws IOException {
+ ClusterEvent.super.writeMap(ew);
+ ew.put("collectionNames", getCollectionNames());
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/CollectionsRemovedEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsRemovedEvent.java
index e6fc64e..a93be4c 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/CollectionsRemovedEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsRemovedEvent.java
@@ -16,6 +16,7 @@
*/
package org.apache.solr.cluster.events;
+import java.io.IOException;
import java.util.Iterator;
/**
@@ -29,4 +30,10 @@ public interface CollectionsRemovedEvent extends ClusterEvent {
}
Iterator<String> getCollectionNames();
+
+ @Override
+ default void writeMap(EntryWriter ew) throws IOException {
+ ClusterEvent.super.writeMap(ew);
+ ew.put("collectionNames", getCollectionNames());
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/NodesDownEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/NodesDownEvent.java
index a8e7a2e..5001ccb 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/NodesDownEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/NodesDownEvent.java
@@ -16,6 +16,7 @@
*/
package org.apache.solr.cluster.events;
+import java.io.IOException;
import java.util.Iterator;
/**
@@ -29,4 +30,10 @@ public interface NodesDownEvent extends ClusterEvent {
}
Iterator<String> getNodeNames();
+
+ @Override
+ default void writeMap(EntryWriter ew) throws IOException {
+ ClusterEvent.super.writeMap(ew);
+ ew.put("nodeNames", getNodeNames());
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/NodesUpEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/NodesUpEvent.java
index f83bf91..fa08f85 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/NodesUpEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/NodesUpEvent.java
@@ -16,6 +16,7 @@
*/
package org.apache.solr.cluster.events;
+import java.io.IOException;
import java.util.Iterator;
/**
@@ -29,4 +30,10 @@ public interface NodesUpEvent extends ClusterEvent {
}
Iterator<String> getNodeNames();
+
+ @Override
+ default void writeMap(EntryWriter ew) throws IOException {
+ ClusterEvent.super.writeMap(ew);
+ ew.put("nodeNames", getNodeNames());
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ReplicasDownEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/ReplicasDownEvent.java
index 69ec48c..1d3ce9b 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/ReplicasDownEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ReplicasDownEvent.java
@@ -18,6 +18,7 @@ package org.apache.solr.cluster.events;
import org.apache.solr.common.cloud.Replica;
+import java.io.IOException;
import java.util.Iterator;
/**
@@ -31,4 +32,10 @@ public interface ReplicasDownEvent extends ClusterEvent {
}
Iterator<Replica> getReplicas();
+
+ @Override
+ default void writeMap(EntryWriter ew) throws IOException {
+ ClusterEvent.super.writeMap(ew);
+ ew.put("replicas", getReplicas());
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java
index 5886d67..034fa8a 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java
@@ -16,16 +16,12 @@
*/
package org.apache.solr.cluster.events.impl;
-import java.io.Closeable;
-import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.time.Instant;
import java.util.Arrays;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
@@ -53,7 +49,7 @@ import org.slf4j.LoggerFactory;
* (not in parallel) and in arbitrary order. This means that if any listener blocks the
* processing other listeners may be invoked much later or not at all.</p>
*/
-public class ClusterEventProducerImpl implements ClusterEventProducer, ClusterSingleton, Closeable {
+public class ClusterEventProducerImpl implements ClusterEventProducer, ClusterSingleton {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Map<ClusterEvent.EventType, Set<ClusterEventListener>> listeners = new HashMap<>();
@@ -62,7 +58,7 @@ public class ClusterEventProducerImpl implements ClusterEventProducer, ClusterSi
private CloudCollectionsListener cloudCollectionsListener;
private ClusterPropertiesListener clusterPropertiesListener;
private ZkController zkController;
- private boolean running;
+ private volatile boolean running;
private final Set<ClusterEvent.EventType> supportedEvents =
new HashSet<>(Arrays.asList(
@@ -73,8 +69,6 @@ public class ClusterEventProducerImpl implements ClusterEventProducer, ClusterSi
ClusterEvent.EventType.CLUSTER_PROPERTIES_CHANGED
));
- private volatile boolean isClosed = false;
-
public ClusterEventProducerImpl(CoreContainer coreContainer) {
this.coreContainer = coreContainer;
}
@@ -96,7 +90,7 @@ public class ClusterEventProducerImpl implements ClusterEventProducer, ClusterSi
// register liveNodesListener
liveNodesListener = (oldNodes, newNodes) -> {
// already closed but still registered
- if (isClosed) {
+ if (!running) {
// remove the listener
return true;
}
@@ -196,6 +190,8 @@ public class ClusterEventProducerImpl implements ClusterEventProducer, ClusterSi
// XXX register collection state listener?
// XXX not sure how to efficiently monitor for REPLICA_DOWN events
+
+ running = true;
}
@Override
@@ -239,13 +235,6 @@ public class ClusterEventProducerImpl implements ClusterEventProducer, ClusterSi
}
@Override
- public void close() throws IOException {
- stop();
- isClosed = true;
- listeners.clear();
- }
-
- @Override
public Map<ClusterEvent.EventType, Set<ClusterEventListener>> getEventListeners() {
return listeners;
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
index 88df58f..42dcde3 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
@@ -19,7 +19,6 @@ package org.apache.solr.cluster.events.impl;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -60,7 +59,7 @@ public class CollectionsRepairEventListener implements ClusterEventListener {
private final SolrClient solrClient;
private final SolrCloudManager solrCloudManager;
- private boolean running = false;
+ private volatile boolean running = false;
public CollectionsRepairEventListener(CoreContainer cc) {
this.solrClient = cc.getSolrClientCache().getCloudSolrClient(cc.getZkController().getZkClient().getZkServerAddress());
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index f8ec18a..1427dad 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -39,9 +39,12 @@ import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import com.google.common.annotations.VisibleForTesting;
@@ -69,9 +72,10 @@ import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.util.SolrIdentifierValidator;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ClusterSingleton;
-import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.OverseerTaskQueue;
import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cluster.events.ClusterEvent;
+import org.apache.solr.cluster.events.ClusterEventListener;
import org.apache.solr.cluster.events.ClusterEventProducer;
import org.apache.solr.cluster.events.impl.ClusterEventProducerImpl;
import org.apache.solr.common.AlreadyClosedException;
@@ -172,6 +176,72 @@ public class CoreContainer {
}
}
+ public static class ClusterSingletons {
+ private Map<String, ClusterSingleton> singletonMap = new ConcurrentHashMap<>();
+ // we use this latch to delay the initial startup of singletons, due to
+ // the leader election occurring in parallel with the rest of the load() method.
+ private CountDownLatch readyLatch = new CountDownLatch(1);
+
+ public Map<String, ClusterSingleton> getSingletons() {
+ return singletonMap;
+ }
+
+ public boolean isReady() {
+ return readyLatch.getCount() > 0;
+ }
+
+ public void setReady() {
+ readyLatch.countDown();
+ }
+
+ public void waitUntilReady(long timeout, TimeUnit timeUnit)
+ throws InterruptedException, TimeoutException {
+ boolean await = readyLatch.await(timeout, timeUnit);
+ if (!await) {
+ throw new TimeoutException("Timed out waiting for ClusterSingletons to become ready.");
+ }
+ }
+ }
+
+ /**
+ * This class helps in handling the initial registration of plugin-based listeners,
+ * when both the final {@link ClusterEventProducer} implementation and listeners
+ * are configured using plugins.
+ */
+ public static class InitialClusterEventProducer implements ClusterEventProducer {
+ Map<ClusterEvent.EventType, Set<ClusterEventListener>> initialListeners = new HashMap<>();
+
+ @Override
+ public Map<ClusterEvent.EventType, Set<ClusterEventListener>> getEventListeners() {
+ return initialListeners;
+ }
+
+ public void transferListeners(ClusterEventProducer target) {
+ initialListeners.forEach((type, listeners) -> {
+ listeners.forEach(listener -> {
+ try {
+ target.registerListener(listener, type);
+ } catch (Exception e) {
+ log.warn("Unable to register event listener for type {}: {}", type, e);
+ }
+ });
+ });
+ }
+
+ @Override
+ public void start() throws Exception {
+ }
+
+ @Override
+ public boolean isRunning() {
+ return false;
+ }
+
+ @Override
+ public void stop() {
+ }
+ }
+
private volatile PluginBag<SolrRequestHandler> containerHandlers = new PluginBag<>(SolrRequestHandler.class, null);
/**
@@ -247,12 +317,11 @@ public class CoreContainer {
private volatile SolrClientCache solrClientCache;
- private volatile ClusterEventProducer clusterEventProducer;
+ private volatile ClusterEventProducer clusterEventProducer = new InitialClusterEventProducer();
private final ObjectCache objectCache = new ObjectCache();
- private final Map<String, ClusterSingleton> clusterSingletons = new ConcurrentHashMap<>();
-
+ private final ClusterSingletons clusterSingletons = new ClusterSingletons();
private PackageStoreAPI packageStoreAPI;
private PackageLoader packageLoader;
@@ -903,30 +972,28 @@ public class CoreContainer {
// init ClusterSingleton-s
- // register handlers that are also ClusterSingleton
+ // register the handlers that are also ClusterSingleton
containerHandlers.keySet().forEach(handlerName -> {
SolrRequestHandler handler = containerHandlers.get(handlerName);
if (handler instanceof ClusterSingleton) {
- clusterSingletons.put(handlerName, (ClusterSingleton) handler);
+ clusterSingletons.singletonMap.put(handlerName, (ClusterSingleton) handler);
}
});
// create the ClusterEventProducer
+ InitialClusterEventProducer initialClusterEventProducer = (InitialClusterEventProducer) clusterEventProducer;
CustomContainerPlugins.ApiInfo clusterEventProducerInfo = customContainerPlugins.getPlugin(ClusterEventProducer.PLUGIN_NAME);
if (clusterEventProducerInfo != null) {
clusterEventProducer = (ClusterEventProducer) clusterEventProducerInfo.getInstance();
} else {
clusterEventProducer = new ClusterEventProducerImpl(this);
- clusterSingletons.put(ClusterEventProducer.PLUGIN_NAME, clusterEventProducer);
+ clusterSingletons.singletonMap.put(ClusterEventProducer.PLUGIN_NAME, clusterEventProducer);
}
+ // transfer those listeners that were already registered to the initial impl
+ initialClusterEventProducer.transferListeners(clusterEventProducer);
+ clusterSingletons.setReady();
zkSys.getZkController().checkOverseerDesignate();
- // XXX note that ClusterSingleton components are registered too late -
- // XXX the Overseer leader may be already started
- Overseer overseer = zkSys.getZkController().getOverseer();
- if (!overseer.isClosed()) { // we are the leader
- overseer.startClusterSingletons();
- }
}
// This is a bit redundant but these are two distinct concepts for all they're accomplished at the same time.
status |= LOAD_COMPLETE | INITIAL_CORE_LOAD_COMPLETE;
@@ -2124,7 +2191,7 @@ public class CoreContainer {
return customContainerPlugins;
}
- public Map<String, ClusterSingleton> getClusterSingletons() {
+ public ClusterSingletons getClusterSingletons() {
return clusterSingletons;
}
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ContainerPluginsApi.java b/solr/core/src/java/org/apache/solr/handler/admin/ContainerPluginsApi.java
index 9f2f459..ad95423 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ContainerPluginsApi.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ContainerPluginsApi.java
@@ -51,7 +51,7 @@ import static org.apache.lucene.util.IOUtils.closeWhileHandlingException;
public class ContainerPluginsApi {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- public static final String PLUGINS = "plugins";
+ public static final String PLUGINS = "plugin";
private final Supplier<SolrZkClient> zkClientSupplier;
private final CoreContainer coreContainer;
public final Read readAPI = new Read();
@@ -64,7 +64,7 @@ public class ContainerPluginsApi {
public class Read {
@EndPoint(method = METHOD.GET,
- path = "/cluster/plugins",
+ path = "/cluster/plugin",
permission = PermissionNameProvider.Name.COLL_READ_PERM)
public void list(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException {
rsp.add(PLUGINS, plugins(zkClientSupplier));
@@ -72,7 +72,7 @@ public class ContainerPluginsApi {
}
@EndPoint(method = METHOD.POST,
- path = "/cluster/plugins",
+ path = "/cluster/plugin",
permission = PermissionNameProvider.Name.COLL_EDIT_PERM)
public class Edit {
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
index cf30ca9..f3e73e2 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerTest.java
@@ -17,13 +17,7 @@
package org.apache.solr.cloud;
import static org.apache.solr.cloud.AbstractDistribZkTestBase.verifyReplicaStatus;
-import static org.mockito.Mockito.anyBoolean;
-import static org.mockito.Mockito.anyInt;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
@@ -118,7 +112,6 @@ public class OverseerTest extends SolrTestCaseJ4 {
private static SolrZkClient zkClient;
-
private volatile boolean testDone = false;
private final List<ZkController> zkControllers = Collections.synchronizedList(new ArrayList<>());
@@ -128,7 +121,6 @@ public class OverseerTest extends SolrTestCaseJ4 {
private final List<HttpShardHandlerFactory> httpShardHandlerFactorys = Collections.synchronizedList(new ArrayList<>());
private final List<UpdateShardHandler> updateShardHandlers = Collections.synchronizedList(new ArrayList<>());
private final List<CloudSolrClient> solrClients = Collections.synchronizedList(new ArrayList<>());
-
private static final String COLLECTION = SolrTestCaseJ4.DEFAULT_TEST_COLLECTION_NAME;
public static class MockZKController{
@@ -307,6 +299,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
@Before
public void setUp() throws Exception {
testDone = false;
+
super.setUp();
}
@@ -323,6 +316,7 @@ public class OverseerTest extends SolrTestCaseJ4 {
}
server = null;
+
}
@After
@@ -1429,7 +1423,10 @@ public class OverseerTest extends SolrTestCaseJ4 {
Mockito.withSettings().defaultAnswer(Mockito.CALLS_REAL_METHODS));
when(mockAlwaysUpCoreContainer.isShutDown()).thenReturn(testDone); // Allow retry on session expiry
when(mockAlwaysUpCoreContainer.getResourceLoader()).thenReturn(new SolrResourceLoader());
- FieldSetter.setField(mockAlwaysUpCoreContainer, CoreContainer.class.getDeclaredField("containerSingletons"), Collections.emptyMap());
+ CoreContainer.ClusterSingletons singletons = new CoreContainer.ClusterSingletons();
+ // don't wait for all singletons
+ singletons.setReady();
+ FieldSetter.setField(mockAlwaysUpCoreContainer, CoreContainer.class.getDeclaredField("clusterSingletons"), singletons);
ClusterEventProducerImpl clusterEventProducer = new ClusterEventProducerImpl(mockAlwaysUpCoreContainer);
when(mockAlwaysUpCoreContainer.getClusterEventProducer()).thenReturn(clusterEventProducer);
FieldSetter.setField(zkController, ZkController.class.getDeclaredField("zkClient"), zkClient);
@@ -1437,6 +1434,13 @@ public class OverseerTest extends SolrTestCaseJ4 {
when(zkController.getCoreContainer()).thenReturn(mockAlwaysUpCoreContainer);
when(zkController.getZkClient()).thenReturn(zkClient);
when(zkController.getZkStateReader()).thenReturn(reader);
+ // primitive support for CC.runAsync
+ doAnswer(invocable -> {
+ Runnable r = invocable.getArgument(0);
+ Thread t = new Thread(r);
+ t.start();
+ return null;
+ }).when(mockAlwaysUpCoreContainer).runAsync(any(Runnable.class));
when(zkController.getLeaderProps(anyString(), anyString(), anyInt())).thenCallRealMethod();
when(zkController.getLeaderProps(anyString(), anyString(), anyInt(), anyBoolean())).thenCallRealMethod();
diff --git a/solr/core/src/test/org/apache/solr/cluster/events/AllEventsListener.java b/solr/core/src/test/org/apache/solr/cluster/events/AllEventsListener.java
index 9d03c8d..8da6bea 100644
--- a/solr/core/src/test/org/apache/solr/cluster/events/AllEventsListener.java
+++ b/solr/core/src/test/org/apache/solr/cluster/events/AllEventsListener.java
@@ -20,12 +20,9 @@ package org.apache.solr.cluster.events;
import org.junit.Assert;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
diff --git a/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java b/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java
index 12079b2..ea6c5f5 100644
--- a/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java
+++ b/solr/core/src/test/org/apache/solr/cluster/events/ClusterEventProducerTest.java
@@ -19,39 +19,64 @@ package org.apache.solr.cluster.events;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.V2Request;
+import org.apache.solr.client.solrj.request.beans.PluginMeta;
+import org.apache.solr.client.solrj.response.V2Response;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.cloud.ClusterProperties;
+import org.apache.solr.common.util.Utils;
+import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.lang.invoke.MethodHandles;
+import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.solr.client.solrj.SolrRequest.METHOD.GET;
+import static org.apache.solr.client.solrj.SolrRequest.METHOD.POST;
/**
*
*/
public class ClusterEventProducerTest extends SolrCloudTestCase {
- private static AllEventsListener eventsListener = new AllEventsListener();
+ private AllEventsListener eventsListener;
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(3)
.addConfig("conf", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
.configure();
- cluster.getOpenOverseer().getCoreContainer().getClusterEventProducer().registerListener(eventsListener);
}
@Before
public void setUp() throws Exception {
+ System.setProperty("enable.packages", "true");
super.setUp();
cluster.deleteAllCollections();
+ eventsListener = new AllEventsListener();
+ cluster.getOpenOverseer().getCoreContainer().getClusterEventProducer().registerListener(eventsListener);
+ }
+
+ @After
+ public void teardown() {
+ System.clearProperty("enable.packages");
+ if (eventsListener != null) {
+ cluster.getOpenOverseer().getCoreContainer().getClusterEventProducer().unregisterListener(eventsListener);
+ }
}
@Test
- public void testNodesEvent() throws Exception {
+ public void testEvents() throws Exception {
// NODES_DOWN
@@ -149,4 +174,100 @@ public class ClusterEventProducerTest extends SolrCloudTestCase {
null, propertiesChanged.getNewClusterProperties().get("ext.foo"));
}
+
+ private static CountDownLatch dummyEventLatch = new CountDownLatch(1);
+ private static ClusterEvent lastEvent = null;
+
+ public static class DummyEventListener implements ClusterEventListener {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ boolean running = false;
+ @Override
+ public void onEvent(ClusterEvent event) {
+ if (!running) {
+ log.debug("skipped event, not running: {}", event);
+ return;
+ }
+ if (event.getType() == ClusterEvent.EventType.COLLECTIONS_ADDED ||
+ event.getType() == ClusterEvent.EventType.COLLECTIONS_REMOVED) {
+ log.debug("recorded event {}", Utils.toJSONString(event));
+ lastEvent = event;
+ dummyEventLatch.countDown();
+ } else {
+ log.debug("skipped event, wrong type: {}", event.getType());
+ }
+ }
+
+ @Override
+ public String getName() {
+ return "dummy";
+ }
+
+ @Override
+ public void start() throws Exception {
+ log.debug("starting {}", Integer.toHexString(hashCode()));
+ running = true;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return running;
+ }
+
+ @Override
+ public void stop() {
+ log.debug("stopping {}", Integer.toHexString(hashCode()));
+ running = false;
+ }
+ }
+
+ @Test
+ public void testListenerPlugins() throws Exception {
+ PluginMeta plugin = new PluginMeta();
+ plugin.name = "testplugin";
+ plugin.klass = DummyEventListener.class.getName();
+ V2Request req = new V2Request.Builder("/cluster/plugin")
+ .forceV2(true)
+ .withMethod(POST)
+ .withPayload(singletonMap("add", plugin))
+ .build();
+ V2Response rsp = req.process(cluster.getSolrClient());
+ //just check if the plugin is indeed registered
+ V2Request readPluginState = new V2Request.Builder("/cluster/plugin")
+ .forceV2(true)
+ .withMethod(GET)
+ .build();
+ rsp = readPluginState.process(cluster.getSolrClient());
+ assertEquals(DummyEventListener.class.getName(), rsp._getStr("/plugin/testplugin/class", null));
+
+ String collection = "testListenerPlugins_collection";
+ CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collection, "conf", 1, 1);
+ cluster.getSolrClient().request(create);
+ cluster.waitForActiveCollection(collection, 1, 1);
+ boolean await = dummyEventLatch.await(30, TimeUnit.SECONDS);
+ if (!await) {
+ fail("Timed out waiting for COLLECTIONS_ADDED event, " + collection);
+ }
+ assertNotNull("lastEvent should be COLLECTIONS_ADDED", lastEvent);
+ assertEquals("lastEvent should be COLLECTIONS_ADDED", ClusterEvent.EventType.COLLECTIONS_ADDED, lastEvent.getType());
+ // verify timestamp
+ Instant now = Instant.now();
+ assertTrue("timestamp of the event is in the future", now.isAfter(lastEvent.getTimestamp()));
+ assertEquals(collection, ((CollectionsAddedEvent)lastEvent).getCollectionNames().next());
+
+ dummyEventLatch = new CountDownLatch(1);
+ lastEvent = null;
+
+ CollectionAdminRequest.Delete delete = CollectionAdminRequest.deleteCollection(collection);
+ cluster.getSolrClient().request(delete);
+ await = dummyEventLatch.await(30, TimeUnit.SECONDS);
+ if (!await) {
+ fail("Timed out waiting for COLLECTIONS_REMOVED event, " + collection);
+ }
+ assertNotNull("lastEvent should be COLLECTIONS_REMOVED", lastEvent);
+ assertEquals("lastEvent should be COLLECTIONS_REMOVED", ClusterEvent.EventType.COLLECTIONS_REMOVED, lastEvent.getType());
+ // verify timestamp
+ now = Instant.now();
+ assertTrue("timestamp of the event is in the future", now.isAfter(lastEvent.getTimestamp()));
+ assertEquals(collection, ((CollectionsRemovedEvent)lastEvent).getCollectionNames().next());
+ }
}
diff --git a/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java b/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
index 4c37c17..bd9bf7c 100644
--- a/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
+++ b/solr/core/src/test/org/apache/solr/handler/TestContainerPlugin.java
@@ -92,8 +92,8 @@ public class TestContainerPlugin extends SolrCloudTestCase {
expectError(req, cluster.getSolrClient(), errPath, "No method with @Command in class");
//test with an invalid class
- plugin.klass = C1.class.getName();
- expectError(req, cluster.getSolrClient(), errPath, "No @EndPoints");
+// plugin.klass = C1.class.getName();
+// expectError(req, cluster.getSolrClient(), errPath, "No @EndPoints");
//test with a valid class. This should succeed now
plugin.klass = C3.class.getName();