You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2020/09/14 15:46:53 UTC
[lucene-solr] 02/02: SOLR-14749: Add more types of cluster events.
Improve example implementation.
This is an automated email from the ASF dual-hosted git repository.
ab pushed a commit to branch jira/solr-14749
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 6087e0218fb6b2566028fb71e1c89b89ceb8b4f1
Author: Andrzej Bialecki <ab...@apache.org>
AuthorDate: Mon Sep 14 17:45:35 2020 +0200
SOLR-14749: Add more types of cluster events. Improve example implementation.
---
.../src/java/org/apache/solr/cloud/Overseer.java | 4 +-
...wnEvent.java => ClusterConfigChangedEvent.java} | 14 +-
.../apache/solr/cluster/events/ClusterEvent.java | 9 +-
...deDownEvent.java => CollectionsAddedEvent.java} | 8 +-
...DownEvent.java => CollectionsRemovedEvent.java} | 8 +-
.../{NodeUpEvent.java => NodesDownEvent.java} | 8 +-
.../{NodeDownEvent.java => NodesUpEvent.java} | 8 +-
.../{NodeDownEvent.java => ReplicasDownEvent.java} | 10 +-
.../events/impl/ClusterEventProducerImpl.java | 155 ++++++++++++++++-----
...er.java => CollectionsRepairEventListener.java} | 24 ++--
.../java/org/apache/solr/core/CoreContainer.java | 10 +-
11 files changed, 180 insertions(+), 78 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 3b80371..733ddaa 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -782,7 +782,7 @@ public class Overseer implements SolrCloseable {
* Start {@link ClusterSingleton} plugins when we become the leader.
*/
private void startClusterSingletons() {
- getCoreContainer().getContainerSingletons().forEach((name, singleton) -> {
+ getCoreContainer().getClusterSingletons().forEach((name, singleton) -> {
try {
singleton.start();
if (singleton instanceof ClusterEventListener) {
@@ -798,7 +798,7 @@ public class Overseer implements SolrCloseable {
* Stop {@link ClusterSingleton} plugins when we lose leadership.
*/
private void stopClusterSingletons() {
- getCoreContainer().getContainerSingletons().forEach((name, singleton) -> {
+ getCoreContainer().getClusterSingletons().forEach((name, singleton) -> {
if (singleton instanceof ClusterEventListener) {
getCoreContainer().getClusterEventProducer().unregisterListener((ClusterEventListener) singleton);
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ReplicaDownEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterConfigChangedEvent.java
similarity index 80%
rename from solr/core/src/java/org/apache/solr/cluster/events/ReplicaDownEvent.java
rename to solr/core/src/java/org/apache/solr/cluster/events/ClusterConfigChangedEvent.java
index bce20b4..e536468 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/ReplicaDownEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterConfigChangedEvent.java
@@ -16,22 +16,20 @@
*/
package org.apache.solr.cluster.events;
+import java.util.Map;
+
/**
*
*/
-public interface ReplicaDownEvent extends ClusterEvent {
+public interface ClusterConfigChangedEvent extends ClusterEvent {
@Override
default EventType getType() {
- return EventType.REPLICA_DOWN;
+ return EventType.CLUSTER_CONFIG_CHANGED;
}
- String getNodeName();
-
- String getCollectionName();
+ Map<String, Object> getOldClusterConfig();
- String getShardName();
+ Map<String, Object> getNewClusterConfig();
- // so called coreNodeName
- String getReplicaName();
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java
index ee8d955..2853f74 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ClusterEvent.java
@@ -24,9 +24,12 @@ import java.time.Instant;
public interface ClusterEvent {
enum EventType {
- NODE_DOWN,
- NODE_UP,
- REPLICA_DOWN,
+ NODES_DOWN,
+ NODES_UP,
+ COLLECTIONS_ADDED,
+ COLLECTIONS_REMOVED,
+ REPLICAS_DOWN,
+ CLUSTER_CONFIG_CHANGED,
// other types? eg. Overseer leader change, shard leader change,
// node overload (eg. CPU / MEM circuit breakers tripped)?
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/NodeDownEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsAddedEvent.java
similarity index 83%
copy from solr/core/src/java/org/apache/solr/cluster/events/NodeDownEvent.java
copy to solr/core/src/java/org/apache/solr/cluster/events/CollectionsAddedEvent.java
index 47e6d84..f4ba87e 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/NodeDownEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsAddedEvent.java
@@ -16,15 +16,17 @@
*/
package org.apache.solr.cluster.events;
+import java.util.Collection;
+
/**
*
*/
-public interface NodeDownEvent extends ClusterEvent {
+public interface CollectionsAddedEvent extends ClusterEvent {
@Override
default EventType getType() {
- return EventType.NODE_DOWN;
+ return EventType.COLLECTIONS_ADDED;
}
- String getNodeName();
+ Collection<String> getCollectionNames();
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/NodeDownEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsRemovedEvent.java
similarity index 83%
copy from solr/core/src/java/org/apache/solr/cluster/events/NodeDownEvent.java
copy to solr/core/src/java/org/apache/solr/cluster/events/CollectionsRemovedEvent.java
index 47e6d84..e6a9175 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/NodeDownEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/CollectionsRemovedEvent.java
@@ -16,15 +16,17 @@
*/
package org.apache.solr.cluster.events;
+import java.util.Collection;
+
/**
*
*/
-public interface NodeDownEvent extends ClusterEvent {
+public interface CollectionsRemovedEvent extends ClusterEvent {
@Override
default EventType getType() {
- return EventType.NODE_DOWN;
+ return EventType.COLLECTIONS_REMOVED;
}
- String getNodeName();
+ Collection<String> getCollectionNames();
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/NodeUpEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/NodesDownEvent.java
similarity index 85%
rename from solr/core/src/java/org/apache/solr/cluster/events/NodeUpEvent.java
rename to solr/core/src/java/org/apache/solr/cluster/events/NodesDownEvent.java
index 594473d..06141e3 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/NodeUpEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/NodesDownEvent.java
@@ -16,15 +16,17 @@
*/
package org.apache.solr.cluster.events;
+import java.util.Collection;
+
/**
*
*/
-public interface NodeUpEvent extends ClusterEvent {
+public interface NodesDownEvent extends ClusterEvent {
@Override
default EventType getType() {
- return EventType.NODE_UP;
+ return EventType.NODES_DOWN;
}
- String getNodeName();
+ Collection<String> getNodeNames();
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/NodeDownEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/NodesUpEvent.java
similarity index 85%
copy from solr/core/src/java/org/apache/solr/cluster/events/NodeDownEvent.java
copy to solr/core/src/java/org/apache/solr/cluster/events/NodesUpEvent.java
index 47e6d84..241b8bf 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/NodeDownEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/NodesUpEvent.java
@@ -16,15 +16,17 @@
*/
package org.apache.solr.cluster.events;
+import java.util.Collection;
+
/**
*
*/
-public interface NodeDownEvent extends ClusterEvent {
+public interface NodesUpEvent extends ClusterEvent {
@Override
default EventType getType() {
- return EventType.NODE_DOWN;
+ return EventType.NODES_UP;
}
- String getNodeName();
+ Collection<String> getNodeNames();
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/NodeDownEvent.java b/solr/core/src/java/org/apache/solr/cluster/events/ReplicasDownEvent.java
similarity index 81%
rename from solr/core/src/java/org/apache/solr/cluster/events/NodeDownEvent.java
rename to solr/core/src/java/org/apache/solr/cluster/events/ReplicasDownEvent.java
index 47e6d84..e4dc106 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/NodeDownEvent.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/ReplicasDownEvent.java
@@ -16,15 +16,19 @@
*/
package org.apache.solr.cluster.events;
+import org.apache.solr.common.cloud.Replica;
+
+import java.util.Collection;
+
/**
*
*/
-public interface NodeDownEvent extends ClusterEvent {
+public interface ReplicasDownEvent extends ClusterEvent {
@Override
default EventType getType() {
- return EventType.NODE_DOWN;
+ return EventType.REPLICAS_DOWN;
}
- String getNodeName();
+ Collection<Replica> getReplicas();
}
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java
index f0ea7f1..3186b18 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/ClusterEventProducerImpl.java
@@ -20,18 +20,25 @@ import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.solr.cloud.ZkController;
+import org.apache.solr.cluster.events.ClusterConfigChangedEvent;
import org.apache.solr.cluster.events.ClusterEvent;
import org.apache.solr.cluster.events.ClusterEventListener;
import org.apache.solr.cluster.events.ClusterEventProducer;
import org.apache.solr.cloud.ClusterSingleton;
-import org.apache.solr.cluster.events.NodeDownEvent;
-import org.apache.solr.cluster.events.NodeUpEvent;
+import org.apache.solr.cluster.events.CollectionsAddedEvent;
+import org.apache.solr.cluster.events.CollectionsRemovedEvent;
+import org.apache.solr.cluster.events.NodesDownEvent;
+import org.apache.solr.cluster.events.NodesUpEvent;
+import org.apache.solr.common.cloud.CloudCollectionsListener;
+import org.apache.solr.common.cloud.ClusterPropertiesListener;
import org.apache.solr.common.cloud.LiveNodesListener;
import org.apache.solr.core.CoreContainer;
import org.slf4j.Logger;
@@ -50,14 +57,20 @@ public class ClusterEventProducerImpl implements ClusterEventProducer, ClusterSi
private final Map<ClusterEvent.EventType, Set<ClusterEventListener>> listeners = new HashMap<>();
private final CoreContainer cc;
private LiveNodesListener liveNodesListener;
+ private CloudCollectionsListener cloudCollectionsListener;
+ private ClusterPropertiesListener clusterPropertiesListener;
+ private Map<String, Object> lastClusterProperties;
private ZkController zkController;
private boolean running;
private final Set<ClusterEvent.EventType> supportedEvents =
- new HashSet<>() {{
- add(ClusterEvent.EventType.NODE_DOWN);
- add(ClusterEvent.EventType.NODE_UP);
- }};
+ new HashSet<>(Arrays.asList(
+ ClusterEvent.EventType.NODES_DOWN,
+ ClusterEvent.EventType.NODES_UP,
+ ClusterEvent.EventType.COLLECTIONS_ADDED,
+ ClusterEvent.EventType.COLLECTIONS_REMOVED,
+ ClusterEvent.EventType.CLUSTER_CONFIG_CHANGED
+ ));
private volatile boolean isClosed = false;
@@ -71,6 +84,8 @@ public class ClusterEventProducerImpl implements ClusterEventProducer, ClusterSi
this.zkController = this.cc.getZkController();
if (zkController == null) {
liveNodesListener = null;
+ cloudCollectionsListener = null;
+ clusterPropertiesListener = null;
return;
}
@@ -88,39 +103,103 @@ public class ClusterEventProducerImpl implements ClusterEventProducer, ClusterSi
if (oldNodes.equals(newNodes)) {
return false;
}
- oldNodes.forEach(oldNode -> {
- if (!newNodes.contains(oldNode)) {
- fireEvent(new NodeDownEvent() {
- final Instant timestamp = Instant.now();
- @Override
- public Instant getTimestamp() {
- return timestamp;
- }
-
- @Override
- public String getNodeName() {
- return oldNode;
- }
- });
+ final Instant now = Instant.now();
+ final Set<String> downNodes = new HashSet<>(oldNodes);
+ downNodes.removeAll(newNodes);
+ if (!downNodes.isEmpty()) {
+ fireEvent(new NodesDownEvent() {
+ @Override
+ public Collection<String> getNodeNames() {
+ return downNodes;
+ }
+
+ @Override
+ public Instant getTimestamp() {
+ return now;
+ }
+ });
+ }
+ final Set<String> upNodes = new HashSet<>(newNodes);
+ upNodes.removeAll(oldNodes);
+ if (!upNodes.isEmpty()) {
+ fireEvent(new NodesUpEvent() {
+ @Override
+ public Collection<String> getNodeNames() {
+ return upNodes;
+ }
+
+ @Override
+ public Instant getTimestamp() {
+ return now;
+ }
+ });
+ }
+ return false;
+ };
+
+ cloudCollectionsListener = ((oldCollections, newCollections) -> {
+ if (oldCollections.equals(newCollections)) {
+ return;
+ }
+ final Instant now = Instant.now();
+ final Set<String> removed = new HashSet<>(oldCollections);
+ removed.removeAll(newCollections);
+ if (!removed.isEmpty()) {
+ fireEvent(new CollectionsRemovedEvent() {
+ @Override
+ public Collection<String> getCollectionNames() {
+ return removed;
+ }
+
+ @Override
+ public Instant getTimestamp() {
+ return now;
+ }
+ });
+ }
+ final Set<String> added = new HashSet<>(newCollections);
+ added.removeAll(oldCollections);
+ if (!added.isEmpty()) {
+ fireEvent(new CollectionsAddedEvent() {
+ @Override
+ public Collection<String> getCollectionNames() {
+ return added;
+ }
+
+ @Override
+ public Instant getTimestamp() {
+ return now;
+ }
+ });
+ }
+ });
+ zkController.zkStateReader.registerCloudCollectionsListener(cloudCollectionsListener);
+
+ lastClusterProperties = zkController.zkStateReader.getClusterProperties();
+ clusterPropertiesListener = (newProperties) -> {
+ if (newProperties.equals(lastClusterProperties)) {
+ return false;
+ }
+ fireEvent(new ClusterConfigChangedEvent() {
+ @Override
+ public Map<String, Object> getOldClusterConfig() {
+ return lastClusterProperties;
}
- });
- newNodes.forEach(newNode -> {
- if (!oldNodes.contains(newNode)) {
- fireEvent(new NodeUpEvent() {
- final Instant timestamp = Instant.now();
- @Override
- public Instant getTimestamp() {
- return timestamp;
- }
- @Override
- public String getNodeName() {
- return newNode;
- }
- });
+
+ @Override
+ public Map<String, Object> getNewClusterConfig() {
+ return newProperties;
+ }
+
+ @Override
+ public Instant getTimestamp() {
+ return Instant.now();
}
});
+ lastClusterProperties = newProperties;
return false;
};
+ zkController.zkStateReader.registerClusterPropertiesListener(clusterPropertiesListener);
// XXX register collection state listener?
// XXX not sure how to efficiently monitor for REPLICA_DOWN events
@@ -141,7 +220,15 @@ public class ClusterEventProducerImpl implements ClusterEventProducer, ClusterSi
if (liveNodesListener != null) {
zkController.zkStateReader.removeLiveNodesListener(liveNodesListener);
}
+ if (cloudCollectionsListener != null) {
+ zkController.zkStateReader.removeCloudCollectionsListener(cloudCollectionsListener);
+ }
+ if (clusterPropertiesListener != null) {
+ zkController.zkStateReader.removeClusterPropertiesListener(clusterPropertiesListener);
+ }
liveNodesListener = null;
+ cloudCollectionsListener = null;
+ clusterPropertiesListener = null;
}
private void ensureRunning() {
diff --git a/solr/core/src/java/org/apache/solr/cluster/events/impl/AutoAddReplicasEventListener.java b/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
similarity index 80%
rename from solr/core/src/java/org/apache/solr/cluster/events/impl/AutoAddReplicasEventListener.java
rename to solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
index 2fa6475..fc2ac25 100644
--- a/solr/core/src/java/org/apache/solr/cluster/events/impl/AutoAddReplicasEventListener.java
+++ b/solr/core/src/java/org/apache/solr/cluster/events/impl/CollectionsRepairEventListener.java
@@ -25,6 +25,8 @@ import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.cluster.events.ClusterEvent;
import org.apache.solr.cluster.events.ClusterEventListener;
import org.apache.solr.cloud.ClusterSingleton;
+import org.apache.solr.cluster.events.NodesDownEvent;
+import org.apache.solr.cluster.events.ReplicasDownEvent;
import org.apache.solr.core.CoreContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,13 +36,13 @@ import org.slf4j.LoggerFactory;
* This is an (incomplete) illustration how to re-implement the combination of 8x
* NodeLostTrigger and AutoAddReplicasPlanAction to maintain the collection's replication factor.
*/
-public class AutoAddReplicasEventListener implements ClusterSingleton, ClusterEventListener {
+public class CollectionsRepairEventListener implements ClusterSingleton, ClusterEventListener {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final Set<ClusterEvent.EventType> EVENT_TYPES = new HashSet<>(
Arrays.asList(
- ClusterEvent.EventType.NODE_DOWN,
- ClusterEvent.EventType.REPLICA_DOWN
+ ClusterEvent.EventType.NODES_DOWN,
+ ClusterEvent.EventType.REPLICAS_DOWN
));
private final CoreContainer cc;
@@ -48,7 +50,7 @@ public class AutoAddReplicasEventListener implements ClusterSingleton, ClusterEv
private boolean running = false;
- public AutoAddReplicasEventListener(CoreContainer cc) {
+ public CollectionsRepairEventListener(CoreContainer cc) {
this.cc = cc;
this.solrClientCache = cc.getSolrClientCache();
}
@@ -65,25 +67,25 @@ public class AutoAddReplicasEventListener implements ClusterSingleton, ClusterEv
return;
}
switch (event.getType()) {
- case NODE_DOWN:
- handleNodeDown(event);
+ case NODES_DOWN:
+ handleNodesDown((NodesDownEvent) event);
break;
- case NODE_UP:
+ case NODES_UP:
// ignore? rebalance replicas?
break;
- case REPLICA_DOWN:
- handleReplicaDown(event);
+ case REPLICAS_DOWN:
+ handleReplicasDown((ReplicasDownEvent) event);
break;
default:
log.warn("Unsupported event {}, ignoring...", event);
}
}
- private void handleNodeDown(ClusterEvent event) {
+ private void handleNodesDown(NodesDownEvent event) {
// send MOVEREPLICA admin requests for all replicas from that node
}
- private void handleReplicaDown(ClusterEvent event) {
+ private void handleReplicasDown(ReplicasDownEvent event) {
// send ADDREPLICA admin request
}
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index ba6cb6c..54c113d 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -177,7 +177,7 @@ public class CoreContainer {
public final Supplier<SolrZkClient> zkClientSupplier = () -> getZkController().getZkClient();
private final CustomContainerPlugins customContainerPlugins = new CustomContainerPlugins(this, containerHandlers.getApiBag());
- private final Map<String, ClusterSingleton> containerSingletons = new HashMap<>();
+ private final Map<String, ClusterSingleton> clusterSingletons = new ConcurrentHashMap<>();
protected final Map<String, CoreLoadFailure> coreInitFailures = new ConcurrentHashMap<>();
@@ -899,11 +899,11 @@ public class CoreContainer {
containerHandlers.keySet().forEach(handlerName -> {
SolrRequestHandler handler = containerHandlers.get(handlerName);
if (handler instanceof ClusterSingleton) {
- containerSingletons.put(handlerName, (ClusterSingleton) handler);
+ clusterSingletons.put(handlerName, (ClusterSingleton) handler);
}
});
// our default clusterEventProducer is also a ClusterSingleton
- containerSingletons.put("clusterEventProducer", (ClusterSingleton) clusterEventProducer);
+ clusterSingletons.put("clusterEventProducer", (ClusterSingleton) clusterEventProducer);
zkSys.getZkController().checkOverseerDesignate();
}
// This is a bit redundant but these are two distinct concepts for all they're accomplished at the same time.
@@ -2107,8 +2107,8 @@ public class CoreContainer {
return customContainerPlugins;
}
- public Map<String, ClusterSingleton> getContainerSingletons() {
- return Collections.unmodifiableMap(containerSingletons);
+ public Map<String, ClusterSingleton> getClusterSingletons() {
+ return clusterSingletons;
}
public ClusterEventProducer getClusterEventProducer() {