You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/12/12 17:28:32 UTC
[lucene-solr] 05/06: @1242 WIP
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 3f42f8f48032e12614273edc141a0b1a00d38fab
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sat Dec 12 10:37:41 2020 -0600
@1242 WIP
---
.../java/org/apache/solr/cloud/LeaderElector.java | 30 +--
.../src/java/org/apache/solr/cloud/Overseer.java | 11 +-
.../apache/solr/cloud/OverseerElectionContext.java | 2 +-
.../org/apache/solr/cloud/RecoveryStrategy.java | 203 +++++++++------------
.../org/apache/solr/cloud/ReplicateFromLeader.java | 18 +-
.../solr/cloud/ShardLeaderElectionContextBase.java | 2 +-
.../java/org/apache/solr/cloud/ZkController.java | 74 +++-----
.../src/java/org/apache/solr/core/SolrCore.java | 4 +-
.../solr/client/solrj/impl/Http2SolrClient.java | 52 +++---
.../apache/solr/common/cloud/ZkStateReader.java | 24 +--
...startup-debug.xml => log4j2-election-debug.xml} | 36 +---
.../src/resources/logconf/log4j2-startup-debug.xml | 4 +-
12 files changed, 198 insertions(+), 262 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
index a9b0d9f..3bc02c0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
+++ b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
@@ -32,6 +32,7 @@ import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -113,7 +114,7 @@ public class LeaderElector implements Closeable {
*
* @param replacement has someone else been the leader already?
*/
- private boolean checkIfIamLeader(final ElectionContext context, boolean replacement) throws KeeperException,
+ private synchronized boolean checkIfIamLeader(final ElectionContext context, boolean replacement) throws KeeperException,
InterruptedException, IOException {
//if (checkClosed(context)) return false;
@@ -209,7 +210,8 @@ public class LeaderElector implements Closeable {
if (context.leaderSeqPath == null) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Election has been cancelled");
}
- zkClient.exists(watchedNode, watcher = new ElectionWatcher(context.leaderSeqPath, watchedNode, context));
+ watcher = new ElectionWatcher(context.leaderSeqPath, watchedNode, context);
+ zkClient.exists(watchedNode, watcher);
state = WAITING_IN_ELECTION;
if (log.isDebugEnabled()) log.debug("Watching path {} to know if I could be the leader, my node is {}", watchedNode, context.leaderSeqPath);
try (SolrCore core = zkController.getCoreContainer().getCore(context.leaderProps.getName())) {
@@ -547,16 +549,22 @@ public class LeaderElector implements Closeable {
return;
}
try {
- // am I the next leader?
- boolean tryagain = checkIfIamLeader(context, true);
- if (tryagain) {
- Thread.sleep(50);
- tryagain = checkIfIamLeader(context, true);
- }
+ if (event.getType() == EventType.NodeDeleted) {
+ // am I the next leader?
+ boolean tryagain = true;
+ while (tryagain) {
+ tryagain = checkIfIamLeader(context, true);
+ }
+ } else {
+ Stat exists = zkClient.exists(watchedNode, this);
+ if (exists == null) {
+ close();
+ boolean tryagain = true;
- if (tryagain) {
- Thread.sleep(50);
- checkIfIamLeader(context, true);
+ while (tryagain) {
+ tryagain = checkIfIamLeader(context, true);
+ }
+ }
}
} catch (AlreadyClosedException | InterruptedException e) {
log.info("Already shutting down");
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index ef0a8f2..f56f6c3 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -337,7 +337,7 @@ public class Overseer implements SolrCloseable {
ThreadGroup ccTg = new ThreadGroup("Overseer collection creation process.");
- this.zkStateWriter = new ZkStateWriter( zkController.getZkStateReader(), stats);
+ this.zkStateWriter = new ZkStateWriter(zkController.getZkStateReader(), stats);
//systemCollectionCompatCheck(new StringBiConsumer());
queueWatcher = new WorkQueueWatcher(getCoreContainer());
@@ -538,15 +538,8 @@ public class Overseer implements SolrCloseable {
overseerOnlyClient = null;
}
- if (taskExecutor != null && taskExecutor.isShutdown() && !taskExecutor.isTerminated()) {
- try {
- taskExecutor.awaitTermination(5, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
-
- }
-
+ if (taskExecutor != null) {
taskExecutor.shutdownNow();
- // ExecutorUtil.shutdownAndAwaitTermination(taskExecutor);
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
index a56e2ea..db607f0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
@@ -37,7 +37,7 @@ final class OverseerElectionContext extends ShardLeaderElectionContextBase {
private final Overseer overseer;
public OverseerElectionContext(final String zkNodeName, SolrZkClient zkClient, Overseer overseer) {
- super(zkNodeName, Overseer.OVERSEER_ELECT, Overseer.OVERSEER_ELECT + "/leader", new Replica(overseer.getZkController().getNodeName(), getIDMap(zkNodeName, overseer), null, null, overseer.getZkStateReader()), zkClient);
+ super(zkNodeName, Overseer.OVERSEER_ELECT, Overseer.OVERSEER_ELECT + "/leader", new Replica("overseer:" + overseer.getZkController().getNodeName(), getIDMap(zkNodeName, overseer), null, null, overseer.getZkStateReader()), zkClient);
this.overseer = overseer;
this.zkClient = zkClient;
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index f8db3eb..74e1439 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -78,8 +78,8 @@ import java.util.concurrent.atomic.AtomicInteger;
public class RecoveryStrategy implements Runnable, Closeable {
private volatile CountDownLatch latch;
- private final ReplicationHandler replicationHandler;
- private final Http2SolrClient recoveryOnlyClient;
+ private volatile ReplicationHandler replicationHandler;
+ private volatile Http2SolrClient recoveryOnlyClient;
public static class Builder implements NamedListInitializedPlugin {
private NamedList args;
@@ -126,8 +126,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
private final AtomicInteger retries = new AtomicInteger(0);
private boolean recoveringAfterStartup;
private volatile Cancellable prevSendPreRecoveryHttpUriRequest;
- private final Replica.Type replicaType;
- private final CoreDescriptor coreDescriptor;
+ private volatile Replica.Type replicaType;
+ private volatile CoreDescriptor coreDescriptor;
private final CoreContainer cc;
@@ -136,23 +136,10 @@ public class RecoveryStrategy implements Runnable, Closeable {
this.cc = cc;
this.coreName = cd.getName();
- try (SolrCore core = cc.getCore(coreName)) {
- if (core == null) {
- log.warn("SolrCore is null, won't do recovery");
- throw new AlreadyClosedException();
- }
- recoveryOnlyClient = core.getCoreContainer().getUpdateShardHandler().getRecoveryOnlyClient();
- SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
- replicationHandler = (ReplicationHandler) handler;
-
- }
-
this.recoveryListener = recoveryListener;
zkController = cc.getZkController();
zkStateReader = zkController.getZkStateReader();
baseUrl = zkController.getBaseUrl();
- replicaType = cd.getCloudDescriptor().getReplicaType();
- this.coreDescriptor = cd;
}
final public int getWaitForUpdatesWithStaleStatePauseMilliSeconds() {
@@ -321,12 +308,26 @@ public class RecoveryStrategy implements Runnable, Closeable {
// set request info for logging
log.info("Starting recovery process. recoveringAfterStartup={}", recoveringAfterStartup);
try {
- doRecovery();
+ try (SolrCore core = cc.getCore(coreName)) {
+ if (core == null) {
+ log.warn("SolrCore is null, won't do recovery");
+ throw new AlreadyClosedException("SolrCore is null, won't do recovery");
+ }
+
+ coreDescriptor = core.getCoreDescriptor();
+ replicaType = coreDescriptor.getCloudDescriptor().getReplicaType();
+
+ recoveryOnlyClient = core.getCoreContainer().getUpdateShardHandler().getRecoveryOnlyClient();
+ SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
+ replicationHandler = (ReplicationHandler) handler;
+
+ doRecovery(core);
+ }
} catch (InterruptedException e) {
log.info("InterruptedException, won't do recovery", e);
return;
} catch (AlreadyClosedException e) {
- log.info("AlreadyClosedException, won't do recovery");
+ log.info("AlreadyClosedException, won't do recovery", e);
return;
} catch (Exception e) {
ParWork.propagateInterrupt(e);
@@ -335,7 +336,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
}
}
- final public void doRecovery() throws Exception {
+ final public void doRecovery(SolrCore core) throws Exception {
// we can lose our core descriptor, so store it now
// try {
// Replica leader = zkController.getZkStateReader().getLeaderRetry(coreDescriptor.getCollectionName(), coreDescriptor.getCloudDescriptor().getShardId(), 15000);
@@ -353,14 +354,14 @@ public class RecoveryStrategy implements Runnable, Closeable {
if (this.coreDescriptor.getCloudDescriptor().requiresTransactionLog()) {
log.info("Sync or replica recovery");
- doSyncOrReplicateRecovery();
+ doSyncOrReplicateRecovery(core);
} else {
log.info("Replicate only recovery");
- doReplicateOnlyRecovery();
+ doReplicateOnlyRecovery(core);
}
}
- final private void doReplicateOnlyRecovery() throws Exception {
+ final private void doReplicateOnlyRecovery(SolrCore core) throws Exception {
boolean successfulRecovery = false;
// if (core.getUpdateHandler().getUpdateLog() != null) {
@@ -370,7 +371,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
// return;
// }
- log.info("Publishing state of core [{}] as recovering", coreName);
+ log.info("Publishing state of core [{}] as recovering {}", coreName, "doReplicateOnlyRecovery");
zkController.publish(this.coreDescriptor, Replica.State.RECOVERING);
@@ -462,30 +463,23 @@ public class RecoveryStrategy implements Runnable, Closeable {
}
// TODO: perhaps make this grab a new core each time through the loop to handle core reloads?
- public final void doSyncOrReplicateRecovery() throws Exception {
+ public final void doSyncOrReplicateRecovery(SolrCore core) throws Exception {
log.info("Do peersync or replication recovery core={} collection={}", coreName, coreDescriptor.getCollectionName());
- log.info("Publishing state of core [{}] as recovering", coreName);
+ log.info("Publishing state of core [{}] as recovering {}", coreName, "doSyncOrReplicateRecovery");
zkController.publish(this.coreDescriptor, Replica.State.RECOVERING);
boolean successfulRecovery = false;
boolean publishedActive = false;
UpdateLog ulog;
- try (SolrCore core = cc.getCore(coreName)) {
- if (core == null) {
- log.warn("SolrCore is null, won't do recovery");
- close = true;
- throw new AlreadyClosedException();
- }
- ulog = core.getUpdateHandler().getUpdateLog();
- if (ulog == null) {
- SolrException.log(log, "No UpdateLog found - cannot recover.");
- close = true;
- recoveryFailed(zkController, baseUrl, this.coreDescriptor);
- return;
- }
+ ulog = core.getUpdateHandler().getUpdateLog();
+ if (ulog == null) {
+ SolrException.log(log, "No UpdateLog found - cannot recover.");
+ close = true;
+ recoveryFailed(zkController, baseUrl, this.coreDescriptor);
+ return;
}
// we temporary ignore peersync for tlog replicas
@@ -595,37 +589,31 @@ public class RecoveryStrategy implements Runnable, Closeable {
if (log.isInfoEnabled()) {
log.info("Attempting to PeerSync from [{}] - recoveringAfterStartup=[{}]", leader.getCoreUrl(), recoveringAfterStartup);
}
- try (SolrCore core = cc.getCore(coreName)) {
- if (core == null) {
- log.warn("SolrCore is null, won't do recovery");
- close = true;
- successfulRecovery = false;
- }
- // System.out.println("Attempting to PeerSync from " + leaderUrl
- // + " i am:" + zkController.getNodeName());
- boolean syncSuccess;
- try (PeerSyncWithLeader peerSyncWithLeader = new PeerSyncWithLeader(core, leader.getCoreUrl(), ulog.getNumRecordsToKeep())) {
- syncSuccess = peerSyncWithLeader.sync(recentVersions).isSuccess();
- }
- if (syncSuccess) {
- SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams());
- log.info("PeerSync was successful, commit to force open a new searcher");
- // force open a new searcher
- core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
- req.close();
- log.info("PeerSync stage of recovery was successful.");
-
- // solrcloud_debug
- // cloudDebugLog(core, "synced");
-
- log.info("Replaying updates buffered during PeerSync.");
- replay();
-
- // sync success
- successfulRecovery = true;
- }
+ // System.out.println("Attempting to PeerSync from " + leaderUrl
+ // + " i am:" + zkController.getNodeName());
+ boolean syncSuccess;
+ try (PeerSyncWithLeader peerSyncWithLeader = new PeerSyncWithLeader(core, leader.getCoreUrl(), ulog.getNumRecordsToKeep())) {
+ syncSuccess = peerSyncWithLeader.sync(recentVersions).isSuccess();
+ }
+ if (syncSuccess) {
+ SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams());
+ log.info("PeerSync was successful, commit to force open a new searcher");
+ // force open a new searcher
+ core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
+ req.close();
+ log.info("PeerSync stage of recovery was successful.");
+
+ // solrcloud_debug
+ // cloudDebugLog(core, "synced");
+
+ log.info("Replaying updates buffered during PeerSync.");
+ replay(core);
+
+ // sync success
+ successfulRecovery = true;
}
+
if (!successfulRecovery) {
log.info("PeerSync Recovery was not successful - trying replication.");
}
@@ -645,7 +633,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
throw new SolrException(ErrorCode.SERVER_ERROR, "Replication fetch reported as failed");
}
- replay();
+ replay(core);
log.info("Replication Recovery was successful.");
successfulRecovery = true;
@@ -673,14 +661,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
// then we still need to update version bucket seeds after recovery
if (successfulRecovery && replayFuture == null) {
log.info("Updating version bucket highest from index after successful recovery.");
- try (SolrCore core = cc.getCore(coreName)) {
- if (core == null) {
- log.warn("SolrCore is null, won't do recovery");
- successfulRecovery = false;
- } else {
- core.seedVersionBuckets();
- }
- }
+
+ core.seedVersionBuckets();
}
zkController.publish(this.coreDescriptor, Replica.State.ACTIVE);
@@ -780,51 +762,46 @@ public class RecoveryStrategy implements Runnable, Closeable {
public static Runnable testing_beforeReplayBufferingUpdates;
- final private void replay()
+ final private void replay(SolrCore core)
throws InterruptedException, ExecutionException {
if (testing_beforeReplayBufferingUpdates != null) {
testing_beforeReplayBufferingUpdates.run();
}
- try (SolrCore core = cc.getCore(coreName)) {
- if (core == null) {
- log.warn("SolrCore is null, won't do recovery");
- close = true;
- throw new AlreadyClosedException();
+
+ if (replicaType == Replica.Type.TLOG) {
+ // roll over all updates during buffering to new tlog, make RTG available
+ try (SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams())) {
+ core.getUpdateHandler().getUpdateLog().copyOverBufferingUpdates(new CommitUpdateCommand(req, false));
}
- if (replicaType == Replica.Type.TLOG) {
- // roll over all updates during buffering to new tlog, make RTG available
- try (SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams())) {
- core.getUpdateHandler().getUpdateLog().copyOverBufferingUpdates(new CommitUpdateCommand(req, false));
- }
+ }
+ Future<RecoveryInfo> future = core.getUpdateHandler().getUpdateLog().applyBufferedUpdates();
+ if (future == null) {
+ // no replay needed\
+ log.info("No replay needed.");
+ return;
+ } else {
+ log.info("Replaying buffered documents.");
+ // wait for replay
+ RecoveryInfo report;
+ try {
+ report = future.get(10, TimeUnit.MINUTES); // nocommit - how long? make configurable too
+ } catch (InterruptedException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Replay failed");
+ } catch (TimeoutException e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
- Future<RecoveryInfo> future = core.getUpdateHandler().getUpdateLog().applyBufferedUpdates();
- if (future == null) {
- // no replay needed\
- log.info("No replay needed.");
- return;
- } else {
- log.info("Replaying buffered documents.");
- // wait for replay
- RecoveryInfo report;
- try {
- report = future.get(10, TimeUnit.MINUTES); // nocommit - how long? make configurable too
- } catch (InterruptedException e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Replay failed");
- } catch (TimeoutException e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
- }
- if (report.failed) {
- SolrException.log(log, "Replay failed");
- throw new SolrException(ErrorCode.SERVER_ERROR, "Replay failed");
- }
+ if (report.failed) {
+ SolrException.log(log, "Replay failed");
+ throw new SolrException(ErrorCode.SERVER_ERROR, "Replay failed");
}
+ }
- // the index may ahead of the tlog's caches after recovery, by calling this tlog's caches will be purged
- UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
- if (ulog != null) {
- ulog.openRealtimeSearcher();
- }
+ // the index may ahead of the tlog's caches after recovery, by calling this tlog's caches will be purged
+ UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
+ if (ulog != null) {
+ ulog.openRealtimeSearcher();
}
+
// solrcloud_debug
// cloudDebugLog(core, "replayed");
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
index adb0e23..eb6c062 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
@@ -24,6 +24,7 @@ import java.lang.invoke.MethodHandles;
import org.apache.lucene.index.IndexCommit;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.ObjectReleaseTracker;
@@ -96,7 +97,7 @@ public class ReplicateFromLeader implements Closeable {
replicationProcess = new ReplicationHandler();
if (switchTransactionLog) {
replicationProcess.setPollListener((solrCore, fetchResult) -> {
- if (fetchResult == IndexFetcher.IndexFetchResult.INDEX_FETCH_SUCCESS) {
+ if (fetchResult.getSuccessful()) {
String commitVersion = getCommitVersion(core);
if (commitVersion == null) return;
if (Long.parseLong(commitVersion) == lastVersion) return;
@@ -107,6 +108,21 @@ public class ReplicateFromLeader implements Closeable {
cuc.setVersion(Long.parseLong(commitVersion));
updateLog.commitAndSwitchToNewTlog(cuc);
lastVersion = Long.parseLong(commitVersion);
+ try {
+ cc.getZkController().publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
+ } catch (Exception e) {
+ log.warn("Failed publishing as ACTIVE", e);
+ }
+ }
+ });
+ } else {
+ replicationProcess.setPollListener((solrCore, fetchResult) -> {
+ if (fetchResult.getSuccessful()) {
+ try {
+ cc.getZkController().publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
+ } catch (Exception e) {
+ log.warn("Failed publishing as ACTIVE", e);
+ }
}
});
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
index cbb5e3d..5947da4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
@@ -124,7 +124,7 @@ class ShardLeaderElectionContextBase extends ElectionContext {
} catch (NoNodeException e) {
// fine
}
- if (log.isDebugEnabled()) log.debug("No version found for ephemeral leader parent node, won't remove previous leader registration. {}", leaderSeqPath);
+ if (log.isDebugEnabled()) log.debug("No version found for ephemeral leader parent node, won't remove previous leader registration. {} {}", leaderPath, leaderSeqPath);
}
leaderSeqPath = null;
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 375bea0..e2c094d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -542,17 +542,11 @@ public class ZkController implements Closeable, Runnable {
});
zkClient.setDisconnectListener(() -> {
try (ParWork worker = new ParWork("disconnected", true, true)) {
- worker.collect(ZkController.this.overseerElector);
worker.collect(ZkController.this.overseer);
-
+ worker.collect(leaderElectors.values());
worker.collect("clearZkCollectionTerms", () -> {
clearZkCollectionTerms();
});
- if (zkClient.isAlive()) {
- synchronized (leaderElectors) {
- worker.collect(leaderElectors.values());
- }
- }
}
});
@@ -652,9 +646,7 @@ public class ZkController implements Closeable, Runnable {
}
}
- synchronized (leaderElectors) {
- closer.collect(leaderElectors);
- }
+ closer.collect(leaderElectors);
closer.collect(overseerElector);
@@ -678,9 +670,7 @@ public class ZkController implements Closeable, Runnable {
});
} finally {
- synchronized (leaderElectors) {
- leaderElectors.clear();
- }
+ leaderElectors.clear();
}
}
@@ -695,9 +685,7 @@ public class ZkController implements Closeable, Runnable {
this.isClosed = true;
try (ParWork closer = new ParWork(this, true, true)) {
- synchronized (leaderElectors) {
- closer.collect(leaderElectors);
- }
+ closer.collect(leaderElectors);
collectionToTerms.forEach((s, zkCollectionTerms) -> closer.collect(zkCollectionTerms));
}
@@ -1357,7 +1345,6 @@ public class ZkController implements Closeable, Runnable {
throw new AlreadyClosedException();
}
- boolean success = false;
try {
final String baseUrl = getBaseUrl();
final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
@@ -1406,7 +1393,7 @@ public class ZkController implements Closeable, Runnable {
}
log.info("Register replica - core:{} address:{} collection:{} shard:{} type={}", coreName, baseUrl, collection, shardId, replica.getType());
- synchronized (leaderElectors) {
+
LeaderElector leaderElector = leaderElectors.get(replica.getName());
if (leaderElector == null) {
ContextKey contextKey = new ContextKey(collection, coreName);
@@ -1418,7 +1405,7 @@ public class ZkController implements Closeable, Runnable {
LeaderElector oldElector = leaderElectors.put(replica.getName(), leaderElector);
IOUtils.closeQuietly(oldElector);
}
- }
+
//
try {
@@ -1427,12 +1414,6 @@ public class ZkController implements Closeable, Runnable {
if (replica.getType() != Type.PULL) {
// nocommit review
joinElection(desc, joinAtHead);
- } else if (replica.getType() == Type.PULL) {
- if (joinAtHead) {
- log.warn("Replica {} was designated as preferred leader but it's type is {}, It won't join election", coreName, Type.PULL);
- }
- log.debug("Replica {} skipping election because it's type is {}", coreName, Type.PULL);
- startReplicationFromLeader(coreName, false);
}
} catch (InterruptedException e) {
ParWork.propagateInterrupt(e);
@@ -1502,18 +1483,20 @@ public class ZkController implements Closeable, Runnable {
}
}
- boolean didRecovery = checkRecovery(isLeader, collection, coreName, shardId, core, cc);
+ // boolean didRecovery = checkRecovery(isLeader, collection, coreName, shardId, core, cc);
- if (!didRecovery) {
- if (isTlogReplicaAndNotLeader) {
- startReplicationFromLeader(coreName, true);
- }
+ if (isTlogReplicaAndNotLeader) {
+ startReplicationFromLeader(coreName, true);
+ }
- if (!isLeader) {
- publish(desc, Replica.State.ACTIVE, true);
- }
+ if (replica.getType() == Type.PULL) {
+ startReplicationFromLeader(coreName, false);
}
+ // if (!isLeader) {
+ // publish(desc, Replica.State.ACTIVE, true);
+ // }
+
if (replica.getType() != Type.PULL) {
// the watcher is added to a set so multiple calls of this method will left only one watcher
if (log.isDebugEnabled()) log.debug("add shard terms listener for {}", coreName);
@@ -1527,7 +1510,6 @@ public class ZkController implements Closeable, Runnable {
registerUnloadWatcher(cloudDesc.getCollectionName(), cloudDesc.getShardId(), desc.getName());
log.info("SolrCore Registered, core{} baseUrl={} collection={}, shard={}", coreName, baseUrl, collection, shardId);
- success = true;
return shardId;
} finally {
MDCLoggingContext.clear();
@@ -1667,16 +1649,14 @@ public class ZkController implements Closeable, Runnable {
Replica replica = new Replica(cd.getName(), props, collection, shardId, zkStateReader);
LeaderElector leaderElector;
- synchronized (leaderElectors) {
- leaderElector = leaderElectors.get(replica.getName());
- if (leaderElector == null) {
- ContextKey contextKey = new ContextKey(collection, replica.getName());
- leaderElector = new LeaderElector(this, contextKey);
- LeaderElector oldElector = leaderElectors.put(replica.getName(), leaderElector);
- IOUtils.closeQuietly(oldElector);
- } else {
- leaderElector.cancel();
- }
+ leaderElector = leaderElectors.get(replica.getName());
+ if (leaderElector == null) {
+ ContextKey contextKey = new ContextKey(collection, replica.getName());
+ leaderElector = new LeaderElector(this, contextKey);
+ LeaderElector oldElector = leaderElectors.put(replica.getName(), leaderElector);
+ IOUtils.closeQuietly(oldElector);
+ } else {
+ leaderElector.cancel();
}
ElectionContext context = new ShardLeaderElectionContext(leaderElector, shardId,
@@ -1798,10 +1778,7 @@ public class ZkController implements Closeable, Runnable {
if (state == Replica.State.RECOVERING && cd.getCloudDescriptor().getReplicaType() != Type.PULL) {
// state is used by client, state of replica can change from RECOVERING to DOWN without needed to finish recovery
// by calling this we will know that a replica actually finished recovery or not
- ZkShardTerms shardTerms = getShardTermsOrNull(collection, shardId);
- if (shardTerms == null) {
- throw new AlreadyClosedException();
- }
+ ZkShardTerms shardTerms = getShardTerms(collection, shardId);
shardTerms.startRecovering(cd.getName());
}
if (state == Replica.State.ACTIVE && cd.getCloudDescriptor().getReplicaType() != Type.PULL) {
@@ -1886,7 +1863,6 @@ public class ZkController implements Closeable, Runnable {
ZkCollectionTerms ct = collectionToTerms.get(collection);
if (ct != null) {
ct.remove(cd.getCloudDescriptor().getShardId(), cd);
- if (ct.cleanUp()) IOUtils.closeQuietly(collectionToTerms.remove(collection));
}
} finally {
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 56a38db..0911be1 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -1928,7 +1928,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
} finally {
- log.info("close done refcount {} {}", refCount == null ? null : refCount.get(), name);
+ if (log.isDebugEnabled()) log.debug("close done refcount {} {}", refCount == null ? null : refCount.get(), name);
refCount.set(-1);
if (reloadLock != null && reloadLock.isHeldByCurrentThread()) reloadLock.unlock();
assert ObjectReleaseTracker.release(this);
@@ -1936,8 +1936,6 @@ public final class SolrCore implements SolrInfoBean, Closeable {
//areAllSearcherReferencesEmpty();
-
-
synchronized (closeAndWait) {
closeAndWait.notifyAll();
}
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index c9cbd02..590cbf1 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -435,8 +435,6 @@ public class Http2SolrClient extends SolrClient {
public Cancellable asyncRequest(@SuppressWarnings({"rawtypes"}) SolrRequest solrRequest, String collection, AsyncListener<NamedList<Object>> asyncListener) {
Integer idleTimeout = solrRequest.getParams().getInt("idleTimeout");
-
-
Request req;
try {
req = makeRequest(solrRequest, collection);
@@ -1451,43 +1449,39 @@ public class Http2SolrClient extends SolrClient {
try {
asyncListener.onSuccess(stream);
} catch (Exception e) {
+ log.error("Exception in async stream listener",e);
+ }
+ });
+ }
+
+ public void onComplete(Result result) {
+ try {
+ super.onComplete(result);
+ } finally {
+ try {
if (stream != null) {
try {
while (stream.read() != -1) {
}
- } catch (IOException e1) {
+ } catch (IOException e) {
// quietly
}
}
- if (SolrException.getRootCause(e) != CANCELLED_EXCEPTION) {
- asyncListener.onFailure(e);
- }
- }
- });
- }
-
- public void onComplete(Result result) {
-
- super.onComplete(result);
-
- if (stream != null) {
- try {
- while (stream.read() != -1) {
+ } finally {
+ if (result.isFailed()) {
+ Throwable failure = result.getFailure();
+
+ if (failure != CANCELLED_EXCEPTION) { // avoid retrying on load balanced search requests - keep in mind this
+ // means cancelled requests won't notify the caller of fail or complete
+ try {
+ asyncListener.onFailure(new SolrServerException(failure.getMessage(), failure));
+ } catch (Exception e) {
+ log.error("Exception in async failure listener",e);
+ }
+ }
}
- } catch (IOException e) {
- // quietly
}
}
-
- if (result.isFailed()) {
- Throwable failure = result.getFailure();
-
- if (failure != CANCELLED_EXCEPTION) { // avoid retrying on load balanced search requests - keep in mind this
- // means cancelled requests won't notify the caller of fail or complete
- asyncListener.onFailure(new SolrServerException(failure.getMessage(), failure));
- }
- }
-
}
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 7732d36..7b3ec54 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -895,6 +895,13 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
if (log.isDebugEnabled()) log.debug("Closing ZkStateReader");
if (closeTracker != null) closeTracker.close();
this.closed = true;
+
+ synchronized (this) {
+ if (collectionPropsCacheCleaner != null) {
+ collectionPropsCacheCleaner.cancel(true);
+ }
+ }
+
if (notifications != null) {
notifications.shutdown();
}
@@ -902,29 +909,24 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
stateWatchersMap.forEach((s, stateWatcher) -> IOUtils.closeQuietly(stateWatcher));
stateWatchersMap.clear();
- waitLatches.forEach(c -> { for (int i = 0; i < c.getCount(); i++) c.countDown(); });
- waitLatches.clear();
-
try {
if (closeClient) {
IOUtils.closeQuietly(zkClient);
}
try {
if (collectionPropsCacheCleaner != null) {
- collectionPropsCacheCleaner.cancel(true);
+ collectionPropsCacheCleaner.cancel(false);
}
} catch (NullPointerException e) {
// okay
}
if (notifications != null) {
- try {
- boolean success = notifications.awaitTermination(1, TimeUnit.SECONDS);
- if (!success) notifications.shutdownNow();
- } catch (InterruptedException e) {
- ParWork.propagateInterrupt(e);
- }
+ notifications.shutdownNow();
}
+ waitLatches.forEach(c -> { for (int i = 0; i < c.getCount(); i++) c.countDown(); });
+ waitLatches.clear();
+
} finally {
assert ObjectReleaseTracker.release(this);
}
@@ -2028,7 +2030,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
try {
// wait for the watcher predicate to return true, or time out
- if (!latch.await(wait, unit)) {
+ if (!latch.await(wait, unit) || isClosed()) {
throw new TimeoutException("Timeout waiting to see state for collection=" + collection + " :" + "live=" + liveNodes
+ docCollection.get());
}
diff --git a/solr/test-framework/src/resources/logconf/log4j2-startup-debug.xml b/solr/test-framework/src/resources/logconf/log4j2-election-debug.xml
similarity index 58%
copy from solr/test-framework/src/resources/logconf/log4j2-startup-debug.xml
copy to solr/test-framework/src/resources/logconf/log4j2-election-debug.xml
index bcc8267..c1bea34 100644
--- a/solr/test-framework/src/resources/logconf/log4j2-startup-debug.xml
+++ b/solr/test-framework/src/resources/logconf/log4j2-election-debug.xml
@@ -20,11 +20,11 @@
<Appenders>
<Console name="STDERR_COLOR" target="SYSTEM_ERR">
- <PatternLayout pattern="%style{%-4r}{yellow} %highlight{%maxLen{%-5p}{6}} [%style{%X{node_name} %X{collection} %X{shard} %X{replica} %X{trace_id}}{cyan}] %style{%c{1.}}{cyan} %highlight{%m %notEmpty{%ex}}\n"/>
+ <PatternLayout pattern="%style{%-4r}{yellow} %highlight{%maxLen{%-5p}{6}} [%style{%X{replica} %X{trace_id}}{cyan}] %style{%c{1.}}{cyan} %highlight{%m %notEmpty{%ex}}\n"/>
</Console>
<File name="FILE" fileName="${sys:user.home}/solr-test.log" immediateFlush="false" append="false">
- <PatternLayout pattern="%style{%-4r}{yellow} %highlight{%maxLen{%-5p}{6}} %style{(%t)}{yellow,bold} [%style{%X{node_name} %X{collection} %X{shard} %X{replica} %X{trace_id}}{cyan}] %style{%c{1.}}{cyan} %highlight{%m %notEmpty{%ex}}\n"/>
+ <PatternLayout pattern="%style{%-4r}{yellow} %highlight{%maxLen{%-5p}{6}} %style{(%t)}{yellow,bold} [%style{%X{replica} %X{trace_id}}{cyan}] %style{%c{1.}}{cyan} %highlight{%m %notEmpty{%ex}}\n"/>
</File>
<File name="FILE2" fileName="${sys:user.home}/solr-test.log" immediateFlush="false" append="false">
@@ -39,43 +39,15 @@
</Appenders>
<Loggers>
-
-
- <AsyncLogger name="org.apache.solr.servlet.HttpSolrCall" level="DEBUG"/>
<AsyncLogger name="org.apache.zookeeper" level="WARN"/>
<AsyncLogger name="org.apache.hadoop" level="WARN"/>
<AsyncLogger name="org.apache.directory" level="WARN"/>
<AsyncLogger name="org.apache.solr.hadoop" level="INFO"/>
<AsyncLogger name="org.eclipse.jetty" level="INFO"/>
- <AsyncLogger name="org.apache.solr.core.SolrCore" level="DEBUG"/>
- <AsyncLogger name="org.apache.solr.handler.admin.CollectionsHandler" level="DEBUG"/>
- <AsyncLogger name="org.apache.solr.handler.IndexFetcher" level="DEBUG"/>
- <AsyncLogger name="org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler" level="DEBUG"/>
- <AsyncLogger name="org.apache.solr.cloud.api.collections.CreateCollectionCmd" level="DEBUG"/>
- <!-- <AsyncLogger name="org.apache.solr.common.patterns.DW" level="DEBUG"/> -->
- <AsyncLogger name="org.apache.solr.cloud.overseer.ZkStateWriter" level="DEBUG"/>
- <AsyncLogger name="org.apache.solr.cloud.Overseer" level="DEBUG"/>
- <!-- <AsyncLogger name="org.apache.solr.cloud.OverseerTaskProcessor" level="DEBUG"/>
- <AsyncLogger name="org.apache.solr.cloud.ZkDistributedQueue" level="DEBUG"/>
- <AsyncLogger name="org.apache.solr.cloud.OverseerTaskQueue" level="DEBUG"/>
- <AsyncLogger name="org.apache.solr.cloud.OverseerTaskExecutorTask" level="DEBUG"/>-->
+
<AsyncLogger name="org.apache.solr.cloud.LeaderElector" level="DEBUG"/>
<AsyncLogger name="org.apache.solr.cloud.ShardLeaderElectionContextBase" level="DEBUG"/>
-
- <!-- <AsyncLogger name="org.apache.solr.common.cloud.SolrZkClient" level="DEBUG"/>
- <AsyncLogger name="org.apache.solr.cloud.overseer.SliceMutator" level="DEBUG"/>-->
- <AsyncLogger name="org.apache.solr.client.solrj.impl.LBSolrClient" level="DEBUG"/>
- <AsyncLogger name="org.apache.solr.cloud.ZkController" level="DEBUG"/>
- <AsyncLogger name="org.apache.solr.common.cloud.ZkStateReader" level="DEBUG"/>
-
- <AsyncLogger name="org.apache.solr.core.SolrCore" level="DEBUG"/>
- <AsyncLogger name="org.apache.solr.common.cloud.ZkMaintenanceUtils" level="DEBUG"/>
- <AsyncLogger name="org.apache.solr.update.processor.DistributedZkUpdateProcessor" level="DEBUG"/>
- <AsyncLogger name="org.apache.solr.update.processor.DistributedUpdateProcessor" level="DEBUG"/>
-
- <AsyncLogger name="org.apache.solr.client.solrj.impl.Http2SolrClient" level="TRACE"/>
-
- <AsyncLogger name="com.google.inject.servlet" level="DEBUG"/>
+ <AsyncLogger name="org.apache.solr.cloud.OverseerElectionContext" level="DEBUG"/>
<AsyncRoot level="INFO">
<AppenderRef ref="STDERR_COLOR"/>
diff --git a/solr/test-framework/src/resources/logconf/log4j2-startup-debug.xml b/solr/test-framework/src/resources/logconf/log4j2-startup-debug.xml
index bcc8267..22e1955 100644
--- a/solr/test-framework/src/resources/logconf/log4j2-startup-debug.xml
+++ b/solr/test-framework/src/resources/logconf/log4j2-startup-debug.xml
@@ -20,11 +20,11 @@
<Appenders>
<Console name="STDERR_COLOR" target="SYSTEM_ERR">
- <PatternLayout pattern="%style{%-4r}{yellow} %highlight{%maxLen{%-5p}{6}} [%style{%X{node_name} %X{collection} %X{shard} %X{replica} %X{trace_id}}{cyan}] %style{%c{1.}}{cyan} %highlight{%m %notEmpty{%ex}}\n"/>
+ <PatternLayout pattern="%style{%-4r}{yellow} %highlight{%maxLen{%-5p}{6}} [%style{%X{replica} %X{trace_id}}{cyan}] %style{%c{1.}}{cyan} %highlight{%m %notEmpty{%ex}}\n"/>
</Console>
<File name="FILE" fileName="${sys:user.home}/solr-test.log" immediateFlush="false" append="false">
- <PatternLayout pattern="%style{%-4r}{yellow} %highlight{%maxLen{%-5p}{6}} %style{(%t)}{yellow,bold} [%style{%X{node_name} %X{collection} %X{shard} %X{replica} %X{trace_id}}{cyan}] %style{%c{1.}}{cyan} %highlight{%m %notEmpty{%ex}}\n"/>
+ <PatternLayout pattern="%style{%-4r}{yellow} %highlight{%maxLen{%-5p}{6}} %style{(%t)}{yellow,bold} [%style{%X{replica} %X{trace_id}}{cyan}] %style{%c{1.}}{cyan} %highlight{%m %notEmpty{%ex}}\n"/>
</File>
<File name="FILE2" fileName="${sys:user.home}/solr-test.log" immediateFlush="false" append="false">