You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2020/10/20 12:49:16 UTC

[lucene-solr] branch jira/solr-14942 updated: SOLR-14942: Address review feedback.

This is an automated email from the ASF dual-hosted git repository.

shalin pushed a commit to branch jira/solr-14942
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/jira/solr-14942 by this push:
     new 0a8ae69  SOLR-14942: Address review feedback.
0a8ae69 is described below

commit 0a8ae69bcb6f49d2f7f38fca915d9000cc675e97
Author: Shalin Shekhar Mangar <sh...@apache.org>
AuthorDate: Tue Oct 20 18:19:00 2020 +0530

    SOLR-14942: Address review feedback.
    
    * Iterate electionContexts inside a synchronized block. Fixes iteration of electionContexts inside ZkController.close as well
    * Added javadocs for ZkController.tryCancelAllElections
    * Replaces parallelStream with serial stream call for closing election contexts since it is a very lightweight operation
    * Removes unused SolrCoreState.unpauseUpdates method
    * Adds a timeout to SolrCoreState.pauseUpdatesAndAwaitInflightRequests (defaults to 2500 millis)
---
 .../java/org/apache/solr/cloud/ZkController.java   | 33 ++++++++++++++--------
 .../java/org/apache/solr/core/CoreContainer.java   | 16 +++++++----
 .../java/org/apache/solr/update/SolrCoreState.java | 19 ++++++-------
 3 files changed, 42 insertions(+), 26 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index f6f8c3e..f2de3a9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -653,7 +653,12 @@ public class ZkController implements Closeable {
     customThreadPool.submit(() -> Collections.singleton(overseer).parallelStream().forEach(IOUtils::closeQuietly));
 
     try {
-      customThreadPool.submit(() -> electionContexts.values().parallelStream().forEach(IOUtils::closeQuietly));
+      customThreadPool.submit(() -> {
+        Collection<ElectionContext> values = electionContexts.values();
+        synchronized (electionContexts) {
+          values.forEach(IOUtils::closeQuietly);
+        }
+      });
 
     } finally {
 
@@ -1838,20 +1843,26 @@ public class ZkController implements Closeable {
     }
   }
 
+  /**
+   * Attempts to cancel all leader elections. This method should be called on node shutdown.
+   */
   public void tryCancelAllElections() {
     if (zkClient.isClosed()) {
       return;
     }
-    electionContexts.values().parallelStream().forEach(context -> {
-      try {
-        context.cancelElection();
-        context.close();
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-      } catch (KeeperException e) {
-        log.error("Error on cancelling elections of {}", context.leaderPath, e);
-      }
-    });
+    Collection<ElectionContext> values = electionContexts.values();
+    synchronized (electionContexts) {
+      values.forEach(context -> {
+        try {
+          context.cancelElection();
+          context.close();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        } catch (KeeperException e) {
+          log.warn("Error on cancelling elections of {}", context.leaderPath, e);
+        }
+      });
+    }
   }
 
   private ZkCoreNodeProps waitForLeaderToSeeDownState(
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 0891df5..ff3d2a9 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -38,10 +38,7 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;
 import java.util.function.Supplier;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -1000,7 +997,16 @@ public class CoreContainer {
         cancelCoreRecoveries();
         zkSys.zkController.preClose();
         // doesn't need to unpause here since we are shutting down
-        getCores().parallelStream().map(SolrCore::getSolrCoreState).forEach(SolrCoreState::pauseUpdatesAndAwaitInflightRequests);
+        getCores().parallelStream().forEach(solrCore -> {
+          SolrCoreState solrCoreState = solrCore.getSolrCoreState();
+          try {
+            solrCoreState.pauseUpdatesAndAwaitInflightRequests();
+          } catch (TimeoutException e) {
+            log.warn("Timed out waiting for in-flight update requests to complete for core: {}", solrCore.getName());
+          } catch (InterruptedException e) {
+            log.warn("Interrupted while waiting for in-flight update requests to complete for core: {}", solrCore.getName());
+          }
+        });
         zkSys.zkController.tryCancelAllElections();
       }
 
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
index 95eab76..f38fb2b 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
@@ -32,6 +32,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 
@@ -42,6 +44,8 @@ import java.util.concurrent.locks.Lock;
  */
 public abstract class SolrCoreState {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static final int PAUSE_UPDATES_TIMEOUT_MILLIS = Integer.getInteger("solr.cloud.wait-for-updates-on-shutdown-millis", 2500);
   
   protected boolean closed = false;
   private final Object updateLock = new Object();
@@ -108,21 +112,16 @@ public abstract class SolrCoreState {
    * Pauses all update requests to this core and waits (indefinitely) for all in-flight
    * update requests to finish
    */
-  public void pauseUpdatesAndAwaitInflightRequests() {
+  public void pauseUpdatesAndAwaitInflightRequests() throws TimeoutException, InterruptedException {
     if (pauseUpdateRequests.compareAndSet(false, true)) {
-      inflightUpdatesCounter.register();
-      inflightUpdatesCounter.arriveAndAwaitAdvance();
+      int arrivalNumber = inflightUpdatesCounter.register();
+      if (arrivalNumber != -1) {
+        inflightUpdatesCounter.awaitAdvanceInterruptibly(inflightUpdatesCounter.arrive(), PAUSE_UPDATES_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+      }
     }
   }
 
   /**
-   * Unpauses update requests to this core
-   */
-  public void unpauseUpdates() {
-    this.pauseUpdateRequests.set(false);
-  }
-
-  /**
    * Registers in-flight update requests to this core.
    *
    * @return true if request was registered, false if update requests are paused