You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2020/10/19 06:10:54 UTC

[lucene-solr] 01/01: SOLR-14982: Reduce leader election time on node shutdown

This is an automated email from the ASF dual-hosted git repository.

shalin pushed a commit to branch jira/solr-14942
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 4f855e46c1f1c3324c5285fb224bec07f8a81994
Author: Shalin Shekhar Mangar <sh...@apache.org>
AuthorDate: Mon Oct 19 11:40:37 2020 +0530

    SOLR-14982: Reduce leader election time on node shutdown
    
    The shutdown process waits for all replicas/cores to be closed before removing the election node of the leader. This can take some time due to index flush or merge activities on the leader cores and delays new leaders from being elected. Moreover, jetty stops accepting new requests on receiving SIGTERM which means that even though a leader technically exists, no new indexing requests can be processed by the node. This commit waits for all in-flight indexing requests to complete, remov [...]
---
 .../java/org/apache/solr/cloud/ZkController.java   | 16 +++++
 .../java/org/apache/solr/core/CoreContainer.java   |  3 +
 .../java/org/apache/solr/servlet/HttpSolrCall.java | 58 ++++++++++--------
 .../java/org/apache/solr/update/SolrCoreState.java | 68 ++++++++++++++++++++--
 4 files changed, 115 insertions(+), 30 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index c29623d..f6f8c3e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -1838,6 +1838,22 @@ public class ZkController implements Closeable {
     }
   }
 
