You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2020/10/24 15:53:04 UTC

[lucene-solr] branch branch_8x updated: SOLR-14942: Reduce leader election time on node shutdown (#2004)

This is an automated email from the ASF dual-hosted git repository.

shalin pushed a commit to branch branch_8x
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/branch_8x by this push:
     new b6d06bb  SOLR-14942: Reduce leader election time on node shutdown (#2004)
b6d06bb is described below

commit b6d06bb309d121901ab6ce1d1935b4067ce610fe
Author: Shalin Shekhar Mangar <sh...@apache.org>
AuthorDate: Sat Oct 24 17:39:00 2020 +0530

    SOLR-14942: Reduce leader election time on node shutdown (#2004)
    
    The shutdown process waits for all replicas/cores to be closed before removing the election node of the leader. This can take some time due to index flush or merge activities on the leader cores and delays new leaders from being elected. Moreover, jetty stops accepting new requests on receiving SIGTERM which means that even though a leader technically exists, no new indexing requests can be processed by the node. This commit waits for all in-flight indexing requests to complete, remov [...]
    
    Co-authored-by: Cao Manh Dat <da...@apache.org>
    
    (cherry picked from commit 706f284c467becb5f002c05455808ee31aee3465)
---
 solr/CHANGES.txt                                   |  3 +-
 .../java/org/apache/solr/cloud/ZkController.java   | 37 ++++++++--
 .../java/org/apache/solr/core/CoreContainer.java   | 22 ++++++
 .../java/org/apache/solr/servlet/HttpSolrCall.java | 67 +++++++++++-------
 .../java/org/apache/solr/update/SolrCoreState.java | 81 +++++++++++++++++++---
 5 files changed, 171 insertions(+), 39 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index abe4d00..1afca6e 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -14,7 +14,8 @@ New Features
 
 Improvements
 ---------------------
-(No changes)
+* SOLR-14942: Reduce leader election time on node shutdown by removing election nodes before closing cores.
+  (Cao Manh Dat, Mike Drob, hossman, shalin)
 
 Optimizations
 ---------------------
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index d86a2ed..189c713 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -607,18 +607,23 @@ public class ZkController implements Closeable {
 
     ExecutorService customThreadPool = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("closeThreadPool"));
 
-    customThreadPool.submit(() -> Collections.singleton(overseerElector.getContext()).parallelStream().forEach(IOUtils::closeQuietly));
+    customThreadPool.submit(() -> IOUtils.closeQuietly(overseerElector.getContext()));
 
-    customThreadPool.submit(() -> Collections.singleton(overseer).parallelStream().forEach(IOUtils::closeQuietly));
+    customThreadPool.submit(() -> IOUtils.closeQuietly(overseer));
 
     try {
-      customThreadPool.submit(() -> electionContexts.values().parallelStream().forEach(IOUtils::closeQuietly));
+      customThreadPool.submit(() -> {
+        Collection<ElectionContext> values = electionContexts.values();
+        synchronized (electionContexts) {
+          values.forEach(IOUtils::closeQuietly);
+        }
+      });
 
     } finally {
 
       sysPropsCacher.close();
-      customThreadPool.submit(() -> Collections.singleton(cloudSolrClient).parallelStream().forEach(IOUtils::closeQuietly));
-      customThreadPool.submit(() -> Collections.singleton(cloudManager).parallelStream().forEach(IOUtils::closeQuietly));
+      customThreadPool.submit(() -> IOUtils.closeQuietly(cloudSolrClient));
+      customThreadPool.submit(() -> IOUtils.closeQuietly(cloudManager));
 
       try {
         try {
@@ -1860,6 +1865,28 @@ public class ZkController implements Closeable {
     }
   }
 
+  /**
+   * Attempts to cancel all leader elections. This method should be called on node shutdown.
+   */
+  public void tryCancelAllElections() {
+    if (zkClient.isClosed()) {
+      return;
+    }
+    Collection<ElectionContext> values = electionContexts.values();
+    synchronized (electionContexts) {
+      values.forEach(context -> {
+        try {
+          context.cancelElection();
+          context.close();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        } catch (KeeperException e) {
+          log.warn("Error on cancelling elections of {}", context.leaderPath, e);
+        }
+      });
+    }
+  }
+
   private ZkCoreNodeProps waitForLeaderToSeeDownState(
       CoreDescriptor descriptor, final String coreZkNodeName) throws SessionExpiredException {
     // try not to wait too long here - if we are waiting too long, we should probably
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 0c08b71..a0f8c11 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Supplier;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -1023,6 +1024,27 @@ public class CoreContainer {
       if (isZooKeeperAware()) {
         cancelCoreRecoveries();
         zkSys.zkController.preClose();
+        /*
+         * Pause updates for all cores on this node and wait for all in-flight update requests to finish.
+         * Here, we (slightly) delay leader election so that in-flight update requests succeed and we can preserve consistency.
+         *
+         * Jetty already allows a grace period for in-flight requests to complete and our solr cores, searchers etc
+         * are reference counted to allow for graceful shutdown. So we don't worry about any other kind of requests.
+         *
+         * We do not need to unpause ever because the node is being shut down.
+         */
+        getCores().parallelStream().forEach(solrCore -> {
+          SolrCoreState solrCoreState = solrCore.getSolrCoreState();
+          try {
+            solrCoreState.pauseUpdatesAndAwaitInflightRequests();
+          } catch (TimeoutException e) {
+            log.warn("Timed out waiting for in-flight update requests to complete for core: {}", solrCore.getName());
+          } catch (InterruptedException e) {
+            log.warn("Interrupted while waiting for in-flight update requests to complete for core: {}", solrCore.getName());
+            Thread.currentThread().interrupt();
+          }
+        });
+        zkSys.zkController.tryCancelAllElections();
       }
 
       ExecutorUtil.shutdownAndAwaitTermination(coreContainerWorkExecutor);
diff --git a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
index 2900184..a472600 100644
--- a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
+++ b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
@@ -86,6 +86,7 @@ import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.SolrConfig;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.handler.ContentStreamHandlerBase;
+import org.apache.solr.handler.UpdateRequestHandler;
 import org.apache.solr.logging.MDCLoggingContext;
 import org.apache.solr.request.LocalSolrQueryRequest;
 import org.apache.solr.request.SolrQueryRequest;
@@ -551,39 +552,57 @@ public class HttpSolrCall {
           remoteQuery(coreUrl + path, resp);
           return RETURN;
         case PROCESS:
-          final Method reqMethod = Method.getMethod(req.getMethod());
-          HttpCacheHeaderUtil.setCacheControlHeader(config, resp, reqMethod);
-          // unless we have been explicitly told not to, do cache validation
-          // if we fail cache validation, execute the query
-          if (config.getHttpCachingConfig().isNever304() ||
-              !HttpCacheHeaderUtil.doCacheHeaderValidation(solrReq, req, reqMethod, resp)) {
-            SolrQueryResponse solrRsp = new SolrQueryResponse();
+          /*
+           We track update requests so that we can preserve consistency by waiting for them to complete
+           on a node shutdown and then immediately trigger a leader election without waiting for the core to close.
+           See how the SolrCoreState#pauseUpdatesAndAwaitInflightRequests() method is used in CoreContainer#shutdown()
+
+           Also see https://issues.apache.org/jira/browse/SOLR-14942 for details on why we do not care for
+           other kinds of requests.
+          */
+          if (handler instanceof UpdateRequestHandler && !core.getSolrCoreState().registerInFlightUpdate()) {
+            throw new SolrException(ErrorCode.SERVER_ERROR, "Updates are temporarily paused for core: " + core.getName());
+          }
+          try {
+            final Method reqMethod = Method.getMethod(req.getMethod());
+            HttpCacheHeaderUtil.setCacheControlHeader(config, resp, reqMethod);
+            // unless we have been explicitly told not to, do cache validation
+            // if we fail cache validation, execute the query
+            if (config.getHttpCachingConfig().isNever304() ||
+                    !HttpCacheHeaderUtil.doCacheHeaderValidation(solrReq, req, reqMethod, resp)) {
+              SolrQueryResponse solrRsp = new SolrQueryResponse();
               /* even for HEAD requests, we need to execute the handler to
                * ensure we don't get an error (and to make sure the correct
                * QueryResponseWriter is selected and we get the correct
                * Content-Type)
                */
-            SolrRequestInfo.setRequestInfo(new SolrRequestInfo(solrReq, solrRsp, action));
-            mustClearSolrRequestInfo = true;
-            execute(solrRsp);
-            if (shouldAudit()) {
-              EventType eventType = solrRsp.getException() == null ? EventType.COMPLETED : EventType.ERROR;
-              if (shouldAudit(eventType)) {
-                cores.getAuditLoggerPlugin().doAudit(
-                    new AuditEvent(eventType, req, getAuthCtx(), solrReq.getRequestTimer().getTime(), solrRsp.getException()));
+              SolrRequestInfo.setRequestInfo(new SolrRequestInfo(solrReq, solrRsp, action));
+              mustClearSolrRequestInfo = true;
+              execute(solrRsp);
+              if (shouldAudit()) {
+                EventType eventType = solrRsp.getException() == null ? EventType.COMPLETED : EventType.ERROR;
+                if (shouldAudit(eventType)) {
+                  cores.getAuditLoggerPlugin().doAudit(
+                          new AuditEvent(eventType, req, getAuthCtx(), solrReq.getRequestTimer().getTime(), solrRsp.getException()));
+                }
+              }
+              HttpCacheHeaderUtil.checkHttpCachingVeto(solrRsp, resp, reqMethod);
+              Iterator<Map.Entry<String, String>> headers = solrRsp.httpHeaders();
+              while (headers.hasNext()) {
+                Map.Entry<String, String> entry = headers.next();
+                resp.addHeader(entry.getKey(), entry.getValue());
               }
+              QueryResponseWriter responseWriter = getResponseWriter();
+              if (invalidStates != null) solrReq.getContext().put(CloudSolrClient.STATE_VERSION, invalidStates);
+              writeResponse(solrRsp, responseWriter, reqMethod);
             }
-            HttpCacheHeaderUtil.checkHttpCachingVeto(solrRsp, resp, reqMethod);
-            Iterator<Map.Entry<String, String>> headers = solrRsp.httpHeaders();
-            while (headers.hasNext()) {
-              Map.Entry<String, String> entry = headers.next();
-              resp.addHeader(entry.getKey(), entry.getValue());
+            return RETURN;
+          } finally {
+            if (handler instanceof UpdateRequestHandler) {
+              // every registered request must also be de-registered
+              core.getSolrCoreState().deregisterInFlightUpdate();
             }
-            QueryResponseWriter responseWriter = getResponseWriter();
-            if (invalidStates != null) solrReq.getContext().put(CloudSolrClient.STATE_VERSION, invalidStates);
-            writeResponse(solrRsp, responseWriter, reqMethod);
           }
-          return RETURN;
         default: return action;
       }
     } catch (Throwable ex) {
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
index eddd5b7..94036ab 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
@@ -16,12 +16,6 @@
  */
 package org.apache.solr.update;
 
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
-import java.util.concurrent.locks.Lock;
-
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.search.Sort;
 import org.apache.solr.cloud.ActionThrottle;
@@ -35,6 +29,16 @@ import org.apache.solr.util.RefCounted;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+
 /**
  * The state in this class can be easily shared between SolrCores across
  * SolrCore reloads.
@@ -42,11 +46,37 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class SolrCoreState {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  
+
+  private static final int PAUSE_UPDATES_TIMEOUT_MILLIS = Integer.getInteger("solr.cloud.wait-for-updates-on-shutdown-millis", 2500);
+
   protected boolean closed = false;
   private final Object updateLock = new Object();
   private final Object reloadLock = new Object();
-  
+
+  /**
+   * If true then all update requests will be refused
+   */
+  private final AtomicBoolean pauseUpdateRequests = new AtomicBoolean();
+
+  /**
+   * Phaser is used to track in flight update requests and can be used
+   * to wait for all in-flight requests to finish. A Phaser terminates
+   * automatically when the number of registered parties reach zero.
+   * Since we track requests with this phaser, we disable the automatic
+   * termination by overriding the onAdvance method to return false.
+   *
+   * @see #registerInFlightUpdate()
+   * @see #deregisterInFlightUpdate()
+   * @see #pauseUpdatesAndAwaitInflightRequests()
+   */
+  private final Phaser inflightUpdatesCounter = new Phaser()  {
+    @Override
+    protected boolean onAdvance(int phase, int registeredParties) {
+      // disable termination of phaser
+      return false;
+    }
+  };
+
   public Object getUpdateLock() {
     return updateLock;
   }
@@ -88,7 +118,40 @@ public abstract class SolrCoreState {
     }
     return close;
   }
-  
+
+  /**
+   * Pauses all update requests to this core and waits (indefinitely) for all in-flight
+   * update requests to finish
+   */
+  public void pauseUpdatesAndAwaitInflightRequests() throws TimeoutException, InterruptedException {
+    if (pauseUpdateRequests.compareAndSet(false, true)) {
+      int arrivalNumber = inflightUpdatesCounter.register();
+      assert arrivalNumber >= 0 : "Registration of in-flight request should have succeeded but got arrival phase number < 0";
+      inflightUpdatesCounter.awaitAdvanceInterruptibly(inflightUpdatesCounter.arrive(), PAUSE_UPDATES_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+    }
+  }
+
+  /**
+   * Registers in-flight update requests to this core.
+   *
+   * @return true if request was registered, false if update requests are paused
+   */
+  public boolean registerInFlightUpdate() {
+    if (pauseUpdateRequests.get()) {
+      return false;
+    }
+    inflightUpdatesCounter.register();
+    return true;
+  }
+
+  /**
+   * De-registers in-flight update requests to this core (marks them as completed)
+   */
+  public void deregisterInFlightUpdate() {
+    int arrivalPhaseNumber = inflightUpdatesCounter.arriveAndDeregister();
+    assert arrivalPhaseNumber >= 0 : "inflightUpdatesCounter should not have been terminated";
+  }
+
   public abstract Lock getCommitLock();
   
   /**