You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/12/12 17:28:32 UTC

[lucene-solr] 05/06: @1242 WIP

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 3f42f8f48032e12614273edc141a0b1a00d38fab
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sat Dec 12 10:37:41 2020 -0600

    @1242 WIP
---
 .../java/org/apache/solr/cloud/LeaderElector.java  |  30 +--
 .../src/java/org/apache/solr/cloud/Overseer.java   |  11 +-
 .../apache/solr/cloud/OverseerElectionContext.java |   2 +-
 .../org/apache/solr/cloud/RecoveryStrategy.java    | 203 +++++++++------------
 .../org/apache/solr/cloud/ReplicateFromLeader.java |  18 +-
 .../solr/cloud/ShardLeaderElectionContextBase.java |   2 +-
 .../java/org/apache/solr/cloud/ZkController.java   |  74 +++-----
 .../src/java/org/apache/solr/core/SolrCore.java    |   4 +-
 .../solr/client/solrj/impl/Http2SolrClient.java    |  52 +++---
 .../apache/solr/common/cloud/ZkStateReader.java    |  24 +--
 ...startup-debug.xml => log4j2-election-debug.xml} |  36 +---
 .../src/resources/logconf/log4j2-startup-debug.xml |   4 +-
 12 files changed, 198 insertions(+), 262 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
