You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/11/11 06:30:49 UTC

[lucene-solr] branch reference_impl_dev updated: @1159 More efficient state publish from zkcontroller.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl_dev by this push:
     new c29a883  @1159 More efficient state publish from zkcontroller.
c29a883 is described below

commit c29a88394ec905c356b7f00e1584aa469bfe62e8
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Nov 11 00:13:50 2020 -0600

    @1159 More efficient state publish from zkcontroller.
---
 .../org/apache/solr/cloud/RecoveryStrategy.java    |   3 -
 .../solr/cloud/ShardLeaderElectionContext.java     |   6 +-
 .../java/org/apache/solr/cloud/StatePublisher.java | 114 +++++++++++++
 .../java/org/apache/solr/cloud/ZkController.java   |  18 +-
 .../org/apache/solr/cloud/ZkDistributedQueue.java  |   4 -
 .../apache/solr/cloud/overseer/ZkStateWriter.java  | 181 ++++++++++++---------
 .../apache/solr/handler/admin/PrepRecoveryOp.java  |  78 +++------
 .../client/solrj/request/CoreAdminRequest.java     |   2 +-
 8 files changed, 265 insertions(+), 141 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 1f88d0f..3890957 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -894,10 +894,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
 
     WaitForState prepCmd = new WaitForState();
     prepCmd.setCoreName(coreName);
-    prepCmd.setNodeName(zkController.getNodeName());
     prepCmd.setState(Replica.State.RECOVERING);
-    prepCmd.setCheckLive(true);
-    prepCmd.setOnlyIfLeader(true);
     prepCmd.setCollection(coreDescriptor.getCollectionName());
     prepCmd.setShardId(coreDescriptor.getCloudDescriptor().getShardId());
     final Slice.State state = slice.getState();
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
index f90c1495..b8b2ca5 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
@@ -264,13 +264,13 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
         core.getCoreDescriptor().getCloudDescriptor().setLeader(true);
 
         ZkNodeProps zkNodes = ZkNodeProps
