You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/12/16 17:09:32 UTC

[lucene-solr] 02/02: @1243 Minor fixes.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 794972d721e03dc87b6caad7c1432df99e2287fe
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Dec 16 11:05:23 2020 -0600

    @1243 Minor fixes.
---
 .../solr/cloud/ShardLeaderElectionContext.java     |  10 +-
 .../org/apache/solr/cloud/ZkCollectionTerms.java   |   6 +-
 .../java/org/apache/solr/cloud/ZkController.java   | 139 ++++++++++-----------
 .../java/org/apache/solr/core/CoreContainer.java   |  42 +++----
 .../solr/handler/admin/CollectionsHandler.java     |   2 +-
 .../solr/handler/admin/CoreAdminHandler.java       |   2 +-
 .../org/apache/solr/cloud/ReplaceNodeTest.java     |   2 +
 .../CreateCollectionsIndexAndRestartTest.java      |   2 +-
 .../org/apache/solr/core/TestCodecSupport.java     |   3 +-
 .../apache/solr/common/cloud/ZkStateReader.java    |  47 ++++---
 10 files changed, 133 insertions(+), 122 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
index 60acb70..e2af971 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
@@ -21,7 +21,6 @@ import java.lang.invoke.MethodHandles;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.lucene.search.MatchAllDocsQuery;
 import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.common.AlreadyClosedException;
 import org.apache.solr.common.ParWork;
@@ -37,10 +36,8 @@ import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.logging.MDCLoggingContext;
-import org.apache.solr.search.SolrIndexSearcher;
 import org.apache.solr.update.PeerSync;
 import org.apache.solr.update.UpdateLog;
-import org.apache.solr.util.RefCounted;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.SessionExpiredException;
 import org.slf4j.Logger;
@@ -102,6 +99,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
     String coreName = leaderProps.getName();
 
     log.info("Run leader process for shard [{}] election, first step is to try and sync with the shard core={}", context.leaderProps.getSlice(), coreName);
+    cc.waitForLoadingCore(coreName, 15000);
     try (SolrCore core = cc.getCore(coreName)) {
       if (core == null) {
         log.error("No SolrCore found, cannot become leader {}", coreName);
@@ -186,9 +184,9 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
               success = true;
             }
           }
+        } else {
+          log.info("Our sync attempt succeeded");
         }
-        log.info("Our sync attempt succeeded");
-
         // solrcloud_debug
 //        if (log.isDebugEnabled()) {
 //          try {
@@ -228,6 +226,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
         // in case of leaderVoteWait timeout, a replica with lower term can win the election
         if (setTermToMax) {
           log.error("WARNING: Potential data loss -- Replica {} became leader after timeout (leaderVoteWait) " + "without being up-to-date with the previous leader", coreName);
+          zkController.createCollectionTerms(collection);
           zkController.getShardTerms(collection, shardId).setTermEqualsToLeader(coreName);
         }
 
