You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/12/12 17:28:33 UTC

[lucene-solr] 06/06: @1242 WIP

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit d860f539896096498e76420544df041297db0570
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sat Dec 12 11:28:00 2020 -0600

    @1242 WIP
---
 .../org/apache/solr/cloud/ZkCollectionTerms.java   |  7 ++-
 .../java/org/apache/solr/cloud/ZkController.java   |  7 +--
 .../java/org/apache/solr/cloud/ZkShardTerms.java   | 56 +++++++++---------
 .../java/org/apache/solr/core/CoreContainer.java   |  8 ++-
 .../solr/handler/admin/CollectionsHandler.java     |  9 ++-
 .../processor/DistributedZkUpdateProcessor.java    | 67 +++++++++++++---------
 .../org/apache/solr/cloud/ZkShardTermsTest.java    | 23 +++++---
 7 files changed, 104 insertions(+), 73 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
index 54d762d..42311fc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
@@ -20,6 +20,7 @@ package org.apache.solr.cloud;
 import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.core.CoreDescriptor;
+import org.apache.zookeeper.KeeperException;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -43,7 +44,7 @@ class ZkCollectionTerms implements AutoCloseable {
     assert ObjectReleaseTracker.track(this);
   }
 
-  ZkShardTerms getShard(String shardId) {
+  ZkShardTerms getShard(String shardId) throws Exception {
     collectionToTermsLock.lock();
     try {
       if (!terms.containsKey(shardId)) {
@@ -65,11 +66,11 @@ class ZkCollectionTerms implements AutoCloseable {
     }
   }
 
-  public void register(String shardId, String coreNodeName) {
+  public void register(String shardId, String coreNodeName) throws Exception {
     getShard(shardId).registerTerm(coreNodeName);
   }
 
-  public void remove(String shardId, CoreDescriptor coreDescriptor) {
+  public void remove(String shardId, CoreDescriptor coreDescriptor) throws KeeperException, InterruptedException {
     collectionToTermsLock.lock();
     try {
       ZkShardTerms zterms = getShardOrNull(shardId);
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index e2c094d..f5ce3a9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -28,7 +28,6 @@ import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.ParWork;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.BeforeReconnect;
 import org.apache.solr.common.cloud.ClusterState;
 import org.apache.solr.common.cloud.ConnectionManager;
 import org.apache.solr.common.cloud.DefaultZkACLProvider;
@@ -1674,7 +1673,7 @@ public class ZkController implements Closeable, Runnable {
    */
   private boolean checkRecovery(final boolean isLeader,
                                 final String collection, String coreZkNodeName, String shardId,
-                                SolrCore core, CoreContainer cc) {
+                                SolrCore core, CoreContainer cc) throws Exception {
     boolean doRecovery = true;
     if (!isLeader) {
 
@@ -1800,7 +1799,7 @@ public class ZkController implements Closeable, Runnable {
     statePublisher.submitState(message);
   }
 
-  public ZkShardTerms getShardTerms(String collection, String shardId) {
+  public ZkShardTerms getShardTerms(String collection, String shardId) throws Exception {
     ZkCollectionTerms ct = getCollectionTerms(collection);
     if (ct == null) {
       throw new AlreadyClosedException();
@@ -1854,7 +1853,7 @@ public class ZkController implements Closeable, Runnable {
     }
   }
 
-  public void unregister(String coreName, CoreDescriptor cd) {
+  public void unregister(String coreName, CoreDescriptor cd) throws KeeperException, InterruptedException {
     log.info("Unregister core from zookeeper {}", coreName);
     final String collection = cd.getCloudDescriptor().getCollectionName();
     try {
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
index 40eba00..ff16516 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -17,6 +17,8 @@
 
 package org.apache.solr.cloud;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
 import java.util.Map;
@@ -27,7 +29,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.solr.client.solrj.cloud.ShardTerms;
-import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.ParWork;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.SolrZkClient;
@@ -63,7 +64,7 @@ import org.slf4j.LoggerFactory;
  * </ul>
  * This class should not be reused after {@link org.apache.zookeeper.Watcher.Event.KeeperState#Expired} event
  */
-public class ZkShardTerms implements AutoCloseable{
+public class ZkShardTerms implements Closeable {
 
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
@@ -98,12 +99,16 @@ public class ZkShardTerms implements AutoCloseable{
     void close();
   }
 
-  public ZkShardTerms(String collection, String shard, SolrZkClient zkClient) {
+  public ZkShardTerms(String collection, String shard, SolrZkClient zkClient) throws IOException {
     this.znodePath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/terms/" + shard;
     this.collection = collection;
     this.shard = shard;
     this.zkClient = zkClient;
-    refreshTerms();
+    try {
+      refreshTerms();
+    } catch (KeeperException e) {
+      throw new IOException(e);
+    }
     retryRegisterWatcher();
     assert ObjectReleaseTracker.track(this);
   }
@@ -113,7 +118,7 @@ public class ZkShardTerms implements AutoCloseable{
    * @param leader coreNodeName of leader
    * @param replicasNeedingRecovery set of replicas in which their terms should be lower than leader's term
    */
-  public void ensureTermsIsHigher(String leader, Set<String> replicasNeedingRecovery) {
+  public void ensureTermsIsHigher(String leader, Set<String> replicasNeedingRecovery) throws KeeperException, InterruptedException {
     if (log.isDebugEnabled()) log.debug("ensureTermsIsHigher leader={} replicasNeedingRecvoery={}", leader, replicasNeedingRecovery);
     if (replicasNeedingRecovery.isEmpty()) return;
 
@@ -185,7 +190,7 @@ public class ZkShardTerms implements AutoCloseable{
    * Remove the coreNodeName from terms map and also remove any expired listeners
    * @return Return true if this object should not be reused
    */
-  boolean removeTerm(CoreDescriptor cd) {
+  boolean removeTerm(CoreDescriptor cd) throws KeeperException, InterruptedException {
     int numListeners;
       // solrcore already closed
     listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(terms.get()));
@@ -196,7 +201,7 @@ public class ZkShardTerms implements AutoCloseable{
 
   // package private for testing, only used by tests
   // return true if this object should not be reused
-  boolean removeTerm(String coreNodeName) {
+  boolean removeTerm(String coreNodeName) throws KeeperException, InterruptedException {
     ShardTerms newTerms;
     int tries = 0;
     while ( (newTerms = terms.get().removeTerm(coreNodeName)) != null) {
@@ -219,7 +224,7 @@ public class ZkShardTerms implements AutoCloseable{
    * If a term is already associate with this replica do nothing
    * @param coreNodeName of the replica
    */
-  void registerTerm(String coreNodeName) {
+  void registerTerm(String coreNodeName) throws KeeperException, InterruptedException {
     ShardTerms newTerms;
     while ( (newTerms = terms.get().registerTerm(coreNodeName)) != null) {
       if (forceSaveTerms(newTerms) || isClosed.get()) break;
@@ -231,14 +236,14 @@ public class ZkShardTerms implements AutoCloseable{
    * This call should only be used by {@link org.apache.solr.common.params.CollectionParams.CollectionAction#FORCELEADER}
    * @param coreNodeName of the replica
    */
-  public void setTermEqualsToLeader(String coreNodeName) {
+  public void setTermEqualsToLeader(String coreNodeName) throws KeeperException, InterruptedException {
     ShardTerms newTerms;
     while ( (newTerms = terms.get().setTermEqualsToLeader(coreNodeName)) != null) {
       if (forceSaveTerms(newTerms) || isClosed.get()) break;
     }
   }
 
-  public void setTermToZero(String coreNodeName) {
+  public void setTermToZero(String coreNodeName) throws KeeperException, InterruptedException {
     ShardTerms newTerms;
     while ( (newTerms = terms.get().setTermToZero(coreNodeName)) != null) {
       if (forceSaveTerms(newTerms) || isClosed.get()) break;
@@ -248,7 +253,7 @@ public class ZkShardTerms implements AutoCloseable{
   /**
    * Mark {@code coreNodeName} as recovering
    */
-  public void startRecovering(String coreNodeName) {
+  public void startRecovering(String coreNodeName) throws KeeperException, InterruptedException {
     ShardTerms newTerms;
     while ( (newTerms = terms.get().startRecovering(coreNodeName)) != null) {
       if (forceSaveTerms(newTerms) || isClosed.get()) break;
@@ -258,7 +263,7 @@ public class ZkShardTerms implements AutoCloseable{
   /**
    * Mark {@code coreNodeName} as finished recovering
    */
-  public void doneRecovering(String coreNodeName) {
+  public void doneRecovering(String coreNodeName) throws KeeperException, InterruptedException {
     ShardTerms newTerms;
     while ( (newTerms = terms.get().doneRecovering(coreNodeName)) != null) {
       if (forceSaveTerms(newTerms) || isClosed.get()) break;
@@ -273,7 +278,7 @@ public class ZkShardTerms implements AutoCloseable{
    * When first updates come in, all replicas have some data now,
    * so we must switch from term 0 (registered) to 1 (have some data)
    */
-  public void ensureHighestTermsAreNotZero() {
+  public void ensureHighestTermsAreNotZero() throws KeeperException, InterruptedException {
     ShardTerms newTerms;
     while ( (newTerms = terms.get().ensureHighestTermsAreNotZero()) != null) {
       if (forceSaveTerms(newTerms) || isClosed.get()) break;
@@ -300,7 +305,7 @@ public class ZkShardTerms implements AutoCloseable{
    * @param newTerms to be set
    * @return true if terms is saved successfully to ZK, false if otherwise
    */
-  private boolean forceSaveTerms(ShardTerms newTerms) {
+  private boolean forceSaveTerms(ShardTerms newTerms) throws KeeperException, InterruptedException {
     try {
       return saveTerms(newTerms);
     } catch (KeeperException.NoNodeException e) {
@@ -315,7 +320,7 @@ public class ZkShardTerms implements AutoCloseable{
    * @return true if terms is saved successfully to ZK, false if otherwise
    * @throws KeeperException.NoNodeException correspond ZK term node is not created
    */
-  private boolean saveTerms(ShardTerms newTerms) throws KeeperException.NoNodeException {
+  private boolean saveTerms(ShardTerms newTerms) throws KeeperException, InterruptedException {
     byte[] znodeData = Utils.toJSON(newTerms);
     try {
       Stat stat = zkClient.setData(znodePath, znodeData, newTerms.getVersion(), true);
@@ -325,11 +330,6 @@ public class ZkShardTerms implements AutoCloseable{
     } catch (KeeperException.BadVersionException e) {
       log.info("Failed to save terms, version is not a match, retrying version={}", newTerms.getVersion());
       refreshTerms();
-    } catch (KeeperException.NoNodeException e) {
-      return true;
-    } catch (Exception e) {
-      ParWork.propagateInterrupt(e);
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error while saving shard term for collection: " + collection, e);
     }
     return false;
   }
@@ -337,7 +337,7 @@ public class ZkShardTerms implements AutoCloseable{
   /**
    * Fetch latest terms from ZK
    */
-  public void refreshTerms() {
+  public void refreshTerms() throws KeeperException {
     ShardTerms newTerms;
     try {
       Stat stat = new Stat();
@@ -352,8 +352,6 @@ public class ZkShardTerms implements AutoCloseable{
     } catch (InterruptedException e) {
       ParWork.propagateInterrupt(e);
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection: " + collection, e);
-    } catch (KeeperException e) {
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection: " + collection, e);
     }
 
     setNewTerms(newTerms);
@@ -393,9 +391,15 @@ public class ZkShardTerms implements AutoCloseable{
       if (Watcher.Event.EventType.None == event.getType()) {
         return;
       }
-      retryRegisterWatcher();
-      // Some events may be missed during register a watcher, so it is safer to refresh terms after registering watcher
-      refreshTerms();
+      if (event.getType() == Watcher.Event.EventType.NodeCreated || event.getType() == Watcher.Event.EventType.NodeDataChanged) {
+        retryRegisterWatcher();
+        // Some events may be missed during register a watcher, so it is safer to refresh terms after registering watcher
+        try {
+          refreshTerms();
+        } catch (KeeperException e) {
+          log.warn("Could not refresh terms", e);
+        }
+      }
     };
     try {
       // exists operation is faster than getData operation
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 42fd09d..c2e492a 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -1546,9 +1546,13 @@ public class CoreContainer implements Closeable {
               getZkController().getShardTerms(desc.getCollectionName(), desc.getShardId()).setTermToZero(dcore.getName());
               return new SolrCore(this, dcore, coreConfig);
             }
-          } catch (SolrException se) {
+          } catch (Exception se) {
             se.addSuppressed(original);
-            throw se;
+            if (se instanceof  SolrException) {
+              throw (SolrException) se;
+            } else {
+              throw new SolrException(ErrorCode.SERVER_ERROR, se);
+            }
           }
         }
         if (original instanceof RuntimeException) {
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 7bff7f8..3c64ffd 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -76,7 +76,6 @@ import org.apache.solr.response.SolrQueryResponse;
 import org.apache.solr.security.AuthorizationContext;
 import org.apache.solr.security.PermissionNameProvider;
 import org.apache.zookeeper.KeeperException;
-import org.eclipse.jetty.rewrite.handler.Rule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1380,7 +1379,13 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
         //TODO only increase terms of replicas less out-of-sync
         liveReplicas.stream()
             .filter(rep -> zkShardTerms.registered(rep.getName()))
-            .forEach(rep -> zkShardTerms.setTermEqualsToLeader(rep.getName()));
+            .forEach(rep -> {
+              try {
+                zkShardTerms.setTermEqualsToLeader(rep.getName());
+              } catch (Exception e) {
+                log.error("Exception in shard terms", e);
+              }
+            });
       }
 
       // Wait till we have an active leader
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index 28b1111..1a02637 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -762,7 +762,12 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
 
         List<SolrCmdDistributor.Node> nodes = new ArrayList<>(replicas.size());
         skippedCoreNodeNames = new HashSet<>();
-        ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId);
+        ZkShardTerms zkShardTerms = null;
+        try {
+          zkShardTerms = zkController.getShardTerms(collection, shardId);
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
         for (Replica replica: replicas) {
           String coreNodeName = replica.getName();
           if (skipList != null && skipListSet.contains(replica.getCoreUrl())) {
@@ -926,7 +931,12 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
 
     List<SolrCmdDistributor.Node> nodes = new ArrayList<>(replicas.size());
     skippedCoreNodeNames = new HashSet<>();
-    ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId);
+    ZkShardTerms zkShardTerms = null;
+    try {
+      zkShardTerms = zkController.getShardTerms(collection, shardId);
+    } catch (Exception e) {
+      throw new SolrException(ErrorCode.SERVER_ERROR, e);
+    }
     for (Replica replica : replicas) {
       String coreNodeName = replica.getName();
       if (skipList != null && skipListSet.contains(replica.getCoreUrl())) {
@@ -1128,11 +1138,17 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
 
     boolean shouldUpdateTerms = isLeader && isIndexChanged;
     if (shouldUpdateTerms) {
-      ZkShardTerms zkShardTerms = zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId());
-      if (skippedCoreNodeNames != null) {
-        zkShardTerms.ensureTermsIsHigher(desc.getName(), skippedCoreNodeNames);
+      ZkShardTerms zkShardTerms = null;
+      try {
+        zkShardTerms = zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId());
+
+        if (skippedCoreNodeNames != null) {
+          zkShardTerms.ensureTermsIsHigher(desc.getName(), skippedCoreNodeNames);
+        }
+        zkController.getShardTerms(collection, cloudDesc.getShardId()).ensureHighestTermsAreNotZero();
+      } catch (Exception e) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, e);
       }
-      zkController.getShardTerms(collection, cloudDesc.getShardId()).ensureHighestTermsAreNotZero();
     }
     // TODO: if not a forward and replication req is not specified, we could
     // send in a background thread
@@ -1168,22 +1184,19 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
       // legit
 
       DistribPhase phase = DistribPhase.parseParam(error.req.uReq.getParams().get(DISTRIB_UPDATE_PARAM));
-      if (phase != DistribPhase.FROMLEADER)
-        continue; // don't have non-leaders try to recovery other nodes
+      if (phase != DistribPhase.FROMLEADER) continue; // don't have non-leaders try to recovery other nodes
 
       // commits are special -- they can run on any node irrespective of whether it is a leader or not
       // we don't want to run recovery on a node which missed a commit command
-      if (error.req.uReq.getParams().get(COMMIT_END_POINT) != null)
-        continue;
+      if (error.req.uReq.getParams().get(COMMIT_END_POINT) != null) continue;
 
       final String replicaUrl = error.req.node.getUrl();
 
       // if the remote replica failed the request because of leader change (SOLR-6511), then fail the request
-      String cause = (error.t instanceof SolrException) ? ((SolrException)error.t).getMetadata("cause") : null;
+      String cause = (error.t instanceof SolrException) ? ((SolrException) error.t).getMetadata("cause") : null;
       if ("LeaderChanged".equals(cause)) {
         // let's just fail this request and let the client retry? or just call processAdd again?
-        log.error("On {}, replica {} now thinks it is the leader! Failing the request to let the client retry!"
-            , desc.getName(), replicaUrl, error.t);
+        log.error("On {}, replica {} now thinks it is the leader! Failing the request to let the client retry!", desc.getName(), replicaUrl, error.t);
         errorsForClient.add(error);
         continue;
       }
@@ -1192,7 +1205,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
       String shardId = null;
 
       if (error.req.node instanceof SolrCmdDistributor.StdNode) {
-        SolrCmdDistributor.StdNode stdNode = (SolrCmdDistributor.StdNode)error.req.node;
+        SolrCmdDistributor.StdNode stdNode = (SolrCmdDistributor.StdNode) error.req.node;
         collection = stdNode.getCollection();
         shardId = stdNode.getShardId();
 
@@ -1209,16 +1222,15 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
           getLeaderExc = exc;
         }
         if (leaderCoreNodeName == null) {
-          log.warn("Failed to determine if {} is still the leader for collection={} shardId={} before putting {} into leader-initiated recovery",
-              desc.getName(), collection, shardId, replicaUrl, getLeaderExc);
+          log.warn("Failed to determine if {} is still the leader for collection={} shardId={} before putting {} into leader-initiated recovery", desc.getName(), collection, shardId, replicaUrl,
+              getLeaderExc);
         }
 
-        List<Replica> myReplicas = zkController.getZkStateReader().getReplicaProps(collection,
-            cloudDesc.getShardId(), desc.getName());
+        List<Replica> myReplicas = zkController.getZkStateReader().getReplicaProps(collection, cloudDesc.getShardId(), desc.getName());
         boolean foundErrorNodeInReplicaList = false;
         if (myReplicas != null) {
           for (Replica replicaProp : myReplicas) {
-            if (((Replica) replicaProp).getName().equals(((Replica)stdNode.getNodeProps()).getName()))  {
+            if (((Replica) replicaProp).getName().equals(((Replica) stdNode.getNodeProps()).getName())) {
               foundErrorNodeInReplicaList = true;
               break;
             }
@@ -1238,29 +1250,30 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
           } catch (Exception exc) {
             SolrZkClient.checkInterrupted(exc);
             Throwable setLirZnodeFailedCause = SolrException.getRootCause(exc);
-            log.error("Leader failed to set replica {} state to DOWN due to: {}"
-                , error.req.node.getUrl(), setLirZnodeFailedCause, setLirZnodeFailedCause);
+            log.error("Leader failed to set replica {} state to DOWN due to: {}", error.req.node.getUrl(), setLirZnodeFailedCause, setLirZnodeFailedCause);
           }
         } else {
           // not the leader anymore maybe or the error'd node is not my replica?
           if (!foundErrorNodeInReplicaList) {
-            log.warn("Core {} belonging to {} {}, does not have error'd node {} as a replica. No request recovery command will be sent!"
-                , desc.getName(), collection, cloudDesc.getShardId(), stdNode.getNodeProps().getCoreUrl());
+            log.warn("Core {} belonging to {} {}, does not have error'd node {} as a replica. No request recovery command will be sent!", desc.getName(), collection, cloudDesc.getShardId(),
+                stdNode.getNodeProps().getCoreUrl());
             if (!shardId.equals(cloudDesc.getShardId())) {
               // some replicas on other shard did not receive the updates (ex: during splitshard),
               // exception must be notified to clients
               errorsForClient.add(error);
             }
           } else {
-            log.warn("Core {} is no longer the leader for {} {}  or we tried to put ourself into LIR, no request recovery command will be sent!"
-                , desc.getName(), collection, shardId);
+            log.warn("Core {} is no longer the leader for {} {}  or we tried to put ourself into LIR, no request recovery command will be sent!", desc.getName(), collection, shardId);
           }
         }
       }
     }
     if (!replicasShouldBeInLowerTerms.isEmpty()) {
-      zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId())
-          .ensureTermsIsHigher(desc.getName(), replicasShouldBeInLowerTerms);
+      try {
+        zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId()).ensureTermsIsHigher(desc.getName(), replicasShouldBeInLowerTerms);
+      } catch (Exception e) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, e);
+      }
     }
     handleReplicationFactor();
     if (0 < errorsForClient.size()) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
index 9ed585d..c8cf1be 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
@@ -75,7 +75,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
   }
 
   @Test
-  public void testRecoveringFlag() throws KeeperException, InterruptedException {
+  public void testRecoveringFlag() throws Exception {
     cluster.getZkClient().makePath("/collections/recoveringFlag/terms/s1", ZkStateReader.emptyJson, false);
     String collection = "recoveringFlag";
     try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "s1", cluster.getZkClient())) {
@@ -129,7 +129,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
   }
 
   @Test
-  public void testCoreRemovalWhileRecovering() throws KeeperException, InterruptedException {
+  public void testCoreRemovalWhileRecovering() throws Exception {
     cluster.getZkClient().makePath("/collections/recoveringFlagRemoval/terms/s1", ZkStateReader.emptyJson, false);
     String collection = "recoveringFlagRemoval";
     try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "s1", cluster.getZkClient())) {
@@ -151,7 +151,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
     }
   }
 
-  public void testRegisterTerm() throws InterruptedException, KeeperException {
+  public void testRegisterTerm() throws Exception {
     cluster.getZkClient().makePath("/collections/registerTerm/terms/s1", ZkStateReader.emptyJson, false);
     String collection = "registerTerm";
     ZkShardTerms rep1Terms = new ZkShardTerms(collection, "s1", cluster.getZkClient());
@@ -195,7 +195,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
 
   @Test
   @Nightly
-  public void testRaceConditionOnUpdates() throws InterruptedException {
+  public void testRaceConditionOnUpdates() throws Exception {
     String collection = "raceConditionOnUpdates";
     List<String> replicas = Arrays.asList("rep1", "rep2", "rep3", "rep4");
     for (String replica : replicas) {
@@ -214,17 +214,22 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
     for (int i = 0; i < failedReplicas.size(); i++) {
       String replica = failedReplicas.get(i);
       threads[i] = new Thread(() -> {
+
         try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "s1", cluster.getZkClient())) {
           while (!stop.get()) {
             try {
               Thread.sleep(LuceneTestCase.random().nextInt(TEST_NIGHTLY ? 200 : 50));
               zkShardTerms.setTermEqualsToLeader(replica);
-            } catch (InterruptedException e) {
+            } catch (InterruptedException | KeeperException e) {
               ParWork.propagateInterrupt(e);
               log.error("", e);
             }
           }
+        } catch (Exception e) {
+          ParWork.propagateInterrupt(e);
+          log.error("", e);
         }
+
       });
       threads[i].start();
     }
@@ -246,7 +251,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
     }
   }
 
-  public void testCoreTermWatcher() throws InterruptedException, KeeperException {
+  public void testCoreTermWatcher() throws Exception {
     cluster.getZkClient().makePath("/collections/coreTermWatcher/terms/s1", ZkStateReader.emptyJson, false);
     String collection = "coreTermWatcher";
     ZkShardTerms leaderTerms = new ZkShardTerms(collection, "s1", cluster.getZkClient());
@@ -287,7 +292,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
     assertEquals(1L, terms.getTerm("leader").longValue());
   }
 
-  public void testSetTermToZero() throws KeeperException, InterruptedException {
+  public void testSetTermToZero() throws Exception {
     cluster.getZkClient().makePath("/collections/setTermToZero/terms/s1", ZkStateReader.emptyJson, false);
     String collection = "setTermToZero";
     ZkShardTerms terms = new ZkShardTerms(collection, "s1", cluster.getZkClient());
@@ -300,7 +305,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
     terms.close();
   }
 
-  public void testReplicaCanBecomeLeader() throws InterruptedException, KeeperException {
+  public void testReplicaCanBecomeLeader() throws Exception {
     cluster.getZkClient().makePath("/collections/replicaCanBecomeLeader/terms/s1", ZkStateReader.emptyJson, false);
     String collection = "replicaCanBecomeLeader";
     ZkShardTerms leaderTerms = new ZkShardTerms(collection, "s1", cluster.getZkClient());
@@ -324,7 +329,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
     replicaTerms.close();
   }
 
-  public void testSetTermEqualsToLeader() throws InterruptedException, KeeperException {
+  public void testSetTermEqualsToLeader() throws Exception {
     cluster.getZkClient().makePath("/collections/setTermEqualsToLeader/terms/s1", ZkStateReader.emptyJson, false);
     String collection = "setTermEqualsToLeader";
     ZkShardTerms leaderTerms = new ZkShardTerms(collection, "s1", cluster.getZkClient());