You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ab...@apache.org on 2018/04/23 17:35:06 UTC

[02/40] lucene-solr:jira/solr-11833: SOLR-12187: Replica should watch clusterstate and unload itself if its entry is removed

SOLR-12187: Replica should watch clusterstate and unload itself if its entry is removed


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/09db13f4
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/09db13f4
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/09db13f4

Branch: refs/heads/jira/solr-11833
Commit: 09db13f4f459a391896db2a90b2830f9b1fd898d
Parents: f7f12a5
Author: Cao Manh Dat <da...@apache.org>
Authored: Tue Apr 17 20:16:31 2018 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Tue Apr 17 20:16:31 2018 +0700

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   2 +
 .../org/apache/solr/cloud/ZkController.java     | 136 ++++++++++++++-----
 .../java/org/apache/solr/core/ZkContainer.java  |  16 ---
 .../solr/handler/admin/CollectionsHandler.java  |  41 +-----
 .../apache/solr/cloud/DeleteReplicaTest.java    |  84 ++++++++++--
 .../org/apache/solr/cloud/ForceLeaderTest.java  |  75 ----------
 .../org/apache/solr/cloud/MoveReplicaTest.java  |  17 ---
 .../apache/solr/common/cloud/ZkStateReader.java |   8 +-
 8 files changed, 186 insertions(+), 193 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/09db13f4/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index e010366..1107c56 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -164,6 +164,8 @@ Bug Fixes
 
 * SOLR-10169: PeerSync will hit an NPE on no response errors when looking for fingerprint. (Erick Erickson)
 
+* SOLR-12187: Replica should watch clusterstate and unload itself if its entry is removed (Cao Manh Dat)
+
 Optimizations
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/09db13f4/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 872a8b9..8cd02b6 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -38,6 +38,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
@@ -65,6 +66,7 @@ import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.BeforeReconnect;
 import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.CollectionStateWatcher;
 import org.apache.solr.common.cloud.DefaultConnectionStrategy;
 import org.apache.solr.common.cloud.DefaultZkACLProvider;
 import org.apache.solr.common.cloud.DefaultZkCredentialsProvider;
