You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/08/11 03:07:59 UTC
[lucene-solr] 02/02: @492 ZkShardTerms improvement.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 42d72fa44e5d144f6064362c999f1045cfaaa406
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Aug 10 22:07:29 2020 -0500
@492 ZkShardTerms improvement.
---
.../java/org/apache/solr/cloud/ZkShardTerms.java | 63 +++++++---------------
.../apache/solr/handler/admin/PrepRecoveryOp.java | 2 +-
2 files changed, 19 insertions(+), 46 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
index ef90260..4dafb63 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -26,12 +26,14 @@ import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CoreDescriptor;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
+import java.sql.Connection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -73,7 +75,7 @@ public class ZkShardTerms implements AutoCloseable{
private final Set<CoreTermWatcher> listeners = ConcurrentHashMap.newKeySet();
private final AtomicBoolean isClosed = new AtomicBoolean(false);
- private AtomicReference<ShardTerms> terms = new AtomicReference<>();
+ private final AtomicReference<ShardTerms> terms = new AtomicReference<>();
/**
* Listener of a core for shard's term change events
@@ -99,8 +101,7 @@ public class ZkShardTerms implements AutoCloseable{
this.collection = collection;
this.shard = shard;
this.zkClient = zkClient;
- refreshTerms();
- retryRegisterWatcher();
+ registerWatcher();
ObjectReleaseTracker.track(this);
}
@@ -308,7 +309,7 @@ public class ZkShardTerms implements AutoCloseable{
return true;
} catch (KeeperException.BadVersionException e) {
log.info("Failed to save terms, version is not a match, retrying");
- refreshTerms();
+ // TODO: wait till next version shows up
} catch (KeeperException.NoNodeException e) {
throw e;
} catch (Exception e) {
@@ -321,14 +322,13 @@ public class ZkShardTerms implements AutoCloseable{
/**
* Fetch latest terms from ZK
*/
- public void refreshTerms() {
+ public void refreshTerms(Watcher watcher) {
ShardTerms newTerms;
try {
Stat stat = new Stat();
- byte[] data = zkClient.getData(znodePath, null, stat);
+ byte[] data = zkClient.getData(znodePath, watcher, stat);
newTerms = new ShardTerms((Map<String, Long>) Utils.fromJSON(data), stat.getVersion());
} catch (KeeperException e) {
- Thread.interrupted();
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection: " + collection, e);
} catch (InterruptedException e) {
ParWork.propegateInterrupt(e);
@@ -339,49 +339,22 @@ public class ZkShardTerms implements AutoCloseable{
}
/**
- * Retry register a watcher to the correspond ZK term node
+ * Register a watcher to the correspond ZK term node
*/
- private void retryRegisterWatcher() {
- while (!isClosed.get()) {
- try {
- registerWatcher();
- return;
- } catch (KeeperException.SessionExpiredException | KeeperException.AuthFailedException e) {
- isClosed.set(true);
- log.error("Failed watching shard term for collection: {} due to unrecoverable exception", collection, e);
- return;
- } catch (KeeperException e) {
- log.warn("Failed watching shard term for collection: {}, retrying!", collection, e);
- try {
- zkClient.getConnectionManager().waitForConnected(zkClient.getZkClientTimeout());
- } catch (TimeoutException | InterruptedException te) {
- ParWork.propegateInterrupt(te);
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error watching shard term for collection: " + collection, te);
+ private void registerWatcher() {
+ Watcher watcher = new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ // session events are not change events, and do not remove the watcher
+ if (Watcher.Event.EventType.None == event.getType()) {
+ return;
}
- }
- }
- }
- /**
- * Register a watcher to the correspond ZK term node
- */
- private void registerWatcher() throws KeeperException {
- Watcher watcher = event -> {
- // session events are not change events, and do not remove the watcher
- if (Watcher.Event.EventType.None == event.getType()) {
- return;
+ // Some events may be missed during register a watcher, so it is safer to refresh terms after registering watcher
+ refreshTerms(this);
}
- retryRegisterWatcher();
- // Some events may be missed during register a watcher, so it is safer to refresh terms after registering watcher
- refreshTerms();
};
- try {
- // exists operation is faster than getData operation
- zkClient.exists(znodePath, watcher);
- } catch (InterruptedException e) {
- ParWork.propegateInterrupt(e);
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error watching shard term for collection: " + collection, e);
- }
+ refreshTerms(watcher);
}
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
index b784a03..342e371 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
@@ -126,7 +126,7 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
// The replica changed it term, then published itself as RECOVERING.
// This core already see replica as RECOVERING
// so it is guarantees that a live-fetch will be enough for this core to see max term published
- shardTerms.refreshTerms();
+ shardTerms.refreshTerms(null);
}
boolean onlyIfActiveCheckResult = onlyIfLeaderActive != null && onlyIfLeaderActive