You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by da...@apache.org on 2020/08/25 07:59:49 UTC
[lucene-solr] 02/03: Move tracking to coreDescriptor
This is an automated email from the ASF dual-hosted git repository.
datcm pushed a commit to branch datcm/bloomberg-abdicate-leadership
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 3d058672fdde7b1a637be60cdd5482c45b720ba7
Author: Cao Manh Dat <da...@apache.org>
AuthorDate: Tue Aug 25 14:55:00 2020 +0700
Move tracking to coreDescriptor
---
.../java/org/apache/solr/core/CoreContainer.java | 83 --------------------
.../java/org/apache/solr/core/CoreDescriptor.java | 37 +++++++++
.../solr/handler/admin/CoreAdminOperation.java | 91 +++++++++++++---------
.../java/org/apache/solr/servlet/HttpSolrCall.java | 4 +-
.../apache/solr/common/params/CoreAdminParams.java | 1 +
5 files changed, 93 insertions(+), 123 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 711b5c7..c537791 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -261,89 +261,6 @@ public class CoreContainer {
private ExecutorService coreContainerAsyncTaskExecutor = ExecutorUtil.newMDCAwareCachedThreadPool("Core Container Async Task");
private final Object lock = new Object();
- private Set<String> pausedUpdatesCores = Collections.synchronizedSet(new HashSet<>());
- private Map<String, AtomicInteger> currentCoreUpdateCounters = new HashMap<>();
-
- public boolean startTrackUpdateRequest(String coreName) {
- synchronized (lock) {
- if (pausedUpdatesCores.contains(coreName)) {
- return false;
- }
- if (!currentCoreUpdateCounters.containsKey(coreName)) {
- currentCoreUpdateCounters.put(coreName, new AtomicInteger());
- }
- currentCoreUpdateCounters.get(coreName).incrementAndGet();
- return true;
- }
- }
-
- public void endTrackUpdateRequest(String coreName) {
- synchronized (lock) {
- currentCoreUpdateCounters.get(coreName).decrementAndGet();
- }
- }
-
- public void pauseUpdateFor(String coreName) {
- synchronized (lock) {
- pausedUpdatesCores.add(coreName);
- }
- }
-
- public void unpauseUpdateFor(String coreName) {
- synchronized (lock) {
- pausedUpdatesCores.remove(coreName);
- }
- }
-
- public void waitForEmptyUpdates(String coreName) {
- AtomicInteger counter = currentCoreUpdateCounters.get(coreName);
- if (counter == null)
- return;
-
- while (counter.get() != 0) {
- log.info("Datcm waiting for finish pending updates:{}", counter.get());
- try {
- Thread.sleep(300);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return;
- }
- }
-
- }
-
- public void abdicateLeadership(String coreName) {
- CoreDescriptor cd = getCoreDescriptor(coreName);
- if (!isZooKeeperAware() || cd.getCloudDescriptor() == null || !cd.getCloudDescriptor().isLeader())
- return;
-
- try {
- synchronized (lock) {
- pausedUpdatesCores.add(coreName);
- }
- AtomicInteger counter = currentCoreUpdateCounters.get(coreName);
- while (counter != null && counter.get() != 0) {
- try {
- Thread.sleep(300);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return;
- }
- }
- try {
- zkSys.zkController.rejoinShardLeaderElection(cd, false);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- log.error("Exception on abdicate leadership of core:{}", coreName, e);
- } catch (Exception e) {
- log.error("Exception on abdicate leadership of core:{}", coreName, e);
- }
- } finally {
- synchronized (lock) {
- pausedUpdatesCores.remove(coreName);
- }
- }
- }
private enum CoreInitFailedAction {fromleader, none}
diff --git a/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java b/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java
index 7b3428d..a51d979 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreDescriptor.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -141,6 +142,11 @@ public class CoreDescriptor {
/** The properties for this core, substitutable by resource loaders */
protected final Properties substitutableProperties = new Properties();
+ /** If true then all update requests will be refused */
+ protected volatile boolean pausedAcceptingIndexing = false;
+
+ protected final AtomicInteger inflightUpdatesCounter = new AtomicInteger();
+
/** TESTS ONLY */
public CoreDescriptor(String name, Path instanceDir, CoreContainer coreContainer, String... coreProps) {
this(name, instanceDir, toMap(coreProps), coreContainer.getContainerProperties(), coreContainer.getZkController());
@@ -334,6 +340,37 @@ public class CoreDescriptor {
return Boolean.parseBoolean(tmp);
}
+ public void setPausedAcceptingIndexing(boolean pausedAcceptingIndexing) {
+ this.pausedAcceptingIndexing = pausedAcceptingIndexing;
+ }
+
+ public boolean isPausedAcceptingIndexing() {
+ return pausedAcceptingIndexing;
+ }
+
+ public boolean incInflightUpdateCounter() {
+ if (isPausedAcceptingIndexing()) {
+ return false;
+ }
+ inflightUpdatesCounter.incrementAndGet();
+ return true;
+ }
+
+ public void decInFlightUpdateCounter() {
+ inflightUpdatesCounter.decrementAndGet();
+ }
+
+ public void waitForNoInflightUpdates() {
+ while (inflightUpdatesCounter.get() != 0) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ }
+
public boolean isTransient() {
String tmp = coreProperties.getProperty(CORE_TRANSIENT, "false");
return PropertiesUtil.toBoolean(tmp);
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
index 2b852a4..6c0021b 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminOperation.java
@@ -16,26 +16,14 @@
*/
package org.apache.solr.handler.admin;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.nio.file.Path;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.solr.cloud.CloudDescriptor;
-import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.ZkController;
-import org.apache.solr.cloud.overseer.OverseerAction;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Slice;
-import org.apache.solr.common.cloud.ZkNodeProps;
-import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
@@ -48,7 +36,7 @@ import org.apache.solr.core.SolrInfoBean;
import org.apache.solr.core.snapshots.SolrSnapshotManager;
import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager;
import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager.SnapshotMetaData;
-import org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminOp;
+import org.apache.solr.handler.admin.CoreAdminHandler.*;
import org.apache.solr.metrics.SolrMetricManager;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.UpdateLog;
@@ -59,6 +47,16 @@ import org.apache.solr.util.TestInjection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.common.params.CoreAdminParams.COLLECTION;
import static org.apache.solr.common.params.CoreAdminParams.CoreAdminAction.*;
@@ -238,38 +236,55 @@ enum CoreAdminOperation implements CoreAdminOp {
}
}
}),
+
ABDICATE_LEADERSHIP_OP(ABDICATE_LEADERSHIP, it -> {
CoreContainer cc = it.handler.coreContainer;
if (!cc.isZooKeeperAware()) {
throw new SolrException(ErrorCode.BAD_REQUEST, "This api only works in SolrCloud env");
}
- for (String coreName: cc.getAllCoreNames()) {
- try {
- CoreDescriptor cd = cc.getCoreDescriptor(coreName);
- if (cd == null || cd.getCloudDescriptor() == null || !cd.getCloudDescriptor().isLeader()) {
- continue;
- }
-
- ClusterState clusterState = cc.getZkController().getClusterState();
- DocCollection docCollection = clusterState.getCollection(cd.getCollectionName());
- Slice slice = docCollection.getSlice(cd.getCloudDescriptor().getShardId());
- if (slice.getReplicas().size() < 2) {
- log().info("Skipping abdicate leadership of core:{} since it is the only one replicas in the shard", coreName);
- }
- if (slice.getReplicas(replica -> replica.isActive(clusterState.getLiveNodes())).size() < 2) {
- log().info("Skipping abdicate leadership of core:{} since it is the only one active replica", coreName);
- }
+ String cores = it.req.getParams().get("cores");
+ Set<String> selectedCores = null;
+ if (cores != null) {
+ selectedCores = new HashSet<>(Arrays.asList(cores.split(",")));
+ }
+ for (CoreDescriptor cd: cc.getCoreDescriptors()) {
+ if (cd == null || cd.getCloudDescriptor() == null || !cd.getCloudDescriptor().isLeader()) {
+ continue;
+ }
+ if (selectedCores != null && !selectedCores.contains(cd.getName())) {
+ continue;
+ }
- cc.pauseUpdateFor(coreName);
- cc.waitForEmptyUpdates(coreName);
- cc.getZkController().rejoinShardLeaderElection(cd, false);
- } finally {
- cc.unpauseUpdateFor(coreName);
- // after this point this core can still receive more updates since it did not explicitly
- // unset its leader role from cluster state, but that fine since it already unset its leader role locally
- // (by setting cloudDescriptor.isLeader() flag) so it gonna keep refusing updates or routing update requests
- // to a new leader
+ try {
+ ClusterState clusterState = cc.getZkController().getClusterState();
+ DocCollection docCollection = clusterState.getCollection(cd.getCollectionName());
+ Slice slice = docCollection.getSlice(cd.getCloudDescriptor().getShardId());
+ if (slice.getReplicas().size() < 2) {
+ log().info("Skipping abdicate leadership of core:{} since it is the only one replicas in the shard", cd.getName());
+ }
+ if (slice.getReplicas(replica -> replica.isActive(clusterState.getLiveNodes())).size() < 2) {
+ log().info("Skipping abdicate leadership of core:{} since it is the only one active replica", cd.getName());
}
+
+ cd.setPausedAcceptingIndexing(true);
+ cd.waitForNoInflightUpdates();
+ cc.getZkController().rejoinShardLeaderElection(cd, false);
+ } finally {
+ cd.setPausedAcceptingIndexing(false);
+ // after this point this core can still receive more updates since it did not explicitly
+ // unset its leader role from cluster state, but that fine since it already unset its leader role locally
+ // (by setting cloudDescriptor.isLeader() flag) so it gonna keep refusing updates or routing update requests
+ // to a new leader
+ }
+ }
+ }),
+
+ UNPAUSE_INDEXING_OP(UNPAUSE_INDEXING, it -> {
+ CoreContainer cc = it.handler.coreContainer;
+ for (CoreDescriptor cd : cc.getCoreDescriptors()) {
+ if (cd != null) {
+ cd.setPausedAcceptingIndexing(false);
+ }
}
}),
diff --git a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
index 895955d..27fa106 100644
--- a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
+++ b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
@@ -572,7 +572,7 @@ public class HttpSolrCall {
remoteQuery(coreUrl + path, resp);
return RETURN;
case PROCESS:
- if (handler instanceof UpdateRequestHandler && !cores.startTrackUpdateRequest(core.getName())) {
+ if (handler instanceof UpdateRequestHandler && !core.getCoreDescriptor().incInflightUpdateCounter()) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Updates are temporary paused for core:"+core.getName());
}
try {
@@ -611,7 +611,7 @@ public class HttpSolrCall {
return RETURN;
} finally {
if (handler instanceof UpdateRequestHandler) {
- cores.endTrackUpdateRequest(core.getName());
+ core.getCoreDescriptor().decInFlightUpdateCounter();
}
}
default: return action;
diff --git a/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java b/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
index daff1fe..ebee497 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CoreAdminParams.java
@@ -154,6 +154,7 @@ public abstract class CoreAdminParams
REQUESTAPPLYUPDATES,
OVERSEEROP,
ABDICATE_LEADERSHIP,
+ UNPAUSE_INDEXING,
REQUESTSTATUS(true),
REJOINLEADERELECTION,
//internal API used by force shard leader election