You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by er...@apache.org on 2019/01/20 03:20:46 UTC

[lucene-solr] branch master updated: SOLR-13091: REBALANCELEADERS is broken

This is an automated email from the ASF dual-hosted git repository.

erick pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/master by this push:
     new a692d05  SOLR-13091: REBALANCELEADERS is broken
a692d05 is described below

commit a692d05a909e1ce385c56c087cb62911c27b5f5b
Author: Erick Erickson <Er...@gmail.com>
AuthorDate: Sat Jan 19 19:20:39 2019 -0800

    SOLR-13091: REBALANCELEADERS is broken
---
 solr/CHANGES.txt                                   |   2 +
 .../apache/solr/cloud/ExclusiveSliceProperty.java  |  20 +
 .../solr/handler/admin/RebalanceLeaders.java       | 381 ++++++++---
 .../apache/solr/cloud/TestRebalanceLeaders.java    | 738 ++++++++++++++-------
 solr/solr-ref-guide/src/collections-api.adoc       |   9 +
 5 files changed, 820 insertions(+), 330 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 65604c2..5f770a6 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -284,6 +284,8 @@ Bug Fixes
 
 * SOLR-13137: NPE when /admin/zookeeper/status endpoint hit in standalone mode (janhoy)
 
+* SOLR-13091: REBALANCELEADERS is broken (Erick Erickson)
+
 Improvements
 ----------------------
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java b/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java
index 953023f..f5672ba 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ExclusiveSliceProperty.java
@@ -17,6 +17,7 @@
 
 package org.apache.solr.cloud;
 
+import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -39,6 +40,8 @@ import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.ONLY_ACTIVE_NODES;
 import static org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler.SHARD_UNIQUE;
@@ -46,6 +49,7 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.BA
 
 // Class to encapsulate processing replica properties that have at most one replica hosting a property per slice.
 class ExclusiveSliceProperty {
+  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private ClusterState clusterState;
   private final boolean onlyActiveNodes;
   private final String property;
@@ -235,6 +239,15 @@ class ExclusiveSliceProperty {
       adjustLimits(nodesHostingProp.get(nodeName));
       removeSliceAlreadyHostedFromPossibles(srToChange.slice.getName());
       addProp(srToChange.slice, srToChange.replica.getName());
+      // When you set the property, you must insure that it is _removed_ from any other replicas.
+      for (Replica rep : srToChange.slice.getReplicas()) {
+        if (rep.getName().equals(srToChange.replica.getName())) {
+          continue;
+        }
+        if (rep.getProperty(property) != null) {
+          removeProp(srToChange.slice, srToChange.replica.getName());
+        }
+      }
     }
   }
 
@@ -266,10 +279,12 @@ class ExclusiveSliceProperty {
   }
 
   private void removeProp(Slice origSlice, String replicaName) {
+    log.debug("Removing property {} from slice {}, replica {}", property, origSlice.getName(), replicaName);
     getReplicaFromChanged(origSlice, replicaName).getProperties().remove(property);
   }
 
   private void addProp(Slice origSlice, String replicaName) {
+    log.debug("Adding property {} to slice {}, replica {}", property, origSlice.getName(), replicaName);
     getReplicaFromChanged(origSlice, replicaName).getProperties().put(property, "true");
   }
 
@@ -342,5 +357,10 @@ class ExclusiveSliceProperty {
       this.slice = slice;
       this.replica = replica;
     }
+    public String toString() {
+      StringBuilder sb = new StringBuilder(System.lineSeparator()).append(System.lineSeparator()).append("******EOE20 starting toString of SliceReplica");
+      sb.append("    :").append(System.lineSeparator()).append("slice: ").append(slice.toString()).append(System.lineSeparator()).append("      replica: ").append(replica.toString()).append(System.lineSeparator());
+      return sb.toString();
+    }
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java b/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
index f0819bd..522a432 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
@@ -18,10 +18,13 @@ package org.apache.solr.handler.admin;
 
 import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.solr.cloud.LeaderElector;
@@ -55,13 +58,62 @@ import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.REBALANCELEADERS;
 import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
 
