You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by sh...@apache.org on 2020/10/20 12:49:16 UTC
[lucene-solr] branch jira/solr-14942 updated: SOLR-14942: Address
review feedback.
This is an automated email from the ASF dual-hosted git repository.
shalin pushed a commit to branch jira/solr-14942
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/jira/solr-14942 by this push:
new 0a8ae69 SOLR-14942: Address review feedback.
0a8ae69 is described below
commit 0a8ae69bcb6f49d2f7f38fca915d9000cc675e97
Author: Shalin Shekhar Mangar <sh...@apache.org>
AuthorDate: Tue Oct 20 18:19:00 2020 +0530
SOLR-14942: Address review feedback.
* Iterate electionContexts inside a synchronized block. Fixes iteration of electionContexts inside ZkController.close as well
* Added javadocs for ZkController.tryCancelAllElections
* Replaces parallelStream with serial stream call for closing election contexts since it is a very lightweight operation
* Removes unused SolrCoreState.unpauseUpdates method
* Adds a timeout to SolrCoreState.pauseUpdatesAndAwaitInflightRequests (defaults to 2500 millis)
---
.../java/org/apache/solr/cloud/ZkController.java | 33 ++++++++++++++--------
.../java/org/apache/solr/core/CoreContainer.java | 16 +++++++----
.../java/org/apache/solr/update/SolrCoreState.java | 19 ++++++-------
3 files changed, 42 insertions(+), 26 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index f6f8c3e..f2de3a9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -653,7 +653,12 @@ public class ZkController implements Closeable {
customThreadPool.submit(() -> Collections.singleton(overseer).parallelStream().forEach(IOUtils::closeQuietly));
try {
- customThreadPool.submit(() -> electionContexts.values().parallelStream().forEach(IOUtils::closeQuietly));
+ customThreadPool.submit(() -> {
+ Collection<ElectionContext> values = electionContexts.values();
+ synchronized (electionContexts) {
+ values.forEach(IOUtils::closeQuietly);
+ }
+ });
} finally {
@@ -1838,20 +1843,26 @@ public class ZkController implements Closeable {
}
}
+ /**
+ * Attempts to cancel all leader elections. This method should be called on node shutdown.
+ */
public void tryCancelAllElections() {
if (zkClient.isClosed()) {
return;
}
- electionContexts.values().parallelStream().forEach(context -> {
- try {
- context.cancelElection();
- context.close();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (KeeperException e) {
- log.error("Error on cancelling elections of {}", context.leaderPath, e);
- }
- });
+ Collection<ElectionContext> values = electionContexts.values();
+ synchronized (electionContexts) {
+ values.forEach(context -> {
+ try {
+ context.cancelElection();
+ context.close();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (KeeperException e) {
+ log.warn("Error on cancelling elections of {}", context.leaderPath, e);
+ }
+ });
+ }
}
private ZkCoreNodeProps waitForLeaderToSeeDownState(
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 0891df5..ff3d2a9 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -38,10 +38,7 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;
import java.util.function.Supplier;
import com.google.common.annotations.VisibleForTesting;
@@ -1000,7 +997,16 @@ public class CoreContainer {
cancelCoreRecoveries();
zkSys.zkController.preClose();
// doesn't need to unpause here since we are shutting down
- getCores().parallelStream().map(SolrCore::getSolrCoreState).forEach(SolrCoreState::pauseUpdatesAndAwaitInflightRequests);
+ getCores().parallelStream().forEach(solrCore -> {
+ SolrCoreState solrCoreState = solrCore.getSolrCoreState();
+ try {
+ solrCoreState.pauseUpdatesAndAwaitInflightRequests();
+ } catch (TimeoutException e) {
+ log.warn("Timed out waiting for in-flight update requests to complete for core: {}", solrCore.getName());
+ } catch (InterruptedException e) {
+ log.warn("Interrupted while waiting for in-flight update requests to complete for core: {}", solrCore.getName());
+ }
+ });
zkSys.zkController.tryCancelAllElections();
}
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
index 95eab76..f38fb2b 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
@@ -32,6 +32,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
@@ -42,6 +44,8 @@ import java.util.concurrent.locks.Lock;
*/
public abstract class SolrCoreState {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final int PAUSE_UPDATES_TIMEOUT_MILLIS = Integer.getInteger("solr.cloud.wait-for-updates-on-shutdown-millis", 2500);
protected boolean closed = false;
private final Object updateLock = new Object();
@@ -108,21 +112,16 @@ public abstract class SolrCoreState {
* Pauses all update requests to this core and waits (indefinitely) for all in-flight
* update requests to finish
*/
- public void pauseUpdatesAndAwaitInflightRequests() {
+ public void pauseUpdatesAndAwaitInflightRequests() throws TimeoutException, InterruptedException {
if (pauseUpdateRequests.compareAndSet(false, true)) {
- inflightUpdatesCounter.register();
- inflightUpdatesCounter.arriveAndAwaitAdvance();
+ int arrivalNumber = inflightUpdatesCounter.register();
+ if (arrivalNumber != -1) {
+ inflightUpdatesCounter.awaitAdvanceInterruptibly(inflightUpdatesCounter.arrive(), PAUSE_UPDATES_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ }
}
}
/**
- * Unpauses update requests to this core
- */
- public void unpauseUpdates() {
- this.pauseUpdateRequests.set(false);
- }
-
- /**
* Registers in-flight update requests to this core.
*
* @return true if request was registered, false if update requests are paused