+  public void tryCancelAllElections() {
+    if (zkClient.isClosed()) {
+      return;
+    }
+    electionContexts.values().parallelStream().forEach(context -> {
+      try {
+        context.cancelElection();
+        context.close();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      } catch (KeeperException e) {
+        log.error("Error on cancelling elections of {}", context.leaderPath, e);
+      }
+    });
+  }
+
   private ZkCoreNodeProps waitForLeaderToSeeDownState(
       CoreDescriptor descriptor, final String coreZkNodeName) throws SessionExpiredException {
     // try not to wait too long here - if we are waiting too long, we should probably
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index c341469..0891df5 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -999,6 +999,9 @@ public class CoreContainer {
       if (isZooKeeperAware()) {
         cancelCoreRecoveries();
         zkSys.zkController.preClose();
+        // doesn't need to unpause here since we are shutting down
+        getCores().parallelStream().map(SolrCore::getSolrCoreState).forEach(SolrCoreState::pauseUpdatesAndAwaitInflightRequests);
+        zkSys.zkController.tryCancelAllElections();
       }
 
       ExecutorUtil.shutdownAndAwaitTermination(coreContainerWorkExecutor);
diff --git a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
index 4fae614..c5aaf86 100644
--- a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
+++ b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
@@ -86,6 +86,7 @@ import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.SolrConfig;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.handler.ContentStreamHandlerBase;
+import org.apache.solr.handler.UpdateRequestHandler;
 import org.apache.solr.logging.MDCLoggingContext;
 import org.apache.solr.request.LocalSolrQueryRequest;
 import org.apache.solr.request.SolrQueryRequest;
@@ -551,39 +552,48 @@ public class HttpSolrCall {
           remoteQuery(coreUrl + path, resp);
           return RETURN;
         case PROCESS:
-          final Method reqMethod = Method.getMethod(req.getMethod());
-          HttpCacheHeaderUtil.setCacheControlHeader(config, resp, reqMethod);
-          // unless we have been explicitly told not to, do cache validation
-          // if we fail cache validation, execute the query
-          if (config.getHttpCachingConfig().isNever304() ||
-              !HttpCacheHeaderUtil.doCacheHeaderValidation(solrReq, req, reqMethod, resp)) {
-            SolrQueryResponse solrRsp = new SolrQueryResponse();
+          if (handler instanceof UpdateRequestHandler && !core.getSolrCoreState().registerInFlightUpdate()) {
+            throw new SolrException(ErrorCode.SERVER_ERROR, "Updates are temporarily paused for core: " + core.getName());
+          }
+          try {
+            final Method reqMethod = Method.getMethod(req.getMethod());
+            HttpCacheHeaderUtil.setCacheControlHeader(config, resp, reqMethod);
+            // unless we have been explicitly told not to, do cache validation
+            // if we fail cache validation, execute the query
+            if (config.getHttpCachingConfig().isNever304() ||
+                    !HttpCacheHeaderUtil.doCacheHeaderValidation(solrReq, req, reqMethod, resp)) {
+              SolrQueryResponse solrRsp = new SolrQueryResponse();
               /* even for HEAD requests, we need to execute the handler to
                * ensure we don't get an error (and to make sure the correct
                * QueryResponseWriter is selected and we get the correct
                * Content-Type)
                */
-            SolrRequestInfo.setRequestInfo(new SolrRequestInfo(solrReq, solrRsp, action));
-            mustClearSolrRequestInfo = true;
-            execute(solrRsp);
-            if (shouldAudit()) {
-              EventType eventType = solrRsp.getException() == null ? EventType.COMPLETED : EventType.ERROR;
-              if (shouldAudit(eventType)) {
-                cores.getAuditLoggerPlugin().doAudit(
-                    new AuditEvent(eventType, req, getAuthCtx(), solrReq.getRequestTimer().getTime(), solrRsp.getException()));
+              SolrRequestInfo.setRequestInfo(new SolrRequestInfo(solrReq, solrRsp, action));
+              mustClearSolrRequestInfo = true;
+              execute(solrRsp);
+              if (shouldAudit()) {
+                EventType eventType = solrRsp.getException() == null ? EventType.COMPLETED : EventType.ERROR;
+                if (shouldAudit(eventType)) {
+                  cores.getAuditLoggerPlugin().doAudit(
+                          new AuditEvent(eventType, req, getAuthCtx(), solrReq.getRequestTimer().getTime(), solrRsp.getException()));
+                }
+              }
+              HttpCacheHeaderUtil.checkHttpCachingVeto(solrRsp, resp, reqMethod);
+              Iterator<Map.Entry<String, String>> headers = solrRsp.httpHeaders();
+              while (headers.hasNext()) {
+                Map.Entry<String, String> entry = headers.next();
+                resp.addHeader(entry.getKey(), entry.getValue());
               }
+              QueryResponseWriter responseWriter = getResponseWriter();
+              if (invalidStates != null) solrReq.getContext().put(CloudSolrClient.STATE_VERSION, invalidStates);
+              writeResponse(solrRsp, responseWriter, reqMethod);
             }
-            HttpCacheHeaderUtil.checkHttpCachingVeto(solrRsp, resp, reqMethod);
-            Iterator<Map.Entry<String, String>> headers = solrRsp.httpHeaders();
-            while (headers.hasNext()) {
-              Map.Entry<String, String> entry = headers.next();
-              resp.addHeader(entry.getKey(), entry.getValue());
+            return RETURN;
+          } finally {
+            if (handler instanceof UpdateRequestHandler) {
+              core.getSolrCoreState().deregisterInFlightUpdate();
             }
-            QueryResponseWriter responseWriter = getResponseWriter();
-            if (invalidStates != null) solrReq.getContext().put(CloudSolrClient.STATE_VERSION, invalidStates);
-            writeResponse(solrRsp, responseWriter, reqMethod);
           }
-          return RETURN;
         default: return action;
       }
     } catch (Throwable ex) {
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
index c8f61d5..b83069d 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
@@ -16,10 +16,6 @@
  */
 package org.apache.solr.update;
 
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.concurrent.locks.Lock;
-
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.search.Sort;
 import org.apache.solr.cloud.ActionThrottle;
@@ -33,6 +29,12 @@ import org.apache.solr.util.RefCounted;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+
 /**
  * The state in this class can be easily shared between SolrCores across
  * SolrCore reloads.
@@ -44,7 +46,22 @@ public abstract class SolrCoreState {
   protected boolean closed = false;
   private final Object updateLock = new Object();
   private final Object reloadLock = new Object();
-  
+
+  /**
+   * If true then all update requests will be refused
+   */
+  private final AtomicBoolean pauseUpdateRequests = new AtomicBoolean();
+
+  /**
+   * Phaser is used to track in flight update requests and can be used
+   * to wait for all in-flight requests to finish.
+   *
+   * @see #registerInFlightUpdate()
+   * @see #deregisterInFlightUpdate()
+   * @see #pauseUpdatesAndAwaitInflightRequests()
+   */
+  private final Phaser inflightUpdatesCounter = new Phaser();
+
   public Object getUpdateLock() {
     return updateLock;
   }
@@ -86,7 +103,46 @@ public abstract class SolrCoreState {
     }
     return close;
   }
-  
+
+  /**
+   * Pauses all update requests to this core and waits (indefinitely) for all in-flight
+   * update requests to finish
+   */
+  public void pauseUpdatesAndAwaitInflightRequests() {
+    if (pauseUpdateRequests.compareAndSet(false, true)) {
+      inflightUpdatesCounter.register();
+      inflightUpdatesCounter.arriveAndAwaitAdvance();
+    }
+  }
+
+  /**
+   * Unpauses update requests to this core
+   */
+  public void unpauseUpdates() {
+    this.pauseUpdateRequests.set(false);
+  }
+
+  /**
+   * Registers in-flight update requests to this core. The caller of this method should
+   * disallow update request to this core if this method returns false.
+   *
+   * @return true if request was registered, false if update requests are paused
+   */
+  public boolean registerInFlightUpdate() {
+    if (pauseUpdateRequests.get()) {
+      return false;
+    }
+    inflightUpdatesCounter.register();
+    return true;
+  }
+
+  /**
+   * De-registers in-flight update requests to this core (marks them as completed)
+   */
+  public void deregisterInFlightUpdate() {
+    inflightUpdatesCounter.arriveAndDeregister();
+  }
+
   public abstract Lock getCommitLock();
   
   /**