You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by er...@apache.org on 2019/01/20 03:20:46 UTC
[lucene-solr] branch master updated: SOLR-13091: REBALANCELEADERS
is broken
This is an automated email from the ASF dual-hosted git repository.
erick pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/master by this push:
new a692d05 SOLR-13091: REBALANCELEADERS is broken
a692d05 is described below
commit a692d05a909e1ce385c56c087cb62911c27b5f5b
Author: Erick Erickson <Er...@gmail.com>
AuthorDate: Sat Jan 19 19:20:39 2019 -0800
SOLR-13091: REBALANCELEADERS is broken
---
solr/CHANGES.txt | 2 +
.../apache/solr/cloud/ExclusiveSliceProperty.java | 20 +
.../solr/handler/admin/RebalanceLeaders.java | 381 ++++++++---
.../apache/solr/cloud/TestRebalanceLeaders.java | 738 ++++++++++++++-------
solr/solr-ref-guide/src/collections-api.adoc | 9 +
5 files changed, 820 insertions(+), 330 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 65604c2..5f770a6 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -284,6 +284,8 @@ Bug Fixes
* SOLR-13137: NPE when /admin/zookeeper/status endpoint hit in standalone mode (janhoy)
+* SOLR-13091: REBALANCELEADERS is broken (Erick Erickson)
+
Improvements
----------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java b/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java
index 953023f..f5672ba 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java
@@ -17,6 +17,7 @@
package org.apache.solr.cloud;
+import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -39,6 +40,8 @@ import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ONLY_ACTIVE_NODES;
import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SHARD_UNIQUE;
@@ -46,6 +49,7 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.BA
// Class to encapsulate processing replica properties that have at most one replica hosting a property per slice.
class ExclusiveSliceProperty {
+ private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private ClusterState clusterState;
private final boolean onlyActiveNodes;
private final String property;
@@ -235,6 +239,15 @@ class ExclusiveSliceProperty {
adjustLimits(nodesHostingProp.get(nodeName));
removeSliceAlreadyHostedFromPossibles(srToChange.slice.getName());
addProp(srToChange.slice, srToChange.replica.getName());
+ // When you set the property, you must insure that it is _removed_ from any other replicas.
+ for (Replica rep : srToChange.slice.getReplicas()) {
+ if (rep.getName().equals(srToChange.replica.getName())) {
+ continue;
+ }
+ if (rep.getProperty(property) != null) {
+ removeProp(srToChange.slice, srToChange.replica.getName());
+ }
+ }
}
}
@@ -266,10 +279,12 @@ class ExclusiveSliceProperty {
}
private void removeProp(Slice origSlice, String replicaName) {
+ log.debug("Removing property {} from slice {}, replica {}", property, origSlice.getName(), replicaName);
getReplicaFromChanged(origSlice, replicaName).getProperties().remove(property);
}
private void addProp(Slice origSlice, String replicaName) {
+ log.debug("Adding property {} to slice {}, replica {}", property, origSlice.getName(), replicaName);
getReplicaFromChanged(origSlice, replicaName).getProperties().put(property, "true");
}
@@ -342,5 +357,10 @@ class ExclusiveSliceProperty {
this.slice = slice;
this.replica = replica;
}
+ public String toString() {
+ StringBuilder sb = new StringBuilder(System.lineSeparator()).append(System.lineSeparator()).append("******EOE20 starting toString of SliceReplica");
+ sb.append(" :").append(System.lineSeparator()).append("slice: ").append(slice.toString()).append(System.lineSeparator()).append(" replica: ").append(replica.toString()).append(System.lineSeparator());
+ return sb.toString();
+ }
}
}
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java b/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
index f0819bd..522a432 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
@@ -18,10 +18,13 @@ package org.apache.solr.handler.admin;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.solr.cloud.LeaderElector;
@@ -55,13 +58,62 @@ import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
+
+/**
+ * The end point for the collections API REBALANCELEADERS call that actually does the work.
+ * <p>
+ * Overview:
+ * <p>
+ * The leader election process is that each replica of a shard watches one, and only one other replica via
+ * ephemeral nodes in ZooKeeper. When the node being watched goes down, the node watching it is sent a notification
+ * and, if the node being watched is the leader, the node getting the notification assumes leadership.
+ * <p>
+ * ZooKeeper's ephemeral nodes get a monotonically increasing "sequence number" that defines it's position in the queue
+ * <p>
+ * So to force a particular node to become a leader it must have a watch on the leader. This can lead to two nodes
+ * having the same sequence number. Say the process is this
+ * replica1 is the leader (seq 1)
+ * replica3 is on a Solr node that happens to be started next, it watches the leader (seq2)
+ * replica2 is on the next Solr node started. It will _also_ watch the leader, it's sequence number is 2 exactly
+ * like replica3s
+ * <p>
+ * This is true on startup, but can also be a consequence of, say, a replica going into recovery. It's no longer
+ * eligible to become leader, so will be put at the end of the queue by default. So there's code to put it in the
+ * queue with the same sequence number as the current second replica.
+ * <p>
+ * To compilcate matters further, when the nodes are sorted (see OverseerTaskProcessor.getSortedElectionNodes)
+ * the primary sort is on the sequence number, secondary sort on the session ID. So the preferredLeader may
+ * or may not be second in that list.
+ * <p>
+ * what all this means is that when the REBALANCELEADER command is issued, this class examines the election queue and
+ * performs just three things for each shard in the collection:
+ * <p>
+ * 1> insures that the preferredLeader is watching the leader (rejoins the election queue at the head)
+ * <p>
+ * 2> if there are two ephemeral nodes with the same sequence number watching the leader, and if one of them is the
+ * preferredLeader it will send the _other_ node to the end of the queue (rejoins it)
+ * <p>
+ * 3> rejoins the zeroth entry in the list at the end of the queue, which triggers the watch on the preferredLeader
+ * replica which then takes over leadership
+ * <p>
+ * All this of course assuming the preferedLeader is alive and well and is assigned for any given shard.
+ */
+
class RebalanceLeaders {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
+
final SolrQueryRequest req;
final SolrQueryResponse rsp;
final CollectionsHandler collectionsHandler;
final CoreContainer coreContainer;
+ private final Set<String> asyncRequests = new HashSet<>();
+ final static String INACTIVE_PREFERREDS = "inactivePreferreds";
+ final static String ALREADY_LEADERS = "alreadyLeaders";
+ final static String SUMMARY = "Summary";
+ final NamedList<Object> results = new NamedList<>();
+ final Map<String, String> pendingOps = new HashMap<>();
+ private String collectionName;
+
RebalanceLeaders(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler collectionsHandler) {
this.req = req;
@@ -71,38 +123,29 @@ class RebalanceLeaders {
}
void execute() throws KeeperException, InterruptedException {
- req.getParams().required().check(COLLECTION_PROP);
+ DocCollection dc = checkParams();
+
- String collectionName = req.getParams().get(COLLECTION_PROP);
- if (StringUtils.isBlank(collectionName)) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
- String.format(Locale.ROOT, "The " + COLLECTION_PROP + " is required for the Rebalance Leaders command."));
- }
- coreContainer.getZkController().getZkStateReader().forceUpdateCollection(collectionName);
- ClusterState clusterState = coreContainer.getZkController().getClusterState();
- DocCollection dc = clusterState.getCollection(collectionName);
- if (dc == null) {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection '" + collectionName + "' does not exist, no action taken.");
- }
- Map<String, String> currentRequests = new HashMap<>();
int max = req.getParams().getInt(MAX_AT_ONCE_PROP, Integer.MAX_VALUE);
if (max <= 0) max = Integer.MAX_VALUE;
int maxWaitSecs = req.getParams().getInt(MAX_WAIT_SECONDS_PROP, 60);
- NamedList<Object> results = new NamedList<>();
+
+ // If there are a maximum number of simultaneous requests specified, we have to pause when we have that many
+ // outstanding requests and wait for at least one to finish before going on the the next rebalance.
boolean keepGoing = true;
for (Slice slice : dc.getSlices()) {
- ensurePreferredIsLeader(results, slice, currentRequests);
- if (currentRequests.size() == max) {
+ ensurePreferredIsLeader(slice);
+ if (asyncRequests.size() == max) {
log.info("Queued " + max + " leader reassignments, waiting for some to complete.");
- keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, false, results);
+ keepGoing = waitAsyncRequests(maxWaitSecs, false);
if (keepGoing == false) {
break; // If we've waited longer than specified, don't continue to wait!
}
}
}
if (keepGoing == true) {
- keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, true, results);
+ keepGoing = waitAsyncRequests(maxWaitSecs, true);
}
if (keepGoing == true) {
log.info("All leader reassignments completed.");
@@ -110,15 +153,72 @@ class RebalanceLeaders {
log.warn("Exceeded specified timeout of ." + maxWaitSecs + "' all leaders may not have been reassigned");
}
+ checkLeaderStatus();
+ NamedList<Object> summary = new NamedList<>();
+ if (pendingOps.size() == 0) {
+ summary.add("Success", "All active replicas with the preferredLeader property set are leaders");
+ } else {
+ summary.add("Failure", "Not all active replicas with preferredLeader property are leaders");
+ }
+ rsp.getValues().add(SUMMARY, summary); // we want this first.
+
rsp.getValues().addAll(results);
}
- private void ensurePreferredIsLeader(NamedList<Object> results,
- Slice slice, Map<String, String> currentRequests) throws KeeperException, InterruptedException {
- final String inactivePreferreds = "inactivePreferreds";
- final String alreadyLeaders = "alreadyLeaders";
- String collectionName = req.getParams().get(COLLECTION_PROP);
+ // Insure that ll required parameters are there and the doc colection exists.
+ private DocCollection checkParams() throws KeeperException, InterruptedException {
+ req.getParams().required().check(COLLECTION_PROP);
+ collectionName = req.getParams().get(COLLECTION_PROP);
+ if (StringUtils.isBlank(collectionName)) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+ String.format(Locale.ROOT, "The " + COLLECTION_PROP + " is required for the Rebalance Leaders command."));
+ }
+ coreContainer.getZkController().getZkStateReader().forceUpdateCollection(collectionName);
+ ClusterState clusterState = coreContainer.getZkController().getClusterState();
+
+ DocCollection dc = clusterState.getCollection(collectionName);
+ if (dc == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection '" + collectionName + "' does not exist, no action taken.");
+ }
+ return dc;
+ }
+
+ // Once we've done all the fiddling with the queues, check on the way out to see if all the active preferred
+ // leaders that we intended to change are in fact the leaders.
+ private void checkLeaderStatus() throws InterruptedException, KeeperException {
+ for (int idx = 0; pendingOps.size() > 0 && idx < 600; ++idx) {
+ ClusterState clusterState = coreContainer.getZkController().getClusterState();
+ Set<String> liveNodes = clusterState.getLiveNodes();
+ DocCollection dc = clusterState.getCollection(collectionName);
+ for (Slice slice : dc.getSlices()) {
+ for (Replica replica : slice.getReplicas()) {
+ if (replica.isActive(liveNodes) && replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false)) {
+ if (replica.getBool(LEADER_PROP, false)) {
+ if (pendingOps.containsKey(slice.getName())) {
+ // Record for return that the leader changed successfully
+ pendingOps.remove(slice.getName());
+ addToSuccesses(slice, replica);
+ break;
+ }
+ }
+ }
+ }
+ }
+ TimeUnit.MILLISECONDS.sleep(100);
+ coreContainer.getZkController().getZkStateReader().forciblyRefreshAllClusterStateSlow();
+ }
+ addAnyFailures();
+ }
+
+ // The process is:
+ // if the replica with preferredLeader is already the leader, do nothing
+ // Otherwise:
+ // > if two nodes have the same sequence number and both point to the current leader, we presume that we've just
+ // moved it, move the one that does _not_ have the preferredLeader to the end of the list.
+ // > move the current leader to the end of the list. This _should_ mean that the current ephemeral node in the
+ // leader election queue is removed and the only remaining node watching it is triggered to become leader.
+ private void ensurePreferredIsLeader(Slice slice) throws KeeperException, InterruptedException {
for (Replica replica : slice.getReplicas()) {
// Tell the replica to become the leader if we're the preferred leader AND active AND not the leader already
if (replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false) == false) {
@@ -127,87 +227,126 @@ class RebalanceLeaders {
// OK, we are the preferred leader, are we the actual leader?
if (replica.getBool(LEADER_PROP, false)) {
//We're a preferred leader, but we're _also_ the leader, don't need to do anything.
- NamedList<Object> noops = (NamedList<Object>) results.get(alreadyLeaders);
- if (noops == null) {
- noops = new NamedList<>();
- results.add(alreadyLeaders, noops);
- }
- NamedList<Object> res = new NamedList<>();
- res.add("status", "success");
- res.add("msg", "Already leader");
- res.add("shard", slice.getName());
- res.add("nodeName", replica.getNodeName());
- noops.add(replica.getName(), res);
+ addAlreadyLeaderToResults(slice, replica);
return; // already the leader, do nothing.
}
-
+ ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
// We're the preferred leader, but someone else is leader. Only become leader if we're active.
- if (replica.getState() != Replica.State.ACTIVE) {
- NamedList<Object> inactives = (NamedList<Object>) results.get(inactivePreferreds);
- if (inactives == null) {
- inactives = new NamedList<>();
- results.add(inactivePreferreds, inactives);
- }
- NamedList<Object> res = new NamedList<>();
- res.add("status", "skipped");
- res.add("msg", "Node is a referredLeader, but it's inactive. Skipping");
- res.add("shard", slice.getName());
- res.add("nodeName", replica.getNodeName());
- inactives.add(replica.getName(), res);
+ if (replica.isActive(zkStateReader.getClusterState().getLiveNodes()) == false) {
+ addInactiveToResults(slice, replica);
return; // Don't try to become the leader if we're not active!
}
- // Replica is the preferred leader but not the actual leader, do something about that.
- // "Something" is
- // 1> if the preferred leader isn't first in line, tell it to re-queue itself.
- // 2> tell the actual leader to re-queue itself.
-
- ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
-
List<String> electionNodes = OverseerTaskProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
- if (electionNodes.size() < 2) { // if there's only one node in the queue, should already be leader and we shouldn't be here anyway.
- log.info("Rebalancing leaders and slice " + slice.getName() + " has less than two elements in the leader " +
- "election queue, but replica " + replica.getName() + " doesn't think it's the leader.");
+ if (electionQueueInBadState(electionNodes, slice, replica)) {
return;
}
+ // Replica is the preferred leader but not the actual leader, do something about that.
+ // "Something" is
+ // 1> if the preferred leader isn't first in line, tell it to re-queue itself.
+ // 2> tell the actual leader to re-queue itself.
+
// Ok, the sorting for election nodes is a bit strange. If the sequence numbers are the same, then the whole
// string is used, but that sorts nodes with the same sequence number by their session IDs from ZK.
// While this is determinate, it's not quite what we need, so re-queue nodes that aren't us and are
// watching the leader node..
+
String firstWatcher = electionNodes.get(1);
if (LeaderElector.getNodeName(firstWatcher).equals(replica.getName()) == false) {
- makeReplicaFirstWatcher(collectionName, slice, replica);
+ makeReplicaFirstWatcher(slice, replica);
}
- String coreName = slice.getReplica(LeaderElector.getNodeName(electionNodes.get(0))).getStr(CORE_NAME_PROP);
- rejoinElection(collectionName, slice, electionNodes.get(0), coreName, false);
- waitForNodeChange(collectionName, slice, electionNodes.get(0));
-
+ // This replica should be the leader at the end of the day, so let's record that information to check at the end
+ pendingOps.put(slice.getName(), replica.getName());
+ String leaderElectionNode = electionNodes.get(0);
+ String coreName = slice.getReplica(LeaderElector.getNodeName(leaderElectionNode)).getStr(CORE_NAME_PROP);
+ rejoinElectionQueue(slice, leaderElectionNode, coreName, false);
+ waitForNodeChange(slice, leaderElectionNode);
return; // Done with this slice, skip the rest of the replicas.
}
}
+
+ // Check that the election queue has some members! There really should be two or more for this to make any sense,
+ // if there's only one we can't change anything.
+ private boolean electionQueueInBadState(List<String> electionNodes, Slice slice, Replica replica) {
+ if (electionNodes.size() < 2) { // if there's only one node in the queue, should already be leader and we shouldn't be here anyway.
+ log.warn("Rebalancing leaders and slice {} has less than two elements in the leader " +
+ "election queue, but replica {} doesn't think it's the leader.", slice.getName(), replica.getName());
+ return true;
+ }
+
+ return false;
+ }
+
+ // Provide some feedback to the user about what actually happened, or in this case where no action was
+ // possible
+ private void addInactiveToResults(Slice slice, Replica replica) {
+ NamedList<Object> inactives = (NamedList<Object>) results.get(INACTIVE_PREFERREDS);
+ if (inactives == null) {
+ inactives = new NamedList<>();
+ results.add(INACTIVE_PREFERREDS, inactives);
+ }
+ NamedList<Object> res = new NamedList<>();
+ res.add("status", "skipped");
+ res.add("msg", "Replica " + replica.getName() + " is a referredLeader for shard " + slice.getName() + ", but is inactive. No change necessary");
+ inactives.add(replica.getName(), res);
+ }
+
+ // Provide some feedback to the user about what actually happened, or in this case where no action was
+ // necesary since this preferred replica was already the leader
+ private void addAlreadyLeaderToResults(Slice slice, Replica replica) {
+ NamedList<Object> alreadyLeaders = (NamedList<Object>) results.get(ALREADY_LEADERS);
+ if (alreadyLeaders == null) {
+ alreadyLeaders = new NamedList<>();
+ results.add(ALREADY_LEADERS, alreadyLeaders);
+ }
+ NamedList<Object> res = new NamedList<>();
+ res.add("status", "skipped");
+ res.add("msg", "Replica " + replica.getName() + " is already the leader for shard " + slice.getName() + ". No change necessary");
+ alreadyLeaders.add(replica.getName(), res);
+ }
+
// Put the replica in at the head of the queue and send all nodes with the same sequence number to the back of the list
- void makeReplicaFirstWatcher(String collectionName, Slice slice, Replica replica)
+ // There can be "ties", i.e. replicas in the queue with the same sequence number. Sorting doesn't necessarily sort
+ // the one we most care about first. So put the node we _don't care about at the end of the election queuel
+
+ void makeReplicaFirstWatcher(Slice slice, Replica replica)
throws KeeperException, InterruptedException {
ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
List<String> electionNodes = OverseerTaskProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
- // First, queue up the preferred leader at the head of the queue.
+ // First, queue up the preferred leader watching the leader if it isn't already
+ int secondSeq = Integer.MAX_VALUE;
+
+ int candidateSeq = -1;
+ for (int idx = 1; idx < electionNodes.size(); ++idx) {
+ String candidate = electionNodes.get(idx);
+ secondSeq = Math.min(secondSeq, LeaderElector.getSeq(candidate));
+ if (LeaderElector.getNodeName(candidate).equals(replica.getName())) {
+ candidateSeq = LeaderElector.getSeq(candidate);
+ }
+ }
int newSeq = -1;
- for (String electionNode : electionNodes) {
- if (LeaderElector.getNodeName(electionNode).equals(replica.getName())) {
- String coreName = slice.getReplica(LeaderElector.getNodeName(electionNode)).getStr(CORE_NAME_PROP);
- rejoinElection(collectionName, slice, electionNode, coreName, true);
- newSeq = waitForNodeChange(collectionName, slice, electionNode);
- break;
+ if (candidateSeq == secondSeq) {
+ // the preferredLeader is already watching the leader, no need to move it around.
+ newSeq = secondSeq;
+ } else {
+ for (String electionNode : electionNodes) {
+ if (LeaderElector.getNodeName(electionNode).equals(replica.getName())) {
+ // Make the preferred leader watch the leader.
+ String coreName = slice.getReplica(LeaderElector.getNodeName(electionNode)).getStr(CORE_NAME_PROP);
+ rejoinElectionQueue(slice, electionNode, coreName, true);
+ newSeq = waitForNodeChange(slice, electionNode);
+ break;
+ }
}
}
if (newSeq == -1) {
@@ -225,18 +364,22 @@ class RebalanceLeaders {
if (LeaderElector.getNodeName(thisNode).equals(replica.getName())) {
continue;
}
+ // We won't get here for the preferredLeader node
if (LeaderElector.getSeq(thisNode) == newSeq) {
String coreName = slice.getReplica(LeaderElector.getNodeName(thisNode)).getStr(CORE_NAME_PROP);
- rejoinElection(collectionName, slice, thisNode, coreName, false);
- waitForNodeChange(collectionName, slice, thisNode);
+ rejoinElectionQueue(slice, thisNode, coreName, false);
+ waitForNodeChange(slice, thisNode);
}
}
}
- int waitForNodeChange(String collectionName, Slice slice, String electionNode) throws InterruptedException, KeeperException {
+ // We're just waiting for the electionNode to rejoin the queue with a _different_ node, indicating that any
+ // requeueing we've done has happened.
+ int waitForNodeChange(Slice slice, String electionNode) throws InterruptedException, KeeperException {
String nodeName = LeaderElector.getNodeName(electionNode);
int oldSeq = LeaderElector.getSeq(electionNode);
for (int idx = 0; idx < 600; ++idx) {
+
ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
List<String> electionNodes = OverseerTaskProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
@@ -245,14 +388,16 @@ class RebalanceLeaders {
return LeaderElector.getSeq(testNode);
}
}
-
- Thread.sleep(100);
+ TimeUnit.MILLISECONDS.sleep(100);
+ zkStateReader.forciblyRefreshAllClusterStateSlow();
}
return -1;
}
-
- private void rejoinElection(String collectionName, Slice slice, String electionNode, String core,
- boolean rejoinAtHead) throws KeeperException, InterruptedException {
+
+ // Move an election node to some other place in the queue. If rejoinAtHead==false, then at the end, otherwise
+ // the new node should point at the leader.
+ private void rejoinElectionQueue(Slice slice, String electionNode, String core, boolean rejoinAtHead)
+ throws KeeperException, InterruptedException {
Replica replica = slice.getReplica(LeaderElector.getNodeName(electionNode));
Map<String, Object> propMap = new HashMap<>();
propMap.put(COLLECTION_PROP, collectionName);
@@ -265,64 +410,84 @@ class RebalanceLeaders {
propMap.put(ELECTION_NODE_PROP, electionNode);
String asyncId = REBALANCELEADERS.toLower() + "_" + core + "_" + Math.abs(System.nanoTime());
propMap.put(ASYNC, asyncId);
+ asyncRequests.add(asyncId);
+
collectionsHandler.sendToOCPQueue(new ZkNodeProps(propMap)); // ignore response; we construct our own
}
- // currentAsyncIds - map of request IDs and reporting data (value)
// maxWaitSecs - How long are we going to wait? Defaults to 30 seconds.
- // waitForAll - if true, do not return until all assignments have been made.
- // results - a place to stash results for reporting back to the user.
+ // waitForAll - if true, do not return until all requests have been processed. "Processed" could mean failure!
//
- private boolean waitForLeaderChange(Map<String, String> currentAsyncIds, final int maxWaitSecs,
- Boolean waitForAll, NamedList<Object> results)
+
+ private boolean waitAsyncRequests(final int maxWaitSecs, Boolean waitForAll)
throws KeeperException, InterruptedException {
- if (currentAsyncIds.size() == 0) return true;
+ if (asyncRequests.size() == 0) {
+ return true;
+ }
for (int idx = 0; idx < maxWaitSecs * 10; ++idx) {
- Iterator<Map.Entry<String, String>> iter = currentAsyncIds.entrySet().iterator();
+ Iterator<String> iter = asyncRequests.iterator();
boolean foundChange = false;
while (iter.hasNext()) {
- Map.Entry<String, String> pair = iter.next();
- String asyncId = pair.getKey();
+ String asyncId = iter.next();
if (coreContainer.getZkController().getOverseerFailureMap().contains(asyncId)) {
coreContainer.getZkController().getOverseerFailureMap().remove(asyncId);
coreContainer.getZkController().clearAsyncId(asyncId);
- NamedList<Object> fails = (NamedList<Object>) results.get("failures");
- if (fails == null) {
- fails = new NamedList<>();
- results.add("failures", fails);
- }
- NamedList<Object> res = new NamedList<>();
- res.add("status", "failed");
- res.add("msg", "Failed to assign '" + pair.getValue() + "' to be leader");
- fails.add(asyncId.substring(REBALANCELEADERS.toLower().length()), res);
iter.remove();
foundChange = true;
} else if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId)) {
coreContainer.getZkController().getOverseerCompletedMap().remove(asyncId);
coreContainer.getZkController().clearAsyncId(asyncId);
- NamedList<Object> successes = (NamedList<Object>) results.get("successes");
- if (successes == null) {
- successes = new NamedList<>();
- results.add("successes", successes);
- }
- NamedList<Object> res = new NamedList<>();
- res.add("status", "success");
- res.add("msg", "Assigned '" + pair.getValue() + "' to be leader");
- successes.add(asyncId.substring(REBALANCELEADERS.toLower().length()), res);
iter.remove();
foundChange = true;
}
}
// We're done if we're processing a few at a time or all requests are processed.
- if ((foundChange && waitForAll == false) || currentAsyncIds.size() == 0) {
+ // We don't want to change, say, 100s of leaders simultaneously. So if the request specifies some limit,
+ // and we're at that limit, we want to return to the caller so it can immediately add another request.
+ // That's the purpose of the first clause here. Otherwise, of course, just return if all requests are
+ // processed.
+ if ((foundChange && waitForAll == false) || asyncRequests.size() == 0) {
return true;
}
- Thread.sleep(100); //TODO: Is there a better thing to do than sleep here?
+ TimeUnit.MILLISECONDS.sleep(100);
}
+ // If we get here, we've timed out waiting.
return false;
}
+ // If we actually changed the leader, we should send that fact back in the response.
+ private void addToSuccesses(Slice slice, Replica replica) {
+ NamedList<Object> successes = (NamedList<Object>) results.get("successes");
+ if (successes == null) {
+ successes = new NamedList<>();
+ results.add("successes", successes);
+ }
+ log.info("Successfully changed leader of shard {} to replica {}", slice.getName(), replica.getName());
+ NamedList<Object> res = new NamedList<>();
+ res.add("status", "success");
+ res.add("msg", "Successfully changed leader of slice " + slice.getName() + " to " + replica.getName());
+ successes.add(slice.getName(), res);
+ }
+ // If for any reason we were supposed to change leadership, that should be recorded in changingLeaders. Any
+ // time we verified that the change actually occurred, that entry should have been removed. So report anything
+ // left over as a failure.
+ private void addAnyFailures() {
+ if (pendingOps.size() == 0) {
+ return;
+ }
+ NamedList<Object> fails = (NamedList<Object>) new NamedList<>();
+ results.add("failures", fails);
+
+ for (Map.Entry<String, String> ent : pendingOps.entrySet()) {
+ log.info("Failed to change leader of shard {} to replica {}", ent.getKey(), ent.getValue());
+ NamedList<Object> res = new NamedList<>();
+ res.add("status", "failed");
+ res.add("msg", String.format(Locale.ROOT, "Could not change leder for slice %s to %s", ent.getKey(), ent.getValue()));
+ fails.add(ent.getKey(), res);
+
+ }
+ }
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java b/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java
index b47424f..b207fa3 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java
@@ -15,335 +15,629 @@
* limitations under the License.
*/
package org.apache.solr.cloud;
+
import java.io.IOException;
import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.TimeUnit;
+import org.apache.lucene.util.LuceneTestCase;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.TimeSource;
-import org.apache.solr.common.util.Utils;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.KeeperException;
+import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-public class TestRebalanceLeaders extends AbstractFullDistribZkTestBase {
+@LuceneTestCase.Slow
+public class TestRebalanceLeaders extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- public static final String COLLECTION_NAME = "testcollection";
+ private static final String COLLECTION_NAME = "TestColl";
+
+ private static int numNodes;
+ private static int numShards;
+ private static int numReplicas;
+
+ private static boolean useAdminToSetProps = false;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+
+ numNodes = random().nextInt(4) + 4;
+ numShards = random().nextInt(3) + 3;
+ numReplicas = random().nextInt(2) + 2;
+ useAdminToSetProps = random().nextBoolean();
+
+ configureCluster(numNodes)
+ .addConfig(COLLECTION_NAME, configset("cloud-minimal"))
+ .configure();
+
+ CollectionAdminResponse resp = CollectionAdminRequest.createCollection(COLLECTION_NAME, COLLECTION_NAME,
+ numShards, numReplicas, 0, 0)
+ .setMaxShardsPerNode((numShards * numReplicas) / numNodes + 1)
+ .process(cluster.getSolrClient());
+ assertEquals("Admin request failed; ", 0, resp.getStatus());
+ cluster.waitForActiveCollection(COLLECTION_NAME, numShards, numShards * numReplicas);
- public TestRebalanceLeaders() {
- schemaString = "schema15.xml"; // we need a string id
- sliceCount = 4;
}
- int reps = 10;
+ @Before
+ public void removeAllProperties() throws KeeperException, InterruptedException {
+ forceUpdateCollectionStatus();
+ DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
+ for (Slice slice : docCollection.getSlices()) {
+ for (Replica rep : slice.getReplicas()) {
+ rep.getProperties().forEach((key, value) -> {
+ if (key.startsWith("property.")) {
+ try {
+ delProp(slice, rep, key);
+ } catch (IOException | SolrServerException e) {
+ fail("Caught unexpected exception in @Before " + e.getMessage());
+ }
+ }
+ });
+ }
+ }
+ }
+
int timeoutMs = 60000;
- Map<String, List<Replica>> initial = new HashMap<>();
- Map<String, Replica> expected = new HashMap<>();
+ // test that setting an arbitrary "slice unique" property un-sets the property if it's on another replica in the
+ // slice. This is testing when the property is set on an _individual_ replica whereas testBalancePropertySliceUnique
+ // tests whether changing an individual _replica_ un-sets the property on other replicas _in that slice_.
+ //
+ // NOTE: There were significant problems because at one point the code implicitly defined
+ // shardUnique=true for the special property preferredLeader. That was removed at one point so we're explicitly
+ // testing that as well.
@Test
- @ShardsFixed(num = 4)
- public void test() throws Exception {
- reps = random().nextInt(9) + 1; // make sure and do at least one.
- try (CloudSolrClient client = createCloudClient(null)) {
- // Mix up a bunch of different combinations of shards and replicas in order to exercise boundary cases.
- // shards, replicationfactor, maxreplicaspernode
- int shards = random().nextInt(7);
- if (shards < 2) shards = 2;
- int rFactor = random().nextInt(4);
- if (rFactor < 2) rFactor = 2;
- createCollection(null, COLLECTION_NAME, shards, rFactor, shards * rFactor + 1, client, null, "conf1");
- }
+ public void testSetArbitraryPropertySliceUnique() throws IOException, SolrServerException, InterruptedException, KeeperException {
+ // Check both special (preferredLeader) and something arbitrary.
+ doTestSetArbitraryPropertySliceUnique("foo" + random().nextInt(1_000_000));
+ removeAllProperties();
+ doTestSetArbitraryPropertySliceUnique("preferredleader");
+ }
- waitForCollection(cloudClient.getZkStateReader(), COLLECTION_NAME, 2);
- waitForRecoveriesToFinish(COLLECTION_NAME, false);
- listCollection();
- rebalanceLeaderTest();
+ // Test that automatically distributing a slice unique property un-sets that property if it's in any other replica
+ // on that slice.
+ // This is different than the test above. The test above sets individual properties on individual nodes. This one
+ // relies on Solr to pick which replicas to set the property on
+ @Test
+ public void testBalancePropertySliceUnique() throws KeeperException, InterruptedException, IOException, SolrServerException {
+ // Check both cases of "special" property preferred(Ll)eader
+ doTestBalancePropertySliceUnique("foo" + random().nextInt(1_000_000));
+ removeAllProperties();
+ doTestBalancePropertySliceUnique("preferredleader");
}
- private void listCollection() throws IOException, SolrServerException {
- //CloudSolrServer client = createCloudClient(null);
- try {
- ModifiableSolrParams params = new ModifiableSolrParams();
- params.set("action", CollectionParams.CollectionAction.LIST.toString());
- SolrRequest request = new QueryRequest(params);
- request.setPath("/admin/collections");
-
- NamedList<Object> rsp = cloudClient.request(request);
- List<String> collections = (List<String>) rsp.get("collections");
- assertTrue("control_collection was not found in list", collections.contains("control_collection"));
- assertTrue(DEFAULT_COLLECTION + " was not found in list", collections.contains(DEFAULT_COLLECTION));
- assertTrue(COLLECTION_NAME + " was not found in list", collections.contains(COLLECTION_NAME));
- } finally {
- //remove collections
- //client.shutdown();
- }
+ // We've moved on from a property being tested, we need to check if rebalancing the leaders actually chantges the
+ // leader appropriately.
+ @Test
+ public void testRebalanceLeaders() throws Exception {
+
+ // First let's unbalance the preferredLeader property, do all the leaders get reassigned properly?
+ concentrateProp("preferredLeader");
+ sendRebalanceCommand();
+ checkPreferredsAreLeaders();
+
+ // Now follow up by evenly distributing the property as well as possible.
+ doTestBalancePropertySliceUnique("preferredLeader");
+ sendRebalanceCommand();
+ checkPreferredsAreLeaders();
+
+ // Now check the condition we saw "in the wild" where you could not rebalance properly when Jetty was restarted.
+ concentratePropByRestartingJettys();
+ sendRebalanceCommand();
+ checkPreferredsAreLeaders();
}
- void recordInitialState() throws InterruptedException {
- Map<String, Slice> slices = cloudClient.getZkStateReader().getClusterState().getCollection(COLLECTION_NAME).getSlicesMap();
+ // Insure that the property is set on only one replica per slice when changing a unique property on an individual
+ // replica.
+ private void doTestSetArbitraryPropertySliceUnique(String propIn) throws InterruptedException, KeeperException, IOException, SolrServerException {
+ final String prop = (random().nextBoolean()) ? propIn : propIn.toUpperCase(Locale.ROOT);
+ // First set the property in some replica in some slice
+ forceUpdateCollectionStatus();
+ DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
+
+ Slice[] slices = docCollection.getSlices().toArray(new Slice[0]);
+ Slice slice = slices[random().nextInt(slices.length)];
+
+ // Bounce around a bit setting this property and insure it's only set in one replica.
+ Replica[] reps = slice.getReplicas().toArray(new Replica[0]);
+ for (int idx = 0; idx < 4; ++idx) {
+ Replica rep = reps[random().nextInt(reps.length)];
+ // Set the property on a particular replica
+ setProp(slice, rep, prop);
+ TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+
+ long count = 0;
+ boolean rightRep = false;
+ Slice modSlice;
+ DocCollection modColl = null; // keeps IDE happy
+
+ // insure that no other replica in that slice has the property when we return.
+ while (timeout.hasTimedOut() == false) {
+ forceUpdateCollectionStatus();
+ modColl = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
+ modSlice = modColl.getSlice(slice.getName());
+ rightRep = modSlice.getReplica(rep.getName()).getBool("property." + prop.toLowerCase(Locale.ROOT), false);
+ count = modSlice.getReplicas().stream().filter(thisRep -> thisRep.getBool("property." + prop.toLowerCase(Locale.ROOT), false)).count();
+
+ if (count == 1 && rightRep) {
+ break;
+ }
- // Assemble a list of all the replicas for all the shards in a convenient way to look at them.
- for (Map.Entry<String, Slice> ent : slices.entrySet()) {
- initial.put(ent.getKey(), new ArrayList<>(ent.getValue().getReplicas()));
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+ if (count != 1 || rightRep == false) {
+ fail("The property " + prop + " was not uniquely distributed in slice " + slice.getName()
+ + " " + modColl.toString());
+ }
}
}
- void rebalanceLeaderTest() throws InterruptedException, IOException, SolrServerException, KeeperException {
- recordInitialState();
- for (int idx = 0; idx < reps; ++idx) {
- issueCommands();
- checkConsistency();
- }
- }
- // After we've called the rebalance command, we want to insure that:
- // 1> all replicas appear once and only once in the respective leader election queue
- // 2> All the replicas we _think_ are leaders are in the 0th position in the leader election queue.
- // 3> The node that ZooKeeper thinks is the leader is the one we think should be the leader.
- void checkConsistency() throws InterruptedException, KeeperException {
+ // Fail if we the replicas with the preferredLeader property are _not_ also the leaders.
+ private void checkPreferredsAreLeaders() throws InterruptedException, KeeperException {
+ // Make sure that the shard unique are where you expect.
TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
- boolean checkAppearOnce = false;
- boolean checkElectionZero = false;
- boolean checkZkLeadersAgree = false;
- while (!timeout.hasTimedOut()) {
- checkAppearOnce = checkAppearOnce();
- checkElectionZero = checkElectionZero();
- checkZkLeadersAgree = checkZkLeadersAgree();
- if (checkAppearOnce && checkElectionZero && checkZkLeadersAgree) {
+
+ while (timeout.hasTimedOut() == false) {
+ if (checkPreferredsAreLeaders(false)) {
+ // Ok, all preferreds are leaders. Just for Let's also get the election queue and guarantee that every
+ // live replica is in the queue and none are repeated.
+ checkElectionQueues();
return;
}
- Thread.sleep(1000);
+ TimeUnit.MILLISECONDS.sleep(100);
}
- fail("Checking the rebalance leader command failed, checkAppearOnce=" + checkAppearOnce + " checkElectionZero="
- + checkElectionZero + " checkZkLeadersAgree=" + checkZkLeadersAgree);
+ log.error("Leaders are not all preferres {}", cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME));
+ // Show the errors
+ checkPreferredsAreLeaders(true);
}
+ // Do all active nodes in each slice appear exactly once in the slice's leader election queue?
+ // Since we assert that the number of live replicas is the same size as the leader election queue, we only
+ // have to compare one way.
+ private void checkElectionQueues() throws KeeperException, InterruptedException {
- // Do all the nodes appear exactly once in the leader election queue and vice-versa?
- Boolean checkAppearOnce() throws KeeperException, InterruptedException {
+ DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
+ Set<String> liveNodes = cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes();
- for (Map.Entry<String, List<Replica>> ent : initial.entrySet()) {
- List<String> leaderQueue = cloudClient.getZkStateReader().getZkClient().getChildren("/collections/" + COLLECTION_NAME +
- "/leader_elect/" + ent.getKey() + "/election", null, true);
-
- if (leaderQueue.size() != ent.getValue().size()) {
- return false;
- }
- // Check that each election node has a corresponding replica.
- for (String electionNode : leaderQueue) {
- if (checkReplicaName(LeaderElector.getNodeName(electionNode), ent.getValue())) {
- continue;
- }
- return false;
- }
- // Check that each replica has an election node.
- for (Replica rep : ent.getValue()) {
- if (checkElectionNode(rep.getName(), leaderQueue)) {
- continue;
+ for (Slice slice : docCollection.getSlices()) {
+ Set<Replica> liveReplicas = new HashSet<>();
+ slice.getReplicas().forEach(replica -> {
+ if (replica.isActive(liveNodes)) {
+ liveReplicas.add(replica);
}
- return false;
- }
+ });
+ checkOneQueue(docCollection, slice, liveReplicas);
}
- return true;
}
- // Check that the given name is in the leader election queue
- Boolean checkElectionNode(String repName, List<String> leaderQueue) {
+ // Helper method to check one leader election queue's consistency.
+ private void checkOneQueue(DocCollection coll, Slice slice, Set<Replica> liveReplicas) throws KeeperException, InterruptedException {
+
+ List<String> leaderQueue = cluster.getSolrClient().getZkStateReader().getZkClient().getChildren("/collections/" + COLLECTION_NAME +
+ "/leader_elect/" + slice.getName() + "/election", null, true);
+
+ if (leaderQueue.size() != liveReplicas.size()) {
+
+ log.error("One or more replicas is missing from the leader election queue! Slice {}, election queue: {}, collection: {}"
+ , slice.getName(), leaderQueue, coll);
+ fail("One or more replicas is missing from the leader election queue");
+ }
+ // Check that each election node has a corresponding live replica.
for (String electionNode : leaderQueue) {
- if (repName.equals(LeaderElector.getNodeName(electionNode))) {
- return true;
+ String replica = LeaderElector.getNodeName(electionNode);
+ if (slice.getReplica(replica) == null) {
+ log.error("Replica {} is not in the election queue: {}", replica, leaderQueue);
+ fail("Replica is not in the election queue!");
}
}
- return false;
}
- // Check that the name passed in corresponds to a replica.
- Boolean checkReplicaName(String toCheck, List<Replica> replicas) {
- for (Replica rep : replicas) {
- if (toCheck.equals(rep.getName())) {
- return true;
+ // Just an encapsulation for checkPreferredsAreLeaders to make returning easier.
+ // the doAsserts var is to actually print the problem and fail the test if the condition is not met.
+ private boolean checkPreferredsAreLeaders(boolean doAsserts) throws KeeperException, InterruptedException {
+ forceUpdateCollectionStatus();
+ DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
+ for (Slice slice : docCollection.getSlices()) {
+ for (Replica rep : slice.getReplicas()) {
+ if (rep.getBool("property.preferredleader", false)) {
+ boolean isLeader = rep.getBool("leader", false);
+ if (doAsserts) {
+ assertTrue("PreferredLeader should be the leader: ", isLeader);
+ } else if (isLeader == false) {
+ return false;
+ }
+ }
}
}
- return false;
+ return true;
}
- // Get the shard leader election from ZK and sort it. The node may not actually be there, so retry
- List<String> getOverseerSort(String key) {
- List<String> ret = null;
- try {
- ret = OverseerCollectionConfigSetProcessor.getSortedElectionNodes(cloudClient.getZkStateReader().getZkClient(),
- "/collections/" + COLLECTION_NAME + "/leader_elect/" + key + "/election");
- return ret;
- } catch (KeeperException e) {
- cloudClient.connect();
- } catch (InterruptedException e) {
- return null;
+ // Arbitrarily send the rebalance command either with the SolrJ interface or with an HTTP request.
+ private void sendRebalanceCommand() throws SolrServerException, InterruptedException, IOException {
+ if (random().nextBoolean()) {
+ rebalanceLeaderUsingSolrJAPI();
+ } else {
+ rebalanceLeaderUsingStandardRequest();
}
- return null;
}
- // Is every node we think is the leader in the zeroth position in the leader election queue?
- Boolean checkElectionZero() {
- for (Map.Entry<String, Replica> ent : expected.entrySet()) {
+ // Helper method to make sure the property is _unbalanced_ first, then it gets properly re-assigned with the
+ // BALANCESHARDUNIQUE command.
+ private void doTestBalancePropertySliceUnique(String propIn) throws InterruptedException, IOException, KeeperException, SolrServerException {
+ final String prop = (random().nextBoolean()) ? propIn : propIn.toUpperCase(Locale.ROOT);
- List<String> leaderQueue = getOverseerSort(ent.getKey());
- if (leaderQueue == null) return false;
+ // Concentrate the properties on as few replicas a possible
+ concentrateProp(prop);
+
+ // issue the BALANCESHARDUNIQUE command
+ rebalancePropAndCheck(prop);
+
+ // Verify that there are no more than one replica with the property per shard.
+ verifyPropUniquePerShard(prop);
+
+ // Verify that the property is reasonably evenly distributed
+ verifyPropCorrectlyDistributed(prop);
- String electName = LeaderElector.getNodeName(leaderQueue.get(0));
- String coreName = ent.getValue().getName();
- if (electName.equals(coreName) == false) {
- return false;
- }
- }
- return true;
}
- // Do who we _think_ should be the leader agree with the leader nodes?
- Boolean checkZkLeadersAgree() throws KeeperException, InterruptedException {
- for (Map.Entry<String,Replica> ent : expected.entrySet()) {
-
- String path = "/collections/" + COLLECTION_NAME + "/leaders/" + ent.getKey() + "/leader";
- byte[] data = getZkData(cloudClient, path);
- if (data == null) {
- log.warn("path to check not found {}", path);
- return false;
- }
-
- String repCore = null;
- String zkCore = null;
-
- Map m = (Map) Utils.fromJSON(data);
- zkCore = (String) m.get("core");
- repCore = ent.getValue().getStr("core");
- if (zkCore.equals(repCore) == false) {
- log.warn("leader in zk does not match what we expect: {} != {}", zkCore, repCore);
- return false;
+ private void verifyPropCorrectlyDistributed(String prop) throws KeeperException, InterruptedException {
+
+ TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+
+ String propLC = prop.toLowerCase(Locale.ROOT);
+ DocCollection docCollection = null;
+ while (timeout.hasTimedOut() == false) {
+ forceUpdateCollectionStatus();
+ docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
+ int maxPropCount = Integer.MAX_VALUE;
+ int minPropCount = Integer.MIN_VALUE;
+ for (Slice slice : docCollection.getSlices()) {
+ int repCount = 0;
+ for (Replica rep : slice.getReplicas()) {
+ if (rep.getBool("property." + propLC, false)) {
+ repCount++;
+ }
+ }
+ maxPropCount = Math.max(maxPropCount, repCount);
+ minPropCount = Math.min(minPropCount, repCount);
}
-
+ if (Math.abs(maxPropCount - minPropCount) < 2) return;
}
- return true;
+ log.error("Property {} is not distributed evenly. {}", prop, docCollection);
+ fail("Property is not distributed evenly " + prop);
}
- byte[] getZkData(CloudSolrClient client, String path) {
- org.apache.zookeeper.data.Stat stat = new org.apache.zookeeper.data.Stat();
- try {
- byte[] data = client.getZkStateReader().getZkClient().getData(path, null, stat, true);
- if (data != null) {
- return data;
+ // Used when we concentrate the leader on a few nodes.
+ private void verifyPropDistributedAsExpected(Map<String, String> expectedShardReplicaMap, String prop) throws InterruptedException, KeeperException {
+ // Make sure that the shard unique are where you expect.
+ TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+
+ String propLC = prop.toLowerCase(Locale.ROOT);
+ boolean failure = false;
+ DocCollection docCollection = null;
+ while (timeout.hasTimedOut() == false) {
+ forceUpdateCollectionStatus();
+ docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
+ failure = false;
+ for (Map.Entry<String, String> ent : expectedShardReplicaMap.entrySet()) {
+ Replica rep = docCollection.getSlice(ent.getKey()).getReplica(ent.getValue());
+ if (rep.getBool("property." + propLC, false) == false) {
+ failure = true;
+ }
}
- } catch (KeeperException.NoNodeException e) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e1) {
- return null;
+ if (failure == false) {
+ return;
}
- } catch (InterruptedException | KeeperException e) {
- return null;
+ TimeUnit.MILLISECONDS.sleep(100);
}
- return null;
+
+ fail(prop + " properties are not on the expected replicas: " + docCollection.toString()
+ + System.lineSeparator() + "Expected " + expectedShardReplicaMap.toString());
}
- // It's OK not to check the return here since the subsequent tests will fail.
- void issueCommands() throws IOException, SolrServerException, KeeperException, InterruptedException {
+ // Just check that the property is distributed as expectecd. This does _not_ rebalance the leaders
+ private void rebalancePropAndCheck(String prop) throws IOException, SolrServerException, InterruptedException, KeeperException {
- // Find a replica to make the preferredLeader. NOTE: may be one that's _already_ leader!
- expected.clear();
- for (Map.Entry<String, List<Replica>> ent : initial.entrySet()) {
- List<Replica> replicas = ent.getValue();
- Replica rep = replicas.get(Math.abs(random().nextInt()) % replicas.size());
- expected.put(ent.getKey(), rep);
- issuePreferred(ent.getKey(), rep);
+ if (random().nextBoolean()) {
+ rebalancePropUsingSolrJAPI(prop);
+ } else {
+ rebalancePropUsingStandardRequest(prop);
}
+ }
- if (waitForAllPreferreds() == false) {
- fail("Waited for timeout for preferredLeader assignments to be made and they werent.");
- }
- //fillExpectedWithCurrent();
- // Now rebalance the leaders randomly using SolrJ or direct call
- if(random().nextBoolean())
- rebalanceLeaderUsingSolrJAPI();
- else
- rebalanceLeaderUsingDirectCall();
+ private void rebalanceLeaderUsingSolrJAPI() throws IOException, SolrServerException, InterruptedException {
+ CollectionAdminResponse resp = CollectionAdminRequest
+ .rebalanceLeaders(COLLECTION_NAME)
+ .process(cluster.getSolrClient());
+ assertTrue("All leaders should have been verified", resp.getResponse().get("Summary").toString().contains("Success"));
+ assertEquals("Admin request failed; ", 0, resp.getStatus());
+ }
+
+ private void rebalanceLeaderUsingStandardRequest() throws IOException, SolrServerException {
+ ModifiableSolrParams params = new ModifiableSolrParams();
+ params.set("action", CollectionParams.CollectionAction.REBALANCELEADERS.toString());
+ params.set("collection", COLLECTION_NAME);
+ QueryRequest request = new QueryRequest(params);
+ request.setPath("/admin/collections");
+ QueryResponse resp = request.process(cluster.getSolrClient());
+ assertTrue("All leaders should have been verified", resp.getResponse().get("Summary").toString().contains("Success"));
+ assertEquals("Call to rebalanceLeaders failed ", 0, resp.getStatus());
}
- private void rebalanceLeaderUsingSolrJAPI() throws IOException, SolrServerException {
- CollectionAdminRequest.RebalanceLeaders rebalanceLeaders = CollectionAdminRequest.rebalanceLeaders(COLLECTION_NAME);
- rebalanceLeaders.setMaxAtOnce(10)
- .process(cloudClient);
+
+ private void rebalancePropUsingSolrJAPI(String prop) throws IOException, SolrServerException, InterruptedException {
+ // Don't set the value, that should be done automatically.
+ CollectionAdminResponse resp;
+
+ if (prop.toLowerCase(Locale.ROOT).contains("preferredleader")) {
+ resp = CollectionAdminRequest
+ .balanceReplicaProperty(COLLECTION_NAME, prop)
+ .process(cluster.getSolrClient());
+
+ } else {
+ resp = CollectionAdminRequest
+ .balanceReplicaProperty(COLLECTION_NAME, prop)
+ .setShardUnique(true)
+ .process(cluster.getSolrClient());
+
+ }
+ assertEquals("Admin request failed; ", 0, resp.getStatus());
}
- private void rebalanceLeaderUsingDirectCall() throws IOException, SolrServerException {
+ private void rebalancePropUsingStandardRequest(String prop) throws IOException, SolrServerException {
ModifiableSolrParams params = new ModifiableSolrParams();
- params.set("action", CollectionParams.CollectionAction.REBALANCELEADERS.toString());
+ params.set("action", CollectionParams.CollectionAction.BALANCESHARDUNIQUE.toString());
+ params.set("property", prop);
- // Insure we get error returns when omitting required parameters
params.set("collection", COLLECTION_NAME);
- params.set("maxAtOnce", "10");
- SolrRequest request = new QueryRequest(params);
+ if (prop.toLowerCase(Locale.ROOT).contains("preferredleader") == false) {
+ params.set("shardUnique", true);
+ }
+ QueryRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
- cloudClient.request(request);
+ QueryResponse resp = request.process(cluster.getSolrClient());
+ assertEquals("Call to rebalanceLeaders failed ", 0, resp.getStatus());
+ }
+ // This important. I've (Erick Erickson) run across a situation where the "standard request" causes failures, but
+ // never the Admin request. So let's test both all the time for a given test.
+ //
+ // This sets an _individual_ replica to have the property, not collection-wide
+ private void setProp(Slice slice, Replica rep, String prop) throws IOException, SolrServerException {
+ if (useAdminToSetProps) {
+ setPropWithAdminRequest(slice, rep, prop);
+ } else {
+ setPropWithStandardRequest(slice, rep, prop);
+ }
}
- void issuePreferred(String slice, Replica rep) throws IOException, SolrServerException, InterruptedException {
+ void setPropWithStandardRequest(Slice slice, Replica rep, String prop) throws IOException, SolrServerException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("action", CollectionParams.CollectionAction.ADDREPLICAPROP.toString());
- // Insure we get error returns when omitting required parameters
-
params.set("collection", COLLECTION_NAME);
- params.set("shard", slice);
+ params.set("shard", slice.getName());
params.set("replica", rep.getName());
- params.set("property", "preferredLeader");
+ params.set("property", prop);
params.set("property.value", "true");
+ // Test to insure that implicit shardUnique is added for preferredLeader.
+ if (prop.toLowerCase(Locale.ROOT).equals("preferredleader") == false) {
+ params.set("shardUnique", "true");
+ }
SolrRequest request = new QueryRequest(params);
request.setPath("/admin/collections");
- cloudClient.request(request);
+ cluster.getSolrClient().request(request);
+ String propLC = prop.toLowerCase(Locale.ROOT);
+ waitForState("Expecting property '" + prop + "'to appear on replica " + rep.getName(), COLLECTION_NAME,
+ (n, c) -> "true".equals(c.getReplica(rep.getName()).getProperty(propLC)));
+
+ }
+
+ void setPropWithAdminRequest(Slice slice, Replica rep, String prop) throws IOException, SolrServerException {
+ boolean setUnique = (prop.toLowerCase(Locale.ROOT).equals("preferredleader") == false);
+ CollectionAdminRequest.AddReplicaProp addProp =
+ CollectionAdminRequest.addReplicaProperty(COLLECTION_NAME, slice.getName(), rep.getName(), prop, "true");
+ if (setUnique) {
+ addProp.setShardUnique(true);
+ }
+ CollectionAdminResponse resp = addProp.process(cluster.getSolrClient());
+ assertEquals(0, resp.getStatus());
+ String propLC = prop.toLowerCase(Locale.ROOT);
+ waitForState("Expecting property '" + prop + "'to appear on replica " + rep.getName(), COLLECTION_NAME,
+ (n, c) -> "true".equals(c.getReplica(rep.getName()).getProperty(propLC)));
+
+ }
+
+ private void delProp(Slice slice, Replica rep, String prop) throws IOException, SolrServerException {
+ String propLC = prop.toLowerCase(Locale.ROOT);
+ CollectionAdminResponse resp = CollectionAdminRequest.deleteReplicaProperty(COLLECTION_NAME, slice.getName(), rep.getName(), propLC)
+ .process(cluster.getSolrClient());
+ assertEquals("Admin request failed; ", 0, resp.getStatus());
+ waitForState("Expecting property '" + prop + "' to be removed from replica " + rep.getName(), COLLECTION_NAME,
+ (n, c) -> c.getReplica(rep.getName()).getProperty(prop) == null);
+ }
+
+ // Intentionally un-balance the property to insure that BALANCESHARDUNIQUE does its job. There was an odd case
+ // where rebalancing didn't work very well if the Solr nodes were stopped and restarted that worked perfectly
+ // when if the nodes were _not_ restarted in the test. So we have to test that too.
+ private void concentratePropByRestartingJettys() throws Exception {
+
+ List<JettySolrRunner> jettys = new ArrayList<>(cluster.getJettySolrRunners());
+ Collections.shuffle(jettys, random());
+ jettys.remove(random().nextInt(jettys.size()));
+ // Now we have a list of jettys, and there is one missing. Stop all of the remaining jettys, then start them again
+ // to concentrate the leaders. It's not necessary that all shards have a leader.
+
+ for (JettySolrRunner jetty : jettys) {
+ cluster.stopJettySolrRunner(jetty);
+ cluster.waitForJettyToStop(jetty);
+ }
+ checkReplicasInactive(jettys);
+
+ for (int idx = 0; idx < jettys.size(); ++idx) {
+ cluster.startJettySolrRunner(jettys.get(idx));
+ }
+ cluster.waitForAllNodes(60);
+ // the nodes are present, but are all replica active?
+ checkAllReplicasActive();
+ }
+
+ // while banging my nead against a wall, I put a lot of force refresh statements in. Want to leave them in
+ // but have this be a no-op so if we start to get failures, we can re-enable with minimal effort.
+ private void forceUpdateCollectionStatus() throws KeeperException, InterruptedException {
+ // cluster.getSolrClient().getZkStateReader().forceUpdateCollection(COLLECTION_NAME);
}
- boolean waitForAllPreferreds() throws KeeperException, InterruptedException {
- boolean goAgain = true;
+ // Since we have to restart jettys, we don't want to try rebalancing etc. until we're sure all jettys that should
+ // be up are up and all replicas are active.
+ private void checkReplicasInactive(List<JettySolrRunner> downJettys) throws KeeperException, InterruptedException {
TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
- while (! timeout.hasTimedOut()) {
- goAgain = false;
- Map<String, Slice> slices = cloudClient.getZkStateReader().getClusterState().getCollection(COLLECTION_NAME).getSlicesMap();
-
- for (Map.Entry<String, Replica> ent : expected.entrySet()) {
- Replica me = slices.get(ent.getKey()).getReplica(ent.getValue().getName());
- if (me.getBool("property.preferredleader", false) == false) {
- goAgain = true;
- break;
+ DocCollection docCollection = null;
+ Set<String> liveNodes = null;
+
+ Set<String> downJettyNodes = new TreeSet<>();
+ for (JettySolrRunner jetty : downJettys) {
+ downJettyNodes.add(jetty.getBaseUrl().getHost() + ":" + jetty.getBaseUrl().getPort() + "_solr");
+ }
+ while (timeout.hasTimedOut() == false) {
+ forceUpdateCollectionStatus();
+ docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
+ liveNodes = cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes();
+ boolean expectedInactive = true;
+
+ for (Slice slice : docCollection.getSlices()) {
+ for (Replica rep : slice.getReplicas()) {
+ if (downJettyNodes.contains(rep.getNodeName()) == false) {
+ continue; // We are on a live node
+ }
+ // A replica on an allegedly down node is reported as active.
+ if (rep.isActive(liveNodes)) {
+ expectedInactive = false;
+ }
+ }
+ }
+ if (expectedInactive) {
+ return;
+ }
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+ fail("timed out waiting for all replicas to become inactive: livenodes: " + liveNodes +
+ " Collection state: " + docCollection.toString());
+ }
+
+ // We need to wait around until all replicas are active before expecting rebalancing or distributing shard-unique
+ // properties to work.
+ private void checkAllReplicasActive() throws KeeperException, InterruptedException {
+ TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+ while (timeout.hasTimedOut() == false) {
+ forceUpdateCollectionStatus();
+ DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
+ Set<String> liveNodes = cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes();
+ boolean allActive = true;
+ for (Slice slice : docCollection.getSlices()) {
+ for (Replica rep : slice.getReplicas()) {
+ if (rep.isActive(liveNodes) == false) {
+ allActive = false;
+ }
+ }
+ }
+ if (allActive) {
+ return;
+ }
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+ fail("timed out waiting for all replicas to become active");
+ }
+
+ // use a simple heuristic to put as many replicas with the property on as few nodes as possible. The point is that
+ // then we can execute BALANCESHARDUNIQUE and be sure it worked correctly
+ private void concentrateProp(String prop) throws KeeperException, InterruptedException, IOException, SolrServerException {
+ // find all the live nodes
+ // for each slice, assign the leader to the first replica that is in the lowest position on live_nodes
+ List<String> liveNodes = new ArrayList<>(cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes());
+ Collections.shuffle(liveNodes, random());
+
+ Map<String, String> uniquePropMap = new TreeMap<>();
+ forceUpdateCollectionStatus();
+ DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
+ for (Slice slice : docCollection.getSlices()) {
+ Replica changedRep = null;
+ int livePos = Integer.MAX_VALUE;
+ for (Replica rep : slice.getReplicas()) {
+ int pos = liveNodes.indexOf(rep.getNodeName());
+ if (pos >= 0 && pos < livePos) {
+ livePos = pos;
+ changedRep = rep;
}
}
- if (goAgain) {
- Thread.sleep(250);
- } else {
- return true;
+ if (livePos == Integer.MAX_VALUE) {
+ fail("Invalid state! We should have a replica to add the property to! " + docCollection.toString());
}
+
+ uniquePropMap.put(slice.getName(), changedRep.getName());
+ // Now set the property on the "lowest" node in live_nodes.
+ setProp(slice, changedRep, prop);
}
- return false;
+ verifyPropDistributedAsExpected(uniquePropMap, prop);
}
-}
+ // make sure that the property in question is unique per shard.
+ private Map<String, String> verifyPropUniquePerShard(String prop) throws InterruptedException, KeeperException {
+ Map<String, String> uniquePropMaps = new TreeMap<>();
+
+ TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+ while (timeout.hasTimedOut() == false) {
+ uniquePropMaps.clear();
+ if (checkdUniquePropPerShard(uniquePropMaps, prop)) {
+ return uniquePropMaps;
+ }
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+ fail("There should be exactly one replica with value " + prop + " set to true per shard: "
+ + cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME).toString());
+ return null; // keeps IDE happy.
+ }
+ // return true if every shard has exactly one replica with the unique property set to "true"
+ private boolean checkdUniquePropPerShard(Map<String, String> uniques, String prop) throws KeeperException, InterruptedException {
+ forceUpdateCollectionStatus();
+ DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
+
+ for (Slice slice : docCollection.getSlices()) {
+ int propfCount = 0;
+ for (Replica rep : slice.getReplicas()) {
+ if (rep.getBool("property." + prop.toLowerCase(Locale.ROOT), false)) {
+ propfCount++;
+ uniques.put(slice.getName(), rep.getName());
+ }
+ }
+ if (1 != propfCount) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/solr/solr-ref-guide/src/collections-api.adoc b/solr/solr-ref-guide/src/collections-api.adoc
index f650cdc..ddaaeb8 100644
--- a/solr/solr-ref-guide/src/collections-api.adoc
+++ b/solr/solr-ref-guide/src/collections-api.adoc
@@ -2163,6 +2163,8 @@ http://localhost:8983/solr/admin/collections?action=REBALANCELEADERS&collection=
In this example, two replicas in the "alreadyLeaders" section already had the leader assigned to the same node as the `preferredLeader` property so no action was taken.
+The "Success" tag indicates that the command rebalanced all leaders. If, for any reason some replicas with preferredLeader=true are not leaders, this will be "Failure" rather than "Success". If a replica cannot be made leader due to not being "Active", it's also considered a failure.
+
The replica in the "inactivePreferreds" section had the `preferredLeader` property set but the node was down and no action was taken. The three nodes in the "successes" section were made leaders because they had the `preferredLeader` property set but were not leaders and they were active.
[source,xml]
@@ -2172,6 +2174,9 @@ The replica in the "inactivePreferreds" section had the `preferredLeader` proper
<int name="status">0</int>
<int name="QTime">123</int>
</lst>
+ <lst>
+ <str name="Success">All replicas with the preferredLeader property set are leaders</str>
+ </lst>
<lst name="alreadyLeaders">
<lst name="core_node1">
<str name="status">success</str>
@@ -2219,6 +2224,10 @@ The replica in the "inactivePreferreds" section had the `preferredLeader` proper
Examining the clusterstate after issuing this call should show that every live node that has the `preferredLeader` property should also have the "leader" property set to _true_.
+NOTE: The added work done by an NRT leader during indexing is quite small. The primary use-case is to redistribute the leader role if there are a large number of leaders concentrated on a small number of nodes. Rebalancing will likely not improve performance unless the imbalance of leadership roles is measured in multiples of 10.
+
+NOTE: The BALANCESHARDUNIQUE command that distributes the preferredLeader property does not guarantee perfect distribution and in some collection topoligies it is impossible to make that guarantee.
+
[[forceleader]]
== FORCELEADER: Force Shard Leader