You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/03/01 03:58:01 UTC

[lucene-solr] 02/02: @1417 Continue a bit on the state updates, some tweaks and play around the fallout of that. HEY MARK, COME BACK AND REVIEW A BIT SOON.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 2b53d6727f951ec012473eb99b9393522df29c58
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sun Feb 28 21:31:00 2021 -0600

    @1417 Continue a bit on the state updates, some tweaks and play around the fallout of that. HEY MARK, COME BACK AND REVIEW A BIT SOON.
    
    Took 3 hours 7 minutes
---
 .../client/solrj/embedded/JettySolrRunner.java     |  47 ++++----
 .../src/java/org/apache/solr/cloud/Overseer.java   |  21 +---
 .../apache/solr/cloud/OverseerElectionContext.java |   5 +-
 .../solr/cloud/ShardLeaderElectionContextBase.java |   4 +-
 .../java/org/apache/solr/cloud/StatePublisher.java |  18 ++--
 .../java/org/apache/solr/cloud/ZkController.java   | 119 +++++++--------------
 .../apache/solr/cloud/overseer/ZkStateWriter.java  |  22 +---
 .../java/org/apache/solr/core/CoreContainer.java   |  68 ++++++------
 .../src/java/org/apache/solr/core/SolrCore.java    |  10 +-
 .../apache/solr/update/DirectUpdateHandler2.java   |  11 +-
 .../org/apache/solr/cloud/TestCloudRecovery2.java  |   2 -
 .../org/apache/solr/common/cloud/ClusterState.java |  11 ++
 .../java/org/apache/solr/common/cloud/Replica.java |   9 +-
 13 files changed, 143 insertions(+), 204 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
index 21f5607..0784133 100644
--- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
+++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
@@ -84,6 +84,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.LongAdder;
 
@@ -706,29 +707,29 @@ public class JettySolrRunner implements Closeable {
         throw new RuntimeException(e);
       }
 
