You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/01/22 01:24:40 UTC

[lucene-solr] 02/02: @1276 Don't interrupt election and SolrQos tweaks and recovery tweaks.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 0f23e0984bc8b3db5fe6deeaf30b004c81088e70
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Jan 21 19:16:14 2021 -0600

    @1276 Don't interrupt election and SolrQos tweaks and recovery tweaks.
---
 .../java/org/apache/solr/cloud/LeaderElector.java  |  26 ++--
 .../org/apache/solr/cloud/RecoveryStrategy.java    |  23 +++-
 .../solr/cloud/ShardLeaderElectionContext.java     |  15 +-
 .../solr/cloud/ShardLeaderElectionContextBase.java | 151 ++++++++++++---------
 .../apache/solr/handler/admin/PrepRecoveryOp.java  |  17 ---
 .../org/apache/solr/servlet/SolrQoSFilter.java     |  14 +-
 6 files changed, 141 insertions(+), 105 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
index 3c822f6..fcbdd4b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
+++ b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
@@ -316,7 +316,7 @@ public class LeaderElector implements Closeable {
   }
 
   public void joinElection(boolean replacement,boolean joinAtHead) {
-    if (!isClosed && !zkController.getCoreContainer().isShutDown() && !zkController.isDcCalled() && zkClient.isAlive()) {
+    if (!isClosed && !zkController.getCoreContainer().isShutDown() && !zkController.isDcCalled()) {
       joinFuture = executor.submit(() -> {
         try {
           isCancelled = false;
@@ -382,8 +382,7 @@ public class LeaderElector implements Closeable {
           }
         } else {
           if (log.isDebugEnabled()) log.debug("create ephem election node {}", shardsElectZkPath + "/" + id + "-n_");
-              leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", (byte[]) null,
-                      CreateMode.EPHEMERAL_SEQUENTIAL, false);
+          leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", (byte[]) null, CreateMode.EPHEMERAL_SEQUENTIAL, false);
         }
 
         log.info("Joined leadership election with path: {}", leaderSeqPath);
@@ -444,7 +443,7 @@ public class LeaderElector implements Closeable {
   }
 
   private boolean shouldRejectJoins() {
-    return zkController.getCoreContainer().isShutDown() || zkController.isDcCalled();
+    return zkController.getCoreContainer().isShutDown() || zkController.isDcCalled() || isClosed;
   }
 
   @Override
@@ -471,10 +470,6 @@ public class LeaderElector implements Closeable {
 
   public void cancel() {
 
-    if (state == OUT_OF_ELECTION || state == CLOSED) {
-      return;
-    }
-
     state = OUT_OF_ELECTION;
 
     try {
@@ -587,15 +582,26 @@ public class LeaderElector implements Closeable {
     Collections.sort(seqs, Comparator.comparingInt(LeaderElector::getSeq).thenComparing(o -> o));
   }
 
-  void retryElection(boolean joinAtHead) {
+  synchronized void retryElection(boolean joinAtHead) {
+    state = OUT_OF_ELECTION;
     if (shouldRejectJoins()) {
+      log.info("Closed, won't rejoin election");
       throw new AlreadyClosedException();
     }
-    cancel();
     ElectionWatcher watcher = this.watcher;
     IOUtils.closeQuietly(watcher);
+    this.watcher = null;
     IOUtils.closeQuietly(this);
+    context.leaderSeqPath = null;
+    context.watchedSeqPath = null;
+    if (context instanceof ShardLeaderElectionContextBase) {
+      ((ShardLeaderElectionContextBase) context).closed = false;
+      ((ShardLeaderElectionContextBase) context).leaderZkNodeParentVersion = null;
+    }
+
+    isClosed = false;
     isCancelled = false;
+    joinFuture = null;
     joinElection(true, joinAtHead);
   }
 
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 1272ad8..68d99fc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -351,8 +351,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
         }
 
         Replica leader = zkController.getZkStateReader().getLeaderRetry(coreDescriptor.getCollectionName(), coreDescriptor.getCloudDescriptor().getShardId(), 1500);
-        if (leader != null && leader.getName().equals(coreName) && zkController.getZkClient()
-            .exists(COLLECTIONS_ZKNODE + "/" + coreDescriptor.getCollectionName() + "/leaders/" + coreDescriptor.getCloudDescriptor().getShardId() + "/leader")) {
+        if (leader != null && leader.getName().equals(coreName)) {
           log.info("We are the leader, STOP recovery");
           close = true;
           return;
@@ -403,9 +402,15 @@ public class RecoveryStrategy implements Runnable, Closeable {
       // though
       try {
         CloudDescriptor cloudDesc = this.coreDescriptor.getCloudDescriptor();
-        Replica leaderprops;
+        Replica leader;
         try {
-          leaderprops = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 1500);
+          leader = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 1500);
+
+          if (leader != null && leader.getName().equals(coreName)) {
+            log.info("We are the leader, STOP recovery");
+            close = true;
+            return false;
+          }
         } catch (Exception e) {
           log.error("Could not get leader for {} {} {}", cloudDesc.getCollectionName(), cloudDesc.getShardId(), zkStateReader.getClusterState().getCollectionOrNull(cloudDesc.getCollectionName()), e);
           throw new SolrException(ErrorCode.SERVER_ERROR, e);
@@ -413,12 +418,12 @@ public class RecoveryStrategy implements Runnable, Closeable {
         if (isClosed()) {
           throw new AlreadyClosedException();
         }
-        log.info("Starting Replication Recovery. [{}] leader is [{}] and I am [{}]", coreName, leaderprops.getName(), Replica.getCoreUrl(baseUrl, coreName));
+        log.info("Starting Replication Recovery. [{}] leader is [{}] and I am [{}]", coreName, leader.getName(), Replica.getCoreUrl(baseUrl, coreName));
 
         try {
           log.info("Stopping background replicate from leader process");
           zkController.stopReplicationFromLeader(coreName);
-          IndexFetcher.IndexFetchResult result = replicate(leaderprops);
+          IndexFetcher.IndexFetchResult result = replicate(leader);
 
           if (result.getSuccessful()) {
             log.info("replication fetch reported as success");
@@ -598,6 +603,12 @@ public class RecoveryStrategy implements Runnable, Closeable {
         CloudDescriptor cloudDesc = this.coreDescriptor.getCloudDescriptor();
         leader = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 1500);
 
+        if (leader != null && leader.getName().equals(coreName)) {
+          log.info("We are the leader, STOP recovery");
+          close = true;
+          return false;
+        }
+
         log.info("Begin buffering updates. core=[{}]", coreName);
         // recalling buffer updates will drop the old buffer tlog
         ulog.bufferUpdates();
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
index 56eb674..dfd970a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
@@ -127,14 +127,27 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
         }
 
         Replica.Type replicaType;
-        String coreNodeName;
         boolean setTermToMax = false;
 
         CoreDescriptor cd = core.getCoreDescriptor();
         CloudDescriptor cloudCd = cd.getCloudDescriptor();
         replicaType = cloudCd.getReplicaType();
         // should I be leader?
+
         ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId);
+        try {
+          // if the replica is waiting for leader to see recovery state, the leader should refresh its terms
+          if (zkShardTerms != null && zkShardTerms.skipSendingUpdatesTo(coreName)) {
+            // The replica changed its term, then published itself as RECOVERING.
+            // This core already see replica as RECOVERING
+            // so it is guarantees that a live-fetch will be enough for this core to see max term published
+            log.info("refresh shard terms for core {}", coreName);
+            zkShardTerms.refreshTerms(false);
+          }
+        } catch (Exception e) {
+          log.error("Exception while looking at refreshing shard terms", e);
+        }
+        
         if (zkShardTerms.registered(coreName) && !zkShardTerms.canBecomeLeader(coreName)) {
           if (!waitForEligibleBecomeLeaderAfterTimeout(zkShardTerms, coreName, leaderVoteWait)) {
             rejoinLeaderElection(core);
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
index abaee54..1754d50 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
@@ -48,7 +48,7 @@ class ShardLeaderElectionContextBase extends ElectionContext {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   protected final SolrZkClient zkClient;
   protected volatile boolean closed;
-  private volatile Integer leaderZkNodeParentVersion;
+  protected volatile Integer leaderZkNodeParentVersion;
 
   public ShardLeaderElectionContextBase(final String coreNodeName, String electionPath, String leaderPath,
                                         Replica props, SolrZkClient zkClient) {
@@ -59,82 +59,105 @@ class ShardLeaderElectionContextBase extends ElectionContext {
   @Override
   protected void cancelElection() throws InterruptedException, KeeperException {
 
-
     if (log.isTraceEnabled()) log.trace("cancelElection");
-//    if (!zkClient.isConnected()) {
-//      log.info("Can't cancel, zkClient is not connected");
-//      return;
-//    }
+    //    if (!zkClient.isConnected()) {
+    //      log.info("Can't cancel, zkClient is not connected");
+    //      return;
+    //    }
     super.cancelElection();
-      try {
-        if (leaderZkNodeParentVersion != null) {
-          try {
-//            if (!zkClient.exists(leaderSeqPath)) {
-//              return;
-//            }
-            // We need to be careful and make sure we *only* delete our own leader registration node.
-            // We do this by using a multi and ensuring the parent znode of the leader registration node
-            // matches the version we expect - there is a setData call that increments the parent's znode
-            // version whenever a leader registers.
-            log.info("Removing leader registration node on cancel, parent node: {} {}", Paths.get(leaderPath).getParent().toString(), leaderZkNodeParentVersion);
-            List<Op> ops = new ArrayList<>(3);
-            ops.add(Op.check(Paths.get(leaderPath).getParent().toString(), leaderZkNodeParentVersion));
-            ops.add(Op.delete(leaderSeqPath, -1));
-            ops.add(Op.delete(leaderPath, -1));
-            zkClient.multi(ops, false);
-          } catch (KeeperException e) {
-            if (e instanceof NoNodeException) {
-              // okay
-              return;
-            }
-            if (e instanceof KeeperException.SessionExpiredException) {
-              log.warn("ZooKeeper session expired");
-              throw e;
+    try {
+      if (leaderZkNodeParentVersion != null) {
+        try {
+          //            if (!zkClient.exists(leaderSeqPath)) {
+          //              return;
+          //            }
+          // We need to be careful and make sure we *only* delete our own leader registration node.
+          // We do this by using a multi and ensuring the parent znode of the leader registration node
+          // matches the version we expect - there is a setData call that increments the parent's znode
+          // version whenever a leader registers.
+          log.info("Removing leader registration node on cancel, parent node: {} {}", Paths.get(leaderPath).getParent().toString(), leaderZkNodeParentVersion);
+          List<Op> ops = new ArrayList<>(3);
+          ops.add(Op.check(Paths.get(leaderPath).getParent().toString(), leaderZkNodeParentVersion));
+          ops.add(Op.delete(leaderSeqPath, -1));
+          ops.add(Op.delete(leaderPath, -1));
+          zkClient.multi(ops, false);
+        } catch (KeeperException e) {
+          if (e instanceof NoNodeException) {
+            // okay
+            if (leaderSeqPath != null) {
+              if (log.isDebugEnabled()) log.debug("Delete leader seq election path {} path we watch is {}", leaderSeqPath, watchedSeqPath);
+              zkClient.delete(leaderSeqPath, -1);
             }
+            return;
+          }
+          if (e instanceof KeeperException.SessionExpiredException) {
+            log.warn("ZooKeeper session expired");
+            throw e;
+          }
 
-            int i = 0;
-            List<OpResult> results = e.getResults();
-            if (results != null) {
-              for (OpResult result : results) {
-                if (((OpResult.ErrorResult) result).getErr() == -101) {
-                  // no node, fine
-                } else {
-                  if (result instanceof OpResult.ErrorResult) {
-                    OpResult.ErrorResult dresult = (OpResult.ErrorResult) result;
-                    if (dresult.getErr() != 0) {
-                      log.error("op=" + i++ + " err=" + dresult.getErr());
-                    }
+          int i = 0;
+          List<OpResult> results = e.getResults();
+          if (results != null) {
+            for (OpResult result : results) {
+              if (((OpResult.ErrorResult) result).getErr() == -101) {
+                // no node, fine
+                try {
+                  if (leaderSeqPath != null) {
+                    if (log.isDebugEnabled()) log.debug("Delete leader seq election path {} path we watch is {}", leaderSeqPath, watchedSeqPath);
+                    zkClient.delete(leaderSeqPath, -1);
                   }
-                  throw new SolrException(ErrorCode.SERVER_ERROR, "Exception canceling election " + e.getPath(), e);
+                } catch (NoNodeException e1) {
+                  // fine
                 }
+              } else {
+                if (result instanceof OpResult.ErrorResult) {
+                  OpResult.ErrorResult dresult = (OpResult.ErrorResult) result;
+                  if (dresult.getErr() != 0) {
+                    log.error("op=" + i++ + " err=" + dresult.getErr());
+                  }
+                }
+                try {
+                  if (leaderSeqPath != null) {
+                    if (log.isDebugEnabled()) log.debug("Delete leader seq election path {} path we watch is {}", leaderSeqPath, watchedSeqPath);
+                    zkClient.delete(leaderSeqPath, -1);
+                  }
+                } catch (NoNodeException e1) {
+                  // fine
+                }
+                throw new SolrException(ErrorCode.SERVER_ERROR, "Exception canceling election " + e.getPath(), e);
               }
             }
-
-          } catch (InterruptedException | AlreadyClosedException e) {
-            ParWork.propagateInterrupt(e, true);
-          } catch (Exception e) {
-            throw new SolrException(ErrorCode.SERVER_ERROR, "Exception canceling election", e);
           }
-        } else {
-          try {
-            if (leaderSeqPath != null) {
-              if (log.isDebugEnabled()) log.debug("Delete leader seq election path {} path we watch is {}", leaderSeqPath, watchedSeqPath);
-              zkClient.delete(leaderSeqPath, -1);
-            }
-          } catch (NoNodeException e) {
-            // fine
+
+        } catch (InterruptedException | AlreadyClosedException e) {
+          ParWork.propagateInterrupt(e, true);
+        } catch (Exception e) {
+          if (leaderSeqPath != null) {
+            if (log.isDebugEnabled()) log.debug("Delete leader seq election path {} path we watch is {}", leaderSeqPath, watchedSeqPath);
+            zkClient.delete(leaderSeqPath, -1);
           }
-          if (log.isDebugEnabled()) log.debug("No version found for ephemeral leader parent node, won't remove previous leader registration. {} {}", leaderPath, leaderSeqPath);
+          throw new SolrException(ErrorCode.SERVER_ERROR, "Exception canceling election", e);
         }
-      } catch (Exception e) {
-        if (e instanceof InterruptedException) {
-          ParWork.propagateInterrupt(e);
+      } else {
+        try {
+          if (leaderSeqPath != null) {
+            if (log.isDebugEnabled()) log.debug("Delete leader seq election path {} path we watch is {}", leaderSeqPath, watchedSeqPath);
+            zkClient.delete(leaderSeqPath, -1);
+          }
+        } catch (NoNodeException e) {
+          // fine
         }
-        Stat stat = new Stat();
-        zkClient.getData(Paths.get(leaderPath).getParent().toString(), null, stat);
-        log.error("Exception trying to cancel election {} {} {}", stat.getVersion(), e.getClass().getName(), e.getMessage(), e);
+        if (log.isDebugEnabled()) log.debug("No version found for ephemeral leader parent node, won't remove previous leader registration. {} {}", leaderPath, leaderSeqPath);
       }
-      leaderZkNodeParentVersion = null;
+    } catch (Exception e) {
+      if (e instanceof InterruptedException) {
+        ParWork.propagateInterrupt(e);
+      }
+      Stat stat = new Stat();
+      zkClient.getData(Paths.get(leaderPath).getParent().toString(), null, stat);
+      log.error("Exception trying to cancel election {} {} {}", stat.getVersion(), e.getClass().getName(), e.getMessage(), e);
+    }
+    leaderZkNodeParentVersion = null;
   }
 
   @Override
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
index f83075d..63418b4 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
@@ -18,11 +18,9 @@
 package org.apache.solr.handler.admin;
 
 import org.apache.solr.cloud.ZkController.NotInClusterStateException;
-import org.apache.solr.cloud.ZkShardTerms;
 import org.apache.solr.common.ParWork;
 import org.apache.solr.common.SolrException.ErrorCode;
 import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkStateReader;
 import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.SolrParams;
@@ -97,20 +95,5 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
         error = "Timeout waiting for collection state. \n" + coreContainer.getZkController().getZkStateReader().getClusterState().getCollectionOrNull(collection);
       throw new NotInClusterStateException(ErrorCode.SERVER_ERROR, error);
     }
-
-    try {
-      ZkShardTerms shardTerms = coreContainer.getZkController().getShardTermsOrNull(collection, shard);
-      // if the replica is waiting for leader to see recovery state, the leader should refresh its terms
-      if (shardTerms != null && waitForState == Replica.State.RECOVERING && shardTerms.registered(cname) && shardTerms.skipSendingUpdatesTo(cname)) {
-        // The replica changed its term, then published itself as RECOVERING.
-        // This core already see replica as RECOVERING
-        // so it is guarantees that a live-fetch will be enough for this core to see max term published
-        log.info("refresh shard terms for core {}", cname);
-        shardTerms.refreshTerms(false);
-      }
-    } catch (Exception e) {
-       log.error("Exception while looking at refreshing shard terms", e);
-    }
-
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java
index a788ca4..84b3c04 100644
--- a/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java
+++ b/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java
@@ -50,9 +50,9 @@ public class SolrQoSFilter extends QoSFilter {
   @Override
   public void init(FilterConfig filterConfig) {
     super.init(filterConfig);
-    _origMaxRequests = Integer.getInteger("solr.concurrentRequests.max", 5000);
+    _origMaxRequests = Integer.getInteger("solr.concurrentRequests.max", 10000);
     super.setMaxRequests(_origMaxRequests);
-    super.setSuspendMs(Integer.getInteger("solr.concurrentRequests.suspendms", 20000));
+    super.setSuspendMs(Integer.getInteger("solr.concurrentRequests.suspendms", 30000));
     super.setWaitMs(Integer.getInteger("solr.concurrentRequests.waitms", 2000));
   }
 
@@ -82,27 +82,27 @@ public class SolrQoSFilter extends QoSFilter {
 
       } else {
         // nocommit - deal with no supported, use this as a fail safe with high and low watermark?
-        if (ourLoad < 0.70 && sLoad < 1.0 && _origMaxRequests != getMaxRequests()) {
+        if (ourLoad < 0.90 && sLoad < 1.6 && _origMaxRequests != getMaxRequests()) {
           if (sLoad < 0.9) {
             if (log.isDebugEnabled()) log.debug("set max concurrent requests to orig value {}", _origMaxRequests);
             updateMaxRequests(_origMaxRequests, sLoad, ourLoad);
           } else {
-            updateMaxRequests(Math.min(_origMaxRequests, (int) Math.round(getMaxRequests() * 1.5D)), sLoad, ourLoad);
+            updateMaxRequests(Math.min(_origMaxRequests, Math.round(getMaxRequests() * 3)), sLoad, ourLoad);
           }
         } else {
-          if (sLoad > 1.1) {
+          if (ourLoad > 0.90 && sLoad > 1.5) {
             int cMax = getMaxRequests();
             if (cMax > 5) {
               int max = Math.max(5, (int) ((double) cMax * 0.30D));
             //  log.warn("System load is {} and our load is {} procs is {}, set max concurrent requests to {}", sLoad, ourLoad, SysStats.PROC_COUNT, max);
               updateMaxRequests(max, sLoad, ourLoad);
             }
-          } else if (ourLoad < 0.70 && sLoad < 1.0 && _origMaxRequests != getMaxRequests()) {
+          } else if (ourLoad < 0.90 && sLoad < 2 && _origMaxRequests != getMaxRequests()) {
             if (sLoad < 0.9) {
               if (log.isDebugEnabled()) log.debug("set max concurrent requests to orig value {}", _origMaxRequests);
               updateMaxRequests(_origMaxRequests, sLoad, ourLoad);
             } else {
-              updateMaxRequests(Math.min(_origMaxRequests, (int) Math.round(getMaxRequests() * 1.5D)), sLoad, ourLoad);
+              updateMaxRequests(Math.min(_origMaxRequests, Math.round(getMaxRequests() * 3)), sLoad, ourLoad);
             }
 
           }