You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/01/21 21:33:07 UTC
[lucene-solr] 05/05: @1275 Fix wait for down and some lock and
recovery issues.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 561cdac5214c9927c8c1f2adeae2da3d0c5e0c26
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Jan 21 15:17:01 2021 -0600
@1275 Fix wait for down and some lock and recovery issues.
---
.../org/apache/solr/cloud/RecoveryStrategy.java | 68 +++++++++++++---------
.../solr/cloud/ShardLeaderElectionContext.java | 2 +
.../solr/cloud/ShardLeaderElectionContextBase.java | 9 ++-
.../java/org/apache/solr/cloud/StatePublisher.java | 2 +-
.../java/org/apache/solr/cloud/ZkController.java | 10 +++-
.../java/org/apache/solr/core/CoreContainer.java | 49 ++++++++--------
.../src/java/org/apache/solr/core/SolrCore.java | 2 +-
.../java/org/apache/solr/handler/IndexFetcher.java | 1 +
.../apache/solr/handler/admin/PrepRecoveryOp.java | 2 +-
.../handler/component/RealTimeGetComponent.java | 3 -
.../apache/solr/update/DefaultSolrCoreState.java | 18 +++---
.../java/org/apache/solr/update/SolrCoreState.java | 2 +-
.../org/apache/solr/update/TimedVersionBucket.java | 2 +-
.../processor/DistributedZkUpdateProcessor.java | 2 +-
14 files changed, 96 insertions(+), 76 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 32aa54b..1272ad8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -337,7 +337,9 @@ public class RecoveryStrategy implements Runnable, Closeable {
}
final public void doRecovery(SolrCore core) throws Exception {
+ int tries = 0;
while (!isClosed()) {
+ tries++;
try {
try {
if (prevSendPreRecoveryHttpUriRequest != null) {
@@ -370,11 +372,14 @@ public class RecoveryStrategy implements Runnable, Closeable {
}
if (successfulRecovery) {
+ close = true;
break;
+ } else {
+ log.info("Trying another loop to recover after failing try={}", tries);
}
} catch (Exception e) {
- log.info("Exception trying to recover, try again", e);
+ log.info("Exception trying to recover, try again try={}", tries, e);
}
}
}
@@ -593,10 +598,6 @@ public class RecoveryStrategy implements Runnable, Closeable {
CloudDescriptor cloudDesc = this.coreDescriptor.getCloudDescriptor();
leader = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 1500);
- if (isClosed()) {
- throw new AlreadyClosedException();
- }
-
log.info("Begin buffering updates. core=[{}]", coreName);
// recalling buffer updates will drop the old buffer tlog
ulog.bufferUpdates();
@@ -632,26 +633,34 @@ public class RecoveryStrategy implements Runnable, Closeable {
// System.out.println("Attempting to PeerSync from " + leaderUrl
// + " i am:" + zkController.getNodeName());
- boolean syncSuccess;
- try (PeerSyncWithLeader peerSyncWithLeader = new PeerSyncWithLeader(core, leader.getCoreUrl(), ulog.getNumRecordsToKeep())) {
- syncSuccess = peerSyncWithLeader.sync(recentVersions).isSuccess();
- }
- if (syncSuccess) {
- SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams());
- log.info("PeerSync was successful, commit to force open a new searcher");
- // force open a new searcher
- core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
- req.close();
- log.info("PeerSync stage of recovery was successful.");
-
- // solrcloud_debug
- // cloudDebugLog(core, "synced");
-
- log.info("Replaying updates buffered during PeerSync.");
- replay(core);
+ try {
+ boolean syncSuccess;
+ try (PeerSyncWithLeader peerSyncWithLeader = new PeerSyncWithLeader(core, leader.getCoreUrl(), ulog.getNumRecordsToKeep())) {
+ syncSuccess = peerSyncWithLeader.sync(recentVersions).isSuccess();
+ }
+ if (syncSuccess) {
+ SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams());
+ log.info("PeerSync was successful, commit to force open a new searcher");
+ // force open a new searcher
+ core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
+ req.close();
+ log.info("PeerSync stage of recovery was successful.");
+
+ // solrcloud_debug
+ // cloudDebugLog(core, "synced");
+
+ log.info("Replaying updates buffered during PeerSync.");
+ replay(core);
+
+ // sync success
+ successfulRecovery = true;
+ } else {
+ successfulRecovery = false;
+ }
- // sync success
- successfulRecovery = true;
+ } catch (Exception e) {
+ log.error("PeerSync exception", e);
+ successfulRecovery = false;
}
if (!successfulRecovery) {
@@ -681,6 +690,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
log.info("Interrupted or already closed, bailing on recovery");
close = true;
successfulRecovery = false;
+ break;
} catch (Exception e) {
successfulRecovery = false;
log.error("Error while trying to recover", e);
@@ -710,14 +720,16 @@ public class RecoveryStrategy implements Runnable, Closeable {
zkController.publish(this.coreDescriptor, Replica.State.ACTIVE);
publishedActive = true;
+ close = true;
} catch (AlreadyClosedException e) {
log.error("Already closed");
successfulRecovery = false;
+ close = true;
} catch (Exception e) {
log.error("Could not publish as ACTIVE after successful recovery", e);
successfulRecovery = false;
- // core.getSolrCoreState().doRecovery(core);
+ close = false;
}
@@ -731,7 +743,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
}
- if (!successfulRecovery) {
+ if (!successfulRecovery && !isClosed()) {
// lets pause for a moment and we need to try again...
// TODO: we don't want to retry for some problems?
// Or do a fall off retry...
@@ -754,9 +766,9 @@ public class RecoveryStrategy implements Runnable, Closeable {
}
}
- if (!successfulRecovery) {
+ if (!successfulRecovery && !isClosed()) {
waitForRetry();
- } else {
+ } else if (successfulRecovery) {
break;
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
index 8256084..56eb674 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
@@ -319,6 +319,8 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
log.info("There may be a better leader candidate than us - will cancel election, rejoin election, and kick off recovery");
leaderElector.retryElection(false);
+
+ core.getSolrCoreState().doRecovery(core);
}
public String getShardId() {
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
index b11c180..abaee54 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
@@ -181,11 +181,16 @@ class ShardLeaderElectionContextBase extends ElectionContext {
//assert leaderZkNodeParentVersion != null;
} catch (NoNodeException e) {
+ log.warn("No node exists for election", e);
throw new AlreadyClosedException("No node exists for election");
} catch (KeeperException.NodeExistsException e) {
- throw new AlreadyClosedException("Node already exists for election");
+ log.warn("Node already exists for election", e);
+
+ zkClient.delete(leaderPath, -1);
+
+ runLeaderProcess(context, weAreReplacement, pauseBeforeStartMs);
} catch (Throwable t) {
- ParWork.propagateInterrupt(t);
+ log.warn("Could not register as the leader because creating the ephemeral registration node in ZooKeeper failed: ", t);
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not register as the leader because creating the ephemeral registration node in ZooKeeper failed: " + errors, t);
}
return true;
diff --git a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
index 341ffa3..45d5fda 100644
--- a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
+++ b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
@@ -148,7 +148,7 @@ public class StatePublisher implements Closeable {
Replica replica = zkStateReader.getClusterState().getCollection(collection).getReplica(core);
String lastState = stateCache.get(core);
// nocommit
- if (collection != null && replica != null && state.equals(lastState) && replica.getState().toString().equals(state)) {
+ if (collection != null && replica != null && !state.equals(Replica.State.ACTIVE) && state.equals(lastState) && replica.getState().toString().equals(state)) {
log.info("Skipping publish state as {} for {}, because it was the last state published", state, core);
// nocommit
return;
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 8ebdfe9..c4e9db6 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -417,6 +417,14 @@ public class ZkController implements Closeable, Runnable {
started = true;
+ try {
+ if (zkClient.exists( ZkStateReader.LIVE_NODES_ZKNODE + "/" + getNodeName())) {
+ removeEphemeralLiveNode();
+ }
+ } catch (Exception e) {
+ ParWork.propagateInterrupt("Error Removing ephemeral live node. Continuing to close CoreContainer", e);
+ }
+
this.overseer = new Overseer(cc.getUpdateShardHandler(), CommonParams.CORES_HANDLER_PATH, this, cloudConfig);
try {
this.overseerRunningMap = Overseer.getRunningMap(zkClient);
@@ -1407,7 +1415,7 @@ public class ZkController implements Closeable, Runnable {
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
}
- if (log.isDebugEnabled()) log.debug("Wait to see leader for {}, {}", collection, shardId);
+ log.info("Wait to see leader for {}, {}", collection, shardId);
Replica leader = null;
for (int i = 0; i < 30; i++) {
// if (leaderElector.isLeader()) {
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 61566b6..f7be9a1 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -28,6 +28,7 @@ import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
+import org.apache.solr.client.solrj.embedded.EmbeddedSolrServer;
import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.SolrHttpClientBuilder;
@@ -885,6 +886,27 @@ public class CoreContainer implements Closeable {
status |= CORE_DISCOVERY_COMPLETE;
for (final CoreDescriptor cd : cds) {
+ if (isZooKeeperAware()) {
+ String collection = cd.getCollectionName();
+ try {
+ zkSys.zkController.zkStateReader.waitForState(collection, 5, TimeUnit.SECONDS, (n, c) -> {
+ if (c != null) {
+ Replica replica = c.getReplica(cd.getName());
+
+ if (replica.getState().equals(State.DOWN)) {
+ return true;
+ }
+
+ }
+ return false;
+ });
+ } catch (InterruptedException e) {
+ ParWork.propagateInterrupt(e);
+ } catch (TimeoutException e) {
+ log.error("Timeout", e);
+ }
+ }
+
if (log.isDebugEnabled()) log.debug("Process core descriptor {} {} {}", cd.getName(), cd.isTransient(), cd.isLoadOnStartup());
if (cd.isTransient() || !cd.isLoadOnStartup()) {
solrCores.addCoreDescriptor(cd);
@@ -914,32 +936,9 @@ public class CoreContainer implements Closeable {
}));
}
}
- if (zkSys != null && zkSys.getZkController() != null) {
+ if (isZooKeeperAware()) {
+
ParWork.getRootSharedExecutor().submit(() -> {
- Collection<SolrCore> cores = getCores(); // TODO use the cores we just launched, this may not be populated yet
- for (SolrCore core : cores) {
- CoreDescriptor desc = core.getCoreDescriptor();
- String collection = desc.getCollectionName();
- try {
- zkSys.zkController.zkStateReader.waitForState(collection, 5, TimeUnit.SECONDS, (n, c) -> {
- if (c != null) {
- List<Replica> replicas = c.getReplicas();
- for (Replica replica : replicas) {
- if (replica.getNodeName().equals(zkSys.zkController.getNodeName())) {
- if (!replica.getState().equals(Replica.State.DOWN)) {
- return false;
- }
- }
- }
- }
- return true;
- });
- } catch (InterruptedException e) {
- ParWork.propagateInterrupt(e);
- } catch (TimeoutException e) {
- log.error("Timeout", e);
- }
- }
zkSys.getZkController().createEphemeralLiveNode();
});
}
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 8d06233..29bc5bc 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -748,7 +748,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
// another recovery waiting behind us, let it run now instead of after we finish
log.info("Skipping reload because there is another in line behind");
lock.unlock();
- Thread.sleep(50);
+ Thread.sleep(10);
lock.lock();
lock.unlock();
return null;
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index ab4821b..972d012 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -1108,6 +1108,7 @@ public class IndexFetcher {
throw e;
} catch (Exception e) {
log.error("Problem downloading file {}", file, e);
+ throw e;
} finally {
fileFetchRequests.remove(file.get(NAME));
}
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
index 45ef666..f83075d 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/PrepRecoveryOp.java
@@ -80,7 +80,7 @@ class PrepRecoveryOp implements CoreAdminHandler.CoreAdminOp {
final Replica replica = c.getReplica(cname);
if (replica != null) {
- if (replica.getState() == waitForState || replica.getState() == Replica.State.ACTIVE && coreContainer.getZkController().getZkStateReader().isNodeLive(replica.getNodeName())) {
+ if (replica.getState() == waitForState && coreContainer.getZkController().getZkStateReader().isNodeLive(replica.getNodeName())) {
// if (log.isDebugEnabled()) log.debug("replica={} state={} waitForState={}", replica, replica.getState(), waitForState);
log.info("replica={} state={} waitForState={}", replica, replica.getState(), waitForState);
return true;
diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
index a7d71d1..63eff4b 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
@@ -1144,9 +1144,6 @@ public class RealTimeGetComponent extends SearchComponent
// TODO: more complex response?
rb.rsp.add("sync", success);
- if (!success && rb.req.getCore().getCoreContainer().isZooKeeperAware() && !rb.req.getCore().getSolrCoreState().isRecoverying()) {
- rb.req.getCore().getSolrCoreState().doRecovery(rb.req.getCore().getCoreContainer(), rb.req.getCore().getCoreDescriptor());
- }
} catch (IOException e) {
log.error("Error while closing", e);
}
diff --git a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
index 475abe4..7c1f4f5 100644
--- a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
+++ b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
@@ -52,7 +52,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
private final boolean SKIP_AUTO_RECOVERY = Boolean.getBoolean("solrcloud.skip.autorecovery");
- private final ReentrantLock recoveryLock = new ReentrantLock(true);
+ private final ReentrantLock recoveryLock = new ReentrantLock(false);
private final ActionThrottle recoveryThrottle = new ActionThrottle("recovery", Integer.getInteger("solr.recoveryThrottle", 0));
@@ -190,7 +190,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
boolean acquired = false;
do {
try {
- acquired = lock.tryLock(100, TimeUnit.MILLISECONDS);
+ acquired = lock.tryLock() || lock.tryLock(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.warn("WARNING - Dangerous interrupt", e);
}
@@ -346,25 +346,19 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
return;
}
-
// if we can't get the lock, another recovery is running
// we check to see if there is already one waiting to go
// after the current one, and if there is, bail
boolean locked = recoveryLock.tryLock();
- if (!locked && recoveryWaiting.get() > 1) {
- log.info("Skipping recovery because there is another already queued");
- return;
- }
-
// if (closed || prepForClose) {
// return;
// }
if (!locked) {
recoveryWaiting.incrementAndGet();
if (log.isDebugEnabled()) log.debug("Wait for recovery lock");
- cancelRecovery();
- while (!recoveryLock.tryLock(250, TimeUnit.MILLISECONDS)) {
+ cancelRecovery(true, false);
+ while (!(recoveryLock.tryLock() || recoveryLock.tryLock(500, TimeUnit.MILLISECONDS))) {
if (closed || prepForClose) {
log.warn("Skipping recovery because we are closed");
recoveryWaiting.decrementAndGet();
@@ -379,12 +373,13 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
}
}
+ recoverying = true;
+
// to be air tight we must also check after lock
if (prepForClose || closed || corecontainer.isShutDown()) {
log.info("Skipping recovery due to being closed");
return;
}
- recoverying = true;
recoveryThrottle.minimumWaitBetweenActions();
recoveryThrottle.markAttemptingAction();
@@ -435,6 +430,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
@Override
public void cancelRecovery(boolean wait, boolean prepForClose) {
+ log.info("Cancel recovery");
recoverying = false;
if (prepForClose) {
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
index 556ac75..c53fe21 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java
@@ -44,7 +44,7 @@ public abstract class SolrCoreState {
protected volatile boolean closed = false;
private final Object updateLock = new Object();
- private final ReentrantLock reloadLock = new ReentrantLock(true);
+ private final ReentrantLock reloadLock = new ReentrantLock(false);
public Object getUpdateLock() {
return updateLock;
diff --git a/solr/core/src/java/org/apache/solr/update/TimedVersionBucket.java b/solr/core/src/java/org/apache/solr/update/TimedVersionBucket.java
index 94e7551..abeca02 100644
--- a/solr/core/src/java/org/apache/solr/update/TimedVersionBucket.java
+++ b/solr/core/src/java/org/apache/solr/update/TimedVersionBucket.java
@@ -44,7 +44,7 @@ public class TimedVersionBucket extends VersionBucket {
boolean success = false;
try {
- success = lock.tryLock(lockTimeoutMs, TimeUnit.MILLISECONDS);
+ success = lock.tryLock() || lock.tryLock(lockTimeoutMs, TimeUnit.MILLISECONDS);
if (success) {
return function.apply();
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index 7ec4dd6..7e8faeb 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -1036,7 +1036,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
ReentrantLock ruleExpiryLock = req.getCore().getRuleExpiryLock();
if (!ruleExpiryLock.isLocked()) {
try {
- if (ruleExpiryLock.tryLock(10, TimeUnit.MILLISECONDS)) {
+ if (ruleExpiryLock.tryLock() || ruleExpiryLock.tryLock(10, TimeUnit.MILLISECONDS)) {
log.info("Going to expire routing rule");
try {
// nocommit TODO: needs to use the statepublisher