You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/03/16 23:55:49 UTC
lucene-solr:branch_7x: SOLR-12110: Replica which failed to register
in Zk can become leader
Repository: lucene-solr
Updated Branches:
refs/heads/branch_7x 43ad71eaa -> 911fda2ef
SOLR-12110: Replica which failed to register in Zk can become leader
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/911fda2e
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/911fda2e
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/911fda2e
Branch: refs/heads/branch_7x
Commit: 911fda2efd4d71a604c1815e7e0545bc66986eee
Parents: 43ad71e
Author: Cao Manh Dat <da...@apache.org>
Authored: Sat Mar 17 06:54:55 2018 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Sat Mar 17 06:55:36 2018 +0700
----------------------------------------------------------------------
solr/CHANGES.txt | 2 +
.../org/apache/solr/cloud/ZkController.java | 26 ++--
.../java/org/apache/solr/core/ZkContainer.java | 6 +
.../apache/solr/cloud/DeleteReplicaTest.java | 143 +++++++++++++++++++
4 files changed, 168 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/911fda2e/solr/CHANGES.txt
----------------------------------------------------------------------
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 2be1adc..c5f71f6 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -310,6 +310,8 @@ Bug Fixes
* SOLR-12063: Fix PeerSync, Leader Election failures and CDCR checkpoint inconsistencies on a cluster running CDCR
(Amrit Sarkar, Varun Thacker)
+* SOLR-12110: Replica which failed to register in Zk can become leader (Cao Manh Dat)
+
Optimizations
----------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/911fda2e/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index a94bfbc..7a908b2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -1146,6 +1146,9 @@ public class ZkController {
// make sure we have an update cluster state right away
zkStateReader.forceUpdateCollection(collection);
return shardId;
+ } catch (Exception e) {
+ unregister(coreName, desc, false);
+ throw e;
} finally {
MDCLoggingContext.clear();
}
@@ -1493,6 +1496,10 @@ public class ZkController {
}
public void unregister(String coreName, CoreDescriptor cd) throws Exception {
+ unregister(coreName, cd, true);
+ }
+
+ public void unregister(String coreName, CoreDescriptor cd, boolean removeCoreFromZk) throws Exception {
final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
final String collection = cd.getCloudDescriptor().getCollectionName();
getCollectionTerms(collection).remove(cd.getCloudDescriptor().getShardId(), cd);
@@ -1504,7 +1511,7 @@ public class ZkController {
}
final DocCollection docCollection = zkStateReader.getClusterState().getCollectionOrNull(collection);
Replica replica = (docCollection == null) ? null : docCollection.getReplica(coreNodeName);
-
+
if (replica == null || replica.getType() != Type.PULL) {
ElectionContext context = electionContexts.remove(new ContextKey(collection, coreNodeName));
@@ -1514,14 +1521,15 @@ public class ZkController {
}
CloudDescriptor cloudDescriptor = cd.getCloudDescriptor();
zkStateReader.unregisterCore(cloudDescriptor.getCollectionName());
-
- ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
- OverseerAction.DELETECORE.toLower(), ZkStateReader.CORE_NAME_PROP, coreName,
- ZkStateReader.NODE_NAME_PROP, getNodeName(),
- ZkStateReader.COLLECTION_PROP, cloudDescriptor.getCollectionName(),
- ZkStateReader.BASE_URL_PROP, getBaseUrl(),
- ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
- overseerJobQueue.offer(Utils.toJSON(m));
+ if (removeCoreFromZk) {
+ ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION,
+ OverseerAction.DELETECORE.toLower(), ZkStateReader.CORE_NAME_PROP, coreName,
+ ZkStateReader.NODE_NAME_PROP, getNodeName(),
+ ZkStateReader.COLLECTION_PROP, cloudDescriptor.getCollectionName(),
+ ZkStateReader.BASE_URL_PROP, getBaseUrl(),
+ ZkStateReader.CORE_NODE_NAME_PROP, coreNodeName);
+ overseerJobQueue.offer(Utils.toJSON(m));
+ }
}
public void createCollection(String collection) throws Exception {
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/911fda2e/solr/core/src/java/org/apache/solr/core/ZkContainer.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/core/ZkContainer.java b/solr/core/src/java/org/apache/solr/core/ZkContainer.java
index 37155ca..f89367f 100644
--- a/solr/core/src/java/org/apache/solr/core/ZkContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/ZkContainer.java
@@ -26,6 +26,7 @@ import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
+import java.util.function.Predicate;
import org.apache.solr.cloud.CurrentCoreDescriptorProvider;
import org.apache.solr.cloud.SolrZkServer;
@@ -173,11 +174,16 @@ public class ZkContainer {
return zkRun.substring(0, zkRun.lastIndexOf('/'));
}
+ public static Predicate<CoreDescriptor> testing_beforeRegisterInZk;
+
public void registerInZk(final SolrCore core, boolean background, boolean skipRecovery) {
Runnable r = () -> {
MDCLoggingContext.setCore(core);
try {
try {
+ if (testing_beforeRegisterInZk != null) {
+ testing_beforeRegisterInZk.test(core.getCoreDescriptor());
+ }
zkController.register(core.getName(), core.getCoreDescriptor(), skipRecovery);
} catch (InterruptedException e) {
// Restore the interrupted status
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/911fda2e/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
index 4c6253e..3eafdb5 100644
--- a/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/DeleteReplicaTest.java
@@ -16,23 +16,41 @@
*/
package org.apache.solr.cloud;
+import java.lang.invoke.MethodHandles;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.EnumSet;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.CoreStatus;
+import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkNodeProps;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.util.TimeSource;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.ZkContainer;
+import org.apache.solr.util.TimeOut;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.solr.common.cloud.Replica.State.DOWN;
public class DeleteReplicaTest extends SolrCloudTestCase {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(4)
@@ -141,5 +159,130 @@ public class DeleteReplicaTest extends SolrCloudTestCase {
}
+ @Test
+ public void raceConditionOnDeleteAndRegisterReplica() throws Exception {
+ final String collectionName = "raceDeleteReplica";
+ CollectionAdminRequest.createCollection(collectionName, "conf", 1, 2)
+ .process(cluster.getSolrClient());
+ waitForState("Expected 1x2 collections", collectionName, clusterShape(1, 2));
+
+ Slice shard1 = getCollectionState(collectionName).getSlice("shard1");
+ Replica leader = shard1.getLeader();
+ JettySolrRunner leaderJetty = getJettyForReplica(leader);
+ Replica replica1 = shard1.getReplicas(replica -> !replica.getName().equals(leader.getName())).get(0);
+ assertFalse(replica1.getName().equals(leader.getName()));
+
+ JettySolrRunner replica1Jetty = getJettyForReplica(replica1);
+
+ String replica1JettyNodeName = replica1Jetty.getNodeName();
+
+ Semaphore waitingForReplicaGetDeleted = new Semaphore(0);
+ // for safety, we only want this hook get triggered one time
+ AtomicInteger times = new AtomicInteger(0);
+ ZkContainer.testing_beforeRegisterInZk = cd -> {
+ if (cd.getCloudDescriptor() == null) return false;
+ if (replica1.getName().equals(cd.getCloudDescriptor().getCoreNodeName())
+ && collectionName.equals(cd.getCloudDescriptor().getCollectionName())) {
+ if (times.incrementAndGet() > 1) {
+ return false;
+ }
+ LOG.info("Running delete core {}",cd);
+ try {
+ ZkNodeProps m = new ZkNodeProps(
+ Overseer.QUEUE_OPERATION, OverseerAction.DELETECORE.toLower(),
+ ZkStateReader.CORE_NAME_PROP, replica1.getCoreName(),
+ ZkStateReader.NODE_NAME_PROP, replica1.getNodeName(),
+ ZkStateReader.COLLECTION_PROP, collectionName,
+ ZkStateReader.CORE_NODE_NAME_PROP, replica1.getName(),
+ ZkStateReader.BASE_URL_PROP, replica1.getBaseUrl());
+ Overseer.getStateUpdateQueue(cluster.getZkClient()).offer(Utils.toJSON(m));
+
+ boolean replicaDeleted = false;
+ TimeOut timeOut = new TimeOut(20, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+ while (!timeOut.hasTimedOut()) {
+ try {
+ ZkStateReader stateReader = replica1Jetty.getCoreContainer().getZkController().getZkStateReader();
+ stateReader.forceUpdateCollection(collectionName);
+ Slice shard = stateReader.getClusterState().getCollection(collectionName).getSlice("shard1");
+ LOG.error("Datcm get slice on 211 {}", shard);
+ if (shard.getReplicas().size() == 1) {
+ replicaDeleted = true;
+ waitingForReplicaGetDeleted.release();
+ break;
+ }
+ Thread.sleep(500);
+ } catch (NullPointerException | SolrException e) {
+ e.printStackTrace();
+ Thread.sleep(500);
+ }
+ }
+ if (!replicaDeleted) {
+ fail("Timeout for waiting replica get deleted");
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Failed to delete replica");
+ } finally {
+ //avoiding deadlock
+ waitingForReplicaGetDeleted.release();
+ }
+ return true;
+ }
+ return false;
+ };
+
+ try {
+ replica1Jetty.stop();
+ waitForNodeLeave(replica1JettyNodeName);
+ waitForState("Expected replica:"+replica1+" get down", collectionName, (liveNodes, collectionState)
+ -> collectionState.getSlice("shard1").getReplica(replica1.getName()).getState() == DOWN);
+ replica1Jetty.start();
+ waitingForReplicaGetDeleted.acquire();
+ } finally {
+ ZkContainer.testing_beforeRegisterInZk = null;
+ }
+
+
+ waitForState("Timeout for replica:"+replica1.getName()+" register itself as DOWN after failed to register", collectionName, (liveNodes, collectionState) -> {
+ Slice shard = collectionState.getSlice("shard1");
+ Replica replica = shard.getReplica(replica1.getName());
+ return replica != null && replica.getState() == DOWN;
+ });
+
+ CollectionAdminRequest.addReplicaToShard(collectionName, "shard1")
+ .process(cluster.getSolrClient());
+ waitForState("Expected 1x2 collections", collectionName, clusterShape(1, 2));
+
+ String leaderJettyNodeName = leaderJetty.getNodeName();
+ leaderJetty.stop();
+ waitForNodeLeave(leaderJettyNodeName);
+
+ waitForState("Expected new active leader", collectionName, (liveNodes, collectionState) -> {
+ Slice shard = collectionState.getSlice("shard1");
+ Replica newLeader = shard.getLeader();
+ return newLeader != null && newLeader.getState() == Replica.State.ACTIVE && !newLeader.getName().equals(leader.getName());
+ });
+
+ leaderJetty.start();
+
+ CollectionAdminRequest.deleteCollection(collectionName).process(cluster.getSolrClient());
+ }
+
+ private JettySolrRunner getJettyForReplica(Replica replica) {
+ for (JettySolrRunner jetty : cluster.getJettySolrRunners()) {
+ if (jetty.getNodeName().equals(replica.getNodeName())) return jetty;
+ }
+ throw new IllegalArgumentException("Can not find jetty for replica "+ replica);
+ }
+
+
+ private void waitForNodeLeave(String lostNodeName) throws InterruptedException {
+ ZkStateReader reader = cluster.getSolrClient().getZkStateReader();
+ TimeOut timeOut = new TimeOut(20, TimeUnit.SECONDS, TimeSource.NANO_TIME);
+ while (reader.getClusterState().getLiveNodes().contains(lostNodeName)) {
+ Thread.sleep(100);
+ if (timeOut.hasTimedOut()) fail("Wait for " + lostNodeName + " to leave failed!");
+ }
+ }
}