+
+/**
+ * The end point for the collections API REBALANCELEADERS call that actually does the work.
+ * <p>
+ * Overview:
+ * <p>
+ * The leader election process is that each replica of a shard watches one, and only one other replica via
+ * ephemeral nodes in ZooKeeper. When the node being watched goes down, the node watching it is sent a notification
+ * and, if the node being watched is the leader, the node getting the notification assumes leadership.
+ * <p>
+ * ZooKeeper's ephemeral nodes get a monotonically increasing "sequence number" that defines it's position in the queue
+ * <p>
+ * So to force a particular node to become a leader it must have a watch on the leader. This can lead to two nodes
+ * having the same sequence number. Say the process is this
+ * replica1 is the leader (seq 1)
+ * replica3 is on a Solr node that happens to be started next, it watches the leader (seq2)
+ * replica2 is on the next Solr node started. It will _also_ watch the leader, it's sequence number is 2 exactly
+ * like replica3s
+ * <p>
+ * This is true on startup, but can also be a consequence of, say, a replica going into recovery. It's no longer
+ * eligible to become leader, so will be put at the end of the queue by default. So there's code to put it in the
+ * queue with the same sequence number as the current second replica.
+ * <p>
+ * To compilcate matters further, when the nodes are sorted (see  OverseerTaskProcessor.getSortedElectionNodes)
+ * the primary sort is on the sequence number, secondary sort on the session ID. So the preferredLeader may
+ * or may not be second in that list.
+ * <p>
+ * what all this means is that when the REBALANCELEADER command is issued, this class examines the election queue and
+ * performs just three things for each shard in the collection:
+ * <p>
+ * 1> insures that the preferredLeader is watching the leader (rejoins the election queue at the head)
+ * <p>
+ * 2> if there are two ephemeral nodes with the same sequence number watching the leader, and if one of them is the
+ * preferredLeader it will send the _other_ node to the end of the queue (rejoins it)
+ * <p>
+ * 3> rejoins the zeroth entry in the list at the end of the queue, which triggers the watch on the preferredLeader
+ * replica which then takes over leadership
+ * <p>
+ * All this of course assuming the preferedLeader is alive and well and is assigned for any given shard.
+ */
+
 class RebalanceLeaders {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  
+
   final SolrQueryRequest req;
   final SolrQueryResponse rsp;
   final CollectionsHandler collectionsHandler;
   final CoreContainer coreContainer;
+  private final Set<String> asyncRequests = new HashSet<>();
+  final static String INACTIVE_PREFERREDS = "inactivePreferreds";
+  final static String ALREADY_LEADERS = "alreadyLeaders";
+  final static String SUMMARY = "Summary";
+  final NamedList<Object> results = new NamedList<>();
+  final Map<String, String> pendingOps = new HashMap<>();
+  private String collectionName;
+
 
   RebalanceLeaders(SolrQueryRequest req, SolrQueryResponse rsp, CollectionsHandler collectionsHandler) {
     this.req = req;
@@ -71,38 +123,29 @@ class RebalanceLeaders {
   }
 
   void execute() throws KeeperException, InterruptedException {
-    req.getParams().required().check(COLLECTION_PROP);
+    DocCollection dc = checkParams();
+
 
-    String collectionName = req.getParams().get(COLLECTION_PROP);
-    if (StringUtils.isBlank(collectionName)) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
-          String.format(Locale.ROOT, "The " + COLLECTION_PROP + " is required for the Rebalance Leaders command."));
-    }
-    coreContainer.getZkController().getZkStateReader().forceUpdateCollection(collectionName);
-    ClusterState clusterState = coreContainer.getZkController().getClusterState();
-    DocCollection dc = clusterState.getCollection(collectionName);
-    if (dc == null) {
-      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection '" + collectionName + "' does not exist, no action taken.");
-    }
-    Map<String, String> currentRequests = new HashMap<>();
     int max = req.getParams().getInt(MAX_AT_ONCE_PROP, Integer.MAX_VALUE);
     if (max <= 0) max = Integer.MAX_VALUE;
     int maxWaitSecs = req.getParams().getInt(MAX_WAIT_SECONDS_PROP, 60);
-    NamedList<Object> results = new NamedList<>();
 
+
+    // If there are a maximum number of simultaneous requests specified, we have to pause when we have that many
+    // outstanding requests and wait for at least one to finish before going on the the next rebalance.
     boolean keepGoing = true;
     for (Slice slice : dc.getSlices()) {
-      ensurePreferredIsLeader(results, slice, currentRequests);
-      if (currentRequests.size() == max) {
+      ensurePreferredIsLeader(slice);
+      if (asyncRequests.size() == max) {
         log.info("Queued " + max + " leader reassignments, waiting for some to complete.");
-        keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, false, results);
+        keepGoing = waitAsyncRequests(maxWaitSecs, false);
         if (keepGoing == false) {
           break; // If we've waited longer than specified, don't continue to wait!
         }
       }
     }
     if (keepGoing == true) {
-      keepGoing = waitForLeaderChange(currentRequests, maxWaitSecs, true, results);
+      keepGoing = waitAsyncRequests(maxWaitSecs, true);
     }
     if (keepGoing == true) {
       log.info("All leader reassignments completed.");
@@ -110,15 +153,72 @@ class RebalanceLeaders {
       log.warn("Exceeded specified timeout of ." + maxWaitSecs + "' all leaders may not have been reassigned");
     }
 
+    checkLeaderStatus();
+    NamedList<Object> summary = new NamedList<>();
+    if (pendingOps.size() == 0) {
+      summary.add("Success", "All active replicas with the preferredLeader property set are leaders");
+    } else {
+      summary.add("Failure", "Not all active replicas with preferredLeader property are leaders");
+    }
+    rsp.getValues().add(SUMMARY, summary); // we want this first.
+
     rsp.getValues().addAll(results);
   }
 
-  private void ensurePreferredIsLeader(NamedList<Object> results,
-                                       Slice slice, Map<String, String> currentRequests) throws KeeperException, InterruptedException {
-    final String inactivePreferreds = "inactivePreferreds";
-    final String alreadyLeaders = "alreadyLeaders";
-    String collectionName = req.getParams().get(COLLECTION_PROP);
+  // Insure that ll required parameters are there and the doc colection exists.
+  private DocCollection checkParams() throws KeeperException, InterruptedException {
+    req.getParams().required().check(COLLECTION_PROP);
 
+    collectionName = req.getParams().get(COLLECTION_PROP);
+    if (StringUtils.isBlank(collectionName)) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
+          String.format(Locale.ROOT, "The " + COLLECTION_PROP + " is required for the Rebalance Leaders command."));
+    }
+    coreContainer.getZkController().getZkStateReader().forceUpdateCollection(collectionName);
+    ClusterState clusterState = coreContainer.getZkController().getClusterState();
+
+    DocCollection dc = clusterState.getCollection(collectionName);
+    if (dc == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection '" + collectionName + "' does not exist, no action taken.");
+    }
+    return dc;
+  }
+
+  // Once we've done all the fiddling with the queues, check on the way out to see if all the active preferred
+  // leaders that we intended to change are in fact the leaders.
+  private void checkLeaderStatus() throws InterruptedException, KeeperException {
+    for (int idx = 0; pendingOps.size() > 0 && idx < 600; ++idx) {
+      ClusterState clusterState = coreContainer.getZkController().getClusterState();
+      Set<String> liveNodes = clusterState.getLiveNodes();
+      DocCollection dc = clusterState.getCollection(collectionName);
+      for (Slice slice : dc.getSlices()) {
+        for (Replica replica : slice.getReplicas()) {
+          if (replica.isActive(liveNodes) && replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false)) {
+            if (replica.getBool(LEADER_PROP, false)) {
+              if (pendingOps.containsKey(slice.getName())) {
+                // Record for return that the leader changed successfully
+                pendingOps.remove(slice.getName());
+                addToSuccesses(slice, replica);
+                break;
+              }
+            }
+          }
+        }
+      }
+      TimeUnit.MILLISECONDS.sleep(100);
+      coreContainer.getZkController().getZkStateReader().forciblyRefreshAllClusterStateSlow();
+    }
+    addAnyFailures();
+  }
+
+  // The process is:
+  // if the replica with preferredLeader is already the leader, do nothing
+  // Otherwise:
+  // > if two nodes have the same sequence number and both point to the current leader, we presume that we've just
+  //   moved it, move the one that does _not_ have the preferredLeader to the end of the list.
+  // > move the current leader to the end of the list. This _should_ mean that the current ephemeral node in the
+  //   leader election queue is removed and the only remaining node watching it is triggered to become leader.
+  private void ensurePreferredIsLeader(Slice slice) throws KeeperException, InterruptedException {
     for (Replica replica : slice.getReplicas()) {
       // Tell the replica to become the leader if we're the preferred leader AND active AND not the leader already
       if (replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false) == false) {
@@ -127,87 +227,126 @@ class RebalanceLeaders {
       // OK, we are the preferred leader, are we the actual leader?
       if (replica.getBool(LEADER_PROP, false)) {
         //We're a preferred leader, but we're _also_ the leader, don't need to do anything.
-        NamedList<Object> noops = (NamedList<Object>) results.get(alreadyLeaders);
-        if (noops == null) {
-          noops = new NamedList<>();
-          results.add(alreadyLeaders, noops);
-        }
-        NamedList<Object> res = new NamedList<>();
-        res.add("status", "success");
-        res.add("msg", "Already leader");
-        res.add("shard", slice.getName());
-        res.add("nodeName", replica.getNodeName());
-        noops.add(replica.getName(), res);
+        addAlreadyLeaderToResults(slice, replica);
         return; // already the leader, do nothing.
       }
-
+      ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
       // We're the preferred leader, but someone else is leader. Only become leader if we're active.
-      if (replica.getState() != Replica.State.ACTIVE) {
-        NamedList<Object> inactives = (NamedList<Object>) results.get(inactivePreferreds);
-        if (inactives == null) {
-          inactives = new NamedList<>();
-          results.add(inactivePreferreds, inactives);
-        }
-        NamedList<Object> res = new NamedList<>();
-        res.add("status", "skipped");
-        res.add("msg", "Node is a referredLeader, but it's inactive. Skipping");
-        res.add("shard", slice.getName());
-        res.add("nodeName", replica.getNodeName());
-        inactives.add(replica.getName(), res);
+      if (replica.isActive(zkStateReader.getClusterState().getLiveNodes()) == false) {
+        addInactiveToResults(slice, replica);
         return; // Don't try to become the leader if we're not active!
       }
 
-      // Replica is the preferred leader but not the actual leader, do something about that.
-      // "Something" is
-      // 1> if the preferred leader isn't first in line, tell it to re-queue itself.
-      // 2> tell the actual leader to re-queue itself.
-
-      ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
-
       List<String> electionNodes = OverseerTaskProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
           ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
 
-      if (electionNodes.size() < 2) { // if there's only one node in the queue, should already be leader and we shouldn't be here anyway.
-        log.info("Rebalancing leaders and slice " + slice.getName() + " has less than two elements in the leader " +
-            "election queue, but replica " + replica.getName() + " doesn't think it's the leader.");
+      if (electionQueueInBadState(electionNodes, slice, replica)) {
         return;
       }
 
+      // Replica is the preferred leader but not the actual leader, do something about that.
+      // "Something" is
+      // 1> if the preferred leader isn't first in line, tell it to re-queue itself.
+      // 2> tell the actual leader to re-queue itself.
+
       // Ok, the sorting for election nodes is a bit strange. If the sequence numbers are the same, then the whole
       // string is used, but that sorts nodes with the same sequence number by their session IDs from ZK.
       // While this is determinate, it's not quite what we need, so re-queue nodes that aren't us and are
       // watching the leader node..
 
+
       String firstWatcher = electionNodes.get(1);
 
       if (LeaderElector.getNodeName(firstWatcher).equals(replica.getName()) == false) {
-        makeReplicaFirstWatcher(collectionName, slice, replica);
+        makeReplicaFirstWatcher(slice, replica);
       }
 
-      String coreName = slice.getReplica(LeaderElector.getNodeName(electionNodes.get(0))).getStr(CORE_NAME_PROP);
-      rejoinElection(collectionName, slice, electionNodes.get(0), coreName, false);
-      waitForNodeChange(collectionName, slice, electionNodes.get(0));
-
+      // This replica should be the leader at the end of the day, so let's record that information to check at the end
+      pendingOps.put(slice.getName(), replica.getName());
+      String leaderElectionNode = electionNodes.get(0);
+      String coreName = slice.getReplica(LeaderElector.getNodeName(leaderElectionNode)).getStr(CORE_NAME_PROP);
+      rejoinElectionQueue(slice, leaderElectionNode, coreName, false);
+      waitForNodeChange(slice, leaderElectionNode);
 
       return; // Done with this slice, skip the rest of the replicas.
     }
   }
+
+  // Check that the election queue has some members! There really should be two or more for this to make any sense,
+  // if there's only one we can't change anything.
+  private boolean electionQueueInBadState(List<String> electionNodes, Slice slice, Replica replica) {
+    if (electionNodes.size() < 2) { // if there's only one node in the queue, should already be leader and we shouldn't be here anyway.
+      log.warn("Rebalancing leaders and slice {} has less than two elements in the leader " +
+          "election queue, but replica {} doesn't think it's the leader.", slice.getName(), replica.getName());
+      return true;
+    }
+
+    return false;
+  }
+
+  // Provide some feedback to the user about what actually happened, or in this case where no action was
+  // possible
+  private void addInactiveToResults(Slice slice, Replica replica) {
+    NamedList<Object> inactives = (NamedList<Object>) results.get(INACTIVE_PREFERREDS);
+    if (inactives == null) {
+      inactives = new NamedList<>();
+      results.add(INACTIVE_PREFERREDS, inactives);
+    }
+    NamedList<Object> res = new NamedList<>();
+    res.add("status", "skipped");
+    res.add("msg", "Replica " + replica.getName() + " is a referredLeader for shard " + slice.getName() + ", but is inactive. No change necessary");
+    inactives.add(replica.getName(), res);
+  }
+
+  // Provide some feedback to the user about what actually happened, or in this case where no action was
+  // necesary since this preferred replica was already the leader
+  private void addAlreadyLeaderToResults(Slice slice, Replica replica) {
+    NamedList<Object> alreadyLeaders = (NamedList<Object>) results.get(ALREADY_LEADERS);
+    if (alreadyLeaders == null) {
+      alreadyLeaders = new NamedList<>();
+      results.add(ALREADY_LEADERS, alreadyLeaders);
+    }
+    NamedList<Object> res = new NamedList<>();
+    res.add("status", "skipped");
+    res.add("msg", "Replica " + replica.getName() + " is already the leader for shard " + slice.getName() + ". No change necessary");
+    alreadyLeaders.add(replica.getName(), res);
+  }
+
   // Put the replica in at the head of the queue and send all nodes with the same sequence number to the back of the list
-  void makeReplicaFirstWatcher(String collectionName, Slice slice, Replica replica)
+  // There can be "ties", i.e. replicas in the queue with the same sequence number. Sorting doesn't necessarily sort
+  // the one we most care about first. So put the node we _don't care about at the end of the election queuel
+
+  void makeReplicaFirstWatcher(Slice slice, Replica replica)
       throws KeeperException, InterruptedException {
 
     ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
     List<String> electionNodes = OverseerTaskProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
         ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
 
-    // First, queue up the preferred leader at the head of the queue.
+    // First, queue up the preferred leader watching the leader if it isn't already
+    int secondSeq = Integer.MAX_VALUE;
+
+    int candidateSeq = -1;
+    for (int idx = 1; idx < electionNodes.size(); ++idx) {
+      String candidate = electionNodes.get(idx);
+      secondSeq = Math.min(secondSeq, LeaderElector.getSeq(candidate));
+      if (LeaderElector.getNodeName(candidate).equals(replica.getName())) {
+        candidateSeq = LeaderElector.getSeq(candidate);
+      }
+    }
     int newSeq = -1;
-    for (String electionNode : electionNodes) {
-      if (LeaderElector.getNodeName(electionNode).equals(replica.getName())) {
-        String coreName = slice.getReplica(LeaderElector.getNodeName(electionNode)).getStr(CORE_NAME_PROP);
-        rejoinElection(collectionName, slice, electionNode, coreName, true);
-        newSeq = waitForNodeChange(collectionName, slice, electionNode);
-        break;
+    if (candidateSeq == secondSeq) {
+      // the preferredLeader is already watching the leader, no need to move it around.
+      newSeq = secondSeq;
+    } else {
+      for (String electionNode : electionNodes) {
+        if (LeaderElector.getNodeName(electionNode).equals(replica.getName())) {
+          // Make the preferred leader watch the leader.
+          String coreName = slice.getReplica(LeaderElector.getNodeName(electionNode)).getStr(CORE_NAME_PROP);
+          rejoinElectionQueue(slice, electionNode, coreName, true);
+          newSeq = waitForNodeChange(slice, electionNode);
+          break;
+        }
       }
     }
     if (newSeq == -1) {
@@ -225,18 +364,22 @@ class RebalanceLeaders {
       if (LeaderElector.getNodeName(thisNode).equals(replica.getName())) {
         continue;
       }
+      // We won't get here for the preferredLeader node
       if (LeaderElector.getSeq(thisNode) == newSeq) {
         String coreName = slice.getReplica(LeaderElector.getNodeName(thisNode)).getStr(CORE_NAME_PROP);
-        rejoinElection(collectionName, slice, thisNode, coreName, false);
-        waitForNodeChange(collectionName, slice, thisNode);
+        rejoinElectionQueue(slice, thisNode, coreName, false);
+        waitForNodeChange(slice, thisNode);
       }
     }
   }
 
-  int waitForNodeChange(String collectionName, Slice slice, String electionNode) throws InterruptedException, KeeperException {
+  // We're just waiting for the electionNode to rejoin the queue with a _different_ node, indicating that any
+  // requeueing we've done has happened.
+  int waitForNodeChange(Slice slice, String electionNode) throws InterruptedException, KeeperException {
     String nodeName = LeaderElector.getNodeName(electionNode);
     int oldSeq = LeaderElector.getSeq(electionNode);
     for (int idx = 0; idx < 600; ++idx) {
+
       ZkStateReader zkStateReader = coreContainer.getZkController().getZkStateReader();
       List<String> electionNodes = OverseerTaskProcessor.getSortedElectionNodes(zkStateReader.getZkClient(),
           ZkStateReader.getShardLeadersElectPath(collectionName, slice.getName()));
@@ -245,14 +388,16 @@ class RebalanceLeaders {
           return LeaderElector.getSeq(testNode);
         }
       }
-
-      Thread.sleep(100);
+      TimeUnit.MILLISECONDS.sleep(100);
+      zkStateReader.forciblyRefreshAllClusterStateSlow();
     }
     return -1;
   }
-  
-  private void rejoinElection(String collectionName, Slice slice, String electionNode, String core,
-                              boolean rejoinAtHead) throws KeeperException, InterruptedException {
+
+  // Move an election node to some other place in the queue. If rejoinAtHead==false, then at the end, otherwise
+  // the new node should point at the leader.
+  private void rejoinElectionQueue(Slice slice, String electionNode, String core, boolean rejoinAtHead)
+      throws KeeperException, InterruptedException {
     Replica replica = slice.getReplica(LeaderElector.getNodeName(electionNode));
     Map<String, Object> propMap = new HashMap<>();
     propMap.put(COLLECTION_PROP, collectionName);
@@ -265,64 +410,84 @@ class RebalanceLeaders {
     propMap.put(ELECTION_NODE_PROP, electionNode);
     String asyncId = REBALANCELEADERS.toLower() + "_" + core + "_" + Math.abs(System.nanoTime());
     propMap.put(ASYNC, asyncId);
+    asyncRequests.add(asyncId);
+
     collectionsHandler.sendToOCPQueue(new ZkNodeProps(propMap)); // ignore response; we construct our own
   }
 
-  // currentAsyncIds - map of request IDs and reporting data (value)
   // maxWaitSecs - How long are we going to wait? Defaults to 30 seconds.
-  // waitForAll - if true, do not return until all assignments have been made.
-  // results - a place to stash results for reporting back to the user.
+  // waitForAll - if true, do not return until all requests have been processed. "Processed" could mean failure!
   //
-  private boolean waitForLeaderChange(Map<String, String> currentAsyncIds, final int maxWaitSecs,
-                                      Boolean waitForAll, NamedList<Object> results)
+
+  private boolean waitAsyncRequests(final int maxWaitSecs, Boolean waitForAll)
       throws KeeperException, InterruptedException {
 
-    if (currentAsyncIds.size() == 0) return true;
+    if (asyncRequests.size() == 0) {
+      return true;
+    }
 
     for (int idx = 0; idx < maxWaitSecs * 10; ++idx) {
-      Iterator<Map.Entry<String, String>> iter = currentAsyncIds.entrySet().iterator();
+      Iterator<String> iter = asyncRequests.iterator();
       boolean foundChange = false;
       while (iter.hasNext()) {
-        Map.Entry<String, String> pair = iter.next();
-        String asyncId = pair.getKey();
+        String asyncId = iter.next();
         if (coreContainer.getZkController().getOverseerFailureMap().contains(asyncId)) {
           coreContainer.getZkController().getOverseerFailureMap().remove(asyncId);
           coreContainer.getZkController().clearAsyncId(asyncId);
-          NamedList<Object> fails = (NamedList<Object>) results.get("failures");
-          if (fails == null) {
-            fails = new NamedList<>();
-            results.add("failures", fails);
-          }
-          NamedList<Object> res = new NamedList<>();
-          res.add("status", "failed");
-          res.add("msg", "Failed to assign '" + pair.getValue() + "' to be leader");
-          fails.add(asyncId.substring(REBALANCELEADERS.toLower().length()), res);
           iter.remove();
           foundChange = true;
         } else if (coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId)) {
           coreContainer.getZkController().getOverseerCompletedMap().remove(asyncId);
           coreContainer.getZkController().clearAsyncId(asyncId);
-          NamedList<Object> successes = (NamedList<Object>) results.get("successes");
-          if (successes == null) {
-            successes = new NamedList<>();
-            results.add("successes", successes);
-          }
-          NamedList<Object> res = new NamedList<>();
-          res.add("status", "success");
-          res.add("msg", "Assigned '" + pair.getValue() + "' to be leader");
-          successes.add(asyncId.substring(REBALANCELEADERS.toLower().length()), res);
           iter.remove();
           foundChange = true;
         }
       }
       // We're done if we're processing a few at a time or all requests are processed.
-      if ((foundChange && waitForAll == false) || currentAsyncIds.size() == 0) {
+      // We don't want to change, say, 100s of leaders simultaneously. So if the request specifies some limit,
+      // and we're at that limit, we want to return to the caller so it can immediately add another request.
+      // That's the purpose of the first clause here. Otherwise, of course, just return if all requests are
+      // processed.
+      if ((foundChange && waitForAll == false) || asyncRequests.size() == 0) {
         return true;
       }
-      Thread.sleep(100); //TODO: Is there a better thing to do than sleep here?
+      TimeUnit.MILLISECONDS.sleep(100);
     }
+    // If we get here, we've timed out waiting.
     return false;
   }
 
+  // If we actually changed the leader, we should send that fact back in the response.
+  private void addToSuccesses(Slice slice, Replica replica) {
+    NamedList<Object> successes = (NamedList<Object>) results.get("successes");
+    if (successes == null) {
+      successes = new NamedList<>();
+      results.add("successes", successes);
+    }
+    log.info("Successfully changed leader of shard {} to replica {}", slice.getName(), replica.getName());
+    NamedList<Object> res = new NamedList<>();
+    res.add("status", "success");
+    res.add("msg", "Successfully changed leader of slice " + slice.getName() + " to " + replica.getName());
+    successes.add(slice.getName(), res);
+  }
 
+  // If for any reason we were supposed to change leadership, that should be recorded in changingLeaders. Any
+  // time we verified that the change actually occurred, that entry should have been removed. So report anything
+  // left over as a failure.
+    private void addAnyFailures() {
+    if (pendingOps.size() == 0) {
+      return;
+    }
+    NamedList<Object> fails = (NamedList<Object>) new NamedList<>();
+    results.add("failures", fails);
+
+    for (Map.Entry<String, String> ent : pendingOps.entrySet()) {
+      log.info("Failed to change leader of shard {} to replica {}", ent.getKey(), ent.getValue());
+      NamedList<Object> res = new NamedList<>();
+      res.add("status", "failed");
+      res.add("msg", String.format(Locale.ROOT, "Could not change leder for slice %s to %s", ent.getKey(), ent.getValue()));
+      fails.add(ent.getKey(), res);
+
+    }
+  }
 }
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java b/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java
index b47424f..b207fa3 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestRebalanceLeaders.java
@@ -15,335 +15,629 @@
  * limitations under the License.
  */
 package org.apache.solr.cloud;
+
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.lucene.util.LuceneTestCase;
 import org.apache.solr.client.solrj.SolrRequest;
 import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.params.CollectionParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.TimeSource;
-import org.apache.solr.common.util.Utils;
 import org.apache.solr.util.TimeOut;
 import org.apache.zookeeper.KeeperException;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-public class TestRebalanceLeaders extends AbstractFullDistribZkTestBase {
+@LuceneTestCase.Slow
+public class TestRebalanceLeaders extends SolrCloudTestCase {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  public static final String COLLECTION_NAME = "testcollection";
+  private static final String COLLECTION_NAME = "TestColl";
+
+  private static int numNodes;
+  private static int numShards;
+  private static int numReplicas;
+
+  private static boolean useAdminToSetProps = false;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+
+    numNodes = random().nextInt(4) + 4;
+    numShards = random().nextInt(3) + 3;
+    numReplicas = random().nextInt(2) + 2;
+    useAdminToSetProps = random().nextBoolean();
+
+    configureCluster(numNodes)
+        .addConfig(COLLECTION_NAME, configset("cloud-minimal"))
+        .configure();
+
+    CollectionAdminResponse resp = CollectionAdminRequest.createCollection(COLLECTION_NAME, COLLECTION_NAME,
+        numShards, numReplicas, 0, 0)
+        .setMaxShardsPerNode((numShards * numReplicas) / numNodes + 1)
+        .process(cluster.getSolrClient());
+    assertEquals("Admin request failed; ", 0, resp.getStatus());
+    cluster.waitForActiveCollection(COLLECTION_NAME, numShards, numShards * numReplicas);
 
-  public TestRebalanceLeaders() {
-    schemaString = "schema15.xml";      // we need a string id
-    sliceCount = 4;
   }
 
-  int reps = 10;
+  @Before
+  public void removeAllProperties() throws KeeperException, InterruptedException {
+    forceUpdateCollectionStatus();
+    DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
+    for (Slice slice : docCollection.getSlices()) {
+      for (Replica rep : slice.getReplicas()) {
+        rep.getProperties().forEach((key, value) -> {
+          if (key.startsWith("property.")) {
+            try {
+              delProp(slice, rep, key);
+            } catch (IOException | SolrServerException e) {
+              fail("Caught unexpected exception in @Before " + e.getMessage());
+            }
+          }
+        });
+      }
+    }
+  }
+
   int timeoutMs = 60000;
-  Map<String, List<Replica>> initial = new HashMap<>();
 
-  Map<String, Replica> expected = new HashMap<>();
 
+  // test that setting an arbitrary "slice unique" property un-sets the property if it's on another replica in the
+  // slice. This is testing when the property is set on an _individual_ replica whereas testBalancePropertySliceUnique
+  // tests whether changing an individual _replica_ un-sets the property on other replicas _in that slice_.
+  //
+  // NOTE: There were significant problems because at one point the code implicitly defined
+  // shardUnique=true for the special property preferredLeader. That was removed at one point so we're explicitly
+  // testing that as well.
   @Test
-  @ShardsFixed(num = 4)
-  public void test() throws Exception {
-    reps = random().nextInt(9) + 1; // make sure and do at least one.
-    try (CloudSolrClient client = createCloudClient(null)) {
-      // Mix up a bunch of different combinations of shards and replicas in order to exercise boundary cases.
-      // shards, replicationfactor, maxreplicaspernode
-      int shards = random().nextInt(7);
-      if (shards < 2) shards = 2;
-      int rFactor = random().nextInt(4);
-      if (rFactor < 2) rFactor = 2;
-      createCollection(null, COLLECTION_NAME, shards, rFactor, shards * rFactor + 1, client, null, "conf1");
-    }
+  public void testSetArbitraryPropertySliceUnique() throws IOException, SolrServerException, InterruptedException, KeeperException {
+    // Check both special (preferredLeader) and something arbitrary.
+    doTestSetArbitraryPropertySliceUnique("foo" + random().nextInt(1_000_000));
+    removeAllProperties();
+    doTestSetArbitraryPropertySliceUnique("preferredleader");
+  }
 
-    waitForCollection(cloudClient.getZkStateReader(), COLLECTION_NAME, 2);
-    waitForRecoveriesToFinish(COLLECTION_NAME, false);
 
-    listCollection();
-    rebalanceLeaderTest();
+  // Test that automatically distributing a slice unique property un-sets that property if it's in any other replica
+  // on that slice.
+  // This is different than the test above. The test above sets individual properties on individual nodes. This one
+  // relies on Solr to pick which replicas to set the property on
+  @Test
+  public void testBalancePropertySliceUnique() throws KeeperException, InterruptedException, IOException, SolrServerException {
+    // Check both cases of "special" property preferred(Ll)eader
+    doTestBalancePropertySliceUnique("foo" + random().nextInt(1_000_000));
+    removeAllProperties();
+    doTestBalancePropertySliceUnique("preferredleader");
   }
 
-  private void listCollection() throws IOException, SolrServerException {
-    //CloudSolrServer client = createCloudClient(null);
-    try {
-      ModifiableSolrParams params = new ModifiableSolrParams();
-      params.set("action", CollectionParams.CollectionAction.LIST.toString());
-      SolrRequest request = new QueryRequest(params);
-      request.setPath("/admin/collections");
-
-      NamedList<Object> rsp = cloudClient.request(request);
-      List<String> collections = (List<String>) rsp.get("collections");
-      assertTrue("control_collection was not found in list", collections.contains("control_collection"));
-      assertTrue(DEFAULT_COLLECTION + " was not found in list", collections.contains(DEFAULT_COLLECTION));
-      assertTrue(COLLECTION_NAME + " was not found in list", collections.contains(COLLECTION_NAME));
-    } finally {
-      //remove collections
-      //client.shutdown();
-    }
+  // We've moved on from a property being tested, we need to check if rebalancing the leaders actually chantges the
+  // leader appropriately.
+  @Test
+  public void testRebalanceLeaders() throws Exception {
+
+    // First let's unbalance the preferredLeader property, do all the leaders get reassigned properly?
+    concentrateProp("preferredLeader");
+    sendRebalanceCommand();
+    checkPreferredsAreLeaders();
+
+    // Now follow up by evenly distributing the property as well as possible.
+    doTestBalancePropertySliceUnique("preferredLeader");
+    sendRebalanceCommand();
+    checkPreferredsAreLeaders();
+
+    // Now check the condition we saw "in the wild" where you could not rebalance properly when Jetty was restarted.
+    concentratePropByRestartingJettys();
+    sendRebalanceCommand();
+    checkPreferredsAreLeaders();
   }
 
-  void recordInitialState() throws InterruptedException {
-    Map<String, Slice> slices = cloudClient.getZkStateReader().getClusterState().getCollection(COLLECTION_NAME).getSlicesMap();
+  // Insure that the property is set on only one replica per slice when changing a unique property on an individual
+  // replica.
+  private void doTestSetArbitraryPropertySliceUnique(String propIn) throws InterruptedException, KeeperException, IOException, SolrServerException {
+    final String prop = (random().nextBoolean()) ? propIn : propIn.toUpperCase(Locale.ROOT);
+    // First set the property in some replica in some slice
+    forceUpdateCollectionStatus();
+    DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
+
+    Slice[] slices = docCollection.getSlices().toArray(new Slice[0]);
+    Slice slice = slices[random().nextInt(slices.length)];
+
+    // Bounce around a bit setting this property and insure it's only set in one replica.
+    Replica[] reps = slice.getReplicas().toArray(new Replica[0]);
+    for (int idx = 0; idx < 4; ++idx) {
+      Replica rep = reps[random().nextInt(reps.length)];
+      // Set the property on a particular replica
+      setProp(slice, rep, prop);
+      TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+
+      long count = 0;
+      boolean rightRep = false;
+      Slice modSlice;
+      DocCollection modColl = null; // keeps IDE happy
+
+      // insure that no other replica in that slice has the property when we return.
+      while (timeout.hasTimedOut() == false) {
+        forceUpdateCollectionStatus();
+        modColl = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
+        modSlice = modColl.getSlice(slice.getName());
+        rightRep = modSlice.getReplica(rep.getName()).getBool("property." + prop.toLowerCase(Locale.ROOT), false);
+        count = modSlice.getReplicas().stream().filter(thisRep -> thisRep.getBool("property." + prop.toLowerCase(Locale.ROOT), false)).count();
+
+        if (count == 1 && rightRep) {
+          break;
+        }
 
-    // Assemble a list of all the replicas for all the shards in a convenient way to look at them.
-    for (Map.Entry<String, Slice> ent : slices.entrySet()) {
-      initial.put(ent.getKey(), new ArrayList<>(ent.getValue().getReplicas()));
+        TimeUnit.MILLISECONDS.sleep(100);
+      }
+      if (count != 1 || rightRep == false) {
+        fail("The property " + prop + " was not uniquely distributed in slice " + slice.getName()
+            + " " + modColl.toString());
+      }
     }
   }
 
-  void rebalanceLeaderTest() throws InterruptedException, IOException, SolrServerException, KeeperException {
-    recordInitialState();
-    for (int idx = 0; idx < reps; ++idx) {
-      issueCommands();
-      checkConsistency();
-    }
-  }
 
-  // After we've called the rebalance command, we want to insure that:
-  // 1> all replicas appear once and only once in the respective leader election queue
-  // 2> All the replicas we _think_ are leaders are in the 0th position in the leader election queue.
-  // 3> The node that ZooKeeper thinks is the leader is the one we think should be the leader.
-  void checkConsistency() throws InterruptedException, KeeperException {
+  // Fail if we the replicas with the preferredLeader property are _not_ also the leaders.
+  private void checkPreferredsAreLeaders() throws InterruptedException, KeeperException {
+    // Make sure that the shard unique are where you expect.
     TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
-    boolean checkAppearOnce = false;
-    boolean checkElectionZero = false;
-    boolean checkZkLeadersAgree = false;
-    while (!timeout.hasTimedOut()) {
-      checkAppearOnce = checkAppearOnce();
-      checkElectionZero = checkElectionZero();
-      checkZkLeadersAgree = checkZkLeadersAgree();
-      if (checkAppearOnce && checkElectionZero && checkZkLeadersAgree) {
+
+    while (timeout.hasTimedOut() == false) {
+      if (checkPreferredsAreLeaders(false)) {
+        // Ok, all preferreds are leaders. Just for Let's also get the election queue and guarantee that every
+        // live replica is in the queue and none are repeated.
+        checkElectionQueues();
         return;
       }
-      Thread.sleep(1000);
+      TimeUnit.MILLISECONDS.sleep(100);
     }
 
-    fail("Checking the rebalance leader command failed, checkAppearOnce=" + checkAppearOnce + " checkElectionZero="
-        + checkElectionZero + " checkZkLeadersAgree=" + checkZkLeadersAgree);
+    log.error("Leaders are not all preferres {}", cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME));
+    // Show the errors
+    checkPreferredsAreLeaders(true);
   }
 
+  // Do all active nodes in each slice appear exactly once in the slice's leader election queue?
+  // Since we assert that the number of live replicas is the same size as the leader election queue, we only
+  // have to compare one way.
+  private void checkElectionQueues() throws KeeperException, InterruptedException {
 
-  // Do all the nodes appear exactly once in the leader election queue and vice-versa?
-  Boolean checkAppearOnce() throws KeeperException, InterruptedException {
+    DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
+    Set<String> liveNodes = cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes();
 
-    for (Map.Entry<String, List<Replica>> ent : initial.entrySet()) {
-      List<String> leaderQueue = cloudClient.getZkStateReader().getZkClient().getChildren("/collections/" + COLLECTION_NAME +
-          "/leader_elect/" + ent.getKey() + "/election", null, true);
-
-      if (leaderQueue.size() != ent.getValue().size()) {
-        return false;
-      }
-      // Check that each election node has a corresponding replica.
-      for (String electionNode : leaderQueue) {
-        if (checkReplicaName(LeaderElector.getNodeName(electionNode), ent.getValue())) {
-          continue;
-        }
-        return false;
-      }
-      // Check that each replica has an election node.
-      for (Replica rep : ent.getValue()) {
-        if (checkElectionNode(rep.getName(), leaderQueue)) {
-          continue;
+    for (Slice slice : docCollection.getSlices()) {
+      Set<Replica> liveReplicas = new HashSet<>();
+      slice.getReplicas().forEach(replica -> {
+        if (replica.isActive(liveNodes)) {
+          liveReplicas.add(replica);
         }
-        return false;
-      }
+      });
+      checkOneQueue(docCollection, slice, liveReplicas);
     }
-    return true;
   }
 
-  // Check that the given name is in the leader election queue
-  Boolean checkElectionNode(String repName, List<String> leaderQueue) {
+  // Helper method to check one leader election queue's consistency.
+  private void checkOneQueue(DocCollection coll, Slice slice, Set<Replica> liveReplicas) throws KeeperException, InterruptedException {
+
+    List<String> leaderQueue = cluster.getSolrClient().getZkStateReader().getZkClient().getChildren("/collections/" + COLLECTION_NAME +
+        "/leader_elect/" + slice.getName() + "/election", null, true);
+
+    if (leaderQueue.size() != liveReplicas.size()) {
+
+      log.error("One or more replicas is missing from the leader election queue! Slice {}, election queue: {}, collection: {}"
+          , slice.getName(), leaderQueue, coll);
+      fail("One or more replicas is missing from the leader election queue");
+    }
+    // Check that each election node has a corresponding live replica.
     for (String electionNode : leaderQueue) {
-      if (repName.equals(LeaderElector.getNodeName(electionNode))) {
-        return true;
+      String replica = LeaderElector.getNodeName(electionNode);
+      if (slice.getReplica(replica) == null) {
+        log.error("Replica {} is not in the election queue: {}", replica, leaderQueue);
+        fail("Replica is not in the election queue!");
       }
     }
-    return false;
   }
 
-  // Check that the name passed in corresponds to a replica.
-  Boolean checkReplicaName(String toCheck, List<Replica> replicas) {
-    for (Replica rep : replicas) {
-      if (toCheck.equals(rep.getName())) {
-        return true;
+  // Just an encapsulation for checkPreferredsAreLeaders to make returning easier.
+  // the doAsserts var is to actually print the problem and fail the test if the condition is not met.
+  private boolean checkPreferredsAreLeaders(boolean doAsserts) throws KeeperException, InterruptedException {
+    forceUpdateCollectionStatus();
+    DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
+    for (Slice slice : docCollection.getSlices()) {
+      for (Replica rep : slice.getReplicas()) {
+        if (rep.getBool("property.preferredleader", false)) {
+          boolean isLeader = rep.getBool("leader", false);
+          if (doAsserts) {
+            assertTrue("PreferredLeader should be the leader: ", isLeader);
+          } else if (isLeader == false) {
+            return false;
+          }
+        }
       }
     }
-    return false;
+    return true;
   }
 
-  // Get the shard leader election from ZK and sort it. The node may not actually be there, so retry
-  List<String> getOverseerSort(String key) {
-    List<String> ret = null;
-    try {
-      ret = OverseerCollectionConfigSetProcessor.getSortedElectionNodes(cloudClient.getZkStateReader().getZkClient(),
-          "/collections/" + COLLECTION_NAME + "/leader_elect/" + key + "/election");
-      return ret;
-    } catch (KeeperException e) {
-      cloudClient.connect();
-    } catch (InterruptedException e) {
-      return null;
+  // Arbitrarily send the rebalance command either with the SolrJ interface or with an HTTP request.
+  private void sendRebalanceCommand() throws SolrServerException, InterruptedException, IOException {
+    if (random().nextBoolean()) {
+      rebalanceLeaderUsingSolrJAPI();
+    } else {
+      rebalanceLeaderUsingStandardRequest();
     }
-    return null;
   }
 
-  // Is every node we think is the leader in the zeroth position in the leader election queue?
-  Boolean checkElectionZero() {
-    for (Map.Entry<String, Replica> ent : expected.entrySet()) {
+  // Helper method to make sure the property is _unbalanced_ first, then it gets properly re-assigned with the
+  // BALANCESHARDUNIQUE command.
+  private void doTestBalancePropertySliceUnique(String propIn) throws InterruptedException, IOException, KeeperException, SolrServerException {
+    final String prop = (random().nextBoolean()) ? propIn : propIn.toUpperCase(Locale.ROOT);
 
-      List<String> leaderQueue = getOverseerSort(ent.getKey());
-      if (leaderQueue == null) return false;
+    // Concentrate the properties on as few replicas a possible
+    concentrateProp(prop);
+
+    // issue the BALANCESHARDUNIQUE command
+    rebalancePropAndCheck(prop);
+
+    // Verify that there are no more than one replica with the property per shard.
+    verifyPropUniquePerShard(prop);
+
+    // Verify that the property is reasonably evenly distributed
+    verifyPropCorrectlyDistributed(prop);
 
-      String electName = LeaderElector.getNodeName(leaderQueue.get(0));
-      String coreName = ent.getValue().getName();
-      if (electName.equals(coreName) == false) {
-        return false;
-      }
-    }
-    return true;
   }
 
-  // Do who we _think_ should be the leader agree with the leader nodes?
-  Boolean checkZkLeadersAgree() throws KeeperException, InterruptedException {
-    for (Map.Entry<String,Replica> ent : expected.entrySet()) {
-      
-      String path = "/collections/" + COLLECTION_NAME + "/leaders/" + ent.getKey() + "/leader";
-      byte[] data = getZkData(cloudClient, path);
-      if (data == null) {
-        log.warn("path to check not found {}", path);
-        return false;
-      }
-      
-      String repCore = null;
-      String zkCore = null;
-      
-      Map m = (Map) Utils.fromJSON(data);
-      zkCore = (String) m.get("core");
-      repCore = ent.getValue().getStr("core");
-      if (zkCore.equals(repCore) == false) {
-        log.warn("leader in zk does not match what we expect: {} != {}", zkCore, repCore);
-        return false;
+  private void verifyPropCorrectlyDistributed(String prop) throws KeeperException, InterruptedException {
+
+    TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+
+    String propLC = prop.toLowerCase(Locale.ROOT);
+    DocCollection docCollection = null;
+    while (timeout.hasTimedOut() == false) {
+      forceUpdateCollectionStatus();
+      docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
+      int maxPropCount = Integer.MAX_VALUE;
+      int minPropCount = Integer.MIN_VALUE;
+      for (Slice slice : docCollection.getSlices()) {
+        int repCount = 0;
+        for (Replica rep : slice.getReplicas()) {
+          if (rep.getBool("property." + propLC, false)) {
+            repCount++;
+          }
+        }
+        maxPropCount = Math.max(maxPropCount, repCount);
+        minPropCount = Math.min(minPropCount, repCount);
       }
-      
+      if (Math.abs(maxPropCount - minPropCount) < 2) return;
     }
-    return true;
+    log.error("Property {} is not distributed evenly. {}", prop, docCollection);
+    fail("Property is not distributed evenly " + prop);
   }
 
-  byte[] getZkData(CloudSolrClient client, String path) {
-    org.apache.zookeeper.data.Stat stat = new org.apache.zookeeper.data.Stat();
-    try {
-      byte[] data = client.getZkStateReader().getZkClient().getData(path, null, stat, true);
-      if (data != null) {
-        return data;
+  // Used when we concentrate the leader on a few nodes.
+  private void verifyPropDistributedAsExpected(Map<String, String> expectedShardReplicaMap, String prop) throws InterruptedException, KeeperException {
+    // Make sure that the shard unique are where you expect.
+    TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+
+    String propLC = prop.toLowerCase(Locale.ROOT);
+    boolean failure = false;
+    DocCollection docCollection = null;
+    while (timeout.hasTimedOut() == false) {
+      forceUpdateCollectionStatus();
+      docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
+      failure = false;
+      for (Map.Entry<String, String> ent : expectedShardReplicaMap.entrySet()) {
+        Replica rep = docCollection.getSlice(ent.getKey()).getReplica(ent.getValue());
+        if (rep.getBool("property." + propLC, false) == false) {
+          failure = true;
+        }
       }
-    } catch (KeeperException.NoNodeException e) {
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException e1) {
-        return null;
+      if (failure == false) {
+        return;
       }
-    } catch (InterruptedException | KeeperException e) {
-      return null;
+      TimeUnit.MILLISECONDS.sleep(100);
     }
-    return null;
+
+    fail(prop + " properties are not on the expected replicas: " + docCollection.toString()
+        + System.lineSeparator() + "Expected " + expectedShardReplicaMap.toString());
   }
 
-  // It's OK not to check the return here since the subsequent tests will fail.
-  void issueCommands() throws IOException, SolrServerException, KeeperException, InterruptedException {
+  // Just check that the property is distributed as expectecd. This does _not_ rebalance the leaders
+  private void rebalancePropAndCheck(String prop) throws IOException, SolrServerException, InterruptedException, KeeperException {
 
-    // Find a replica to make the preferredLeader. NOTE: may be one that's _already_ leader!
-    expected.clear();
-    for (Map.Entry<String, List<Replica>> ent : initial.entrySet()) {
-      List<Replica> replicas = ent.getValue();
-      Replica rep = replicas.get(Math.abs(random().nextInt()) % replicas.size());
-      expected.put(ent.getKey(), rep);
-      issuePreferred(ent.getKey(), rep);
+    if (random().nextBoolean()) {
+      rebalancePropUsingSolrJAPI(prop);
+    } else {
+      rebalancePropUsingStandardRequest(prop);
     }
+  }
 
-    if (waitForAllPreferreds() == false) {
-      fail("Waited for timeout for preferredLeader assignments to be made and they werent.");
-    }
-    //fillExpectedWithCurrent();
-    // Now rebalance the leaders randomly using SolrJ or direct call
-    if(random().nextBoolean())
-      rebalanceLeaderUsingSolrJAPI();
-    else
-      rebalanceLeaderUsingDirectCall();
 
+  private void rebalanceLeaderUsingSolrJAPI() throws IOException, SolrServerException, InterruptedException {
+    CollectionAdminResponse resp = CollectionAdminRequest
+        .rebalanceLeaders(COLLECTION_NAME)
+        .process(cluster.getSolrClient());
+    assertTrue("All leaders should have been verified", resp.getResponse().get("Summary").toString().contains("Success"));
+    assertEquals("Admin request failed; ", 0, resp.getStatus());
+  }
+
+  private void rebalanceLeaderUsingStandardRequest() throws IOException, SolrServerException {
+    ModifiableSolrParams params = new ModifiableSolrParams();
+    params.set("action", CollectionParams.CollectionAction.REBALANCELEADERS.toString());
+    params.set("collection", COLLECTION_NAME);
+    QueryRequest request = new QueryRequest(params);
+    request.setPath("/admin/collections");
+    QueryResponse resp = request.process(cluster.getSolrClient());
+    assertTrue("All leaders should have been verified", resp.getResponse().get("Summary").toString().contains("Success"));
+    assertEquals("Call to rebalanceLeaders failed ", 0, resp.getStatus());
   }
 
-  private void rebalanceLeaderUsingSolrJAPI() throws IOException, SolrServerException {
-    CollectionAdminRequest.RebalanceLeaders rebalanceLeaders = CollectionAdminRequest.rebalanceLeaders(COLLECTION_NAME);
-    rebalanceLeaders.setMaxAtOnce(10)
-        .process(cloudClient);
+
+  private void rebalancePropUsingSolrJAPI(String prop) throws IOException, SolrServerException, InterruptedException {
+    // Don't set the value, that should be done automatically.
+    CollectionAdminResponse resp;
+
+    if (prop.toLowerCase(Locale.ROOT).contains("preferredleader")) {
+      resp = CollectionAdminRequest
+          .balanceReplicaProperty(COLLECTION_NAME, prop)
+          .process(cluster.getSolrClient());
+
+    } else {
+      resp = CollectionAdminRequest
+          .balanceReplicaProperty(COLLECTION_NAME, prop)
+          .setShardUnique(true)
+          .process(cluster.getSolrClient());
+
+    }
+    assertEquals("Admin request failed; ", 0, resp.getStatus());
   }
 
-  private void rebalanceLeaderUsingDirectCall() throws IOException, SolrServerException {
+  private void rebalancePropUsingStandardRequest(String prop) throws IOException, SolrServerException {
     ModifiableSolrParams params = new ModifiableSolrParams();
-    params.set("action", CollectionParams.CollectionAction.REBALANCELEADERS.toString());
+    params.set("action", CollectionParams.CollectionAction.BALANCESHARDUNIQUE.toString());
+    params.set("property", prop);
 
-    // Insure we get error returns when omitting required parameters
     params.set("collection", COLLECTION_NAME);
-    params.set("maxAtOnce", "10");
-    SolrRequest request = new QueryRequest(params);
+    if (prop.toLowerCase(Locale.ROOT).contains("preferredleader") == false) {
+      params.set("shardUnique", true);
+    }
+    QueryRequest request = new QueryRequest(params);
     request.setPath("/admin/collections");
-    cloudClient.request(request);
+    QueryResponse resp = request.process(cluster.getSolrClient());
+    assertEquals("Call to rebalanceLeaders failed ", 0, resp.getStatus());
+  }
 
+  // This important. I've (Erick Erickson) run across a situation where the "standard request" causes failures, but
+  // never the Admin request. So let's test both all the time for a given test.
+  //
+  // This sets an _individual_ replica to have the property, not collection-wide
+  private void setProp(Slice slice, Replica rep, String prop) throws IOException, SolrServerException {
+    if (useAdminToSetProps) {
+      setPropWithAdminRequest(slice, rep, prop);
+    } else {
+      setPropWithStandardRequest(slice, rep, prop);
+    }
   }
 
-  void issuePreferred(String slice, Replica rep) throws IOException, SolrServerException, InterruptedException {
+  void setPropWithStandardRequest(Slice slice, Replica rep, String prop) throws IOException, SolrServerException {
     ModifiableSolrParams params = new ModifiableSolrParams();
     params.set("action", CollectionParams.CollectionAction.ADDREPLICAPROP.toString());
 
-    // Insure we get error returns when omitting required parameters
-
     params.set("collection", COLLECTION_NAME);
-    params.set("shard", slice);
+    params.set("shard", slice.getName());
     params.set("replica", rep.getName());
-    params.set("property", "preferredLeader");
+    params.set("property", prop);
     params.set("property.value", "true");
+    // Test to insure that implicit shardUnique is added for preferredLeader.
+    if (prop.toLowerCase(Locale.ROOT).equals("preferredleader") == false) {
+      params.set("shardUnique", "true");
+    }
 
     SolrRequest request = new QueryRequest(params);
     request.setPath("/admin/collections");
-    cloudClient.request(request);
+    cluster.getSolrClient().request(request);
+    String propLC = prop.toLowerCase(Locale.ROOT);
+    waitForState("Expecting property '" + prop + "'to appear on replica " + rep.getName(), COLLECTION_NAME,
+        (n, c) -> "true".equals(c.getReplica(rep.getName()).getProperty(propLC)));
+
+  }
+
+  void setPropWithAdminRequest(Slice slice, Replica rep, String prop) throws IOException, SolrServerException {
+    boolean setUnique = (prop.toLowerCase(Locale.ROOT).equals("preferredleader") == false);
+    CollectionAdminRequest.AddReplicaProp addProp =
+        CollectionAdminRequest.addReplicaProperty(COLLECTION_NAME, slice.getName(), rep.getName(), prop, "true");
+    if (setUnique) {
+      addProp.setShardUnique(true);
+    }
+    CollectionAdminResponse resp = addProp.process(cluster.getSolrClient());
+    assertEquals(0, resp.getStatus());
+    String propLC = prop.toLowerCase(Locale.ROOT);
+    waitForState("Expecting property '" + prop + "'to appear on replica " + rep.getName(), COLLECTION_NAME,
+        (n, c) -> "true".equals(c.getReplica(rep.getName()).getProperty(propLC)));
+
+  }
+
+  private void delProp(Slice slice, Replica rep, String prop) throws IOException, SolrServerException {
+    String propLC = prop.toLowerCase(Locale.ROOT);
+    CollectionAdminResponse resp = CollectionAdminRequest.deleteReplicaProperty(COLLECTION_NAME, slice.getName(), rep.getName(), propLC)
+        .process(cluster.getSolrClient());
+    assertEquals("Admin request failed; ", 0, resp.getStatus());
+    waitForState("Expecting property '" + prop + "' to be removed from replica " + rep.getName(), COLLECTION_NAME,
+        (n, c) -> c.getReplica(rep.getName()).getProperty(prop) == null);
+  }
+
+  // Intentionally un-balance the property to insure that BALANCESHARDUNIQUE does its job. There was an odd case
+  // where rebalancing didn't work very well if the Solr nodes were stopped and restarted that worked perfectly
+  // when if the nodes were _not_ restarted in the test. So we have to test that too.
+  private void concentratePropByRestartingJettys() throws Exception {
+
+    List<JettySolrRunner> jettys = new ArrayList<>(cluster.getJettySolrRunners());
+    Collections.shuffle(jettys, random());
+    jettys.remove(random().nextInt(jettys.size()));
+    // Now we have a list of jettys, and there is one missing. Stop all of the remaining jettys, then start them again
+    // to concentrate the leaders. It's not necessary that all shards have a leader.
+
+    for (JettySolrRunner jetty : jettys) {
+      cluster.stopJettySolrRunner(jetty);
+      cluster.waitForJettyToStop(jetty);
+    }
+    checkReplicasInactive(jettys);
+
+    for (int idx = 0; idx < jettys.size(); ++idx) {
+      cluster.startJettySolrRunner(jettys.get(idx));
+    }
+    cluster.waitForAllNodes(60);
+    // the nodes are present, but are all replica active?
+    checkAllReplicasActive();
+  }
+
+  // while banging my nead against a wall, I put a lot of force refresh statements in. Want to leave them in
+  // but have this be a no-op so if we start to get failures, we can re-enable with minimal effort.
+  private void forceUpdateCollectionStatus() throws KeeperException, InterruptedException {
+    // cluster.getSolrClient().getZkStateReader().forceUpdateCollection(COLLECTION_NAME);
   }
 
-  boolean waitForAllPreferreds() throws KeeperException, InterruptedException {
-    boolean goAgain = true;
+  // Since we have to restart jettys, we don't want to try rebalancing etc. until we're sure all jettys that should
+  // be up are up and all replicas are active.
+  private void checkReplicasInactive(List<JettySolrRunner> downJettys) throws KeeperException, InterruptedException {
     TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
-    while (! timeout.hasTimedOut()) {
-      goAgain = false;
-      Map<String, Slice> slices = cloudClient.getZkStateReader().getClusterState().getCollection(COLLECTION_NAME).getSlicesMap();
-
-      for (Map.Entry<String, Replica> ent : expected.entrySet()) {
-        Replica me = slices.get(ent.getKey()).getReplica(ent.getValue().getName());
-        if (me.getBool("property.preferredleader", false) == false) {
-          goAgain = true;
-          break;
+    DocCollection docCollection = null;
+    Set<String> liveNodes = null;
+
+    Set<String> downJettyNodes = new TreeSet<>();
+    for (JettySolrRunner jetty : downJettys) {
+      downJettyNodes.add(jetty.getBaseUrl().getHost() + ":" + jetty.getBaseUrl().getPort() + "_solr");
+    }
+    while (timeout.hasTimedOut() == false) {
+      forceUpdateCollectionStatus();
+      docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
+      liveNodes = cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes();
+      boolean expectedInactive = true;
+
+      for (Slice slice : docCollection.getSlices()) {
+        for (Replica rep : slice.getReplicas()) {
+          if (downJettyNodes.contains(rep.getNodeName()) == false) {
+            continue; // We are on a live node
+          }
+          // A replica on an allegedly down node is reported as active.
+          if (rep.isActive(liveNodes)) {
+            expectedInactive = false;
+          }
+        }
+      }
+      if (expectedInactive) {
+        return;
+      }
+      TimeUnit.MILLISECONDS.sleep(100);
+    }
+    fail("timed out waiting for all replicas to become inactive: livenodes: " + liveNodes +
+        " Collection state: " + docCollection.toString());
+  }
+
+  // We need to wait around until all replicas are active before expecting rebalancing or distributing shard-unique
+  // properties to work.
+  private void checkAllReplicasActive() throws KeeperException, InterruptedException {
+    TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+    while (timeout.hasTimedOut() == false) {
+      forceUpdateCollectionStatus();
+      DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
+      Set<String> liveNodes = cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes();
+      boolean allActive = true;
+      for (Slice slice : docCollection.getSlices()) {
+        for (Replica rep : slice.getReplicas()) {
+          if (rep.isActive(liveNodes) == false) {
+            allActive = false;
+          }
+        }
+      }
+      if (allActive) {
+        return;
+      }
+      TimeUnit.MILLISECONDS.sleep(100);
+    }
+    fail("timed out waiting for all replicas to become active");
+  }
+
+  // use a simple heuristic to put as many replicas with the property on as few nodes as possible. The point is that
+  // then we can execute BALANCESHARDUNIQUE and be sure it worked correctly
+  private void concentrateProp(String prop) throws KeeperException, InterruptedException, IOException, SolrServerException {
+    // find all the live nodes
+    // for each slice, assign the leader to the first replica that is in the lowest position on live_nodes
+    List<String> liveNodes = new ArrayList<>(cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes());
+    Collections.shuffle(liveNodes, random());
+
+    Map<String, String> uniquePropMap = new TreeMap<>();
+    forceUpdateCollectionStatus();
+    DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
+    for (Slice slice : docCollection.getSlices()) {
+      Replica changedRep = null;
+      int livePos = Integer.MAX_VALUE;
+      for (Replica rep : slice.getReplicas()) {
+        int pos = liveNodes.indexOf(rep.getNodeName());
+        if (pos >= 0 && pos < livePos) {
+          livePos = pos;
+          changedRep = rep;
         }
       }
-      if (goAgain) {
-        Thread.sleep(250);
-      } else {
-        return true;
+      if (livePos == Integer.MAX_VALUE) {
+        fail("Invalid state! We should have a replica to add the property to! " + docCollection.toString());
       }
+
+      uniquePropMap.put(slice.getName(), changedRep.getName());
+      // Now set the property on the "lowest" node in live_nodes.
+      setProp(slice, changedRep, prop);
     }
-    return false;
+    verifyPropDistributedAsExpected(uniquePropMap, prop);
   }
 
-}
+  // make sure that the property in question is unique per shard.
+  private Map<String, String> verifyPropUniquePerShard(String prop) throws InterruptedException, KeeperException {
+    Map<String, String> uniquePropMaps = new TreeMap<>();
+
+    TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+    while (timeout.hasTimedOut() == false) {
+      uniquePropMaps.clear();
+      if (checkdUniquePropPerShard(uniquePropMaps, prop)) {
+        return uniquePropMaps;
+      }
+      TimeUnit.MILLISECONDS.sleep(100);
+    }
+    fail("There should be exactly one replica with value " + prop + " set to true per shard: "
+        + cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME).toString());
+    return null; // keeps IDE happy.
+  }
 
+  // return true if every shard has exactly one replica with the unique property set to "true"
+  private boolean checkdUniquePropPerShard(Map<String, String> uniques, String prop) throws KeeperException, InterruptedException {
+    forceUpdateCollectionStatus();
+    DocCollection docCollection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION_NAME);
+
+    for (Slice slice : docCollection.getSlices()) {
+      int propfCount = 0;
+      for (Replica rep : slice.getReplicas()) {
+        if (rep.getBool("property." + prop.toLowerCase(Locale.ROOT), false)) {
+          propfCount++;
+          uniques.put(slice.getName(), rep.getName());
+        }
+      }
+      if (1 != propfCount) {
+        return false;
+      }
+    }
+    return true;
+  }
+}
\ No newline at end of file
diff --git a/solr/solr-ref-guide/src/collections-api.adoc b/solr/solr-ref-guide/src/collections-api.adoc
index f650cdc..ddaaeb8 100644
--- a/solr/solr-ref-guide/src/collections-api.adoc
+++ b/solr/solr-ref-guide/src/collections-api.adoc
@@ -2163,6 +2163,8 @@ http://localhost:8983/solr/admin/collections?action=REBALANCELEADERS&collection=
 
 In this example, two replicas in the "alreadyLeaders" section already had the leader assigned to the same node as the `preferredLeader` property so no action was taken.
 
+The "Success" tag indicates that the command rebalanced all leaders. If, for any reason some replicas with preferredLeader=true are not leaders, this will be "Failure" rather than "Success". If a replica cannot be made leader due to not being "Active", it's also considered a failure.
+
 The replica in the "inactivePreferreds" section had the `preferredLeader` property set but the node was down and no action was taken. The three nodes in the "successes" section were made leaders because they had the `preferredLeader` property set but were not leaders and they were active.
 
 [source,xml]
@@ -2172,6 +2174,9 @@ The replica in the "inactivePreferreds" section had the `preferredLeader` proper
     <int name="status">0</int>
     <int name="QTime">123</int>
   </lst>
+  <lst>
+    <str name="Success">All replicas with the preferredLeader property set are leaders</str>
+  </lst>
   <lst name="alreadyLeaders">
     <lst name="core_node1">
       <str name="status">success</str>
@@ -2219,6 +2224,10 @@ The replica in the "inactivePreferreds" section had the `preferredLeader` proper
 
 Examining the clusterstate after issuing this call should show that every live node that has the `preferredLeader` property should also have the "leader" property set to _true_.
 
+NOTE: The added work done by an NRT leader during indexing is quite small. The primary use-case is to redistribute the leader role if there are a large number of leaders concentrated on a small number of nodes. Rebalancing will likely not improve performance unless the imbalance of leadership roles is measured in multiples of 10.
+
+NOTE: The BALANCESHARDUNIQUE command that distributes the preferredLeader property does not guarantee perfect distribution and in some collection topoligies it is impossible to make that guarantee.
+
 [[forceleader]]
 == FORCELEADER: Force Shard Leader