You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/03/16 23:56:21 UTC

lucene-solr:branch_7_3: SOLR-12110: Replica which failed to register in Zk can become leader

Repository: lucene-solr
Updated Branches:
  refs/heads/branch_7_3 f5a6e2ea4 -> 8a3742d2e


SOLR-12110: Replica which failed to register in Zk can become leader


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/8a3742d2
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/8a3742d2
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/8a3742d2

Branch: refs/heads/branch_7_3
Commit: 8a3742d2ee342ee60a6ed822e36fbdf66e0b5b97
Parents: f5a6e2e
Author: Cao Manh Dat <da...@apache.org>
Authored: Sat Mar 17 06:54:55 2018 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Sat Mar 17 06:56:08 2018 +0700

----------------------------------------------------------------------
 solr/CHANGES.txt                                |   2 +
 .../org/apache/solr/cloud/ZkController.java     |  26 ++--
 .../java/org/apache/solr/core/ZkContainer.java  |   6 +
 .../apache/solr/cloud/DeleteReplicaTest.java    | 143 +++++++++++++++++++
 4 files changed, 168 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8a3742d2/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 35302fc..cf2f2a3 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -259,6 +259,8 @@ Bug Fixes
   missing=true caused an NPE or AIOOBE. (Karthik Ramachandran via yonik)
 
 
+* SOLR-12110: Replica which failed to register in Zk can become leader (Cao Manh Dat)
+
 Optimizations
 ----------------------
 

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8a3742d2/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index b7b0b62..af1d401 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -1146,6 +1146,9 @@ public class ZkController {
       // make sure we have an update cluster state right away
       zkStateReader.forceUpdateCollection(collection);
       return shardId;
+    } catch (Exception e) {
+      unregister(coreName, desc, false);
+      throw e;
     } finally {
       MDCLoggingContext.clear();
     }