@@ -245,6 +244,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
         ParWork.propagateInterrupt("Already closed or interrupted, bailing..", e);
         throw new SolrException(ErrorCode.SERVER_ERROR, e);
       } catch (SessionExpiredException e) {
+        SolrException.log(log, "SessionExpired", e);
         throw e;
       } catch (Exception e) {
         SolrException.log(log, "There was a problem trying to register as the leader", e);
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
index 42311fc..b5b4ef4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
@@ -18,6 +18,7 @@
 package org.apache.solr.cloud;
 
 import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.zookeeper.KeeperException;
@@ -47,8 +48,11 @@ class ZkCollectionTerms implements AutoCloseable {
   ZkShardTerms getShard(String shardId) throws Exception {
     collectionToTermsLock.lock();
     try {
+      ZkShardTerms zkterms = null;
       if (!terms.containsKey(shardId)) {
-        terms.put(shardId, new ZkShardTerms(collection, shardId, zkClient));
+        zkterms = new ZkShardTerms(collection, shardId, zkClient);
+        IOUtils.closeQuietly(terms.put(shardId, zkterms));
+        return zkterms;
       }
       return terms.get(shardId);
     } finally {
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 242bd51..c1009a6 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -311,28 +311,29 @@ public class ZkController implements Closeable, Runnable {
     }
   }
 
-  private class RegisterCoreAsync implements Callable<Object> {
+  public static class RegisterCoreAsync implements Callable<Object> {
 
-    CoreDescriptor descriptor;
-    boolean afterExpiration;
+    private final ZkController zkController;
+    final CoreDescriptor descriptor;
+    final boolean afterExpiration;
 
-    RegisterCoreAsync(CoreDescriptor descriptor, boolean afterExpiration) {
+    public RegisterCoreAsync(ZkController zkController, CoreDescriptor descriptor, boolean afterExpiration) {
       this.descriptor = descriptor;
       this.afterExpiration = afterExpiration;
+      this.zkController = zkController;
     }
 
     public Object call() throws Exception {
       if (log.isInfoEnabled()) {
         log.info("Registering core {} afterExpiration? {}", descriptor.getName(), afterExpiration);
       }
-      if (cc.getLoadedCoreNames().contains(descriptor.getName())) {
-        register(descriptor.getName(), descriptor);
-        return descriptor;
-      }
-      return null;
+
+      zkController.register(descriptor.getName(), descriptor, afterExpiration);
+      return descriptor;
     }
   }
 
+
   // notifies registered listeners after the ZK reconnect in the background
   private static class OnReconnectNotifyAsync implements Callable<Object> {
 
@@ -490,7 +491,7 @@ public class ZkController implements Closeable, Runnable {
                     // unload solrcores that have been 'failed over'
                     // throwErrorIfReplicaReplaced(descriptor);
 
-                    ParWork.getRootSharedExecutor().submit(new RegisterCoreAsync(descriptor, true));
+                    ParWork.getRootSharedExecutor().submit(new RegisterCoreAsync(ZkController.this, descriptor, true));
 
                   } catch (Exception e) {
                     SolrException.log(log, "Error registering SolrCore", e);
@@ -1314,12 +1315,7 @@ public class ZkController implements Closeable, Runnable {
   }
 
   public String register(String coreName, final CoreDescriptor desc) throws Exception {
-    try (SolrCore core = cc.getCore(coreName)) {
-      if (core == null || core.isClosing() || getCoreContainer().isShutDown()) {
-        throw new AlreadyClosedException();
-      }
-     return register(core, desc, false);
-    }
+     return register(coreName, desc, false);
   }
 
   /**
@@ -1327,27 +1323,21 @@ public class ZkController implements Closeable, Runnable {
    *
    * @return the shardId for the SolrCore
    */
-  private String register(SolrCore core, final CoreDescriptor desc, boolean afterExpiration) throws Exception {
-    if (getCoreContainer().isShutDown()) {
+  private String register(String coreName, final CoreDescriptor desc, boolean afterExpiration) throws Exception {
+    if (getCoreContainer().isShutDown() || isDcCalled()) {
       throw new AlreadyClosedException();
     }
     MDCLoggingContext.setCoreDescriptor(cc, desc);
-    String coreName = core.getName();
-
-    if (core.isClosing() || cc.isShutDown()) {
-      throw new AlreadyClosedException();
-    }
-
+    ZkShardTerms shardTerms;
     try {
       final String baseUrl = getBaseUrl();
       final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
       final String collection = cloudDesc.getCollectionName();
       final String shardId = cloudDesc.getShardId();
 
-
       log.info("Register terms for replica {}", coreName);
       createCollectionTerms(collection);
-      ZkShardTerms shardTerms = getShardTerms(collection, cloudDesc.getShardId());
+      shardTerms = getShardTerms(collection, cloudDesc.getShardId());
 
       // the watcher is added to a set so multiple calls of this method will left only one watcher
       getZkStateReader().registerCore(cloudDesc.getCollectionName());
@@ -1444,53 +1434,61 @@ public class ZkController implements Closeable, Runnable {
       // TODO: should this be moved to another thread? To recoveryStrat?
       // TODO: should this actually be done earlier, before (or as part of)
       // leader election perhaps?
+      cc.waitForLoadingCore(coreName, 15000);
+      try (SolrCore core = cc.getCore(coreName)) {
+        if (core == null || core.isClosing() || getCoreContainer().isShutDown()) {
+          throw new AlreadyClosedException();
+        }
 
-      UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
-      boolean isTlogReplicaAndNotLeader = replica.getType() == Replica.Type.TLOG && !isLeader;
-      if (isTlogReplicaAndNotLeader) {
-        String commitVersion = ReplicateFromLeader.getCommitVersion(core);
-        if (commitVersion != null) {
-          ulog.copyOverOldUpdates(Long.parseLong(commitVersion));
+        UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+        boolean isTlogReplicaAndNotLeader = replica.getType() == Replica.Type.TLOG && !isLeader;
+        if (isTlogReplicaAndNotLeader) {
+          String commitVersion = ReplicateFromLeader.getCommitVersion(core);
+          if (commitVersion != null) {
+            ulog.copyOverOldUpdates(Long.parseLong(commitVersion));
+          }
         }
-      }
-      // we will call register again after zk expiration and on reload
-      if (!afterExpiration &&  ulog != null && !isTlogReplicaAndNotLeader) {
-        // disable recovery in case shard is in construction state (for shard splits)
-        Slice slice = getClusterState().getCollection(collection).getSlice(shardId);
-        if (slice.getState() != Slice.State.CONSTRUCTION || !isLeader) {
-          Future<UpdateLog.RecoveryInfo> recoveryFuture = core.getUpdateHandler().getUpdateLog().recoverFromLog();
-          if (recoveryFuture != null) {
-            log.info("Replaying tlog for {} during startup... NOTE: This can take a while.", core);
-            recoveryFuture.get(); // NOTE: this could potentially block for
-            // minutes or more!
-            // TODO: public as recovering in the mean time?
-            // TODO: in the future we could do peersync in parallel with recoverFromLog
-          } else {
-            if (log.isDebugEnabled()) {
-              log.debug("No LogReplay needed for core={} baseURL={}", core.getName(), baseUrl);
+        // we will call register again after zk expiration and on reload
+        if (!afterExpiration && ulog != null && !isTlogReplicaAndNotLeader) {
+          // disable recovery in case shard is in construction state (for shard splits)
+          Slice slice = getClusterState().getCollection(collection).getSlice(shardId);
+          if (slice.getState() != Slice.State.CONSTRUCTION || !isLeader) {
+            Future<UpdateLog.RecoveryInfo> recoveryFuture = core.getUpdateHandler().getUpdateLog().recoverFromLog();
+            if (recoveryFuture != null) {
+              log.info("Replaying tlog for {} during startup... NOTE: This can take a while.", core);
+              recoveryFuture.get(); // NOTE: this could potentially block for
+              // minutes or more!
+              // TODO: public as recovering in the mean time?
+              // TODO: in the future we could do peersync in parallel with recoverFromLog
+            } else {
+              if (log.isDebugEnabled()) {
+                log.debug("No LogReplay needed for core={} baseURL={}", core.getName(), baseUrl);
+              }
             }
           }
         }
-      }
 
-    //  boolean didRecovery = checkRecovery(isLeader, collection, coreName, shardId, core, cc);
+        if (replica.getType() != Type.PULL) {
+          checkRecovery(isLeader, collection, coreName, shardId, core, cc);
+        }
 
-      if (isTlogReplicaAndNotLeader) {
-        startReplicationFromLeader(coreName, true);
-      }
+        if (isTlogReplicaAndNotLeader) {
+          startReplicationFromLeader(coreName, true);
+        }
 
-      if (replica.getType() == Type.PULL) {
-        startReplicationFromLeader(coreName, false);
-      }
+        if (replica.getType() == Type.PULL) {
+          startReplicationFromLeader(coreName, false);
+        }
 
-      //        if (!isLeader) {
-      //          publish(desc, Replica.State.ACTIVE, true);
-      //        }
+        //        if (!isLeader) {
+        //          publish(desc, Replica.State.ACTIVE, true);
+        //        }
 
-      if (replica.getType() != Type.PULL) {
-        // the watcher is added to a set so multiple calls of this method will left only one watcher
-        if (log.isDebugEnabled()) log.debug("add shard terms listener for {}", coreName);
-        shardTerms.addListener(new RecoveringCoreTermWatcher(core.getCoreDescriptor(), getCoreContainer()));
+        if (replica.getType() != Type.PULL && shardTerms != null) {
+          // the watcher is added to a set so multiple calls of this method will left only one watcher
+          if (log.isDebugEnabled()) log.debug("add shard terms listener for {}", coreName);
+          shardTerms.addListener(new RecoveringCoreTermWatcher(core.getCoreDescriptor(), getCoreContainer()));
+        }
       }
 
       desc.getCloudDescriptor().setHasRegistered(true);
@@ -1668,28 +1666,17 @@ public class ZkController implements Closeable, Runnable {
   private boolean checkRecovery(final boolean isLeader,
                                 final String collection, String coreZkNodeName, String shardId,
                                 SolrCore core, CoreContainer cc) throws Exception {
-    boolean doRecovery = true;
+
     if (!isLeader) {
 
-      if (doRecovery && !core.getUpdateHandler().getSolrCoreState().isRecoverying()) {
+      if (!core.getUpdateHandler().getSolrCoreState().isRecoverying()) {
         if (log.isInfoEnabled()) {
           log.info("Core needs to recover:{}", core.getName());
         }
         core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
         return true;
       }
-      log.info("get shard terms {} {} {}", core.getName(), collection, shardId);
-      ZkShardTerms zkShardTerms = getShardTerms(collection, shardId);
-      if (zkShardTerms.registered(coreZkNodeName) && !zkShardTerms.canBecomeLeader(coreZkNodeName)) {
-        if (log.isInfoEnabled()) {
-          log.info("Leader's term larger than core {}; starting recovery process", core.getName());
-        }
-        core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
-        return true;
-      } else {
-        log.info("Leaders term did not force us into recovery");
 
-      }
     } else {
       log.info("I am the leader, no recovery necessary");
     }
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 8fa3a68..093cb2d 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -149,7 +149,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
@@ -901,7 +900,9 @@ public class CoreContainer implements Closeable {
           if (isZooKeeperAware() && !CloudUtil.checkIfValidCloudCore(this, cd)) {
             continue;
           }
-
+          if (isZooKeeperAware()) {
+            ParWork.getRootSharedExecutor().submit(new ZkController.RegisterCoreAsync(zkSys.zkController, cd, false));
+          }
           coreLoadFutures.add(solrCoreLoadExecutor.submit(() -> {
             SolrCore core;
             try {
@@ -1219,9 +1220,9 @@ public class CoreContainer implements Closeable {
     return coresLocator;
   }
 
-  protected SolrCore registerCore(CoreDescriptor cd, SolrCore core, boolean registerInZk, boolean closeOld) {
+  protected SolrCore registerCore(CoreDescriptor cd, SolrCore core, boolean closeOld) {
 
-    log.info("registerCore name={}, registerInZk={}, skipRecovery={}", cd.getName(), registerInZk);
+    log.info("registerCore name={}, skipRecovery={}", cd.getName());
 
     if (core == null) {
       throw new SolrException(ErrorCode.SERVER_ERROR, "Can not register a null core.");
@@ -1247,9 +1248,6 @@ public class CoreContainer implements Closeable {
 
     if (old == null || old == core) {
       log.info("registering core: " + cd.getName());
-      if (registerInZk) {
-        zkSys.registerInZk(core);
-      }
       return null;
     } else {
       log.info("replacing core: " + cd.getName());
@@ -1258,9 +1256,6 @@ public class CoreContainer implements Closeable {
           old.close();
         }
       }
-      if (registerInZk) {
-        zkSys.registerInZk(core);
-      }
       return old;
     }
   }
@@ -1411,6 +1406,10 @@ public class CoreContainer implements Closeable {
           if (isShutDown) {
             throw new AlreadyClosedException("Solr has been shutdown.");
           }
+          solrCores.markCoreAsLoading(dcore);
+          if (isZooKeeperAware()) {
+            ParWork.getRootSharedExecutor().submit(new ZkController.RegisterCoreAsync(zkSys.zkController, dcore, false));
+          }
           core = new SolrCore(this, dcore, coreConfig);
         } catch (Exception e) {
           core = processCoreCreateException(e, dcore, coreConfig);
@@ -1418,11 +1417,13 @@ public class CoreContainer implements Closeable {
 
         core.start();
 
-        old = registerCore(dcore, core, isZooKeeperAware(), true);
+        old = registerCore(dcore, core, true);
         registered = true;
       } catch (Exception e){
 
         throw new SolrException(ErrorCode.SERVER_ERROR, e);
+      } finally {
+        solrCores.markCoreAsNotLoading(dcore);
       }
 
 
@@ -1471,15 +1472,14 @@ public class CoreContainer implements Closeable {
             });
           }
         }
+        if (isShutDown) {
+          SolrCore finalCore1 = core;
+          ParWork.getRootSharedExecutor().submit(() -> {
 
-        //        SolrCore finalCore1 = core;
-        //        ParWork.getRootSharedExecutor().submit(() -> {
-        //          try {
-        //            finalCore1.closeAndWait(false);
-        //          } catch (TimeoutException timeoutException) {
-        //            throw new SolrException(ErrorCode.SERVER_ERROR, timeoutException);
-        //          }
-        //        });
+            finalCore1.closeAndWait();
+
+          });
+        }
       }
       MDCLoggingContext.clear();
     }
@@ -1757,7 +1757,7 @@ public class CoreContainer implements Closeable {
               }
             }
 
-            oldCore = registerCore(cd, newCore, false, true);
+            oldCore = registerCore(cd, newCore, true);
 
             success = true;
           } catch (Exception e) {
@@ -1984,7 +1984,7 @@ public class CoreContainer implements Closeable {
         CoreDescriptor cd = core.getCoreDescriptor();
         cd.setProperty("name", toName);
         core.setName(toName);
-        registerCore(cd, core, isZooKeeperAware(), false);
+        registerCore(cd, core, false);
         SolrCore old = solrCores.remove(name);
 
         coresLocator.rename(this, old.getCoreDescriptor(), core.getCoreDescriptor());
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index bd26627..17f9d2c 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -1390,7 +1390,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
 
       // Wait till we have an active leader
       try {
-        zkController.getZkStateReader().getLeaderRetry(collectionName, sliceId, 30);
+        zkController.getZkStateReader().getLeaderRetry(collectionName, sliceId, 30000);
       } catch (Exception e) {
         ParWork.propagateInterrupt(e);
         log.info("Couldn't successfully force leader, collection: {}, shard: {}. Cluster state: {}", collectionName, sliceId, clusterState);
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
index 94fe6ae..24c576a 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
@@ -175,7 +175,7 @@ public class CoreAdminHandler extends RequestHandlerBase implements PermissionNa
         try {
           MDC.put("CoreAdminHandler.asyncId", taskId);
           MDC.put("CoreAdminHandler.action", op.action.toString());
-          ParWork.getMyPerThreadExecutor().submit(() -> { // ### SUPER DUPER EXPERT USAGE
+          ParWork.getRootSharedExecutor().submit(() -> { // ### SUPER DUPER EXPERT USAGE
             boolean exceptionCaught = false;
             try {
               if (!cores.isShutDown()) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java b/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
index fb1fda0..62e6e8e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ReplaceNodeTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.solr.cloud;
 
+import org.apache.lucene.util.LuceneTestCase;
 import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
 import org.apache.solr.client.solrj.impl.Http2SolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -40,6 +41,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+@LuceneTestCase.AwaitsFix(bugUrl = "nocommit - can leak a Solr a reasonable percent - cmd needs polish/finish anyway")
 public class ReplaceNodeTest extends SolrCloudTestCase {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java
index 562c979..62bd7b5 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CreateCollectionsIndexAndRestartTest.java
@@ -36,7 +36,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 @Slow
-//@LuceneTestCase.AwaitsFix(bugUrl = "This an experimental test class")
+@LuceneTestCase.AwaitsFix(bugUrl = "This an experimental test class")
 public class CreateCollectionsIndexAndRestartTest extends SolrCloudTestCase {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
diff --git a/solr/core/src/test/org/apache/solr/core/TestCodecSupport.java b/solr/core/src/test/org/apache/solr/core/TestCodecSupport.java
index d3a3ec6..bccc506 100644
--- a/solr/core/src/test/org/apache/solr/core/TestCodecSupport.java
+++ b/solr/core/src/test/org/apache/solr/core/TestCodecSupport.java
@@ -36,7 +36,6 @@ import org.apache.solr.schema.SchemaField;
 import org.apache.solr.util.TestHarness;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Ignore;
 
 import javax.xml.xpath.XPathExpressionException;
@@ -235,7 +234,7 @@ public class TestCodecSupport extends SolrTestCaseJ4 {
       CoreDescriptor cd = new CoreDescriptor(newCoreName, testSolrHome.resolve(newCoreName), coreContainer);
       c = new SolrCore(coreContainer, cd,
           new ConfigSet("fakeConfigset", config, schema, null, true));
-      assertNull(coreContainer.registerCore(cd, c, false, false));
+      assertNull(coreContainer.registerCore(cd, c, false));
       h.coreName = newCoreName;
       assertEquals("We are not using the correct core", "solrconfig_codec2.xml", h.getCore().getConfigResource());
       assertU(add(doc("string_f", "foo")));
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 0ce651b..1ef3cc5 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -967,7 +967,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 //  }
 
   public boolean isNodeLive(String node) {
-    return liveNodes.contains(node);
+    return getLiveNodes().contains(node);
 
   }
 
@@ -989,9 +989,15 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       Slice slice = coll.getSlice(shard);
       if (slice != null) {
         Replica leader = slice.getLeader();
-        if (leader != null && leader.getState() == Replica.State.ACTIVE) {
+        if (leader != null && leader.getState() == Replica.State.ACTIVE && isNodeLive(leader.getNodeName())) {
           return leader;
         }
+        Collection<Replica> replicas = slice.getReplicas();
+        for (Replica replica : replicas) {
+          if ("true".equals(replica.getProperty(LEADER_PROP)) && replica.getState() == Replica.State.ACTIVE && isNodeLive(replica.getNodeName())) {
+            return replica;
+          }
+        }
       }
     }
 
@@ -1003,10 +1009,17 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
         Slice slice = c.getSlice(shard);
         if (slice == null) return false;
         Replica leader = slice.getLeader();
-        if (leader != null && leader.getState() == Replica.State.ACTIVE) {
+        if (leader != null && leader.getState() == Replica.State.ACTIVE && isNodeLive(leader.getNodeName())) {
           returnLeader.set(leader);
           return true;
         }
+        Collection<Replica> replicas = slice.getReplicas();
+        for (Replica replica : replicas) {
+          if ("true".equals(replica.getProperty(LEADER_PROP)) && replica.getState() == Replica.State.ACTIVE && isNodeLive(replica.getNodeName())) {
+            returnLeader.set(replica);
+            return true;
+          }
+        }
 
         return false;
       });
@@ -1478,8 +1491,6 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
             Replica.State state = null;
             if (!entry.getValue().equals("l")) {
               state = Replica.State.shortStateToState((String) entry.getValue());
-            } else {
-              state = Replica.State.ACTIVE;
             }
             if (log.isDebugEnabled()) log.debug("Got additional state update {} {}", core, state == null ? "leader" : state);
             Replica replica = docCollection.getReplica(core);
@@ -1492,12 +1503,26 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
               Map properties = new HashMap(replica.getProperties());
               if (entry.getValue().equals("l")) {
                 if (log.isDebugEnabled()) log.debug("state is leader, set to active and leader prop");
-                setLeader = true;
-                properties.put(ZkStateReader.STATE_PROP, Replica.State.ACTIVE);
+                properties.put(ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
                 properties.put("leader", "true");
-              } else {
+
+                for (Replica r : replicasMap.values()) {
+                  if (r == replica) {
+                    continue;
+                  }
+                  if ("true".equals(r.getProperty(LEADER_PROP))) {
+                    Map<String,Object> props = new HashMap<>(r.getProperties());
+                    props.remove(LEADER_PROP);
+                    Replica newReplica = new Replica(r.getName(), props, coll, r.getSlice(), ZkStateReader.this);
+                    replicasMap.put(r.getName(), newReplica);
+                  }
+                }
+              } else if (state != null) {
                 if (log.isDebugEnabled()) log.debug("std state, set to {}", state);
                 properties.put(ZkStateReader.STATE_PROP, state.toString());
+                if (state != Replica.State.ACTIVE && "true".equals(properties.get(LEADER_PROP))) {
+                  properties.remove(LEADER_PROP);
+                }
               }
 
               Replica newReplica = new Replica(core, properties, coll, replica.getSlice(), ZkStateReader.this);
@@ -1508,12 +1533,6 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
 
               Slice newSlice = new Slice(slice.getName(), replicasMap, slice.getProperties(), coll, ZkStateReader.this);
 
-              if (setLeader) {
-                newSlice.setLeader(newReplica);
-              } else {
-                newSlice.setLeader(slice.getLeader());
-              }
-
               Map<String,Slice> newSlices = new HashMap<>(docCollection.getSlicesMap());
               newSlices.put(slice.getName(), newSlice);