You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/08/11 03:07:59 UTC

[lucene-solr] 02/02: @492 ZkShardTerms improvement.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 42d72fa44e5d144f6064362c999f1045cfaaa406
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Aug 10 22:07:29 2020 -0500

    @492 ZkShardTerms improvement.
---
 .../java/org/apache/solr/cloud/ZkShardTerms.java   | 63 +++++++---------------
 .../apache/solr/handler/admin/PrepRecoveryOp.java  |  2 +-
 2 files changed, 19 insertions(+), 46 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
index ef90260..4dafb63 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -26,12 +26,14 @@ import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.invoke.MethodHandles;
+import java.sql.Connection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -73,7 +75,7 @@ public class ZkShardTerms implements AutoCloseable{
   private final Set<CoreTermWatcher> listeners = ConcurrentHashMap.newKeySet();
   private final AtomicBoolean isClosed = new AtomicBoolean(false);
 
-  private AtomicReference<ShardTerms> terms = new AtomicReference<>();
+  private final AtomicReference<ShardTerms> terms = new AtomicReference<>();
 
   /**
    * Listener of a core for shard's term change events
@@ -99,8 +101,7 @@ public class ZkShardTerms implements AutoCloseable{
     this.collection = collection;
     this.shard = shard;
     this.zkClient = zkClient;
-    refreshTerms();
-    retryRegisterWatcher();
+    registerWatcher();
     ObjectReleaseTracker.track(this);
   }
 
@@ -308,7 +309,7 @@ public class ZkShardTerms implements AutoCloseable{
       return true;
     } catch (KeeperException.BadVersionException e) {
       log.info("Failed to save terms, version is not a match, retrying");
-      refreshTerms();
+      // TODO: wait till next version shows up
     } catch (KeeperException.NoNodeException e) {
       throw e;
     } catch (Exception e) {
@@ -321,14 +322,13 @@ public class ZkShardTerms implements AutoCloseable{
   /**
    * Fetch latest terms from ZK
    */
-  public void refreshTerms() {
+  public void refreshTerms(Watcher watcher) {
     ShardTerms newTerms;
     try {
       Stat stat = new Stat();
-      byte[] data = zkClient.getData(znodePath, null, stat);
+      byte[] data = zkClient.getData(znodePath, watcher, stat);
       newTerms = new ShardTerms((Map<String, Long>) Utils.fromJSON(data), stat.getVersion());
     } catch (KeeperException e) {
-      Thread.interrupted();
       throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection: " + collection, e);
     } catch (InterruptedException e) {
       ParWork.propegateInterrupt(e);
@@ -339,49 +339,22 @@ public class ZkShardTerms implements AutoCloseable{
   }
 
   /**
-   * Retry register a watcher to the correspond ZK term node
+   * Register a watcher to the correspond ZK term node
    */
-  private void retryRegisterWatcher() {
-    while (!isClosed.get()) {
-      try {
-        registerWatcher();
-        return;
-      } catch (KeeperException.SessionExpiredException | KeeperException.AuthFailedException e) {
-        isClosed.set(true);
-        log.error("Failed watching shard term for collection: {} due to unrecoverable exception", collection, e);
-        return;
-      } catch (KeeperException e) {
-        log.warn("Failed watching shard term for collection: {}, retrying!", collection, e);
-        try {
-          zkClient.getConnectionManager().waitForConnected(zkClient.getZkClientTimeout());
-        } catch (TimeoutException | InterruptedException te) {
-          ParWork.propegateInterrupt(te);
-          throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error watching shard term for collection: " + collection, te);
+  private void registerWatcher() {
+    Watcher watcher = new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        // session events are not change events, and do not remove the watcher
+        if (Watcher.Event.EventType.None == event.getType()) {
+          return;
         }
-      }
-    }
-  }
 
-  /**
-   * Register a watcher to the correspond ZK term node
-   */
-  private void registerWatcher() throws KeeperException {
-    Watcher watcher = event -> {
-      // session events are not change events, and do not remove the watcher
-      if (Watcher.Event.EventType.None == event.getType()) {
-        return;
+        // Some events may be missed during register a watcher, so it is safer to refresh terms after registering watcher
+        refreshTerms(this);
       }
-      retryRegisterWatcher();
-      // Some events may be missed during register a watcher, so it is safer to refresh terms after registering watcher
-      refreshTerms();
     };
-    try {
-      // exists operation is faster than getData operation
-      zkClient.exists(znodePath, watcher);
-    } catch (InterruptedException e) {
-      ParWork.propegateInterrupt(e);
-      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error watching shard term for collection: " + collection, e);
-    }
+    refreshTerms(watcher);
   }
 
 
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
index b784a03..342e371 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
@@ -126,7 +126,7 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
               // The replica changed it term, then published itself as RECOVERING.
               // This core already see replica as RECOVERING
               // so it is guarantees that a live-fetch will be enough for this core to see max term published
-              shardTerms.refreshTerms();
+              shardTerms.refreshTerms(null);
             }
 
             boolean onlyIfActiveCheckResult = onlyIfLeaderActive != null && onlyIfLeaderActive