@@ -1033,42 +1035,39 @@ public class ZkController {
     try {
       // pre register has published our down state
       final String baseUrl = getBaseUrl();
-      
       final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
       final String collection = cloudDesc.getCollectionName();
-      
-      final String coreZkNodeName = desc.getCloudDescriptor().getCoreNodeName();
+      final String shardId = cloudDesc.getShardId();
+      final String coreZkNodeName = cloudDesc.getCoreNodeName();
       assert coreZkNodeName != null : "we should have a coreNodeName by now";
 
+      // check replica's existence in clusterstate first
+      try {
+        zkStateReader.waitForState(collection, Overseer.isLegacy(zkStateReader) ? 60000 : 100,
+            TimeUnit.MILLISECONDS, (liveNodes, collectionState) -> getReplicaOrNull(collectionState, shardId, coreZkNodeName) != null);
+      } catch (TimeoutException e) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore, timeout waiting for replica present in clusterstate");
+      }
+      Replica replica = getReplicaOrNull(zkStateReader.getClusterState().getCollectionOrNull(collection), shardId, coreZkNodeName);
+      if (replica == null) {
+        throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore, replica is removed from clusterstate");
+      }
+
       ZkShardTerms shardTerms = getShardTerms(collection, cloudDesc.getShardId());
 
       // This flag is used for testing rolling updates and should be removed in SOLR-11812
       boolean isRunningInNewLIR = "new".equals(desc.getCoreProperty("lirVersion", "new"));
-      if (isRunningInNewLIR && cloudDesc.getReplicaType() != Type.PULL) {
+      if (isRunningInNewLIR && replica.getType() != Type.PULL) {
         shardTerms.registerTerm(coreZkNodeName);
       }
-      String shardId = cloudDesc.getShardId();
-      Map<String,Object> props = new HashMap<>();
-      // we only put a subset of props into the leader node
-      props.put(ZkStateReader.BASE_URL_PROP, baseUrl);
-      props.put(ZkStateReader.CORE_NAME_PROP, coreName);
-      props.put(ZkStateReader.NODE_NAME_PROP, getNodeName());
-      
+
       log.debug("Register replica - core:{} address:{} collection:{} shard:{}",
-          coreName, baseUrl, cloudDesc.getCollectionName(), shardId);
-      
-      ZkNodeProps leaderProps = new ZkNodeProps(props);
+          coreName, baseUrl, collection, shardId);
 
       try {
         // If we're a preferred leader, insert ourselves at the head of the queue
-        boolean joinAtHead = false;
-        final DocCollection docCollection = zkStateReader.getClusterState().getCollectionOrNull(collection);
-        Replica replica = (docCollection == null) ? null : docCollection.getReplica(coreZkNodeName);
-        if (replica != null) {
-          joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false);
-        }
-        //TODO WHy would replica be null?
-        if (replica == null || replica.getType() != Type.PULL) {
+        boolean joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false);
+        if (replica.getType() != Type.PULL) {
           joinElection(desc, afterExpiration, joinAtHead);
         } else if (replica.getType() == Type.PULL) {
           if (joinAtHead) {
@@ -1093,9 +1092,8 @@ public class ZkController {
       String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
       log.debug("We are " + ourUrl + " and leader is " + leaderUrl);
       boolean isLeader = leaderUrl.equals(ourUrl);
-      Replica.Type replicaType =  zkStateReader.getClusterState().getCollection(collection).getReplica(coreZkNodeName).getType();
-      assert !(isLeader && replicaType == Type.PULL): "Pull replica became leader!";
-      
+      assert !(isLeader && replica.getType() == Type.PULL) : "Pull replica became leader!";
+
       try (SolrCore core = cc.getCore(desc.getName())) {
         
         // recover from local transaction log and wait for it to complete before
@@ -1105,7 +1103,7 @@ public class ZkController {
         // leader election perhaps?
         
         UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
-        boolean isTlogReplicaAndNotLeader = replicaType == Replica.Type.TLOG && !isLeader;
+        boolean isTlogReplicaAndNotLeader = replica.getType() == Replica.Type.TLOG && !isLeader;
         if (isTlogReplicaAndNotLeader) {
           String commitVersion = ReplicateFromLeader.getCommitVersion(core);
           if (commitVersion != null) {
@@ -1138,23 +1136,40 @@ public class ZkController {
           publish(desc, Replica.State.ACTIVE);
         }
 
-        if (isRunningInNewLIR && replicaType != Type.PULL) {
+        if (isRunningInNewLIR && replica.getType() != Type.PULL) {
+          // the watcher is added to a set so multiple calls of this method will left only one watcher
           shardTerms.addListener(new RecoveringCoreTermWatcher(core.getCoreDescriptor(), getCoreContainer()));
         }
         core.getCoreDescriptor().getCloudDescriptor().setHasRegistered(true);
+      } catch (Exception e) {
+        unregister(coreName, desc, false);
+        throw e;
       }
       
       // make sure we have an update cluster state right away
       zkStateReader.forceUpdateCollection(collection);
+      // the watcher is added to a set so multiple calls of this method will left only one watcher
+      zkStateReader.registerCollectionStateWatcher(cloudDesc.getCollectionName(),
+          new UnloadCoreOnDeletedWatcher(coreZkNodeName, shardId, desc.getName()));
       return shardId;
-    } catch (Exception e) {
-      unregister(coreName, desc, false);
-      throw e;
     } finally {
       MDCLoggingContext.clear();
     }
   }
 
+  private Replica getReplicaOrNull(DocCollection docCollection, String shard, String coreNodeName) {
+    if (docCollection == null) return null;
+
+    Slice slice = docCollection.getSlice(shard);
+    if (slice == null) return null;
+
+    Replica replica = slice.getReplica(coreNodeName);
+    if (replica == null) return null;
+    if (!getNodeName().equals(replica.getNodeName())) return null;
+
+    return replica;
+  }
+
   public void startReplicationFromLeader(String coreName, boolean switchTransactionLog) throws InterruptedException {
     log.info("{} starting background replication from leader", coreName);
     ReplicateFromLeader replicateFromLeader = new ReplicateFromLeader(cc, coreName);
@@ -1359,11 +1374,7 @@ public class ZkController {
   }
 
   public void publish(final CoreDescriptor cd, final Replica.State state) throws Exception {
-    publish(cd, state, true);
-  }
-
-  public void publish(final CoreDescriptor cd, final Replica.State state, boolean updateLastState) throws Exception {
-    publish(cd, state, updateLastState, false);
+    publish(cd, state, true, false);
   }
 
   /**
@@ -1430,6 +1441,9 @@ public class ZkController {
       props.put(ZkStateReader.SHARD_ID_PROP, cd.getCloudDescriptor().getShardId());
       props.put(ZkStateReader.COLLECTION_PROP, collection);
       props.put(ZkStateReader.REPLICA_TYPE, cd.getCloudDescriptor().getReplicaType().toString());
+      if (!Overseer.isLegacy(zkStateReader)) {
+        props.put(ZkStateReader.FORCE_SET_STATE_PROP, "false");
+      }
       if (numShards != null) {
         props.put(ZkStateReader.NUM_SHARDS_PROP, numShards.toString());
       }
@@ -1521,7 +1535,6 @@ public class ZkController {
       }
     }
     CloudDescriptor cloudDescriptor = cd.getCloudDescriptor();
-    zkStateReader.unregisterCore(cloudDescriptor.getCollectionName());
     if (removeCoreFromZk) {
       ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
           OverseerAction.DELETECORE.toLower(), ZkStateReader.CORE_NAME_PROP, coreName,
@@ -1653,7 +1666,6 @@ public class ZkController {
               "Collection {} not visible yet, but flagging it so a watch is registered when it becomes visible" :
               "Registering watch for collection {}",
           collectionName);
-      zkStateReader.registerCore(collectionName);
     } catch (KeeperException e) {
       log.error("", e);
       throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
@@ -2707,6 +2719,56 @@ public class ZkController {
     };
   }
 
+  private class UnloadCoreOnDeletedWatcher implements CollectionStateWatcher {
+    String coreNodeName;
+    String shard;
+    String coreName;
+
+    public UnloadCoreOnDeletedWatcher(String coreNodeName, String shard, String coreName) {
+      this.coreNodeName = coreNodeName;
+      this.shard = shard;
+      this.coreName = coreName;
+    }
+
+    @Override
+    // synchronized due to SOLR-11535
+    public synchronized boolean onStateChanged(Set<String> liveNodes, DocCollection collectionState) {
+      if (getCoreContainer().getCoreDescriptor(coreName) == null) return true;
+
+      boolean replicaRemoved = getReplicaOrNull(collectionState, shard, coreNodeName) == null;
+      if (replicaRemoved) {
+        try {
+          log.info("Replica {} removed from clusterstate, remove it.", coreName);
+          getCoreContainer().unload(coreName, true, true, true);
+        } catch (SolrException e) {
+          if (!e.getMessage().contains("Cannot unload non-existent core")) {
+            // no need to log if the core was already unloaded
+            log.warn("Failed to unregister core:{}", coreName, e);
+          }
+        } catch (Exception e) {
+          log.warn("Failed to unregister core:{}", coreName, e);
+        }
+      }
+      return replicaRemoved;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      UnloadCoreOnDeletedWatcher that = (UnloadCoreOnDeletedWatcher) o;
+      return Objects.equals(coreNodeName, that.coreNodeName) &&
+          Objects.equals(shard, that.shard) &&
+          Objects.equals(coreName, that.coreName);
+    }
+
+    @Override
+    public int hashCode() {
+
+      return Objects.hash(coreNodeName, shard, coreName);
+    }
+  }
+
   /**
    * Thrown during leader initiated recovery process if current node is not leader
    */

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/09db13f4/solr/core/src/java/org/apache/solr/core/ZkContainer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/ZkContainer.java b/solr/core/src/java/org/apache/solr/core/ZkContainer.java
index f89367f..34e5764 100644
--- a/solr/core/src/java/org/apache/solr/core/ZkContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/ZkContainer.java
@@ -222,22 +222,6 @@ public class ZkContainer {
   public ZkController getZkController() {
     return zkController;
   }
-  
-  public void publishCoresAsDown(List<SolrCore> cores) {
-    
-    for (SolrCore core : cores) {
-      try {
-        zkController.publish(core.getCoreDescriptor(), Replica.State.DOWN);
-      } catch (KeeperException e) {
-        ZkContainer.log.error("", e);
-      } catch (InterruptedException e) {
-        Thread.interrupted();
-        ZkContainer.log.error("", e);
-      } catch (Exception e) {
-        ZkContainer.log.error("", e);
-      }
-    }
-  }
 
   public void close() {
     

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/09db13f4/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 5f4bc01..c02271e 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -40,7 +40,6 @@ import org.apache.solr.api.Api;
 import org.apache.solr.client.solrj.SolrResponse;
 import org.apache.solr.client.solrj.impl.HttpSolrClient;
 import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder;
-import org.apache.solr.client.solrj.request.CoreAdminRequest;
 import org.apache.solr.client.solrj.request.CoreAdminRequest.RequestSyncShard;
 import org.apache.solr.client.solrj.response.RequestStatusState;
 import org.apache.solr.client.solrj.util.SolrIdentifierValidator;
@@ -282,7 +281,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
    * In SOLR-11739 we change the way the async IDs are checked to decide if one has
    * already been used or not. For backward compatibility, we continue to check in the
    * old way (meaning, in all the queues) for now. This extra check should be removed
-   * in Solr 9 
+   * in Solr 9
    */
   private static final boolean CHECK_ASYNC_ID_BACK_COMPAT_LOCATIONS = true;
 
@@ -306,7 +305,7 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
        }
 
        NamedList<String> r = new NamedList<>();
-       
+
        if (CHECK_ASYNC_ID_BACK_COMPAT_LOCATIONS && (
            coreContainer.getZkController().getOverseerCompletedMap().contains(asyncId) ||
            coreContainer.getZkController().getOverseerFailureMap().contains(asyncId) ||
@@ -1162,26 +1161,15 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
 
       // Wait till we have an active leader
       boolean success = false;
-      for (int i = 0; i < 10; i++) {
-        ZkCoreNodeProps zombieLeaderProps = getZombieLeader(zkController, collectionName, sliceId);
-        if (zombieLeaderProps != null) {
-          log.warn("A replica {} on node {} won the leader election, but not exist in clusterstate, " +
-                  "remove it and waiting for another round of election",
-              zombieLeaderProps.getCoreName(), zombieLeaderProps.getNodeName());
-          try (HttpSolrClient solrClient = new HttpSolrClient.Builder(zombieLeaderProps.getBaseUrl()).build()) {
-            CoreAdminRequest.unloadCore(zombieLeaderProps.getCoreName(), solrClient);
-          }
-          // waiting for another election round
-          i = 0;
-        }
-        clusterState = zkController.getClusterState();
+      for (int i = 0; i < 9; i++) {
+        Thread.sleep(5000);
+        clusterState = handler.coreContainer.getZkController().getClusterState();
         collection = clusterState.getCollection(collectionName);
         slice = collection.getSlice(sliceId);
         if (slice.getLeader() != null && slice.getLeader().getState() == State.ACTIVE) {
           success = true;
           break;
         }
-        Thread.sleep(5000);
         log.warn("Force leader attempt {}. Waiting 5 secs for an active leader. State of the slice: {}", (i + 1), slice);
       }
 
@@ -1198,25 +1186,6 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
     }
   }
 
-  /**
-   * Zombie leader is a replica won the election but does not exist in clusterstate
-   * @return null if the zombie leader does not exist
-   */
-  private static ZkCoreNodeProps getZombieLeader(ZkController zkController, String collection, String shardId) {
-    try {
-      ZkCoreNodeProps leaderProps = zkController.getLeaderProps(collection, shardId, 1000);
-      DocCollection docCollection = zkController.getClusterState().getCollection(collection);
-      Replica replica = docCollection.getReplica(leaderProps.getNodeProps().getStr(ZkStateReader.CORE_NODE_NAME_PROP));
-      if (replica == null) return leaderProps;
-      if (!replica.getNodeName().equals(leaderProps.getNodeName())) {
-        return leaderProps;
-      }
-      return null;
-    } catch (Exception e) {
-      return null;
-    }
-  }
-
   public static void waitForActiveCollection(String collectionName, CoreContainer cc, SolrResponse createCollResponse)
       throws KeeperException, InterruptedException {
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/09db13f4/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
index d9dbba0..8c11713 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
@@ -22,6 +22,7 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -34,11 +35,13 @@ import org.apache.solr.client.solrj.request.CoreStatus;
 import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.CollectionStateWatcher;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
 import org.apache.solr.common.cloud.ZkNodeProps;
 import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.cloud.ZkStateReaderAccessor;
 import org.apache.solr.common.util.TimeSource;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.ZkContainer;
@@ -86,12 +89,17 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
     assertTrue("Unexpected error message: " + e.getMessage(), e.getMessage().contains("state is 'active'"));
     assertTrue("Data directory for " + replica.getName() + " should not have been deleted", Files.exists(dataDir));
 
+    JettySolrRunner replicaJetty = cluster.getReplicaJetty(replica);
+    ZkStateReaderAccessor accessor = new ZkStateReaderAccessor(replicaJetty.getCoreContainer().getZkController().getZkStateReader());
+    Set<CollectionStateWatcher> watchers = accessor.getStateWatchers(collectionName);
     CollectionAdminRequest.deleteReplica(collectionName, shard.getName(), replica.getName())
         .process(cluster.getSolrClient());
     waitForState("Expected replica " + replica.getName() + " to have been removed", collectionName, (n, c) -> {
       Slice testShard = c.getSlice(shard.getName());
       return testShard.getReplica(replica.getName()) == null;
     });
+    // the core no longer watch collection state since it was removed
+    assertEquals(watchers.size() - 1, accessor.getStateWatchers(collectionName).size());
 
     assertFalse("Data directory for " + replica.getName() + " should have been removed", Files.exists(dataDir));
 
@@ -165,8 +173,63 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
   }
 
   @Test
+  public void deleteReplicaFromClusterState() throws Exception {
+    deleteReplicaFromClusterState("true");
+    deleteReplicaFromClusterState("false");
+    CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, null).process(cluster.getSolrClient());
+  }
+
+  public void deleteReplicaFromClusterState(String legacyCloud) throws Exception {
+    CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, legacyCloud).process(cluster.getSolrClient());
+    final String collectionName = "deleteFromClusterState_"+legacyCloud;
+    CollectionAdminRequest.createCollection(collectionName, "conf", 1, 3)
+        .process(cluster.getSolrClient());
+    cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1"));
+    cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2"));
+    cluster.getSolrClient().commit(collectionName);
+
+    Slice shard = getCollectionState(collectionName).getSlice("shard1");
+    Replica replica = getRandomReplica(shard);
+    JettySolrRunner replicaJetty = cluster.getReplicaJetty(replica);
+    ZkStateReaderAccessor accessor = new ZkStateReaderAccessor(replicaJetty.getCoreContainer().getZkController().getZkStateReader());
+    Set<CollectionStateWatcher> watchers = accessor.getStateWatchers(collectionName);
+
+    ZkNodeProps m = new ZkNodeProps(
+        Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
+        ZkStateReader.CORE_NAME_PROP, replica.getCoreName(),
+        ZkStateReader.NODE_NAME_PROP, replica.getNodeName(),
+        ZkStateReader.COLLECTION_PROP, collectionName,
+        ZkStateReader.CORE_NODE_NAME_PROP, replica.getName(),
+        ZkStateReader.BASE_URL_PROP, replica.getBaseUrl());
+    Overseer.getStateUpdateQueue(cluster.getZkClient()).offer(Utils.toJSON(m));
+
+    waitForState("Timeout waiting for replica get deleted", collectionName,
+        (liveNodes, collectionState) -> collectionState.getSlice("shard1").getReplicas().size() == 2);
+
+    TimeOut timeOut = new TimeOut(60, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+    timeOut.waitFor("Waiting for replica get unloaded", () ->
+        replicaJetty.getCoreContainer().getCoreDescriptor(replica.getCoreName()) == null
+    );
+    // the core no longer watch collection state since it was removed
+    timeOut = new TimeOut(60, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+    timeOut.waitFor("Waiting for watcher get removed", () ->
+        watchers.size() - 1 == accessor.getStateWatchers(collectionName).size()
+    );
+
+    CollectionAdminRequest.deleteCollection(collectionName).process(cluster.getSolrClient());
+  }
+
+  @Test
+  @Slow
   public void raceConditionOnDeleteAndRegisterReplica() throws Exception {
-    final String collectionName = "raceDeleteReplica";
+    raceConditionOnDeleteAndRegisterReplica("true");
+    raceConditionOnDeleteAndRegisterReplica("false");
+    CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, null).process(cluster.getSolrClient());
+  }
+
+  public void raceConditionOnDeleteAndRegisterReplica(String legacyCloud) throws Exception {
+    CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, legacyCloud).process(cluster.getSolrClient());
+    final String collectionName = "raceDeleteReplica_"+legacyCloud;
     CollectionAdminRequest.createCollection(collectionName, "conf", 1, 2)
         .process(cluster.getSolrClient());
     waitForState("Expected 1x2 collections", collectionName, clusterShape(1, 2));
@@ -246,15 +309,16 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
       ZkContainer.testing_beforeRegisterInZk = null;
     }
 
-
-    waitForState("Timeout for replica:"+replica1.getName()+" register itself as DOWN after failed to register", collectionName, (liveNodes, collectionState) -> {
-      Slice shard = collectionState.getSlice("shard1");
-      Replica replica = shard.getReplica(replica1.getName());
-      return replica != null && replica.getState() == DOWN;
-    });
-
-    CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
-        .process(cluster.getSolrClient());
+    while (true) {
+      try {
+        CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
+            .process(cluster.getSolrClient());
+        break;
+      } catch (Exception e) {
+        // expected, when the node is not fully started
+        Thread.sleep(500);
+      }
+    }
     waitForState("Expected 1x2 collections", collectionName, clusterShape(1, 2));
 
     String leaderJettyNodeName = leaderJetty.getNodeName();

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/09db13f4/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
index beaeb24..013434c 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java
@@ -63,81 +63,6 @@ public class ForceLeaderTest extends HttpPartitionTest {
   }
 
   /**
-   * Tests that FORCELEADER can get an active leader even in the case there are a replica won the election but not present in clusterstate
-   */
-  @Test
-  @Slow
-  public void testZombieLeader() throws Exception {
-    String testCollectionName = "forceleader_zombie_leader_collection";
-    createCollection(testCollectionName, "conf1", 1, 3, 1);
-    cloudClient.setDefaultCollection(testCollectionName);
-    try {
-      List<Replica> notLeaders = ensureAllReplicasAreActive(testCollectionName, SHARD1, 1, 3, maxWaitSecsToSeeAllActive);
-      assertEquals("Expected 2 replicas for collection " + testCollectionName
-          + " but found " + notLeaders.size() + "; clusterState: "
-          + printClusterStateInfo(testCollectionName), 2, notLeaders.size());
-      List<JettySolrRunner> notLeaderJetties = notLeaders.stream().map(rep -> getJettyOnPort(getReplicaPort(rep)))
-          .collect(Collectors.toList());
-
-      Replica leader = cloudClient.getZkStateReader().getLeaderRetry(testCollectionName, SHARD1);
-      JettySolrRunner leaderJetty = getJettyOnPort(getReplicaPort(leader));
-
-      // remove leader from clusterstate
-      ZkNodeProps m = new ZkNodeProps(
-          Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
-          ZkStateReader.CORE_NAME_PROP, leader.getCoreName(),
-          ZkStateReader.NODE_NAME_PROP, leader.getNodeName(),
-          ZkStateReader.COLLECTION_PROP, testCollectionName,
-          ZkStateReader.CORE_NODE_NAME_PROP, leader.getName(),
-          ZkStateReader.BASE_URL_PROP, leader.getBaseUrl());
-      Overseer.getStateUpdateQueue(cloudClient.getZkStateReader().getZkClient()).offer(Utils.toJSON(m));
-
-      boolean restartOtherReplicas = random().nextBoolean();
-      log.info("Starting test with restartOtherReplicas:{}", restartOtherReplicas);
-      if (restartOtherReplicas) {
-        for (JettySolrRunner notLeaderJetty : notLeaderJetties) {
-          notLeaderJetty.stop();
-        }
-      }
-      cloudClient.waitForState(testCollectionName, 30, TimeUnit.SECONDS,
-          (liveNodes, collectionState) -> collectionState.getReplicas().size() == 2);
-
-      if (restartOtherReplicas) {
-        for (JettySolrRunner notLeaderJetty : notLeaderJetties) {
-          notLeaderJetty.start();
-        }
-      }
-
-      log.info("Before forcing leader: " + cloudClient.getZkStateReader().getClusterState()
-          .getCollection(testCollectionName).getSlice(SHARD1));
-      doForceLeader(cloudClient, testCollectionName, SHARD1);
-
-      // By now we have an active leader. Wait for recoveries to begin
-      waitForRecoveriesToFinish(testCollectionName, cloudClient.getZkStateReader(), true);
-      ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
-      log.info("After forcing leader: " + clusterState.getCollection(testCollectionName).getSlice(SHARD1));
-
-      assertNull("Expected zombie leader get deleted", leaderJetty.getCoreContainer().getCore(leader.getCoreName()));
-      Replica newLeader = clusterState.getCollectionOrNull(testCollectionName).getSlice(SHARD1).getLeader();
-      assertNotNull(newLeader);
-      assertEquals(State.ACTIVE, newLeader.getState());
-
-      int numActiveReplicas = getNumberOfActiveReplicas(clusterState, testCollectionName, SHARD1);
-      assertEquals(2, numActiveReplicas);
-
-      // Assert that indexing works again
-      sendDoc(1);
-      cloudClient.commit();
-
-      assertDocsExistInAllReplicas(notLeaders, testCollectionName, 1, 1);
-    } finally {
-      log.info("Cleaning up after the test.");
-      // try to clean up
-      attemptCollectionDelete(cloudClient, testCollectionName);
-    }
-  }
-
-  /**
    * Tests that FORCELEADER can get an active leader even only replicas with term lower than leader's term are live
    */
   @Test

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/09db13f4/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
index 0879063..652a2e2 100644
--- a/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/MoveReplicaTest.java
@@ -60,9 +60,6 @@ import org.slf4j.LoggerFactory;
 public class MoveReplicaTest extends SolrCloudTestCase {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private static ZkStateReaderAccessor accessor;
-  private static int overseerLeaderIndex;
-
   // used by MoveReplicaHDFSTest
   protected boolean inPlaceMove = true;
 
@@ -78,14 +75,12 @@ public class MoveReplicaTest extends SolrCloudTestCase {
       JettySolrRunner jetty = cluster.getJettySolrRunner(i);
       if (jetty.getNodeName().equals(overseerLeader)) {
         overseerJetty = jetty;
-        overseerLeaderIndex = i;
         break;
       }
     }
     if (overseerJetty == null) {
       fail("no overseer leader!");
     }
-    accessor = new ZkStateReaderAccessor(overseerJetty.getCoreContainer().getZkController().getZkStateReader());
   }
 
   protected String getSolrXml() {
@@ -137,8 +132,6 @@ public class MoveReplicaTest extends SolrCloudTestCase {
       }
     }
 
-    Set<CollectionStateWatcher> watchers = new HashSet<>(accessor.getStateWatchers(coll));
-
     int sourceNumCores = getNumOfCores(cloudClient, replica.getNodeName(), coll);
     int targetNumCores = getNumOfCores(cloudClient, targetNode, coll);
 
@@ -201,9 +194,6 @@ public class MoveReplicaTest extends SolrCloudTestCase {
 
     assertEquals(100, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound());
 
-    Set<CollectionStateWatcher> newWatchers = new HashSet<>(accessor.getStateWatchers(coll));
-    assertEquals(watchers, newWatchers);
-
     moveReplica = createMoveReplicaRequest(coll, replica, targetNode, shardId);
     moveReplica.setInPlaceMove(inPlaceMove);
     moveReplica.process(cloudClient);
@@ -243,8 +233,6 @@ public class MoveReplicaTest extends SolrCloudTestCase {
       }
     }
     assertTrue("replica never fully recovered", recovered);
-    newWatchers = new HashSet<>(accessor.getStateWatchers(coll));
-    assertEquals(watchers, newWatchers);
 
     assertEquals(100, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound());
   }
@@ -258,8 +246,6 @@ public class MoveReplicaTest extends SolrCloudTestCase {
 
     CloudSolrClient cloudClient = cluster.getSolrClient();
 
-    Set<CollectionStateWatcher> watchers = new HashSet<>(accessor.getStateWatchers(coll));
-
     CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(coll, "conf1", 2, REPLICATION);
     create.setAutoAddReplicas(false);
     cloudClient.request(create);
@@ -303,9 +289,6 @@ public class MoveReplicaTest extends SolrCloudTestCase {
     }
     assertFalse(success);
 
-    Set<CollectionStateWatcher> newWatchers = new HashSet<>(accessor.getStateWatchers(coll));
-    assertEquals(watchers, newWatchers);
-
     log.info("--- current collection state: " + cloudClient.getZkStateReader().getClusterState().getCollection(coll));
     assertEquals(100, cluster.getSolrClient().query(coll, new SolrQuery("*:*")).getResults().getNumFound());
   }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/09db13f4/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
----------------------------------------------------------------------
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index b0b591a..7d5401d 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -1572,8 +1572,12 @@ public class ZkStateReader implements Closeable {
         return v;
       });
       for (CollectionStateWatcher watcher : watchers) {
-        if (watcher.onStateChanged(liveNodes, collectionState)) {
-          removeCollectionStateWatcher(collection, watcher);
+        try {
+          if (watcher.onStateChanged(liveNodes, collectionState)) {
+            removeCollectionStateWatcher(collection, watcher);
+          }
+        } catch (Throwable throwable) {
+          LOG.warn("Error on calling watcher", throwable);
         }
       }
     }