-//      if (wait && coreContainer != null && coreContainer
-//          .isZooKeeperAware()) {
-//        log.info("waitForJettyToStop: {}", getLocalPort());
-//        String nodeName = getNodeName();
-//        if (nodeName == null) {
-//          log.info("Cannot wait for Jetty with null node name");
-//        } else {
-//
-//          log.info("waitForNode: {}", getNodeName());
-//
-//          ZkStateReader reader = coreContainer.getZkController().getZkStateReader();
-//
-//          try {
-//            if (!reader.isClosed() && reader.getZkClient().isConnected()) {
-//              reader.waitForLiveNodes(10, TimeUnit.SECONDS, (n) -> !n.contains(nodeName));
-//            }
-//          } catch (InterruptedException e) {
-//            ParWork.propagateInterrupt(e);
-//          } catch (TimeoutException e) {
-//            log.error("Timeout waiting for live node");
-//          }
-//        }
-//      }
+      if (wait && coreContainer != null && coreContainer
+          .isZooKeeperAware()) {
+        log.info("waitForJettyToStop: {}", getLocalPort());
+        String nodeName = getNodeName();
+        if (nodeName == null) {
+          log.info("Cannot wait for Jetty with null node name");
+        } else {
+
+          log.info("waitForNode: {}", getNodeName());
+
+          ZkStateReader reader = coreContainer.getZkController().getZkStateReader();
+
+          try {
+            if (!reader.isClosed() && reader.getZkClient().isConnected()) {
+              reader.waitForLiveNodes(10, TimeUnit.SECONDS, (n) -> !n.contains(nodeName));
+            }
+          } catch (InterruptedException e) {
+            ParWork.propagateInterrupt(e);
+          } catch (TimeoutException e) {
+            log.error("Timeout waiting for live node");
+          }
+        }
+      }
 
     } catch (Exception e) {
       SolrZkClient.checkInterrupted(e);
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 2d42625..1a45047 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -823,18 +823,13 @@ public class Overseer implements SolrCloseable {
 
     @Override
     public void close() {
-      ourLock.lock();
-      try {
-        this.closed = true;
-        closeWatcher();
-      } finally {
-        ourLock.unlock();
-      }
+      this.closed = true;
+      closeWatcher();
     }
 
     private void closeWatcher() {
       try {
-        zkController.getZkClient().removeWatches(path, this, WatcherType.Data, true);
+        zkController.getZkClient().removeWatches(path, this, WatcherType.Any, true);
       } catch (KeeperException.NoWatcherException e) {
 
       } catch (Exception e) {
@@ -883,16 +878,6 @@ public class Overseer implements SolrCloseable {
           if (zkController.getZkClient().isAlive()) {
             try {
               zkController.getZkClient().delete(fullPaths, true);
-            } catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) {
-              if (zkController.getZkClient().isAlive()) {
-                try {
-                  zkController.getZkClient().delete(fullPaths, true);
-                } catch (KeeperException keeperException) {
-                  log.warn("Failed deleting processed items", e);
-                }
-              } else {
-                log.warn("Failed deleting processed items", e);
-              }
             } catch (Exception e) {
               log.warn("Failed deleting processed items", e);
             }
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
index 5657efe..d294266 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
@@ -38,15 +38,14 @@ final class OverseerElectionContext extends ShardLeaderElectionContextBase {
   private final Overseer overseer;
 
   public OverseerElectionContext(final String zkNodeName, SolrZkClient zkClient, Overseer overseer) {
-    super(zkNodeName, Overseer.OVERSEER_ELECT, Overseer.OVERSEER_ELECT + "/leader", new Replica("-1", getIDMap(zkNodeName, overseer), "overseer", "overseer", overseer.getZkStateReader()), null, zkClient);
+    super(zkNodeName, Overseer.OVERSEER_ELECT, Overseer.OVERSEER_ELECT + "/leader", new Replica("overseer:" + overseer.getZkController().getNodeName(), getIDMap(zkNodeName, overseer), "overseer", "overseer", overseer.getZkStateReader()), null, zkClient);
     this.overseer = overseer;
     this.zkClient = zkClient;
   }
 
   private static Map<String,Object> getIDMap(String zkNodeName, Overseer overseer) {
     Map<String,Object> idMap = new HashMap<>(2);
-    idMap.put("id", "-1");
-    idMap.put("zknode", zkNodeName);
+    idMap.put("id", zkNodeName);
     idMap.put(ZkStateReader.NODE_NAME_PROP, overseer.getZkController().getNodeName());
     return idMap;
   }
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
index ab1515a..f8c5752 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
@@ -66,7 +66,7 @@ class ShardLeaderElectionContextBase extends ElectionContext {
     //      return;
     //    }
     super.cancelElection();
-    if (zkClient.isAlive()) {
+  //  if (zkClient.isAlive()) {
       try {
         if (leaderZkNodeParentVersion != null) {
           try {
@@ -165,7 +165,7 @@ class ShardLeaderElectionContextBase extends ElectionContext {
         log.info("Exception trying to cancel election {} {}", e.getClass().getName(), e.getMessage());
       }
       leaderZkNodeParentVersion = null;
-    }
+ //   }
   }
 
   @Override
diff --git a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
index 22a1e1e..845b1aa 100644
--- a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
+++ b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
@@ -156,25 +156,25 @@ public class StatePublisher implements Closeable {
     }
 
     private void clearStatesForNode(ZkNodeProps bulkMessage, String nodeName) {
-      Set<String> removeCores = new HashSet<>();
-      Set<String> cores = bulkMessage.getProperties().keySet();
-      for (String core : cores) {
-        if (core.equals(OverseerAction.DOWNNODE.toLower()) || core.equals(OverseerAction.RECOVERYNODE.toLower())) {
+      Set<String> removeIds = new HashSet<>();
+      Set<String> ids = bulkMessage.getProperties().keySet();
+      for (String id : ids) {
+        if (id.equals(OverseerAction.DOWNNODE.toLower()) || id.equals(OverseerAction.RECOVERYNODE.toLower())) {
           continue;
         }
         Collection<DocCollection> collections = zkStateReader.getClusterState().getCollectionsMap().values();
         for (DocCollection collection : collections) {
-          Replica replica = collection.getReplica(core);
+          Replica replica = collection.getReplicaById(id);
           if (replica != null) {
             if (replica.getNodeName().equals(nodeName)) {
-              removeCores.add(core);
+              removeIds.add(id);
             }
           }
         }
 
       }
-      for (String core : removeCores) {
-        bulkMessage.getProperties().remove(core);
+      for (String id : removeIds) {
+        bulkMessage.getProperties().remove(id);
       }
     }
 
@@ -206,7 +206,7 @@ public class StatePublisher implements Closeable {
           if (coll != null) {
             Replica replica = coll.getReplica(core);
             id = replica.getId();
-            String lastState = stateCache.get(core);
+            String lastState = stateCache.get(id);
             if (collection != null && replica != null && !state.equals(Replica.State.ACTIVE) && state.equals(lastState) && replica.getState().toString().equals(state)) {
               log.info("Skipping publish state as {} for {}, because it was the last state published", state, core);
               return;
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index fbf8a73..f74c2f8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -23,7 +23,6 @@ import org.apache.solr.client.solrj.cloud.SolrCloudManager;
 import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
 import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
 import org.apache.solr.cloud.overseer.OverseerAction;
-import org.apache.solr.cloud.overseer.SliceMutator;
 import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.ParWork;
 import org.apache.solr.common.SolrException;
@@ -362,6 +361,7 @@ public class ZkController implements Closeable, Runnable {
             zkController.register(descriptor.getName(), descriptor, afterExpiration);
           } catch (Exception e) {
             log.error("Error registering core name={} afterExpireation={}", descriptor.getName(), afterExpiration);
+            throw new SolrException(ErrorCode.SERVER_ERROR, e);
           }
         }
         return descriptor;
@@ -487,8 +487,6 @@ public class ZkController implements Closeable, Runnable {
 
               removeEphemeralLiveNode();
 
-              publishNodeAs(getNodeName(), OverseerAction.RECOVERYNODE);
-
               // recreate our watchers first so that they exist even on any problems below
               zkStateReader.createClusterStateWatchersAndUpdate();
 
@@ -1304,72 +1302,50 @@ public class ZkController implements Closeable, Runnable {
         getZkStateReader().registerCore(cloudDesc.getCollectionName());
       }
 
-      try {
-        log.info("Waiting to see our entry in state.json {}", desc.getName());
-        zkStateReader.waitForState(collection, Integer.getInteger("solr.zkregister.leaderwait", 30000), TimeUnit.MILLISECONDS, (l, c) -> { // MRM TODO: timeout
-          if (c == null) {
-            return false;
-          }
-          coll.set(c);
-          Replica r = c.getReplica(coreName);
-          if (r != null) {
-            replicaRef.set(r);
-            return true;
-          }
-          return false;
-        });
-      } catch (TimeoutException e) {
-        log.warn("Timeout waiting to see core " + coreName + " \ncollection=" + collection + " " + coll.get());
-      }
-
-      Replica replica = replicaRef.get();
-
-      if (replica == null) {
-          throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore, replica=" + coreName + " is removed from clusterstate \n"
-              + coll.get());
-      }
-
-      log.info("Register replica - core:{} address:{} collection:{} shard:{} type={}", coreName, baseUrl, collection, shardId, replica.getType());
+      log.info("Register replica - core:{} address:{} collection:{} shard:{} type={}", coreName, baseUrl, collection, shardId, cloudDesc.getReplicaType());
 
       log.info("Register terms for replica {}", coreName);
 
       registerShardTerms(collection, cloudDesc.getShardId(), coreName);
 
       log.info("Create leader elector for replica {}", coreName);
-      leaderElector = leaderElectors.get(replica.getName());
+      leaderElector = leaderElectors.get(coreName);
       if (leaderElector == null) {
         leaderElector = new LeaderElector(this);
-        LeaderElector oldElector = leaderElectors.putIfAbsent(replica.getName(), leaderElector);
+        LeaderElector oldElector = leaderElectors.putIfAbsent(coreName, leaderElector);
 
         if (oldElector != null) {
           IOUtils.closeQuietly(leaderElector);
         }
 
-        if (cc.isShutDown()) {
-          IOUtils.closeQuietly(leaderElector);
-          IOUtils.closeQuietly(oldElector);
-          IOUtils.closeQuietly(getShardTermsOrNull(collection, shardId));
-          throw new AlreadyClosedException();
-        }
+//        if (cc.isShutDown()) {
+//          IOUtils.closeQuietly(leaderElector);
+//          IOUtils.closeQuietly(oldElector);
+//          IOUtils.closeQuietly(getShardTermsOrNull(collection, shardId));
+//          throw new AlreadyClosedException();
+//        }
       }
 
       // If we're a preferred leader, insert ourselves at the head of the queue
-      boolean joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false);
-      if (replica.getType() != Type.PULL) {
+      boolean joinAtHead = false; //replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false);
+
+      if (cloudDesc.getReplicaType() != Type.PULL) {
         //getCollectionTerms(collection).register(cloudDesc.getShardId(), coreName);
         // MRM TODO: review joinAtHead
         joinElection(desc, joinAtHead);
       }
 
       log.info("Wait to see leader for {}, {}", collection, shardId);
-      Replica leader = null;
+      String leaderName = null;
+
       for (int i = 0; i < 60; i++) {
         if (leaderElector.isLeader()) {
-          leader = replica;
+          leaderName = coreName;
           break;
         }
         try {
-          leader = zkStateReader.getLeaderRetry(collection, shardId, 3000, true);
+          Replica leader = zkStateReader.getLeaderRetry(collection, shardId, 3000, true);
+          leaderName = leader.getName();
 
         } catch (TimeoutException timeoutException) {
           if (isClosed() || isDcCalled() || cc.isShutDown()) {
@@ -1380,15 +1356,15 @@ public class ZkController implements Closeable, Runnable {
         }
       }
 
-      if (leader == null) {
+      if (leaderName == null) {
         log.error("No leader found while trying to register " + coreName + " with zookeeper");
         throw new SolrException(ErrorCode.SERVER_ERROR, "No leader found while trying to register " + coreName + " with zookeeper");
       }
 
-      String ourUrl = replica.getCoreUrl();
-      boolean isLeader = leader.getName().equals(coreName);
 
-      log.info("We are {} and leader is {} isLeader={}", ourUrl, leader.getCoreUrl(), isLeader);
+      boolean isLeader = leaderName.equals(coreName);
+
+      log.info("We are {} and leader is {} isLeader={}", coreName, leaderName, isLeader);
 
       log.info("Check if we should recover isLeader={}", isLeader);
       //assert !(isLeader && replica.getType() == Type.PULL) : "Pull replica became leader!";
@@ -1404,7 +1380,7 @@ public class ZkController implements Closeable, Runnable {
         }
 
         UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
-        boolean isTlogReplicaAndNotLeader = replica.getType() == Replica.Type.TLOG && !isLeader;
+        boolean isTlogReplicaAndNotLeader = cloudDesc.getReplicaType() == Replica.Type.TLOG && !isLeader;
         if (isTlogReplicaAndNotLeader) {
           String commitVersion = ReplicateFromLeader.getCommitVersion(core);
           if (commitVersion != null) {
@@ -1415,7 +1391,7 @@ public class ZkController implements Closeable, Runnable {
         if (!afterExpiration && !core.isReloaded() && ulog != null && !isTlogReplicaAndNotLeader) {
           // disable recovery in case shard is in construction state (for shard splits)
           Slice slice = getClusterState().getCollection(collection).getSlice(shardId);
-          if (slice.getState() != Slice.State.CONSTRUCTION || (slice.getState() == Slice.State.CONSTRUCTION  && !isLeader)) {
+          if (slice.getState() != Slice.State.CONSTRUCTION || !isLeader) {
             Future<UpdateLog.RecoveryInfo> recoveryFuture = core.getUpdateHandler().getUpdateLog().recoverFromLog();
             if (recoveryFuture != null) {
               log.info("Replaying tlog for {} during startup... NOTE: This can take a while.", core);
@@ -1431,20 +1407,17 @@ public class ZkController implements Closeable, Runnable {
           }
         }
 
-        if (replica.getType() != Type.PULL) {
-          checkRecovery(isLeader, core, cc);
-        }
-
-        if (isTlogReplicaAndNotLeader) {
+        if (cloudDesc.getReplicaType() != Type.PULL && !isLeader) {
+          checkRecovery(core, cc);
+        } else if (isTlogReplicaAndNotLeader) {
           startReplicationFromLeader(coreName, true);
         }
 
-        if (replica.getType() == Type.PULL) {
+        if (cloudDesc.getReplicaType() == Type.PULL) {
           startReplicationFromLeader(coreName, false);
-          publish(desc, Replica.State.ACTIVE);
         }
 
-        if (replica.getType() != Type.PULL) {
+        if (cloudDesc.getReplicaType() != Type.PULL) {
           shardTerms = getShardTerms(collection, cloudDesc.getShardId());
           // the watcher is added to a set so multiple calls of this method will left only one watcher
           if (log.isDebugEnabled()) log.debug("add shard terms listener for {}", coreName);
@@ -1590,19 +1563,11 @@ public class ZkController implements Closeable, Runnable {
   /**
    * Returns whether or not a recovery was started
    */
-  private void checkRecovery(final boolean isLeader, SolrCore core, CoreContainer cc) {
-
-    if (!isLeader) {
-
+  private void checkRecovery(SolrCore core, CoreContainer cc) {
       if (log.isInfoEnabled()) {
         log.info("Core needs to recover:{}", core.getName());
       }
       core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
-
-    } else {
-      log.info("I am the leader, no recovery necessary");
-    }
-
   }
 
 
@@ -1958,8 +1923,6 @@ public class ZkController implements Closeable, Runnable {
     }
     try {
       overseerElector.retryElection(joinAtHead);
-    } catch (AlreadyClosedException e) {
-      return;
     } catch (Exception e) {
       ParWork.propagateInterrupt(e);
       throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to rejoin election", e);
@@ -2119,14 +2082,13 @@ public class ZkController implements Closeable, Runnable {
     if (confListeners.confDirListeners.isEmpty()) {
       // no more listeners for this confDir, remove it from the map
       if (log.isDebugEnabled()) log.debug("No more listeners for config directory [{}]", confDir);
-      zkClient.removeWatches(COLLECTIONS_ZKNODE, confListeners.watcher, Watcher.WatcherType.Any, true, (rc, path, ctx) -> {
-        if (rc != 0) {
-          KeeperException ex = KeeperException.create(KeeperException.Code.get(rc), path);
-          if (!(ex instanceof KeeperException.NoWatcherException)) {
-            log.error("Exception removing watch for " + path, ex);
-          }
-        }
-      }, "confWatcher");
+      try {
+        zkClient.removeWatches(COLLECTIONS_ZKNODE, confListeners.watcher, Watcher.WatcherType.Any, true);
+      } catch (KeeperException.NoWatcherException e) {
+
+      } catch (Exception e) {
+        log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
+      }
       confDirectoryListeners.remove(confDir);
     }
   }
@@ -2239,12 +2201,7 @@ public class ZkController implements Closeable, Runnable {
 
   private static void setConfWatcher(String zkDir, Watcher watcher, Stat stat, CoreContainer cc, Map<String, ConfListeners> confDirectoryListeners, SolrZkClient zkClient) {
     try {
-      zkClient.addWatch(zkDir, watcher, AddWatchMode.PERSISTENT, (rc, path, ctx) -> {
-        if (rc != 0) {
-          KeeperException ex = KeeperException.create(KeeperException.Code.get(rc), path);
-          log.error("Exception creating watch for " + path, ex);
-        }
-      }, "confWatcher");
+      zkClient.addWatch(zkDir, watcher, AddWatchMode.PERSISTENT);
       Stat newStat = zkClient.exists(zkDir, null);
       if (stat != null && newStat.getVersion() > stat.getVersion()) {
         //a race condition where a we missed an event fired
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index de321da..475ab1a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -142,7 +142,6 @@ public class ZkStateWriter {
             if (latestColl == null) {
               //log.info("no node exists, using version 0");
               trackVersions.remove(collection.getName());
-              idToCollection.remove(collection.getId());
             } else {
               cs.getCollectionStates().put(latestColl.getName(), new ClusterState.CollectionRef(latestColl));
               //log.info("got version from zk {}", existsStat.getVersion());
@@ -152,13 +151,8 @@ public class ZkStateWriter {
             }
           }
 
-          DocCollection currentCollection = cs.getCollectionOrNull(collection.getName());
-          if (currentCollection != null) {
-            idToCollection.put(currentCollection.getId(), currentCollection.getName());
-          } else {
-            idToCollection.put(collection.getId(), collection.getName());
-          }
 
+          DocCollection currentCollection = cs.getCollectionOrNull(collection.getName());
           collection.getProperties().remove("pullReplicas");
           collection.getProperties().remove("replicationFactor");
           collection.getProperties().remove("maxShardsPerNode");
@@ -234,18 +228,10 @@ public class ZkStateWriter {
                 }
 
                 long collectionId = Long.parseLong(id.split("-")[0]);
-                String collection = idToCollection.get(collectionId);
+                String collection = reader.getClusterState().getCollection(collectionId);
 
                 if (collection == null) {
-                  String[] cname = new String[1];
-                  this.cs.forEachCollection( col -> {if (col.getId() == collectionId) cname[0]=col.getName();});
-
-                  if (cname[0] != null) {
-                    collection = cname[0];
-                  }
-                  if (collection == null) {
-                    continue;
-                  }
+                  continue;
                 }
 
                 String setState = Replica.State.shortStateToState(stateString).toString();
@@ -381,8 +367,6 @@ public class ZkStateWriter {
         //   ver = docColl.getZNodeVersion();
         if (ver == null) {
           ver = 0;
-        } else {
-
         }
       }
       updates.getProperties().put("_cs_ver_", ver.toString());
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 725e1f1..ddd0bd2 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -883,40 +883,40 @@ public class CoreContainer implements Closeable {
     status |= CORE_DISCOVERY_COMPLETE;
     startedLoadingCores = true;
 
-    if (isZooKeeperAware()) {
-
-      log.info("Waiting to see RECOVERY states for node on startup ...");
-      for (final CoreDescriptor cd : cds) {
-        String collection = cd.getCollectionName();
-        try {
-          getZkController().getZkStateReader().waitForState(collection, 5, TimeUnit.SECONDS, (n, c) -> {
-            if (c == null) {
-              if (log.isDebugEnabled()) log.debug("Found  incorrect state c={}", c);
-              return false;
-            }
-            String nodeName = getZkController().getNodeName();
-            List<Replica> replicas = c.getReplicas();
-            for (Replica replica : replicas) {
-              if (replica.getNodeName().equals(nodeName)) {
-                if (!replica.getState().equals(Replica.State.RECOVERING)) {
-                  if (log.isDebugEnabled()) log.debug("Found  incorrect state {} {} ourNodeName={}", replica.getState(), replica.getNodeName(), nodeName);
-                  return false;
-                }
-              } else {
-                if (log.isDebugEnabled()) log.debug("Found  incorrect state {} {} ourNodeName={}", replica.getState(), replica.getNodeName(), nodeName);
-              }
-            }
-
-            return true;
-          });
-        } catch (InterruptedException e) {
-          ParWork.propagateInterrupt(e);
-          return;
-        } catch (TimeoutException e) {
-          log.error("Timeout", e);
-        }
-      }
-    }
+//    if (isZooKeeperAware()) {
+//
+//      log.info("Waiting to see RECOVERY states for node on startup ...");
+//      for (final CoreDescriptor cd : cds) {
+//        String collection = cd.getCollectionName();
+//        try {
+//          getZkController().getZkStateReader().waitForState(collection, 5, TimeUnit.SECONDS, (n, c) -> {
+//            if (c == null) {
+//              if (log.isDebugEnabled()) log.debug("Found  incorrect state c={}", c);
+//              return false;
+//            }
+//            String nodeName = getZkController().getNodeName();
+//            List<Replica> replicas = c.getReplicas();
+//            for (Replica replica : replicas) {
+//              if (replica.getNodeName().equals(nodeName)) {
+//                if (!replica.getState().equals(Replica.State.RECOVERING)) {
+//                  if (log.isDebugEnabled()) log.debug("Found  incorrect state {} {} ourNodeName={}", replica.getState(), replica.getNodeName(), nodeName);
+//                  return false;
+//                }
+//              } else {
+//                if (log.isDebugEnabled()) log.debug("Found  incorrect state {} {} ourNodeName={}", replica.getState(), replica.getNodeName(), nodeName);
+//              }
+//            }
+//
+//            return true;
+//          });
+//        } catch (InterruptedException e) {
+//          ParWork.propagateInterrupt(e);
+//          return;
+//        } catch (TimeoutException e) {
+//          log.error("Timeout", e);
+//        }
+//      }
+//    }
 
     for (final CoreDescriptor cd : cds) {
       if (!cd.isTransient() && cd.isLoadOnStartup()) {
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 407af18..faab326 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -734,7 +734,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
       CoreDescriptor cd = getCoreDescriptor();
       cd.loadExtraProperties(); //Reload the extra properties
       //  coreMetricManager.close();
-      if (coreContainer.isShutDown()) {
+      if (coreContainer.isShutDown() || closing) {
         throw new AlreadyClosedException();
       }
       core = new SolrCore(coreContainer, getName(), coreConfig, cd, getDataDir(), updateHandler, solrDelPolicy, currentCore, true);
@@ -2387,7 +2387,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
       SolrIndexSearcher tmp = null;
       RefCounted<SolrIndexSearcher> newestSearcher = null;
       boolean success = false;
-      if (coreContainer.isShutDown()) {
+      if (coreContainer.isShutDown() || closing) {
         throw new AlreadyClosedException();
       }
       try {
@@ -2410,7 +2410,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
 
         searcherLock.lock();
         try {
-          if (coreContainer.isShutDown()) { // if we start new searchers after close we won't close them
+          if (coreContainer.isShutDown() || closing) { // if we start new searchers after close we won't close them
             throw new SolrCoreState.CoreIsClosedException();
           }
 
@@ -2472,7 +2472,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
           // (caches take a little while to instantiate)
           final boolean useCaches = !realtime;
           final String newName = realtime ? "realtime" : "main";
-          if (coreContainer.isShutDown()) { // if we start new searchers after close we won't close them
+          if (coreContainer.isShutDown() || closing) { // if we start new searchers after close we won't close them
             throw new SolrCoreState.CoreIsClosedException();
           }
 
@@ -2586,7 +2586,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
      */
     public RefCounted<SolrIndexSearcher> getSearcher ( boolean forceNew, boolean returnSearcher, @SuppressWarnings({"rawtypes"}) final Future[] waitSearcher,
     boolean updateHandlerReopens){
-      if (coreContainer.isShutDown()) { // if we start new searchers after close we won't close them
+      if (coreContainer.isShutDown() || closing) { // if we start new searchers after close we won't close them
         throw new SolrCoreState.CoreIsClosedException();
       }
 
diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
index 6a5cd7d..3094740 100644
--- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
+++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
@@ -50,13 +50,12 @@ import org.apache.solr.common.ParWork;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrConfig.UpdateHandlerInfo;
 import org.apache.solr.core.SolrCore;
-import org.apache.solr.core.SolrInfoBean;
-import org.apache.solr.metrics.SolrMetricProducer;
 import org.apache.solr.metrics.SolrMetricsContext;
 import org.apache.solr.request.LocalSolrQueryRequest;
 import org.apache.solr.request.SolrQueryRequest;
@@ -835,10 +834,10 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
     if (log.isDebugEnabled()) {
       log.debug("closing {}", this);
     }
-    try (ParWork closer = new ParWork(this, true, false)) {
-      closer.collect(commitTracker);
-      closer.collect(softCommitTracker);
-    }
+
+    IOUtils.closeQuietly(commitTracker);
+    IOUtils.closeQuietly(softCommitTracker);
+
     super.close();
     numDocsPending.reset();
 
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java b/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java
index 8c5ae74..8da3dc9 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java
@@ -69,8 +69,6 @@ public class TestCloudRecovery2 extends SolrCloudTestCase {
 
       node2.stop();
 
-      cluster.waitForActiveCollection(COLLECTION, 1, 1, true);
-
       cluster.getSolrClient().getZkStateReader().waitForLiveNodes(5, TimeUnit.SECONDS, (newLiveNodes) -> newLiveNodes.size() == 1);
 
       UpdateRequest req = new UpdateRequest();
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
index eae1d10..a29e497 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
@@ -24,6 +24,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
@@ -339,6 +340,16 @@ public class ClusterState implements JSONWriter.Writable {
     return highest[0];
   }
 
+  public String getCollection(long id) {
+    Set<Entry<String,CollectionRef>> entries = collectionStates.entrySet();
+    for (Entry<String,CollectionRef> entry : entries) {
+      if (entry.getValue().get().getId() == id) {
+        return entry.getKey();
+      }
+    }
+    return null;
+  }
+
   public static class CollectionRef {
     protected final AtomicInteger gets = new AtomicInteger();
     private final DocCollection coll;
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
index c3d37fd..43aeb9f 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
@@ -25,7 +25,7 @@ import org.apache.solr.common.util.Utils;
 
 public class Replica extends ZkNodeProps {
 
-  final Long id;
+  Long id;
   final Long collId;
 
   public String getId() {
@@ -163,7 +163,12 @@ public class Replica extends ZkNodeProps {
     this.name = name;
 
     this.nodeName = (String) propMap.get(ZkStateReader.NODE_NAME_PROP);
-    this.id = propMap.containsKey("id") ? Long.parseLong((String) propMap.get("id")) : null;
+
+    String rawId = (String) propMap.get("id");
+    if (rawId != null && !rawId.contains(":")) {
+      this.id = Long.parseLong(rawId);
+    }
+
     this.collId = propMap.containsKey("collId") ? Long.parseLong((String) propMap.get("collId")) : null;
     this.baseUrl = nodeNameToBaseUrl.getBaseUrlForNodeName(this.nodeName);
     type = Type.get((String) propMap.get(ZkStateReader.REPLICA_TYPE));