index a9b0d9f..3bc02c0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
+++ b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
@@ -32,6 +32,7 @@ import org.apache.zookeeper.KeeperException.ConnectionLossException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -113,7 +114,7 @@ public class LeaderElector implements Closeable {
    *
    * @param replacement has someone else been the leader already?
    */
-  private boolean checkIfIamLeader(final ElectionContext context, boolean replacement) throws KeeperException,
+  private synchronized boolean checkIfIamLeader(final ElectionContext context, boolean replacement) throws KeeperException,
           InterruptedException, IOException {
     //if (checkClosed(context)) return false;
 
@@ -209,7 +210,8 @@ public class LeaderElector implements Closeable {
           if (context.leaderSeqPath == null) {
             throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Election has been cancelled");
           }
-          zkClient.exists(watchedNode, watcher = new ElectionWatcher(context.leaderSeqPath, watchedNode, context));
+          watcher = new ElectionWatcher(context.leaderSeqPath, watchedNode, context);
+          zkClient.exists(watchedNode, watcher);
           state = WAITING_IN_ELECTION;
           if (log.isDebugEnabled()) log.debug("Watching path {} to know if I could be the leader, my node is {}", watchedNode, context.leaderSeqPath);
           try (SolrCore core = zkController.getCoreContainer().getCore(context.leaderProps.getName())) {
@@ -547,16 +549,22 @@ public class LeaderElector implements Closeable {
         return;
       }
       try {
-        // am I the next leader?
-        boolean tryagain = checkIfIamLeader(context, true);
-        if (tryagain) {
-          Thread.sleep(50);
-          tryagain = checkIfIamLeader(context, true);
-        }
+        if (event.getType() == EventType.NodeDeleted) {
+          // am I the next leader?
+          boolean tryagain = true;
+          while (tryagain) {
+            tryagain = checkIfIamLeader(context, true);
+          }
+        } else {
+          Stat exists = zkClient.exists(watchedNode, this);
+          if (exists == null) {
+            close();
+            boolean tryagain = true;
 
-        if (tryagain) {
-          Thread.sleep(50);
-          checkIfIamLeader(context, true);
+            while (tryagain) {
+              tryagain = checkIfIamLeader(context, true);
+            }
+          }
         }
       } catch (AlreadyClosedException | InterruptedException e) {
         log.info("Already shutting down");
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index ef0a8f2..f56f6c3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -337,7 +337,7 @@ public class Overseer implements SolrCloseable {
     ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
 
 
-    this.zkStateWriter = new ZkStateWriter( zkController.getZkStateReader(), stats);
+    this.zkStateWriter = new ZkStateWriter(zkController.getZkStateReader(), stats);
     //systemCollectionCompatCheck(new StringBiConsumer());
 
     queueWatcher = new WorkQueueWatcher(getCoreContainer());
@@ -538,15 +538,8 @@ public class Overseer implements SolrCloseable {
         overseerOnlyClient = null;
       }
 
-      if (taskExecutor != null && taskExecutor.isShutdown() && !taskExecutor.isTerminated()) {
-        try {
-          taskExecutor.awaitTermination(5, TimeUnit.SECONDS);
-        } catch (InterruptedException e) {
-
-        }
-
+      if (taskExecutor != null) {
         taskExecutor.shutdownNow();
-       // ExecutorUtil.shutdownAndAwaitTermination(taskExecutor);
       }
 
     }
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
index a56e2ea..db607f0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
@@ -37,7 +37,7 @@ final class OverseerElectionContext extends ShardLeaderElectionContextBase {
   private final Overseer overseer;
 
   public OverseerElectionContext(final String zkNodeName, SolrZkClient zkClient, Overseer overseer) {
-    super(zkNodeName, Overseer.OVERSEER_ELECT, Overseer.OVERSEER_ELECT + "/leader", new Replica(overseer.getZkController().getNodeName(), getIDMap(zkNodeName, overseer), null, null, overseer.getZkStateReader()), zkClient);
+    super(zkNodeName, Overseer.OVERSEER_ELECT, Overseer.OVERSEER_ELECT + "/leader", new Replica("overseer:" + overseer.getZkController().getNodeName(), getIDMap(zkNodeName, overseer), null, null, overseer.getZkStateReader()), zkClient);
     this.overseer = overseer;
     this.zkClient = zkClient;
   }
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index f8db3eb..74e1439 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -78,8 +78,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class RecoveryStrategy implements Runnable, Closeable {
 
   private volatile CountDownLatch latch;
-  private final ReplicationHandler replicationHandler;
-  private final Http2SolrClient recoveryOnlyClient;
+  private volatile ReplicationHandler replicationHandler;
+  private volatile Http2SolrClient recoveryOnlyClient;
 
   public static class Builder implements NamedListInitializedPlugin {
     private NamedList args;
@@ -126,8 +126,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
   private final AtomicInteger retries = new AtomicInteger(0);
   private boolean recoveringAfterStartup;
   private volatile Cancellable prevSendPreRecoveryHttpUriRequest;
-  private final Replica.Type replicaType;
-  private final CoreDescriptor coreDescriptor;
+  private volatile Replica.Type replicaType;
+  private volatile CoreDescriptor coreDescriptor;
 
   private final CoreContainer cc;
 
@@ -136,23 +136,10 @@ public class RecoveryStrategy implements Runnable, Closeable {
     this.cc = cc;
     this.coreName = cd.getName();
 
-    try (SolrCore core = cc.getCore(coreName)) {
-      if (core == null) {
-        log.warn("SolrCore is null, won't do recovery");
-        throw new AlreadyClosedException();
-      }
-      recoveryOnlyClient = core.getCoreContainer().getUpdateShardHandler().getRecoveryOnlyClient();
-      SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
-      replicationHandler = (ReplicationHandler) handler;
-
-    }
-
     this.recoveryListener = recoveryListener;
     zkController = cc.getZkController();
     zkStateReader = zkController.getZkStateReader();
     baseUrl = zkController.getBaseUrl();
-    replicaType = cd.getCloudDescriptor().getReplicaType();
-    this.coreDescriptor = cd;
   }
 
   final public int getWaitForUpdatesWithStaleStatePauseMilliSeconds() {
@@ -321,12 +308,26 @@ public class RecoveryStrategy implements Runnable, Closeable {
     // set request info for logging
     log.info("Starting recovery process. recoveringAfterStartup={}", recoveringAfterStartup);
     try {
-      doRecovery();
+      try (SolrCore core = cc.getCore(coreName)) {
+        if (core == null) {
+          log.warn("SolrCore is null, won't do recovery");
+          throw new AlreadyClosedException("SolrCore is null, won't do recovery");
+        }
+
+        coreDescriptor = core.getCoreDescriptor();
+        replicaType = coreDescriptor.getCloudDescriptor().getReplicaType();
+
+        recoveryOnlyClient = core.getCoreContainer().getUpdateShardHandler().getRecoveryOnlyClient();
+        SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
+        replicationHandler = (ReplicationHandler) handler;
+
+        doRecovery(core);
+        }
     } catch (InterruptedException e) {
       log.info("InterruptedException, won't do recovery", e);
       return;
     } catch (AlreadyClosedException e) {
-      log.info("AlreadyClosedException, won't do recovery");
+      log.info("AlreadyClosedException, won't do recovery", e);
       return;
     } catch (Exception e) {
       ParWork.propagateInterrupt(e);
@@ -335,7 +336,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
     }
   }
 
-  final public void doRecovery() throws Exception {
+  final public void doRecovery(SolrCore core) throws Exception {
     // we can lose our core descriptor, so store it now
 //    try {
 //      Replica leader = zkController.getZkStateReader().getLeaderRetry(coreDescriptor.getCollectionName(), coreDescriptor.getCloudDescriptor().getShardId(), 15000);
@@ -353,14 +354,14 @@ public class RecoveryStrategy implements Runnable, Closeable {
 
     if (this.coreDescriptor.getCloudDescriptor().requiresTransactionLog()) {
       log.info("Sync or replica recovery");
-      doSyncOrReplicateRecovery();
+      doSyncOrReplicateRecovery(core);
     } else {
       log.info("Replicate only recovery");
-      doReplicateOnlyRecovery();
+      doReplicateOnlyRecovery(core);
     }
   }
 
-  final private void doReplicateOnlyRecovery() throws Exception {
+  final private void doReplicateOnlyRecovery(SolrCore core) throws Exception {
     boolean successfulRecovery = false;
 
     // if (core.getUpdateHandler().getUpdateLog() != null) {
@@ -370,7 +371,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
     // return;
     // }
 
-    log.info("Publishing state of core [{}] as recovering", coreName);
+    log.info("Publishing state of core [{}] as recovering {}", coreName, "doReplicateOnlyRecovery");
 
     zkController.publish(this.coreDescriptor, Replica.State.RECOVERING);
 
@@ -462,30 +463,23 @@ public class RecoveryStrategy implements Runnable, Closeable {
   }
 
   // TODO: perhaps make this grab a new core each time through the loop to handle core reloads?
-  public final void doSyncOrReplicateRecovery() throws Exception {
+  public final void doSyncOrReplicateRecovery(SolrCore core) throws Exception {
     log.info("Do peersync or replication recovery core={} collection={}", coreName, coreDescriptor.getCollectionName());
 
-    log.info("Publishing state of core [{}] as recovering", coreName);
+    log.info("Publishing state of core [{}] as recovering {}", coreName, "doSyncOrReplicateRecovery");
 
     zkController.publish(this.coreDescriptor, Replica.State.RECOVERING);
 
     boolean successfulRecovery = false;
     boolean publishedActive = false;
     UpdateLog ulog;
-    try (SolrCore core = cc.getCore(coreName)) {
-      if (core == null) {
-        log.warn("SolrCore is null, won't do recovery");
-        close = true;
-        throw new AlreadyClosedException();
-      }
 
-      ulog = core.getUpdateHandler().getUpdateLog();
-      if (ulog == null) {
-        SolrException.log(log, "No UpdateLog found - cannot recover.");
-        close = true;
-        recoveryFailed(zkController, baseUrl, this.coreDescriptor);
-        return;
-      }
+    ulog = core.getUpdateHandler().getUpdateLog();
+    if (ulog == null) {
+      SolrException.log(log, "No UpdateLog found - cannot recover.");
+      close = true;
+      recoveryFailed(zkController, baseUrl, this.coreDescriptor);
+      return;
     }
 
     // we temporary ignore peersync for tlog replicas
@@ -595,37 +589,31 @@ public class RecoveryStrategy implements Runnable, Closeable {
           if (log.isInfoEnabled()) {
             log.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leader.getCoreUrl(), recoveringAfterStartup);
           }
-          try (SolrCore core = cc.getCore(coreName)) {
-            if (core == null) {
-              log.warn("SolrCore is null, won't do recovery");
-              close = true;
-              successfulRecovery = false;
-            }
 
-            // System.out.println("Attempting to PeerSync from " + leaderUrl
-            // + " i am:" + zkController.getNodeName());
-            boolean syncSuccess;
-            try (PeerSyncWithLeader peerSyncWithLeader = new PeerSyncWithLeader(core, leader.getCoreUrl(), ulog.getNumRecordsToKeep())) {
-              syncSuccess = peerSyncWithLeader.sync(recentVersions).isSuccess();
-            }
-            if (syncSuccess) {
-              SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams());
-              log.info("PeerSync was successful, commit to force open a new searcher");
-              // force open a new searcher
-              core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
-              req.close();
-              log.info("PeerSync stage of recovery was successful.");
-
-              // solrcloud_debug
-              // cloudDebugLog(core, "synced");
-
-              log.info("Replaying updates buffered during PeerSync.");
-              replay();
-
-              // sync success
-              successfulRecovery = true;
-            }
+          // System.out.println("Attempting to PeerSync from " + leaderUrl
+          // + " i am:" + zkController.getNodeName());
+          boolean syncSuccess;
+          try (PeerSyncWithLeader peerSyncWithLeader = new PeerSyncWithLeader(core, leader.getCoreUrl(), ulog.getNumRecordsToKeep())) {
+            syncSuccess = peerSyncWithLeader.sync(recentVersions).isSuccess();
+          }
+          if (syncSuccess) {
+            SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams());
+            log.info("PeerSync was successful, commit to force open a new searcher");
+            // force open a new searcher
+            core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
+            req.close();
+            log.info("PeerSync stage of recovery was successful.");
+
+            // solrcloud_debug
+            // cloudDebugLog(core, "synced");
+
+            log.info("Replaying updates buffered during PeerSync.");
+            replay(core);
+
+            // sync success
+            successfulRecovery = true;
           }
+
           if (!successfulRecovery) {
             log.info("PeerSync Recovery was not successful - trying replication.");
           }
@@ -645,7 +633,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
               throw new SolrException(ErrorCode.SERVER_ERROR, "Replication fetch reported as failed");
             }
 
-            replay();
+            replay(core);
 
             log.info("Replication Recovery was successful.");
             successfulRecovery = true;
@@ -673,14 +661,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
             // then we still need to update version bucket seeds after recovery
             if (successfulRecovery && replayFuture == null) {
               log.info("Updating version bucket highest from index after successful recovery.");
-              try (SolrCore core = cc.getCore(coreName)) {
-                if (core == null) {
-                  log.warn("SolrCore is null, won't do recovery");
-                  successfulRecovery = false;
-                } else {
-                  core.seedVersionBuckets();
-                }
-              }
+
+              core.seedVersionBuckets();
             }
 
             zkController.publish(this.coreDescriptor, Replica.State.ACTIVE);
@@ -780,51 +762,46 @@ public class RecoveryStrategy implements Runnable, Closeable {
 
   public static Runnable testing_beforeReplayBufferingUpdates;
 
-  final private void replay()
+  final private void replay(SolrCore core)
       throws InterruptedException, ExecutionException {
     if (testing_beforeReplayBufferingUpdates != null) {
       testing_beforeReplayBufferingUpdates.run();
     }
-    try (SolrCore core = cc.getCore(coreName)) {
-      if (core == null) {
-        log.warn("SolrCore is null, won't do recovery");
-        close = true;
-        throw new AlreadyClosedException();
+
+    if (replicaType == Replica.Type.TLOG) {
+      // roll over all updates during buffering to new tlog, make RTG available
+      try (SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams())) {
+        core.getUpdateHandler().getUpdateLog().copyOverBufferingUpdates(new CommitUpdateCommand(req, false));
       }
-      if (replicaType == Replica.Type.TLOG) {
-        // roll over all updates during buffering to new tlog, make RTG available
-        try (SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams())) {
-          core.getUpdateHandler().getUpdateLog().copyOverBufferingUpdates(new CommitUpdateCommand(req, false));
-        }
+    }
+    Future<RecoveryInfo> future = core.getUpdateHandler().getUpdateLog().applyBufferedUpdates();
+    if (future == null) {
+      // no replay needed\
+      log.info("No replay needed.");
+      return;
+    } else {
+      log.info("Replaying buffered documents.");
+      // wait for replay
+      RecoveryInfo report;
+      try {
+        report = future.get(10, TimeUnit.MINUTES); // nocommit - how long? make configurable too
+      } catch (InterruptedException e) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, "Replay failed");
+      } catch (TimeoutException e) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, e);
       }
-      Future<RecoveryInfo> future = core.getUpdateHandler().getUpdateLog().applyBufferedUpdates();
-      if (future == null) {
-        // no replay needed\
-        log.info("No replay needed.");
-        return;
-      } else {
-        log.info("Replaying buffered documents.");
-        // wait for replay
-        RecoveryInfo report;
-        try {
-          report = future.get(10, TimeUnit.MINUTES); // nocommit - how long? make configurable too
-        } catch (InterruptedException e) {
-          throw new SolrException(ErrorCode.SERVER_ERROR, "Replay failed");
-        } catch (TimeoutException e) {
-          throw new SolrException(ErrorCode.SERVER_ERROR, e);
-        }
-        if (report.failed) {
-          SolrException.log(log, "Replay failed");
-          throw new SolrException(ErrorCode.SERVER_ERROR, "Replay failed");
-        }
+      if (report.failed) {
+        SolrException.log(log, "Replay failed");
+        throw new SolrException(ErrorCode.SERVER_ERROR, "Replay failed");
       }
+    }
 
-      // the index may ahead of the tlog's caches after recovery, by calling this tlog's caches will be purged
-      UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
-      if (ulog != null) {
-        ulog.openRealtimeSearcher();
-      }
+    // the index may ahead of the tlog's caches after recovery, by calling this tlog's caches will be purged
+    UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+    if (ulog != null) {
+      ulog.openRealtimeSearcher();
     }
+
     // solrcloud_debug
     // cloudDebugLog(core, "replayed");
   }
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
index adb0e23..eb6c062 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
@@ -24,6 +24,7 @@ import java.lang.invoke.MethodHandles;
 import org.apache.lucene.index.IndexCommit;
 import org.apache.solr.common.ParWork;
 import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.ObjectReleaseTracker;
@@ -96,7 +97,7 @@ public class ReplicateFromLeader implements Closeable {
       replicationProcess = new ReplicationHandler();
       if (switchTransactionLog) {
         replicationProcess.setPollListener((solrCore, fetchResult) -> {
-          if (fetchResult == IndexFetcher.IndexFetchResult.INDEX_FETCH_SUCCESS) {
+          if (fetchResult.getSuccessful()) {
             String commitVersion = getCommitVersion(core);
             if (commitVersion == null) return;
             if (Long.parseLong(commitVersion) == lastVersion) return;
@@ -107,6 +108,21 @@ public class ReplicateFromLeader implements Closeable {
             cuc.setVersion(Long.parseLong(commitVersion));
             updateLog.commitAndSwitchToNewTlog(cuc);
             lastVersion = Long.parseLong(commitVersion);
+            try {
+              cc.getZkController().publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
+            } catch (Exception e) {
+              log.warn("Failed publishing as ACTIVE", e);
+            }
+          }
+        });
+      } else {
+        replicationProcess.setPollListener((solrCore, fetchResult) -> {
+          if (fetchResult.getSuccessful()) {
+            try {
+              cc.getZkController().publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
+            } catch (Exception e) {
+              log.warn("Failed publishing as ACTIVE", e);
+            }
           }
         });
       }
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
index cbb5e3d..5947da4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
@@ -124,7 +124,7 @@ class ShardLeaderElectionContextBase extends ElectionContext {
           } catch (NoNodeException e) {
             // fine
           }
-          if (log.isDebugEnabled()) log.debug("No version found for ephemeral leader parent node, won't remove previous leader registration. {}", leaderSeqPath);
+          if (log.isDebugEnabled()) log.debug("No version found for ephemeral leader parent node, won't remove previous leader registration. {} {}", leaderPath, leaderSeqPath);
         }
         leaderSeqPath = null;
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 375bea0..e2c094d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -542,17 +542,11 @@ public class ZkController implements Closeable, Runnable {
     });
     zkClient.setDisconnectListener(() -> {
       try (ParWork worker = new ParWork("disconnected", true, true)) {
-        worker.collect(ZkController.this.overseerElector);
         worker.collect(ZkController.this.overseer);
-
+        worker.collect(leaderElectors.values());
         worker.collect("clearZkCollectionTerms", () -> {
           clearZkCollectionTerms();
         });
-        if (zkClient.isAlive()) {
-          synchronized (leaderElectors) {
-            worker.collect(leaderElectors.values());
-          }
-        }
       }
 
     });
@@ -652,9 +646,7 @@ public class ZkController implements Closeable, Runnable {
         }
       }
 
-      synchronized (leaderElectors) {
-        closer.collect(leaderElectors);
-      }
+      closer.collect(leaderElectors);
 
       closer.collect(overseerElector);
 
@@ -678,9 +670,7 @@ public class ZkController implements Closeable, Runnable {
       });
 
     } finally {
-      synchronized (leaderElectors) {
-        leaderElectors.clear();
-      }
+      leaderElectors.clear();
     }
   }
 
@@ -695,9 +685,7 @@ public class ZkController implements Closeable, Runnable {
 
     this.isClosed = true;
     try (ParWork closer = new ParWork(this, true, true)) {
-      synchronized (leaderElectors) {
-        closer.collect(leaderElectors);
-      }
+      closer.collect(leaderElectors);
       collectionToTerms.forEach((s, zkCollectionTerms) -> closer.collect(zkCollectionTerms));
     }
 
@@ -1357,7 +1345,6 @@ public class ZkController implements Closeable, Runnable {
       throw new AlreadyClosedException();
     }
 
-    boolean success = false;
     try {
       final String baseUrl = getBaseUrl();
       final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
@@ -1406,7 +1393,7 @@ public class ZkController implements Closeable, Runnable {
       }
 
       log.info("Register replica - core:{} address:{} collection:{} shard:{} type={}", coreName, baseUrl, collection, shardId, replica.getType());
-      synchronized (leaderElectors) {
+
         LeaderElector leaderElector = leaderElectors.get(replica.getName());
         if (leaderElector == null) {
           ContextKey contextKey = new ContextKey(collection, coreName);
@@ -1418,7 +1405,7 @@ public class ZkController implements Closeable, Runnable {
           LeaderElector oldElector = leaderElectors.put(replica.getName(), leaderElector);
           IOUtils.closeQuietly(oldElector);
         }
-      }
+
 
       //
       try {
@@ -1427,12 +1414,6 @@ public class ZkController implements Closeable, Runnable {
         if (replica.getType() != Type.PULL) {
           // nocommit review
           joinElection(desc, joinAtHead);
-        } else if (replica.getType() == Type.PULL) {
-          if (joinAtHead) {
-            log.warn("Replica {} was designated as preferred leader but it's type is {}, It won't join election", coreName, Type.PULL);
-          }
-          log.debug("Replica {} skipping election because it's type is {}", coreName, Type.PULL);
-          startReplicationFromLeader(coreName, false);
         }
       } catch (InterruptedException e) {
         ParWork.propagateInterrupt(e);
@@ -1502,18 +1483,20 @@ public class ZkController implements Closeable, Runnable {
         }
       }
 
-      boolean didRecovery = checkRecovery(isLeader, collection, coreName, shardId, core, cc);
+    //  boolean didRecovery = checkRecovery(isLeader, collection, coreName, shardId, core, cc);
 
-      if (!didRecovery) {
-        if (isTlogReplicaAndNotLeader) {
-          startReplicationFromLeader(coreName, true);
-        }
+      if (isTlogReplicaAndNotLeader) {
+        startReplicationFromLeader(coreName, true);
+      }
 
-        if (!isLeader) {
-          publish(desc, Replica.State.ACTIVE, true);
-        }
+      if (replica.getType() == Type.PULL) {
+        startReplicationFromLeader(coreName, false);
       }
 
+      //        if (!isLeader) {
+      //          publish(desc, Replica.State.ACTIVE, true);
+      //        }
+
       if (replica.getType() != Type.PULL) {
         // the watcher is added to a set so multiple calls of this method will left only one watcher
         if (log.isDebugEnabled()) log.debug("add shard terms listener for {}", coreName);
@@ -1527,7 +1510,6 @@ public class ZkController implements Closeable, Runnable {
       registerUnloadWatcher(cloudDesc.getCollectionName(), cloudDesc.getShardId(), desc.getName());
 
       log.info("SolrCore Registered, core{} baseUrl={} collection={}, shard={}", coreName, baseUrl, collection, shardId);
-      success = true;
       return shardId;
     } finally {
       MDCLoggingContext.clear();
@@ -1667,16 +1649,14 @@ public class ZkController implements Closeable, Runnable {
     Replica replica = new Replica(cd.getName(), props, collection, shardId, zkStateReader);
     LeaderElector leaderElector;
 
-    synchronized (leaderElectors) {
-       leaderElector = leaderElectors.get(replica.getName());
-      if (leaderElector == null) {
-        ContextKey contextKey = new ContextKey(collection, replica.getName());
-        leaderElector = new LeaderElector(this, contextKey);
-        LeaderElector oldElector = leaderElectors.put(replica.getName(), leaderElector);
-        IOUtils.closeQuietly(oldElector);
-      } else {
-        leaderElector.cancel();
-      }
+    leaderElector = leaderElectors.get(replica.getName());
+    if (leaderElector == null) {
+      ContextKey contextKey = new ContextKey(collection, replica.getName());
+      leaderElector = new LeaderElector(this, contextKey);
+      LeaderElector oldElector = leaderElectors.put(replica.getName(), leaderElector);
+      IOUtils.closeQuietly(oldElector);
+    } else {
+      leaderElector.cancel();
     }
 
     ElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId,
@@ -1798,10 +1778,7 @@ public class ZkController implements Closeable, Runnable {
       if (state == Replica.State.RECOVERING && cd.getCloudDescriptor().getReplicaType() != Type.PULL) {
         // state is used by client, state of replica can change from RECOVERING to DOWN without needed to finish recovery
         // by calling this we will know that a replica actually finished recovery or not
-        ZkShardTerms shardTerms = getShardTermsOrNull(collection, shardId);
-        if (shardTerms == null) {
-          throw new AlreadyClosedException();
-        }
+        ZkShardTerms shardTerms = getShardTerms(collection, shardId);
         shardTerms.startRecovering(cd.getName());
       }
       if (state == Replica.State.ACTIVE && cd.getCloudDescriptor().getReplicaType() != Type.PULL) {
@@ -1886,7 +1863,6 @@ public class ZkController implements Closeable, Runnable {
         ZkCollectionTerms ct = collectionToTerms.get(collection);
         if (ct != null) {
           ct.remove(cd.getCloudDescriptor().getShardId(), cd);
-          if (ct.cleanUp()) IOUtils.closeQuietly(collectionToTerms.remove(collection));
         }
 
       } finally {
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 56a38db..0911be1 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -1928,7 +1928,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
         throw new SolrException(ErrorCode.SERVER_ERROR, e);
       }
     } finally {
-      log.info("close done refcount {} {}", refCount == null ? null : refCount.get(), name);
+      if (log.isDebugEnabled()) log.debug("close done refcount {} {}", refCount == null ? null : refCount.get(), name);
       refCount.set(-1);
       if (reloadLock != null && reloadLock.isHeldByCurrentThread()) reloadLock.unlock();
       assert ObjectReleaseTracker.release(this);
@@ -1936,8 +1936,6 @@ public final class SolrCore implements SolrInfoBean, Closeable {
 
       //areAllSearcherReferencesEmpty();
 
-
-
       synchronized (closeAndWait) {
         closeAndWait.notifyAll();
       }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index c9cbd02..590cbf1 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -435,8 +435,6 @@ public class Http2SolrClient extends SolrClient {
 
   public Cancellable asyncRequest(@SuppressWarnings({"rawtypes"}) SolrRequest solrRequest, String collection, AsyncListener<NamedList<Object>> asyncListener) {
     Integer idleTimeout = solrRequest.getParams().getInt("idleTimeout");
-
-
     Request req;
     try {
       req = makeRequest(solrRequest, collection);
@@ -1451,43 +1449,39 @@ public class Http2SolrClient extends SolrClient {
         try {
           asyncListener.onSuccess(stream);
         } catch (Exception e) {
+          log.error("Exception in async stream listener",e);
+        }
+      });
+    }
+
+    public void onComplete(Result result) {
+      try {
+        super.onComplete(result);
+      } finally {
+        try {
           if (stream != null) {
             try {
               while (stream.read() != -1) {
               }
-            } catch (IOException e1) {
+            } catch (IOException e) {
               // quietly
             }
           }
-          if (SolrException.getRootCause(e) != CANCELLED_EXCEPTION) {
-            asyncListener.onFailure(e);
-          }
-        }
-      });
-    }
-
-    public void onComplete(Result result) {
-
-      super.onComplete(result);
-
-      if (stream != null) {
-        try {
-          while (stream.read() != -1) {
+        } finally {
+          if (result.isFailed()) {
+            Throwable failure = result.getFailure();
+
+            if (failure != CANCELLED_EXCEPTION) { // avoid retrying on load balanced search requests - keep in mind this
+              // means cancelled requests won't notify the caller of fail or complete
+              try {
+                asyncListener.onFailure(new SolrServerException(failure.getMessage(), failure));
+              } catch (Exception e) {
+                log.error("Exception in async failure listener",e);
+              }
+            }
           }
-        } catch (IOException e) {
-          // quietly
         }
       }
-
-      if (result.isFailed()) {
-        Throwable failure = result.getFailure();
-
-        if (failure != CANCELLED_EXCEPTION) { // avoid retrying on load balanced search requests - keep in mind this
-          // means cancelled requests won't notify the caller of fail or complete
-          asyncListener.onFailure(new SolrServerException(failure.getMessage(), failure));
-        }
-      }
-
     }
   }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 7732d36..7b3ec54 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -895,6 +895,13 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     if (log.isDebugEnabled()) log.debug("Closing ZkStateReader");
     if (closeTracker != null) closeTracker.close();
     this.closed = true;
+
+    synchronized (this) {
+      if (collectionPropsCacheCleaner != null) {
+        collectionPropsCacheCleaner.cancel(true);
+      }
+    }
+
     if (notifications != null) {
       notifications.shutdown();
     }
@@ -902,29 +909,24 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     stateWatchersMap.forEach((s, stateWatcher) -> IOUtils.closeQuietly(stateWatcher));
     stateWatchersMap.clear();
 
-    waitLatches.forEach(c -> { for (int i = 0; i < c.getCount(); i++) c.countDown(); });
-    waitLatches.clear();
-
     try {
       if (closeClient) {
         IOUtils.closeQuietly(zkClient);
       }
       try {
         if (collectionPropsCacheCleaner != null) {
-          collectionPropsCacheCleaner.cancel(true);
+          collectionPropsCacheCleaner.cancel(false);
         }
       } catch (NullPointerException e) {
         // okay
       }
       if (notifications != null) {
-        try {
-          boolean success = notifications.awaitTermination(1, TimeUnit.SECONDS);
-          if (!success) notifications.shutdownNow();
-        } catch (InterruptedException e) {
-          ParWork.propagateInterrupt(e);
-        }
+        notifications.shutdownNow();
       }
 
+      waitLatches.forEach(c -> { for (int i = 0; i < c.getCount(); i++) c.countDown(); });
+      waitLatches.clear();
+
     } finally {
       assert ObjectReleaseTracker.release(this);
     }
@@ -2028,7 +2030,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
     try {
 
       // wait for the watcher predicate to return true, or time out
-      if (!latch.await(wait, unit)) {
+      if (!latch.await(wait, unit) || isClosed()) {
         throw new TimeoutException("Timeout waiting to see state for collection=" + collection + " :" + "live=" + liveNodes
                 + docCollection.get());
       }
diff --git a/solr/test-framework/src/resources/logconf/log4j2-startup-debug.xml b/solr/test-framework/src/resources/logconf/log4j2-election-debug.xml
similarity index 58%
copy from solr/test-framework/src/resources/logconf/log4j2-startup-debug.xml
copy to solr/test-framework/src/resources/logconf/log4j2-election-debug.xml
index bcc8267..c1bea34 100644
--- a/solr/test-framework/src/resources/logconf/log4j2-startup-debug.xml
+++ b/solr/test-framework/src/resources/logconf/log4j2-election-debug.xml
@@ -20,11 +20,11 @@
     <Appenders>
 
         <Console name="STDERR_COLOR" target="SYSTEM_ERR">
-            <PatternLayout pattern="%style{%-4r}{yellow} %highlight{%maxLen{%-5p}{6}} [%style{%X{node_name} %X{collection} %X{shard} %X{replica} %X{trace_id}}{cyan}] %style{%c{1.}}{cyan} %highlight{%m %notEmpty{%ex}}\n"/>
+            <PatternLayout pattern="%style{%-4r}{yellow} %highlight{%maxLen{%-5p}{6}} [%style{%X{replica} %X{trace_id}}{cyan}] %style{%c{1.}}{cyan} %highlight{%m %notEmpty{%ex}}\n"/>
         </Console>
 
         <File name="FILE" fileName="${sys:user.home}/solr-test.log" immediateFlush="false" append="false">
-            <PatternLayout pattern="%style{%-4r}{yellow} %highlight{%maxLen{%-5p}{6}} %style{(%t)}{yellow,bold} [%style{%X{node_name} %X{collection} %X{shard} %X{replica} %X{trace_id}}{cyan}] %style{%c{1.}}{cyan} %highlight{%m %notEmpty{%ex}}\n"/>
+            <PatternLayout pattern="%style{%-4r}{yellow} %highlight{%maxLen{%-5p}{6}} %style{(%t)}{yellow,bold} [%style{%X{replica} %X{trace_id}}{cyan}] %style{%c{1.}}{cyan} %highlight{%m %notEmpty{%ex}}\n"/>
         </File>
 
         <File name="FILE2" fileName="${sys:user.home}/solr-test.log" immediateFlush="false" append="false">
@@ -39,43 +39,15 @@
 
     </Appenders>
     <Loggers>
-
-
-        <AsyncLogger name="org.apache.solr.servlet.HttpSolrCall" level="DEBUG"/>
         <AsyncLogger name="org.apache.zookeeper" level="WARN"/>
         <AsyncLogger name="org.apache.hadoop" level="WARN"/>
         <AsyncLogger name="org.apache.directory" level="WARN"/>
         <AsyncLogger name="org.apache.solr.hadoop" level="INFO"/>
         <AsyncLogger name="org.eclipse.jetty" level="INFO"/>
-        <AsyncLogger name="org.apache.solr.core.SolrCore" level="DEBUG"/>
-        <AsyncLogger name="org.apache.solr.handler.admin.CollectionsHandler" level="DEBUG"/>
-        <AsyncLogger name="org.apache.solr.handler.IndexFetcher" level="DEBUG"/>
-        <AsyncLogger name="org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler" level="DEBUG"/>
-        <AsyncLogger name="org.apache.solr.cloud.api.collections.CreateCollectionCmd" level="DEBUG"/>
-        <!--  <AsyncLogger name="org.apache.solr.common.patterns.DW" level="DEBUG"/> -->
-        <AsyncLogger name="org.apache.solr.cloud.overseer.ZkStateWriter" level="DEBUG"/>
-        <AsyncLogger name="org.apache.solr.cloud.Overseer" level="DEBUG"/>
-        <!--  <AsyncLogger name="org.apache.solr.cloud.OverseerTaskProcessor" level="DEBUG"/>
-           <AsyncLogger name="org.apache.solr.cloud.ZkDistributedQueue" level="DEBUG"/>
-         <AsyncLogger name="org.apache.solr.cloud.OverseerTaskQueue" level="DEBUG"/>
-         <AsyncLogger name="org.apache.solr.cloud.OverseerTaskExecutorTask" level="DEBUG"/>-->
+
         <AsyncLogger name="org.apache.solr.cloud.LeaderElector" level="DEBUG"/>
         <AsyncLogger name="org.apache.solr.cloud.ShardLeaderElectionContextBase" level="DEBUG"/>
-
-        <!-- <AsyncLogger name="org.apache.solr.common.cloud.SolrZkClient" level="DEBUG"/>
-        <AsyncLogger name="org.apache.solr.cloud.overseer.SliceMutator" level="DEBUG"/>-->
-        <AsyncLogger name="org.apache.solr.client.solrj.impl.LBSolrClient" level="DEBUG"/>
-        <AsyncLogger name="org.apache.solr.cloud.ZkController" level="DEBUG"/>
-        <AsyncLogger name="org.apache.solr.common.cloud.ZkStateReader" level="DEBUG"/>
-
-        <AsyncLogger name="org.apache.solr.core.SolrCore" level="DEBUG"/>
-        <AsyncLogger name="org.apache.solr.common.cloud.ZkMaintenanceUtils" level="DEBUG"/>
-        <AsyncLogger name="org.apache.solr.update.processor.DistributedZkUpdateProcessor" level="DEBUG"/>
-        <AsyncLogger name="org.apache.solr.update.processor.DistributedUpdateProcessor" level="DEBUG"/>
-
-        <AsyncLogger name="org.apache.solr.client.solrj.impl.Http2SolrClient" level="TRACE"/>
-
-        <AsyncLogger name="com.google.inject.servlet" level="DEBUG"/>
+        <AsyncLogger name="org.apache.solr.cloud.OverseerElectionContext" level="DEBUG"/>
 
         <AsyncRoot level="INFO">
             <AppenderRef ref="STDERR_COLOR"/>
diff --git a/solr/test-framework/src/resources/logconf/log4j2-startup-debug.xml b/solr/test-framework/src/resources/logconf/log4j2-startup-debug.xml
index bcc8267..22e1955 100644
--- a/solr/test-framework/src/resources/logconf/log4j2-startup-debug.xml
+++ b/solr/test-framework/src/resources/logconf/log4j2-startup-debug.xml
@@ -20,11 +20,11 @@
     <Appenders>
 
         <Console name="STDERR_COLOR" target="SYSTEM_ERR">
-            <PatternLayout pattern="%style{%-4r}{yellow} %highlight{%maxLen{%-5p}{6}} [%style{%X{node_name} %X{collection} %X{shard} %X{replica} %X{trace_id}}{cyan}] %style{%c{1.}}{cyan} %highlight{%m %notEmpty{%ex}}\n"/>
+            <PatternLayout pattern="%style{%-4r}{yellow} %highlight{%maxLen{%-5p}{6}} [%style{%X{replica} %X{trace_id}}{cyan}] %style{%c{1.}}{cyan} %highlight{%m %notEmpty{%ex}}\n"/>
         </Console>
 
         <File name="FILE" fileName="${sys:user.home}/solr-test.log" immediateFlush="false" append="false">
-            <PatternLayout pattern="%style{%-4r}{yellow} %highlight{%maxLen{%-5p}{6}} %style{(%t)}{yellow,bold} [%style{%X{node_name} %X{collection} %X{shard} %X{replica} %X{trace_id}}{cyan}] %style{%c{1.}}{cyan} %highlight{%m %notEmpty{%ex}}\n"/>
+            <PatternLayout pattern="%style{%-4r}{yellow} %highlight{%maxLen{%-5p}{6}} %style{(%t)}{yellow,bold} [%style{%X{replica} %X{trace_id}}{cyan}] %style{%c{1.}}{cyan} %highlight{%m %notEmpty{%ex}}\n"/>
         </File>
 
         <File name="FILE2" fileName="${sys:user.home}/solr-test.log" immediateFlush="false" append="false">