@@ -1493,6 +1496,10 @@ public class ZkController {
   }
 
   public void unregister(String coreName, CoreDescriptor cd) throws Exception {
+    unregister(coreName, cd, true);
+  }
+
+  public void unregister(String coreName, CoreDescriptor cd, boolean removeCoreFromZk) throws Exception {
     final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
     final String collection = cd.getCloudDescriptor().getCollectionName();
     getCollectionTerms(collection).remove(cd.getCloudDescriptor().getShardId(), cd);
@@ -1504,7 +1511,7 @@ public class ZkController {
     }
     final DocCollection docCollection = zkStateReader.getClusterState().getCollectionOrNull(collection);
     Replica replica = (docCollection == null) ? null : docCollection.getReplica(coreNodeName);
-    
+
     if (replica == null || replica.getType() != Type.PULL) {
       ElectionContext context = electionContexts.remove(new ContextKey(collection, coreNodeName));
 
@@ -1514,14 +1521,15 @@ public class ZkController {
     }
     CloudDescriptor cloudDescriptor = cd.getCloudDescriptor();
     zkStateReader.unregisterCore(cloudDescriptor.getCollectionName());
-
-    ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
-        OverseerAction.DELETECORE.toLower(), ZkStateReader.CORE_NAME_PROP, coreName,
-        ZkStateReader.NODE_NAME_PROP, getNodeName(),
-        ZkStateReader.COLLECTION_PROP, cloudDescriptor.getCollectionName(),
-        ZkStateReader.BASE_URL_PROP, getBaseUrl(),
-        ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
-    overseerJobQueue.offer(Utils.toJSON(m));
+    if (removeCoreFromZk) {
+      ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
+          OverseerAction.DELETECORE.toLower(), ZkStateReader.CORE_NAME_PROP, coreName,
+          ZkStateReader.NODE_NAME_PROP, getNodeName(),
+          ZkStateReader.COLLECTION_PROP, cloudDescriptor.getCollectionName(),
+          ZkStateReader.BASE_URL_PROP, getBaseUrl(),
+          ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
+      overseerJobQueue.offer(Utils.toJSON(m));
+    }
   }
 
   public void createCollection(String collection) throws Exception {

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8a3742d2/solr/core/src/java/org/apache/solr/core/ZkContainer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/ZkContainer.java b/solr/core/src/java/org/apache/solr/core/ZkContainer.java
index 37155ca..f89367f 100644
--- a/solr/core/src/java/org/apache/solr/core/ZkContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/ZkContainer.java
@@ -26,6 +26,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
 
 import org.apache.solr.cloud.CurrentCoreDescriptorProvider;
 import org.apache.solr.cloud.SolrZkServer;
@@ -173,11 +174,16 @@ public class ZkContainer {
     return zkRun.substring(0, zkRun.lastIndexOf('/'));
   }
 
+  public static Predicate<CoreDescriptor> testing_beforeRegisterInZk;
+
   public void registerInZk(final SolrCore core, boolean background, boolean skipRecovery) {
     Runnable r = () -> {
       MDCLoggingContext.setCore(core);
       try {
         try {
+          if (testing_beforeRegisterInZk != null) {
+            testing_beforeRegisterInZk.test(core.getCoreDescriptor());
+          }
           zkController.register(core.getName(), core.getCoreDescriptor(), skipRecovery);
         } catch (InterruptedException e) {
           // Restore the interrupted status

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/8a3742d2/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
index 4c6253e..3eafdb5 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
@@ -16,23 +16,41 @@
  */
 package org.apache.solr.cloud;
 
+import java.lang.invoke.MethodHandles;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.EnumSet;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.CoreStatus;
+import org.apache.solr.cloud.overseer.OverseerAction;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.DocCollection;
 import org.apache.solr.common.cloud.Replica;
 import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.ZkContainer;
+import org.apache.solr.util.TimeOut;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.cloud.Replica.State.DOWN;
 
 
 public class DeleteReplicaTest extends SolrCloudTestCase {
 
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   @BeforeClass
   public static void setupCluster() throws Exception {
     configureCluster(4)
@@ -141,5 +159,130 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
 
   }
 
+  @Test
+  public void raceConditionOnDeleteAndRegisterReplica() throws Exception {
+    final String collectionName = "raceDeleteReplica";
+    CollectionAdminRequest.createCollection(collectionName, "conf", 1, 2)
+        .process(cluster.getSolrClient());
+    waitForState("Expected 1x2 collections", collectionName, clusterShape(1, 2));
+
+    Slice shard1 = getCollectionState(collectionName).getSlice("shard1");
+    Replica leader = shard1.getLeader();
+    JettySolrRunner leaderJetty = getJettyForReplica(leader);
+    Replica replica1 = shard1.getReplicas(replica -> !replica.getName().equals(leader.getName())).get(0);
+    assertFalse(replica1.getName().equals(leader.getName()));
+
+    JettySolrRunner replica1Jetty = getJettyForReplica(replica1);
+
+    String replica1JettyNodeName = replica1Jetty.getNodeName();
+
+    Semaphore waitingForReplicaGetDeleted = new Semaphore(0);
+    // for safety, we only want this hook get triggered one time
+    AtomicInteger times = new AtomicInteger(0);
+    ZkContainer.testing_beforeRegisterInZk = cd -> {
+      if (cd.getCloudDescriptor() == null) return false;
+      if (replica1.getName().equals(cd.getCloudDescriptor().getCoreNodeName())
+          && collectionName.equals(cd.getCloudDescriptor().getCollectionName())) {
+        if (times.incrementAndGet() > 1) {
+          return false;
+        }
+        LOG.info("Running delete core {}",cd);
+        try {
+          ZkNodeProps m = new ZkNodeProps(
+              Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
+              ZkStateReader.CORE_NAME_PROP, replica1.getCoreName(),
+              ZkStateReader.NODE_NAME_PROP, replica1.getNodeName(),
+              ZkStateReader.COLLECTION_PROP, collectionName,
+              ZkStateReader.CORE_NODE_NAME_PROP, replica1.getName(),
+              ZkStateReader.BASE_URL_PROP, replica1.getBaseUrl());
+          Overseer.getStateUpdateQueue(cluster.getZkClient()).offer(Utils.toJSON(m));
+
+          boolean replicaDeleted = false;
+          TimeOut timeOut = new TimeOut(20, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+          while (!timeOut.hasTimedOut()) {
+            try {
+              ZkStateReader stateReader = replica1Jetty.getCoreContainer().getZkController().getZkStateReader();
+              stateReader.forceUpdateCollection(collectionName);
+              Slice shard = stateReader.getClusterState().getCollection(collectionName).getSlice("shard1");
+              LOG.error("Datcm get slice on 211 {}", shard);
+              if (shard.getReplicas().size() == 1) {
+                replicaDeleted = true;
+                waitingForReplicaGetDeleted.release();
+                break;
+              }
+              Thread.sleep(500);
+            } catch (NullPointerException | SolrException e) {
+              e.printStackTrace();
+              Thread.sleep(500);
+            }
+          }
+          if (!replicaDeleted) {
+            fail("Timeout for waiting replica get deleted");
+          }
+        } catch (Exception e) {
+          e.printStackTrace();
+          fail("Failed to delete replica");
+        } finally {
+          //avoiding deadlock
+          waitingForReplicaGetDeleted.release();
+        }
+        return true;
+      }
+      return false;
+    };
+
+    try {
+      replica1Jetty.stop();
+      waitForNodeLeave(replica1JettyNodeName);
+      waitForState("Expected replica:"+replica1+" get down", collectionName, (liveNodes, collectionState)
+          -> collectionState.getSlice("shard1").getReplica(replica1.getName()).getState() == DOWN);
+      replica1Jetty.start();
+      waitingForReplicaGetDeleted.acquire();
+    } finally {
+      ZkContainer.testing_beforeRegisterInZk = null;
+    }
+
+
+    waitForState("Timeout for replica:"+replica1.getName()+" register itself as DOWN after failed to register", collectionName, (liveNodes, collectionState) -> {
+      Slice shard = collectionState.getSlice("shard1");
+      Replica replica = shard.getReplica(replica1.getName());
+      return replica != null && replica.getState() == DOWN;
+    });
+
+    CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
+        .process(cluster.getSolrClient());
+    waitForState("Expected 1x2 collections", collectionName, clusterShape(1, 2));
+
+    String leaderJettyNodeName = leaderJetty.getNodeName();
+    leaderJetty.stop();
+    waitForNodeLeave(leaderJettyNodeName);
+
+    waitForState("Expected new active leader", collectionName, (liveNodes, collectionState) -> {
+      Slice shard = collectionState.getSlice("shard1");
+      Replica newLeader = shard.getLeader();
+      return newLeader != null && newLeader.getState() == Replica.State.ACTIVE && !newLeader.getName().equals(leader.getName());
+    });
+
+    leaderJetty.start();
+
+    CollectionAdminRequest.deleteCollection(collectionName).process(cluster.getSolrClient());
+  }
+
+  private JettySolrRunner getJettyForReplica(Replica replica) {
+    for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
+      if (jetty.getNodeName().equals(replica.getNodeName())) return jetty;
+    }
+    throw new IllegalArgumentException("Can not find jetty for replica "+ replica);
+  }
+
+
+  private void waitForNodeLeave(String lostNodeName) throws InterruptedException {
+    ZkStateReader reader = cluster.getSolrClient().getZkStateReader();
+    TimeOut timeOut = new TimeOut(20, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+    while (reader.getClusterState().getLiveNodes().contains(lostNodeName)) {
+      Thread.sleep(100);
+      if (timeOut.hasTimedOut()) fail("Wait for " + lostNodeName + " to leave failed!");
+    }
+  }
 }