You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2018/01/09 08:10:59 UTC
[1/2] lucene-solr:jira/solr-11702: SOLR-11702: Fix bug in case of
reconnect to ZK
Repository: lucene-solr
Updated Branches:
refs/heads/jira/solr-11702 9fd8c4494 -> 030fcae48
SOLR-11702: Fix bug in case of reconnect to ZK
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/09b70276
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/09b70276
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/09b70276
Branch: refs/heads/jira/solr-11702
Commit: 09b7027610b5111aee407e396dcb309301d14c15
Parents: 9fd8c44
Author: Cao Manh Dat <da...@apache.org>
Authored: Tue Jan 9 15:10:00 2018 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Tue Jan 9 15:10:00 2018 +0700
----------------------------------------------------------------------
.../org/apache/solr/cloud/ZkController.java | 9 ++--
.../org/apache/solr/cloud/ZkShardTerms.java | 57 ++++++++++++--------
.../org/apache/solr/cloud/ZkShardTermsTest.java | 57 +++++++++++++++++++-
3 files changed, 97 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/09b70276/solr/core/src/java/org/apache/solr/cloud/ZkController.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 2e495d3..b3d6bb1 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -1036,8 +1036,11 @@ public class ZkController {
final String coreZkNodeName = desc.getCloudDescriptor().getCoreNodeName();
assert coreZkNodeName != null : "we should have a coreNodeName by now";
+ ZkShardTerms shardTerms = getShardTerms(collection, cloudDesc.getShardId());
if ("new".equals(desc.getCoreProperty("lirVersion", "new"))) {
- getShardTerms(collection, cloudDesc.getShardId()).registerTerm(coreZkNodeName);
+ // this call is useful in case of reconnecting to ZK
+ shardTerms.refreshTerms(true);
+ shardTerms.registerTerm(coreZkNodeName);
}
String shardId = cloudDesc.getShardId();
Map<String,Object> props = new HashMap<>();
@@ -1131,7 +1134,7 @@ public class ZkController {
}
if ("new".equals(desc.getCoreProperty("lirVersion", "new"))) {
- getShardTerms(collection, shardId).addListener(new RecoveringCoreTermWatcher(core));
+ shardTerms.addListener(new RecoveringCoreTermWatcher(core));
}
core.getCoreDescriptor().getCloudDescriptor().setHasRegistered(true);
}
@@ -1464,7 +1467,7 @@ public class ZkController {
return getCollectionTerms(collection).getShard(shardId);
}
- public ZkCollectionTerms getCollectionTerms(String collection) {
+ private ZkCollectionTerms getCollectionTerms(String collection) {
synchronized (collectionToTerms) {
if (!collectionToTerms.containsKey(collection)) collectionToTerms.put(collection, new ZkCollectionTerms(collection, zkClient));
return collectionToTerms.get(collection);
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/09b70276/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
index f1f6ae7..d0a8b63 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -83,7 +83,7 @@ public class ZkShardTerms implements AutoCloseable{
this.shard = shard;
this.zkClient = zkClient;
ensureTermNodeExist();
- refreshTerms();
+ refreshTerms(false);
ObjectReleaseTracker.track(this);
}
@@ -91,7 +91,6 @@ public class ZkShardTerms implements AutoCloseable{
* Ensure that leader's term is lower than some replica's terms
* @param leader coreNodeName of leader
* @param replicasInLowerTerms replicas which should their term should be lower than leader's term
- * @return
*/
public void ensureTermsIsHigher(String leader, Set<String> replicasInLowerTerms) {
Terms newTerms;
@@ -120,7 +119,7 @@ public class ZkShardTerms implements AutoCloseable{
public void close() {
// no watcher will be registered
- numWatcher.addAndGet(1);
+ numWatcher.set(2);
ObjectReleaseTracker.release(this);
}
@@ -163,7 +162,7 @@ public class ZkShardTerms implements AutoCloseable{
}
/**
- * Register a repilca's term (term value will be 0).
+ * Register a replica's term (term value will be 0).
* If a term is already associate with this replica do nothing
* @param coreNodeName of the replica
*/
@@ -192,6 +191,11 @@ public class ZkShardTerms implements AutoCloseable{
}
}
+ // package private for testing, only used by tests
+ int getNumWatcher() {
+ return numWatcher.get();
+ }
+
/**
* Set new terms to ZK.
* In case of correspond ZK term node is not created, create it
@@ -221,7 +225,7 @@ public class ZkShardTerms implements AutoCloseable{
return true;
} catch (KeeperException.BadVersionException e) {
log.info("Failed to save terms, version is not match, retrying");
- refreshTerms();
+ refreshTerms(false);
} catch (KeeperException.NoNodeException e) {
throw e;
} catch (Exception e) {
@@ -264,27 +268,36 @@ public class ZkShardTerms implements AutoCloseable{
* Fetch the latest terms from ZK.
* This method will atomically register a watcher to the correspond ZK term node,
* so {@link ZkShardTerms#terms} will stay up to date.
+ * @param earlyStop early stop if the ZK node is already watched
*/
- private void refreshTerms() {
- try {
+ public void refreshTerms(boolean earlyStop) {
+ Terms newTerms;
+ synchronized (numWatcher) {
+ // This block is synchronized because we want only one and at least one valid watcher is watching the term node.
Watcher watcher = null;
- if (numWatcher.compareAndSet(0, 1)) {
- watcher = event -> {
- numWatcher.decrementAndGet();
- refreshTerms();
- };
- }
+ try {
+ if (numWatcher.compareAndSet(0, 1)) {
+ watcher = event -> {
+ numWatcher.compareAndSet(1, 0);
+ refreshTerms(false);
+ };
+ }
- Stat stat = new Stat();
- byte[] data = zkClient.getData(znodePath, watcher, stat, true);
- Terms newTerms = new Terms((Map<String, Long>) Utils.fromJSON(data), stat.getVersion());
- setNewTerms(newTerms);
- } catch (InterruptedException e) {
- Thread.interrupted();
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection:" + collection, e);
- } catch (KeeperException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection:" + collection, e);
+ if (earlyStop && watcher == null) return;
+
+ Stat stat = new Stat();
+ byte[] data = zkClient.getData(znodePath, watcher, stat, true);
+ newTerms = new Terms((Map<String, Long>) Utils.fromJSON(data), stat.getVersion());
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection:" + collection, e);
+ } catch (KeeperException e) {
+ // if an keeper exception is thrown, the watcher will never be called
+ if (watcher != null) numWatcher.compareAndSet(1, 0);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection:" + collection, e);
+ }
}
+ setNewTerms(newTerms);
}
/**
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/09b70276/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
----------------------------------------------------------------------
diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
index fc96687..8d8db94 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
@@ -23,18 +23,25 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.common.cloud.DefaultConnectionStrategy;
+import org.apache.solr.common.cloud.OnReconnect;
+import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.util.TimeOut;
+import org.apache.zookeeper.KeeperException;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
@@ -184,6 +191,54 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
replicaTerms.close();
}
+ public void testCoreTermWatcherOnLosingZKConnection() throws InterruptedException, IOException, KeeperException, TimeoutException {
+ String collection = "testCoreTermWatcherOnLosingZKConnection";
+
+ String zkDir = createTempDir("zkData").toFile().getAbsolutePath();
+ ZkTestServer server = new ZkTestServer(zkDir);
+ try {
+ server.run();
+ try (SolrZkClient zkClient = new SolrZkClient(server.getZkAddress(), 1500)) {
+ zkClient.makePath("/", true);
+ zkClient.makePath("/collections", true);
+ }
+
+ try (SolrZkClient leaderZkClient = new SolrZkClient(server.getZkAddress(), 1500);
+ ZkShardTerms leaderTerms = new ZkShardTerms(collection, "shard1", leaderZkClient)) {
+ leaderTerms.registerTerm("leader");
+ AtomicInteger count = new AtomicInteger(0);
+ Set<ZkShardTerms> shardTerms = new HashSet<>();
+ OnReconnect onReconnect = () -> {
+ log.info("On reconnect {}", shardTerms);
+ shardTerms.iterator().next().refreshTerms(true);
+ };
+ try (SolrZkClient replicaZkClient = new SolrZkClient(server.getZkAddress(), 1500, 1500, new DefaultConnectionStrategy(), onReconnect);
+ ZkShardTerms replicaTerms = new ZkShardTerms(collection, "shard1", replicaZkClient)) {
+ shardTerms.add(replicaTerms);
+ replicaTerms.addListener(terms -> {
+ count.incrementAndGet();
+ return true;
+ });
+ replicaTerms.registerTerm("replica");
+ waitFor(1, count::get);
+ server.expire(replicaZkClient.getSolrZooKeeper().getSessionId());
+ leaderTerms.ensureTermsIsHigher("leader", Collections.singleton("replica"));
+ replicaZkClient.getConnectionManager().waitForDisconnected(10000);
+ replicaZkClient.getConnectionManager().waitForConnected(10000);
+ waitFor(2, count::get);
+ waitFor(1, replicaTerms::getNumWatcher);
+ replicaTerms.setEqualsToMax("replica");
+ waitFor(3, count::get);
+ waitFor(1L, () -> leaderTerms.getTerms().get("replica"));
+ leaderTerms.ensureTermsIsHigher("leader", Collections.singleton("replica"));
+ waitFor(4, count::get);
+ }
+ }
+ } finally {
+ server.shutdown();
+ }
+ }
+
public void testEnsureTermsIsHigher() {
Map<String, Long> map = new HashMap<>();
map.put("leader", 0L);
@@ -193,7 +248,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
}
private <T> void waitFor(T expected, Supplier<T> supplier) throws InterruptedException {
- TimeOut timeOut = new TimeOut(2, TimeUnit.SECONDS, new TimeSource.CurrentTimeSource());
+ TimeOut timeOut = new TimeOut(10, TimeUnit.SECONDS, new TimeSource.CurrentTimeSource());
while (!timeOut.hasTimedOut()) {
if (expected == supplier.get()) return;
Thread.sleep(100);
[2/2] lucene-solr:jira/solr-11702: SOLR-11702: The leader should
refresh its term in PrepREcoveryOp
Posted by da...@apache.org.
SOLR-11702: The leader should refresh its term in PrepREcoveryOp
Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/030fcae4
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/030fcae4
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/030fcae4
Branch: refs/heads/jira/solr-11702
Commit: 030fcae4871f97139117d8b84b34d43e5f734bb3
Parents: 09b7027
Author: Cao Manh Dat <da...@apache.org>
Authored: Tue Jan 9 15:10:47 2018 +0700
Committer: Cao Manh Dat <da...@apache.org>
Committed: Tue Jan 9 15:10:47 2018 +0700
----------------------------------------------------------------------
.../java/org/apache/solr/handler/admin/PrepRecoveryOp.java | 7 +++++++
1 file changed, 7 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/030fcae4/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
----------------------------------------------------------------------
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
index 0a6d5ce..62e66a6 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
@@ -21,6 +21,7 @@ import java.lang.invoke.MethodHandles;
import java.util.Objects;
import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.ZkShardTerms;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
@@ -124,6 +125,12 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
log.warn("Leader " + core.getName() + " ignoring request to be in the recovering state because it is live and active.");
}
+ ZkShardTerms shardTerms = coreContainer.getZkController().getShardTerms(collectionName, slice.getName());
+ // if the replica is waiting for leader to see recovery state, the leader should refresh its terms
+ if (waitForState == Replica.State.RECOVERING && shardTerms.registered(coreNodeName) && !shardTerms.canBecomeLeader(coreNodeName)) {
+ shardTerms.refreshTerms(false);
+ }
+
boolean onlyIfActiveCheckResult = onlyIfLeaderActive != null && onlyIfLeaderActive && localState != Replica.State.ACTIVE;
log.info("In WaitForState(" + waitForState + "): collection=" + collectionName + ", shard=" + slice.getName() +
", thisCore=" + core.getName() + ", leaderDoesNotNeedRecovery=" + leaderDoesNotNeedRecovery +