You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/01/22 01:24:40 UTC
[lucene-solr] 02/02: @1276 Don't interrupt election and SolrQos
tweaks and recovery tweaks.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 0f23e0984bc8b3db5fe6deeaf30b004c81088e70
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Jan 21 19:16:14 2021 -0600
@1276 Don't interrupt election and SolrQos tweaks and recovery tweaks.
---
.../java/org/apache/solr/cloud/LeaderElector.java | 26 ++--
.../org/apache/solr/cloud/RecoveryStrategy.java | 23 +++-
.../solr/cloud/ShardLeaderElectionContext.java | 15 +-
.../solr/cloud/ShardLeaderElectionContextBase.java | 151 ++++++++++++---------
.../apache/solr/handler/admin/PrepRecoveryOp.java | 17 ---
.../org/apache/solr/servlet/SolrQoSFilter.java | 14 +-
6 files changed, 141 insertions(+), 105 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
index 3c822f6..fcbdd4b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
+++ b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
@@ -316,7 +316,7 @@ public class LeaderElector implements Closeable {
}
public void joinElection(boolean replacement,boolean joinAtHead) {
- if (!isClosed && !zkController.getCoreContainer().isShutDown() && !zkController.isDcCalled() && zkClient.isAlive()) {
+ if (!isClosed && !zkController.getCoreContainer().isShutDown() && !zkController.isDcCalled()) {
joinFuture = executor.submit(() -> {
try {
isCancelled = false;
@@ -382,8 +382,7 @@ public class LeaderElector implements Closeable {
}
} else {
if (log.isDebugEnabled()) log.debug("create ephem election node {}", shardsElectZkPath + "/" + id + "-n_");
- leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", (byte[]) null,
- CreateMode.EPHEMERAL_SEQUENTIAL, false);
+ leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", (byte[]) null, CreateMode.EPHEMERAL_SEQUENTIAL, false);
}
log.info("Joined leadership election with path: {}", leaderSeqPath);
@@ -444,7 +443,7 @@ public class LeaderElector implements Closeable {
}
private boolean shouldRejectJoins() {
- return zkController.getCoreContainer().isShutDown() || zkController.isDcCalled();
+ return zkController.getCoreContainer().isShutDown() || zkController.isDcCalled() || isClosed;
}
@Override
@@ -471,10 +470,6 @@ public class LeaderElector implements Closeable {
public void cancel() {
- if (state == OUT_OF_ELECTION || state == CLOSED) {
- return;
- }
-
state = OUT_OF_ELECTION;
try {
@@ -587,15 +582,26 @@ public class LeaderElector implements Closeable {
Collections.sort(seqs, Comparator.comparingInt(LeaderElector::getSeq).thenComparing(o -> o));
}
- void retryElection(boolean joinAtHead) {
+ synchronized void retryElection(boolean joinAtHead) {
+ state = OUT_OF_ELECTION;
if (shouldRejectJoins()) {
+ log.info("Closed, won't rejoin election");
throw new AlreadyClosedException();
}
- cancel();
ElectionWatcher watcher = this.watcher;
IOUtils.closeQuietly(watcher);
+ this.watcher = null;
IOUtils.closeQuietly(this);
+ context.leaderSeqPath = null;
+ context.watchedSeqPath = null;
+ if (context instanceof ShardLeaderElectionContextBase) {
+ ((ShardLeaderElectionContextBase) context).closed = false;
+ ((ShardLeaderElectionContextBase) context).leaderZkNodeParentVersion = null;
+ }
+
+ isClosed = false;
isCancelled = false;
+ joinFuture = null;
joinElection(true, joinAtHead);
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 1272ad8..68d99fc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -351,8 +351,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
}
Replica leader = zkController.getZkStateReader().getLeaderRetry(coreDescriptor.getCollectionName(), coreDescriptor.getCloudDescriptor().getShardId(), 1500);
- if (leader != null && leader.getName().equals(coreName) && zkController.getZkClient()
- .exists(COLLECTIONS_ZKNODE + "/" + coreDescriptor.getCollectionName() + "/leaders/" + coreDescriptor.getCloudDescriptor().getShardId() + "/leader")) {
+ if (leader != null && leader.getName().equals(coreName)) {
log.info("We are the leader, STOP recovery");
close = true;
return;
@@ -403,9 +402,15 @@ public class RecoveryStrategy implements Runnable, Closeable {
// though
try {
CloudDescriptor cloudDesc = this.coreDescriptor.getCloudDescriptor();
- Replica leaderprops;
+ Replica leader;
try {
- leaderprops = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 1500);
+ leader = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 1500);
+
+ if (leader != null && leader.getName().equals(coreName)) {
+ log.info("We are the leader, STOP recovery");
+ close = true;
+ return false;
+ }
} catch (Exception e) {
log.error("Could not get leader for {} {} {}", cloudDesc.getCollectionName(), cloudDesc.getShardId(), zkStateReader.getClusterState().getCollectionOrNull(cloudDesc.getCollectionName()), e);
throw new SolrException(ErrorCode.SERVER_ERROR, e);
@@ -413,12 +418,12 @@ public class RecoveryStrategy implements Runnable, Closeable {
if (isClosed()) {
throw new AlreadyClosedException();
}
- log.info("Starting Replication Recovery. [{}] leader is [{}] and I am [{}]", coreName, leaderprops.getName(), Replica.getCoreUrl(baseUrl, coreName));
+ log.info("Starting Replication Recovery. [{}] leader is [{}] and I am [{}]", coreName, leader.getName(), Replica.getCoreUrl(baseUrl, coreName));
try {
log.info("Stopping background replicate from leader process");
zkController.stopReplicationFromLeader(coreName);
- IndexFetcher.IndexFetchResult result = replicate(leaderprops);
+ IndexFetcher.IndexFetchResult result = replicate(leader);
if (result.getSuccessful()) {
log.info("replication fetch reported as success");
@@ -598,6 +603,12 @@ public class RecoveryStrategy implements Runnable, Closeable {
CloudDescriptor cloudDesc = this.coreDescriptor.getCloudDescriptor();
leader = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 1500);
+ if (leader != null && leader.getName().equals(coreName)) {
+ log.info("We are the leader, STOP recovery");
+ close = true;
+ return false;
+ }
+
log.info("Begin buffering updates. core=[{}]", coreName);
// recalling buffer updates will drop the old buffer tlog
ulog.bufferUpdates();
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
index 56eb674..dfd970a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
@@ -127,14 +127,27 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
}
Replica.Type replicaType;
- String coreNodeName;
boolean setTermToMax = false;
CoreDescriptor cd = core.getCoreDescriptor();
CloudDescriptor cloudCd = cd.getCloudDescriptor();
replicaType = cloudCd.getReplicaType();
// should I be leader?
+
ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId);
+ try {
+ // if the replica is waiting for leader to see recovery state, the leader should refresh its terms
+ if (zkShardTerms != null && zkShardTerms.skipSendingUpdatesTo(coreName)) {
+ // The replica changed its term, then published itself as RECOVERING.
+ // This core already see replica as RECOVERING
+ // so it is guarantees that a live-fetch will be enough for this core to see max term published
+ log.info("refresh shard terms for core {}", coreName);
+ zkShardTerms.refreshTerms(false);
+ }
+ } catch (Exception e) {
+ log.error("Exception while looking at refreshing shard terms", e);
+ }
+
if (zkShardTerms.registered(coreName) && !zkShardTerms.canBecomeLeader(coreName)) {
if (!waitForEligibleBecomeLeaderAfterTimeout(zkShardTerms, coreName, leaderVoteWait)) {
rejoinLeaderElection(core);
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
index abaee54..1754d50 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
@@ -48,7 +48,7 @@ class ShardLeaderElectionContextBase extends ElectionContext {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected final SolrZkClient zkClient;
protected volatile boolean closed;
- private volatile Integer leaderZkNodeParentVersion;
+ protected volatile Integer leaderZkNodeParentVersion;
public ShardLeaderElectionContextBase(final String coreNodeName, String electionPath, String leaderPath,
Replica props, SolrZkClient zkClient) {
@@ -59,82 +59,105 @@ class ShardLeaderElectionContextBase extends ElectionContext {
@Override
protected void cancelElection() throws InterruptedException, KeeperException {
-
if (log.isTraceEnabled()) log.trace("cancelElection");
-// if (!zkClient.isConnected()) {
-// log.info("Can't cancel, zkClient is not connected");
-// return;
-// }
+ // if (!zkClient.isConnected()) {
+ // log.info("Can't cancel, zkClient is not connected");
+ // return;
+ // }
super.cancelElection();
- try {
- if (leaderZkNodeParentVersion != null) {
- try {
-// if (!zkClient.exists(leaderSeqPath)) {
-// return;
-// }
- // We need to be careful and make sure we *only* delete our own leader registration node.
- // We do this by using a multi and ensuring the parent znode of the leader registration node
- // matches the version we expect - there is a setData call that increments the parent's znode
- // version whenever a leader registers.
- log.info("Removing leader registration node on cancel, parent node: {} {}", Paths.get(leaderPath).getParent().toString(), leaderZkNodeParentVersion);
- List<Op> ops = new ArrayList<>(3);
- ops.add(Op.check(Paths.get(leaderPath).getParent().toString(), leaderZkNodeParentVersion));
- ops.add(Op.delete(leaderSeqPath, -1));
- ops.add(Op.delete(leaderPath, -1));
- zkClient.multi(ops, false);
- } catch (KeeperException e) {
- if (e instanceof NoNodeException) {
- // okay
- return;
- }
- if (e instanceof KeeperException.SessionExpiredException) {
- log.warn("ZooKeeper session expired");
- throw e;
+ try {
+ if (leaderZkNodeParentVersion != null) {
+ try {
+ // if (!zkClient.exists(leaderSeqPath)) {
+ // return;
+ // }
+ // We need to be careful and make sure we *only* delete our own leader registration node.
+ // We do this by using a multi and ensuring the parent znode of the leader registration node
+ // matches the version we expect - there is a setData call that increments the parent's znode
+ // version whenever a leader registers.
+ log.info("Removing leader registration node on cancel, parent node: {} {}", Paths.get(leaderPath).getParent().toString(), leaderZkNodeParentVersion);
+ List<Op> ops = new ArrayList<>(3);
+ ops.add(Op.check(Paths.get(leaderPath).getParent().toString(), leaderZkNodeParentVersion));
+ ops.add(Op.delete(leaderSeqPath, -1));
+ ops.add(Op.delete(leaderPath, -1));
+ zkClient.multi(ops, false);
+ } catch (KeeperException e) {
+ if (e instanceof NoNodeException) {
+ // okay
+ if (leaderSeqPath != null) {
+ if (log.isDebugEnabled()) log.debug("Delete leader seq election path {} path we watch is {}", leaderSeqPath, watchedSeqPath);
+ zkClient.delete(leaderSeqPath, -1);
}
+ return;
+ }
+ if (e instanceof KeeperException.SessionExpiredException) {
+ log.warn("ZooKeeper session expired");
+ throw e;
+ }
- int i = 0;
- List<OpResult> results = e.getResults();
- if (results != null) {
- for (OpResult result : results) {
- if (((OpResult.ErrorResult) result).getErr() == -101) {
- // no node, fine
- } else {
- if (result instanceof OpResult.ErrorResult) {
- OpResult.ErrorResult dresult = (OpResult.ErrorResult) result;
- if (dresult.getErr() != 0) {
- log.error("op=" + i++ + " err=" + dresult.getErr());
- }
+ int i = 0;
+ List<OpResult> results = e.getResults();
+ if (results != null) {
+ for (OpResult result : results) {
+ if (((OpResult.ErrorResult) result).getErr() == -101) {
+ // no node, fine
+ try {
+ if (leaderSeqPath != null) {
+ if (log.isDebugEnabled()) log.debug("Delete leader seq election path {} path we watch is {}", leaderSeqPath, watchedSeqPath);
+ zkClient.delete(leaderSeqPath, -1);
}
- throw new SolrException(ErrorCode.SERVER_ERROR, "Exception canceling election " + e.getPath(), e);
+ } catch (NoNodeException e1) {
+ // fine
}
+ } else {
+ if (result instanceof OpResult.ErrorResult) {
+ OpResult.ErrorResult dresult = (OpResult.ErrorResult) result;
+ if (dresult.getErr() != 0) {
+ log.error("op=" + i++ + " err=" + dresult.getErr());
+ }
+ }
+ try {
+ if (leaderSeqPath != null) {
+ if (log.isDebugEnabled()) log.debug("Delete leader seq election path {} path we watch is {}", leaderSeqPath, watchedSeqPath);
+ zkClient.delete(leaderSeqPath, -1);
+ }
+ } catch (NoNodeException e1) {
+ // fine
+ }
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Exception canceling election " + e.getPath(), e);
}
}
-
- } catch (InterruptedException | AlreadyClosedException e) {
- ParWork.propagateInterrupt(e, true);
- } catch (Exception e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Exception canceling election", e);
}
- } else {
- try {
- if (leaderSeqPath != null) {
- if (log.isDebugEnabled()) log.debug("Delete leader seq election path {} path we watch is {}", leaderSeqPath, watchedSeqPath);
- zkClient.delete(leaderSeqPath, -1);
- }
- } catch (NoNodeException e) {
- // fine
+
+ } catch (InterruptedException | AlreadyClosedException e) {
+ ParWork.propagateInterrupt(e, true);
+ } catch (Exception e) {
+ if (leaderSeqPath != null) {
+ if (log.isDebugEnabled()) log.debug("Delete leader seq election path {} path we watch is {}", leaderSeqPath, watchedSeqPath);
+ zkClient.delete(leaderSeqPath, -1);
}
- if (log.isDebugEnabled()) log.debug("No version found for ephemeral leader parent node, won't remove previous leader registration. {} {}", leaderPath, leaderSeqPath);
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Exception canceling election", e);
}
- } catch (Exception e) {
- if (e instanceof InterruptedException) {
- ParWork.propagateInterrupt(e);
+ } else {
+ try {
+ if (leaderSeqPath != null) {
+ if (log.isDebugEnabled()) log.debug("Delete leader seq election path {} path we watch is {}", leaderSeqPath, watchedSeqPath);
+ zkClient.delete(leaderSeqPath, -1);
+ }
+ } catch (NoNodeException e) {
+ // fine
}
- Stat stat = new Stat();
- zkClient.getData(Paths.get(leaderPath).getParent().toString(), null, stat);
- log.error("Exception trying to cancel election {} {} {}", stat.getVersion(), e.getClass().getName(), e.getMessage(), e);
+ if (log.isDebugEnabled()) log.debug("No version found for ephemeral leader parent node, won't remove previous leader registration. {} {}", leaderPath, leaderSeqPath);
}
- leaderZkNodeParentVersion = null;
+ } catch (Exception e) {
+ if (e instanceof InterruptedException) {
+ ParWork.propagateInterrupt(e);
+ }
+ Stat stat = new Stat();
+ zkClient.getData(Paths.get(leaderPath).getParent().toString(), null, stat);
+ log.error("Exception trying to cancel election {} {} {}", stat.getVersion(), e.getClass().getName(), e.getMessage(), e);
+ }
+ leaderZkNodeParentVersion = null;
}
@Override
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
index f83075d..63418b4 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
@@ -18,11 +18,9 @@
package org.apache.solr.handler.admin;
import org.apache.solr.cloud.ZkController.NotInClusterStateException;
-import org.apache.solr.cloud.ZkShardTerms;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Replica;
-import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.SolrParams;
@@ -97,20 +95,5 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
error = "Timeout waiting for collection state. \n" + coreContainer.getZkController().getZkStateReader().getClusterState().getCollectionOrNull(collection);
throw new NotInClusterStateException(ErrorCode.SERVER_ERROR, error);
}
-
- try {
- ZkShardTerms shardTerms = coreContainer.getZkController().getShardTermsOrNull(collection, shard);
- // if the replica is waiting for leader to see recovery state, the leader should refresh its terms
- if (shardTerms != null && waitForState == Replica.State.RECOVERING && shardTerms.registered(cname) && shardTerms.skipSendingUpdatesTo(cname)) {
- // The replica changed its term, then published itself as RECOVERING.
- // This core already see replica as RECOVERING
- // so it is guarantees that a live-fetch will be enough for this core to see max term published
- log.info("refresh shard terms for core {}", cname);
- shardTerms.refreshTerms(false);
- }
- } catch (Exception e) {
- log.error("Exception while looking at refreshing shard terms", e);
- }
-
}
}
diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java
index a788ca4..84b3c04 100644
--- a/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java
+++ b/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java
@@ -50,9 +50,9 @@ public class SolrQoSFilter extends QoSFilter {
@Override
public void init(FilterConfig filterConfig) {
super.init(filterConfig);
- _origMaxRequests = Integer.getInteger("solr.concurrentRequests.max", 5000);
+ _origMaxRequests = Integer.getInteger("solr.concurrentRequests.max", 10000);
super.setMaxRequests(_origMaxRequests);
- super.setSuspendMs(Integer.getInteger("solr.concurrentRequests.suspendms", 20000));
+ super.setSuspendMs(Integer.getInteger("solr.concurrentRequests.suspendms", 30000));
super.setWaitMs(Integer.getInteger("solr.concurrentRequests.waitms", 2000));
}
@@ -82,27 +82,27 @@ public class SolrQoSFilter extends QoSFilter {
} else {
// nocommit - deal with no supported, use this as a fail safe with high and low watermark?
- if (ourLoad < 0.70 && sLoad < 1.0 && _origMaxRequests != getMaxRequests()) {
+ if (ourLoad < 0.90 && sLoad < 1.6 && _origMaxRequests != getMaxRequests()) {
if (sLoad < 0.9) {
if (log.isDebugEnabled()) log.debug("set max concurrent requests to orig value {}", _origMaxRequests);
updateMaxRequests(_origMaxRequests, sLoad, ourLoad);
} else {
- updateMaxRequests(Math.min(_origMaxRequests, (int) Math.round(getMaxRequests() * 1.5D)), sLoad, ourLoad);
+ updateMaxRequests(Math.min(_origMaxRequests, Math.round(getMaxRequests() * 3)), sLoad, ourLoad);
}
} else {
- if (sLoad > 1.1) {
+ if (ourLoad > 0.90 && sLoad > 1.5) {
int cMax = getMaxRequests();
if (cMax > 5) {
int max = Math.max(5, (int) ((double) cMax * 0.30D));
// log.warn("System load is {} and our load is {} procs is {}, set max concurrent requests to {}", sLoad, ourLoad, SysStats.PROC_COUNT, max);
updateMaxRequests(max, sLoad, ourLoad);
}
- } else if (ourLoad < 0.70 && sLoad < 1.0 && _origMaxRequests != getMaxRequests()) {
+ } else if (ourLoad < 0.90 && sLoad < 2 && _origMaxRequests != getMaxRequests()) {
if (sLoad < 0.9) {
if (log.isDebugEnabled()) log.debug("set max concurrent requests to orig value {}", _origMaxRequests);
updateMaxRequests(_origMaxRequests, sLoad, ourLoad);
} else {
- updateMaxRequests(Math.min(_origMaxRequests, (int) Math.round(getMaxRequests() * 1.5D)), sLoad, ourLoad);
+ updateMaxRequests(Math.min(_origMaxRequests, Math.round(getMaxRequests() * 3)), sLoad, ourLoad);
}
}