-            .fromKeyVals(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(), ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP, collection, ZkStateReader.NODE_NAME_PROP, leaderProps.get(ZkStateReader.NODE_NAME_PROP), ZkStateReader.CORE_NAME_PROP,
-                leaderProps.getName(), ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
+            .fromKeyVals(Overseer.QUEUE_OPERATION, OverseerAction.STATE.toLower(), ZkStateReader.COLLECTION_PROP, collection, ZkStateReader.CORE_NAME_PROP,
+                leaderProps.getName(), ZkStateReader.STATE_PROP, "leader");
         assert zkController != null;
         assert zkController.getOverseer() != null;
 
         log.info("Publish leader state");
-        zkController.getOverseer().offerStateUpdate(Utils.toJSON(zkNodes));
+        zkController.publish(zkNodes);
 
         log.info("I am the new leader: " + leaderProps.getCoreUrl() + " " + shardId);
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
new file mode 100644
index 0000000..3cc1f93
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import org.apache.solr.common.ParWork;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.Utils;
+import org.apache.zookeeper.KeeperException;
+import org.eclipse.jetty.util.BlockingArrayQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+public class StatePublisher implements Closeable {
+  private static final Logger log = LoggerFactory
+      .getLogger(MethodHandles.lookup().lookupClass());
+  private final BlockingArrayQueue<ZkNodeProps> workQueue = new BlockingArrayQueue<>(30, 10);
+  private final ZkDistributedQueue overseerJobQueue;
+  private volatile Worker worker;
+  private volatile Future<?> workerFuture;
+
+  private volatile boolean terminated;
+  private class Worker implements Runnable {
+
+    Worker() {
+
+    }
+
+    @Override
+    public void run() {
+      while (!terminated) {
+        ZkNodeProps message = null;
+        ZkNodeProps bulkMessage = new ZkNodeProps();
+        bulkMessage.getProperties().put("operation", "state");
+        try {
+          message = workQueue.poll(5, TimeUnit.SECONDS);
+          if (message != null) {
+            log.info("Got state message " + message);
+
+            bulkMessage(message, bulkMessage);
+
+            while (message != null) {
+              message = workQueue.poll(0, TimeUnit.SECONDS);
+              log.info("Got state message " + message);
+              if (message != null) {
+                bulkMessage(message, bulkMessage);
+              }
+            }
+            processMessage(bulkMessage);
+          }
+
+        } catch (InterruptedException e) {
+          return;
+        } catch (Exception e) {
+          log.error("Exception in StatePublisher run loop", e);
+          return;
+        }
+      }
+    }
+
+    private void bulkMessage(ZkNodeProps zkNodeProps, ZkNodeProps bulkMessage) throws KeeperException, InterruptedException {
+      String collection = zkNodeProps.getStr(ZkStateReader.COLLECTION_PROP);
+      String core = zkNodeProps.getStr(ZkStateReader.CORE_NAME_PROP);
+      String state = zkNodeProps.getStr(ZkStateReader.STATE_PROP);
+
+      bulkMessage.getProperties().put(core, collection + "," + state);
+    }
+
+    private void processMessage(ZkNodeProps message) throws KeeperException, InterruptedException {
+      overseerJobQueue.offer(Utils.toJSON(message));
+    }
+  }
+
+  public StatePublisher(ZkDistributedQueue overseerJobQueue) {
+    this.overseerJobQueue = overseerJobQueue;
+  }
+
+  public void submitState(ZkNodeProps stateMessage) {
+    workQueue.offer(stateMessage);
+  }
+
+  public void start() {
+    this.worker = new Worker();
+    workerFuture = ParWork.getRootSharedExecutor().submit(this.worker);
+  }
+
+  public void close() {
+    this.terminated = true;
+    try {
+      workerFuture.cancel(true);
+    } catch (Exception e) {
+      log.error("Exception waiting for close", e);
+    }
+  }
+}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 3a8b589..749e815 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -140,6 +140,8 @@ public class ZkController implements Closeable, Runnable {
   private CloseTracker closeTracker;
   private boolean closeZkClient = false;
 
+  private volatile StatePublisher statePublisher;
+
   private volatile ZkDistributedQueue overseerJobQueue;
   private volatile OverseerTaskQueue overseerCollectionQueue;
   private volatile OverseerTaskQueue overseerConfigSetQueue;
@@ -675,6 +677,7 @@ public class ZkController implements Closeable, Runnable {
     this.shudownCalled = true;
 
     this.isClosed = true;
+    IOUtils.closeQuietly(statePublisher);
 
     try (ParWork closer = new ParWork(this, true, true)) {
       closer.collect(overseer);
@@ -1195,6 +1198,9 @@ public class ZkController implements Closeable, Runnable {
 //            }
 //          });
         }
+        statePublisher = new StatePublisher(overseerJobQueue);
+        statePublisher.start();
+
         // Do this last to signal we're up.
         createEphemeralLiveNode();
 
@@ -1728,10 +1734,10 @@ public class ZkController implements Closeable, Runnable {
       Map<String,Object> props = new HashMap<>();
       props.put(Overseer.QUEUE_OPERATION, "state");
       props.put(ZkStateReader.STATE_PROP, state.toString());
-      props.put(ZkStateReader.ROLES_PROP, cd.getCloudDescriptor().getRoles());
+    //  props.put(ZkStateReader.ROLES_PROP, cd.getCloudDescriptor().getRoles());
       props.put(CORE_NAME_PROP, cd.getName());
-      props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
-      props.put(ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId());
+    //  props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
+    //  props.put(ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId());
       props.put(ZkStateReader.COLLECTION_PROP, collection);
       props.put(ZkStateReader.REPLICA_TYPE, cd.getCloudDescriptor().getReplicaType().toString());
 
@@ -1774,12 +1780,16 @@ public class ZkController implements Closeable, Runnable {
       if (updateLastState) {
         cd.getCloudDescriptor().setLastPublished(state);
       }
-      overseerJobQueue.offer(Utils.toJSON(m));
+      statePublisher.submitState(m);
     } finally {
       MDCLoggingContext.clear();
     }
   }
 
+  public void publish(ZkNodeProps message) {
+    statePublisher.submitState(message);
+  }
+
   public ZkShardTerms getShardTerms(String collection, String shardId) {
     return getCollectionTerms(collection).getShard(shardId);
   }
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
index c53b2f4..812226c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedQueue.java
@@ -347,10 +347,6 @@ public class ZkDistributedQueue implements DistributedQueue {
     // TODO change to accept json
     Map json = (Map) Utils.fromJSON(data);
 
-    final String operation = (String) json.get(Overseer.QUEUE_OPERATION);
-//    if (!OPERATIONS.contains(operation)) {
-//      throw new IllegalArgumentException("unknown operation:" + operation + " contents:" + json);
-//    }
     Timer.Context time = stats.time(dir + "_offer");
 
     try {
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index da787b9..8eb5377 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -42,7 +42,6 @@ import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.util.Utils;
-import org.apache.solr.util.BoundedTreeSet;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
@@ -70,7 +69,7 @@ public class ZkStateWriter {
 
   private volatile ClusterState cs;
   private boolean dirty;
-  private Set<String> collectionsToWrite = new HashSet<>();
+  private Set<String> collectionsToWrite = ConcurrentHashMap.newKeySet();
 
   protected final ReentrantLock ourLock = new ReentrantLock(true);
   protected final ReentrantLock writeLock = new ReentrantLock(true);
@@ -88,7 +87,9 @@ public class ZkStateWriter {
   public void enqueueUpdate(ClusterState clusterState, ZkNodeProps message, boolean stateUpdate) throws Exception {
 
     if (log.isDebugEnabled()) log.debug("enqueue update stateUpdate={}", stateUpdate);
+    //log.info("Get our write lock for enq");
     ourLock.lock();
+    //log.info("Got our write lock for enq");
     try {
       AtomicBoolean changed = new AtomicBoolean();
 
@@ -143,26 +144,51 @@ public class ZkStateWriter {
         }
         switch (overseerAction) {
           case STATE:
-            log.info("state cmd");
-            String collection = message.getStr("collection");
-            DocCollection docColl = cs.getCollectionOrNull(collection);
-            if (docColl != null) {
-              Replica replica = docColl.getReplica(message.getStr(ZkStateReader.CORE_NAME_PROP));
-              if (replica != null) {
-                Replica.State state = Replica.State.getState((String) message.get(ZkStateReader.STATE_PROP));
-                log.info("set state {} {}", state, replica);
-                if (state != replica.getState()) {
-                  replica.setState(state);
-                  changed.set(true);
-                  collectionsToWrite.add(collection);
+           // log.info("state cmd {}", message);
+            message.getProperties().remove("operation");
+
+            for (Map.Entry<String,Object> entry : message.getProperties().entrySet()) {
+              String core = entry.getKey();
+              String collectionAndStateString = (String) entry.getValue();
+              String[] collectionAndState = collectionAndStateString.split(",");
+              String collection = collectionAndState[0];
+              String setState = collectionAndState[1];
+              DocCollection docColl = cs.getCollectionOrNull(collection);
+              if (docColl != null) {
+                Replica replica = docColl.getReplica(core);
+                if (replica != null) {
+                  if (setState.equals("leader")) {
+                    log.info("set leader {} {}", message.getStr(ZkStateReader.CORE_NAME_PROP), replica);
+                    Slice slice = docColl.getSlice(replica.getSlice());
+                    slice.setLeader(replica);
+                    replica.setState(Replica.State.ACTIVE);
+                    replica.getProperties().put("leader", "true");
+                    Collection<Replica> replicas = slice.getReplicas();
+                    for (Replica r : replicas) {
+                      if (r != replica) {
+                        r.getProperties().remove("leader");
+                      }
+                    }
+                    changed.set(true);
+                    collectionsToWrite.add(collection);
+                  } else {
+
+                    Replica.State state = Replica.State.getState(setState);
+
+                    // log.info("set state {} {}", state, replica);
+                    replica.setState(state);
+                    changed.set(true);
+                    collectionsToWrite.add(collection);
+                  }
                 }
               }
             }
+
             break;
           case LEADER:
-            log.info("leader cmd");
-            collection = message.getStr("collection");
-            docColl = cs.getCollectionOrNull(collection);
+           // log.info("leader cmd");
+            String collection = message.getStr("collection");
+            DocCollection docColl = cs.getCollectionOrNull(collection);
             if (docColl != null) {
               Slice slice = docColl.getSlice(message.getStr("shard"));
               if (slice != null) {
@@ -199,11 +225,9 @@ public class ZkStateWriter {
                   Slice slice = docColl.getSlice(entry.getKey());
                   if (slice != null) {
                     Slice.State state = Slice.State.getState((String) entry.getValue());
-                    if (slice.getState() != state) {
-                      slice.setState(state);
-                      changed.set(true);
-                      collectionsToWrite.add(collection);
-                    }
+                    slice.setState(state);
+                    changed.set(true);
+                    collectionsToWrite.add(collection);
                   }
                 }
               }
@@ -253,13 +277,15 @@ public class ZkStateWriter {
    */
   public void writePendingUpdates() {
 
-    writeLock.lock();
-    try {
+   // writeLock.lock();
+   // try {
+   //   log.info("Get our write lock");
       ourLock.lock();
       try {
-        if (!dirty) {
-          return;
-        }
+   //     log.info("Got our write lock");
+//        if (!dirty) {
+//          return;
+//        }
 
         if (log.isDebugEnabled()) {
           log.debug("writePendingUpdates {}", cs);
@@ -270,61 +296,31 @@ public class ZkStateWriter {
           failedUpdates.clear();
           throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, lastFailedException.get());
         }
-      } finally {
-        ourLock.unlock();
-      }
+//      } finally {
+//        ourLock.unlock();
+//      }
 
       // wait to see our last publish version has propagated TODO don't wait on collections not hosted on overseer?
-      cs.forEachCollection(collection -> {
-        if (collectionsToWrite.contains(collection.getName())) {
-          Integer v = null;
-          try {
-            //System.out.println("waiting to see state " + prevVersion);
-            v = trackVersions.get(collection.getName());
-            if (v == null) v = 0;
-            if (v == 0) return;
-            Integer version = v;
-            try {
-              log.debug("wait to see last published version for collection {} {}", collection.getName(), v);
-              reader.waitForState(collection.getName(), 5, TimeUnit.SECONDS, (l, col) -> {
-                if (col == null) {
-                  return true;
-                }
-                //                          if (col != null) {
-                //                            log.info("the version " + col.getZNodeVersion());
-                //                          }
-                if (col != null && col.getZNodeVersion() >= version) {
-                  if (log.isDebugEnabled()) log.debug("Waited for ver: {}", col.getZNodeVersion() + 1);
-                  // System.out.println("found the version");
-                  return true;
-                }
-                return false;
-              });
-            } catch (InterruptedException e) {
-              ParWork.propagateInterrupt(e);
-              throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
-            }
-          } catch (TimeoutException e) {
-            log.warn("Timeout waiting to see written cluster state come back " + v);
-          }
-        }
-
-      });
+      // waitForStateWePublishedToComeBack();
 
-      ourLock.lock();
+   //   ourLock.lock();
       AtomicInteger lastVersion = new AtomicInteger();
-      try {
+      //log.info("writing out state, looking at collections count={} toWrite={} {} : {}", cs.getCollectionsMap().size(), collectionsToWrite.size(), cs.getCollectionsMap().keySet(), collectionsToWrite);
+      //try {
         cs.forEachCollection(collection -> {
+         // log.info("check collection {}", collection);
           if (collectionsToWrite.contains(collection.getName())) {
+          //  log.info("process collection {}", collection);
             String name = collection.getName();
             String path = ZkStateReader.getCollectionPath(collection.getName());
+           // log.info("process collection {} path {}", collection.getName(), path);
+
             if (log.isDebugEnabled()) log.debug("process {}", collection);
             Stat stat = new Stat();
-            boolean success = false;
             try {
-
+             // log.info("get data for {}", name);
               byte[] data = Utils.toJSON(singletonMap(name, collection));
-
+            //  log.info("got data for {} {}", name, data.length);
               if (log.isDebugEnabled()) log.debug("Write state.json prevVersion={} bytes={} col={}", collection.getZNodeVersion(), data.length, collection);
 
               try {
@@ -373,14 +369,15 @@ public class ZkStateWriter {
           }
         });
 
+        //log.info("Done with successful cluster write out");
         dirty = false;
         collectionsToWrite.clear();
       } finally {
         ourLock.unlock();
       }
-    } finally {
-      writeLock.unlock();
-    }
+//    } finally {
+//      writeLock.unlock();
+//    }
     // nocommit - harden against failures and exceptions
 
     //    if (log.isDebugEnabled()) {
@@ -389,6 +386,44 @@ public class ZkStateWriter {
 
   }
 
+  private void waitForStateWePublishedToComeBack() {
+    cs.forEachCollection(collection -> {
+      if (collectionsToWrite.contains(collection.getName())) {
+        Integer v = null;
+        try {
+          //System.out.println("waiting to see state " + prevVersion);
+          v = trackVersions.get(collection.getName());
+          if (v == null) v = 0;
+          if (v == 0) return;
+          Integer version = v;
+          try {
+            log.info("wait to see last published version for collection {} {}", collection.getName(), v);
+            reader.waitForState(collection.getName(), 5, TimeUnit.SECONDS, (l, col) -> {
+              if (col == null) {
+                return true;
+              }
+              //                          if (col != null) {
+              //                            log.info("the version " + col.getZNodeVersion());
+              //                          }
+              if (col != null && col.getZNodeVersion() >= version) {
+                if (log.isDebugEnabled()) log.debug("Waited for ver: {}", col.getZNodeVersion() + 1);
+                // System.out.println("found the version");
+                return true;
+              }
+              return false;
+            });
+          } catch (InterruptedException e) {
+            ParWork.propagateInterrupt(e);
+            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+          }
+        } catch (TimeoutException e) {
+          log.warn("Timeout waiting to see written cluster state come back " + v);
+        }
+      }
+
+    });
+  }
+
   public ClusterState getClusterstate(boolean stateUpdate) {
     ourLock.lock();
     try {
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
index ee31f8f..0dbc692 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
@@ -55,30 +55,26 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
 
     String collection = params.get("collection");
 
+    String shard = params.get(ZkStateReader.SHARD_ID_PROP);
+
     if (collection == null) {
       throw new IllegalArgumentException("collection cannot be null");
     }
 
-    String shardId = params.get("shardId");
-    String nodeName = params.get("nodeName");
     Replica.State waitForState = Replica.State.getState(params.get(ZkStateReader.STATE_PROP));
-    Boolean checkLive = params.getBool("checkLive");
-    Boolean onlyIfLeader = params.getBool("onlyIfLeader");
 
     log.info(
-        "Going to wait for core: {}, state: {}, checkLive: {}, onlyIfLeader: {}: params={}",
-        cname, waitForState, checkLive, onlyIfLeader, params);
+        "Going to wait for core: {}, state: {}: params={}",
+        cname, waitForState, params);
 
     assert TestInjection.injectPrepRecoveryOpPauseForever();
 
     CoreContainer coreContainer = it.handler.coreContainer;
-    // wait long enough for the leader conflict to work itself out plus a little extra
-    int conflictWaitMs = coreContainer.getZkController().getLeaderConflictResolveWait();
-
 
     AtomicReference<String> errorMessage = new AtomicReference<>();
+
     try {
-      coreContainer.getZkController().getZkStateReader().waitForState(collection, conflictWaitMs, TimeUnit.MILLISECONDS, (n, c) -> {
+      coreContainer.getZkController().getZkStateReader().waitForState(collection, 10, TimeUnit.SECONDS, (n, c) -> {
         if (c == null) {
           log.info("collection not found {}", collection);
           return false;
@@ -86,54 +82,15 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
 
         // wait until we are sure the recovering node is ready
         // to accept updates
-        Replica.State state = null;
-        boolean live = false;
         final Replica replica = c.getReplica(cname);
         if (replica != null) {
           if (replica != null) {
-            state = replica.getState();
-            live = n.contains(nodeName);
-
-            try {
-              ZkShardTerms shardTerms = coreContainer.getZkController().getShardTerms(collection, c.getSlice(replica).getName());
-              // if the replica is waiting for leader to see recovery state, the leader should refresh its terms
-              if (waitForState == Replica.State.RECOVERING && shardTerms.registered(cname) && shardTerms.skipSendingUpdatesTo(cname)) {
-                // The replica changed its term, then published itself as RECOVERING.
-                // This core already see replica as RECOVERING
-                // so it is guarantees that a live-fetch will be enough for this core to see max term published
-                log.info("refresh shard terms for core {}", cname);
-                shardTerms.refreshTerms();
-              }
-            } catch (NullPointerException e) {
-              if (log.isDebugEnabled()) log.debug("No shards found", e);
-              // likely deleted shard/collection
-            }
-            if (log.isInfoEnabled()) {
-              log.info(
-                  "In WaitForState(" + waitForState + "): collection=" + collection +
-                      ", thisCore=" + cname +
-                      ", live=" + live + ", checkLive=" + checkLive + ", currentState=" + state
-                      + ", nodeName=" + nodeName +
-                      ", core=" + cname
-                      + ", nodeProps: " + replica); //LOGOK
-            }
-
-            log.info("replica={} state={} waitForState={}", replica, state, waitForState);
-            if (replica != null && (state == waitForState)) {
-              if (checkLive == null) {
-                log.info("checkLive=false, return true");
-                return true;
-              } else if (checkLive && live) {
-                log.info("checkLive=true live={}, return true", live);
-                return true;
-              } else if (!checkLive && !live) {
-                log.info("checkLive=false live={}, return true", live);
-                return true;
-              }
+            if (replica.getState() == waitForState) {
+              log.info("replica={} state={} waitForState={}", replica, replica.getState(), waitForState);
+              return true;
             }
           }
         }
-
         return false;
       });
 
@@ -141,9 +98,24 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
       SolrZkClient.checkInterrupted(e);
       String error = errorMessage.get();
       if (error == null)
-        error = "Timeout waiting for collection state.";
+        error = "Timeout waiting for collection state. \n" + coreContainer.getZkController().getZkStateReader().getClusterState().getCollectionOrNull(collection);
       throw new NotInClusterStateException(ErrorCode.SERVER_ERROR, error);
     }
 
+    try {
+      ZkShardTerms shardTerms = coreContainer.getZkController().getShardTerms(collection, shard);
+      // if the replica is waiting for leader to see recovery state, the leader should refresh its terms
+      if (waitForState == Replica.State.RECOVERING && shardTerms.registered(cname) && shardTerms.skipSendingUpdatesTo(cname)) {
+        // The replica changed its term, then published itself as RECOVERING.
+        // This core already see replica as RECOVERING
+        // so it is guarantees that a live-fetch will be enough for this core to see max term published
+        log.info("refresh shard terms for core {}", cname);
+        shardTerms.refreshTerms();
+      }
+    } catch (NullPointerException e) {
+      if (log.isDebugEnabled()) log.debug("No shards found", e);
+      // likely deleted shard/collection
+    }
+
   }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
index a18af01..b294e5d 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CoreAdminRequest.java
@@ -272,7 +272,7 @@ public class CoreAdminRequest extends SolrRequest<CoreAdminResponse> {
         params.set( "collection", collectionName);
       }
       if (shardId != null) {
-        params.set( "shardId", shardId);
+        params.set( ZkStateReader.SHARD_ID_PROP, shardId);
       }
 
       return params;