You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/01/24 02:18:21 UTC
[lucene-solr] 14/16: wip
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit f1e301577cade8beeb2f16315c817a7ba06c67fd
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Fri Jan 22 20:59:55 2021 -0600
wip
---
.../java/org/apache/solr/cloud/LeaderElector.java | 23 ++---
.../org/apache/solr/cloud/RecoveryStrategy.java | 34 +++----
.../solr/cloud/ShardLeaderElectionContext.java | 35 ++++---
.../java/org/apache/solr/cloud/ZkController.java | 26 ++---
.../java/org/apache/solr/core/CoreContainer.java | 91 +++++++++---------
.../java/org/apache/solr/handler/IndexFetcher.java | 11 +--
.../handler/component/RealTimeGetComponent.java | 1 +
.../org/apache/solr/update/AddUpdateCommand.java | 10 +-
.../src/java/org/apache/solr/update/PeerSync.java | 30 +++++-
.../org/apache/solr/update/PeerSyncWithLeader.java | 13 ++-
.../src/java/org/apache/solr/update/UpdateLog.java | 107 ++++++++++-----------
.../org/apache/solr/update/UpdateShardHandler.java | 2 +-
.../CollectionsAPIDistributedZkTest.java | 1 +
.../org/apache/solr/common/SolrInputDocument.java | 3 +-
.../apache/solr/common/cloud/SolrZooKeeper.java | 32 +++---
.../apache/solr/common/cloud/ZkStateReader.java | 58 +++++------
16 files changed, 248 insertions(+), 229 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
index c9b7ffe..ac2cbe9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
+++ b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
@@ -184,11 +184,11 @@ public class LeaderElector implements Closeable {
oldWatcher.close();
}
- if ((zkController != null && zkController.getCoreContainer().isShutDown())) {
- if (log.isDebugEnabled()) log.debug("Elector is closed, will not try and run leader processes");
- state = OUT_OF_ELECTION;
- return false;
- }
+// if ((zkController != null && zkController.getCoreContainer().isShutDown())) {
+// if (log.isDebugEnabled()) log.debug("Elector is closed, will not try and run leader processes");
+// state = OUT_OF_ELECTION;
+// return false;
+// }
state = POT_LEADER;
runIamLeaderProcess(context, replacement);
@@ -267,12 +267,12 @@ public class LeaderElector implements Closeable {
// TODO: get this core param out of here
protected void runIamLeaderProcess(final ElectionContext context, boolean weAreReplacement) throws KeeperException,
InterruptedException, IOException {
- if (state == CLOSED) {
- throw new AlreadyClosedException();
- }
- if (state == LEADER) {
- throw new IllegalStateException("Already in leader state");
- }
+// if (state == CLOSED) {
+// throw new AlreadyClosedException();
+// }
+// if (state == LEADER) {
+// throw new IllegalStateException("Already in leader state");
+// }
boolean success = context.runLeaderProcess(context, weAreReplacement, 0);
@@ -280,6 +280,7 @@ public class LeaderElector implements Closeable {
state = LEADER;
} else {
state = OUT_OF_ELECTION;
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Failed becoming leader");
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index eb377d6..ee085b2 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -56,7 +56,6 @@ import org.apache.solr.util.plugin.NamedListInitializedPlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.solr.common.cloud.ZkStateReader.COLLECTIONS_ZKNODE;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
@@ -230,10 +229,11 @@ public class RecoveryStrategy implements Runnable, Closeable {
log.info("Attempting to replicate from [{}].", leaderprops);
- final String leaderUrl = getReplicateLeaderUrl(leaderprops, zkStateReader);
-
+ String leaderUrl;
// send commit
try {
+ Replica leader = zkController.getZkStateReader().getLeaderRetry(coreDescriptor.getCollectionName(), coreDescriptor.getCloudDescriptor().getShardId(), 1500, false);
+ leaderUrl = leader.getCoreUrl();
commitOnLeader(leaderUrl);
} catch (Exception e) {
log.error("Commit on leader failed", e);
@@ -610,20 +610,6 @@ public class RecoveryStrategy implements Runnable, Closeable {
return false;
}
- log.info("Begin buffering updates. core=[{}]", coreName);
- // recalling buffer updates will drop the old buffer tlog
- ulog.bufferUpdates();
-
-// try {
-// if (prevSendPreRecoveryHttpUriRequest != null) {
-// prevSendPreRecoveryHttpUriRequest.cancel();
-// }
-// } catch (NullPointerException e) {
-// // okay
-// }
- // TODO can we do this with commit on leader
- sendPrepRecoveryCmd(leader.getBaseUrl(), leader.getName(), zkStateReader.getClusterState().getCollection(coreDescriptor.getCollectionName()).getSlice(cloudDesc.getShardId()));
-
// we wait a bit so that any updates on the leader
// that started before they saw recovering state
// are sure to have finished (see SOLR-7141 for
@@ -684,6 +670,20 @@ public class RecoveryStrategy implements Runnable, Closeable {
try {
+ log.info("Begin buffering updates. core=[{}]", coreName);
+ // recalling buffer updates will drop the old buffer tlog
+ ulog.bufferUpdates();
+
+ // try {
+ // if (prevSendPreRecoveryHttpUriRequest != null) {
+ // prevSendPreRecoveryHttpUriRequest.cancel();
+ // }
+ // } catch (NullPointerException e) {
+ // // okay
+ // }
+ // TODO can we do this with commit on leader
+ sendPrepRecoveryCmd(leader.getBaseUrl(), leader.getName(), zkStateReader.getClusterState().getCollection(coreDescriptor.getCollectionName()).getSlice(cloudDesc.getShardId()));
+
IndexFetcher.IndexFetchResult result = replicate(leader);
if (result.getSuccessful()) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
index 93cfb38..f74f5dc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
@@ -38,7 +38,6 @@ import org.apache.solr.logging.MDCLoggingContext;
import org.apache.solr.update.PeerSync;
import org.apache.solr.update.UpdateLog;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,19 +102,19 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
log.error("No SolrCore found, cannot become leader {}", coreName);
throw new AlreadyClosedException("No SolrCore found, cannot become leader " + coreName);
}
- if (core.isClosing() || core.getCoreContainer().isShutDown()) {
- log.info("We are closed, will not become leader");
- closed = true;
- cancelElection();
- return false;
- }
+// if (core.isClosing() || core.getCoreContainer().isShutDown()) {
+// log.info("We are closed, will not become leader");
+// closed = true;
+// cancelElection();
+// return false;
+// }
try {
- core.getSolrCoreState().cancelRecovery(true, false);
+ // core.getSolrCoreState().cancelRecovery(true, false);
ActionThrottle lt;
- MDCLoggingContext.setCore(core);
+ // MDCLoggingContext.setCore(core);
lt = core.getUpdateHandler().getSolrCoreState().getLeaderThrottle();
lt.minimumWaitBetweenActions();
@@ -138,7 +137,8 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
replicaType = cloudCd.getReplicaType();
// should I be leader?
- ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId);
+ log.info("Check zkShardTerms");
+ ZkShardTerms zkShardTerms = zkController.getShardTermsOrNull(collection, shardId);
try {
// if the replica is waiting for leader to see recovery state, the leader should refresh its terms
if (zkShardTerms != null && zkShardTerms.skipSendingUpdatesTo(coreName)) {
@@ -152,7 +152,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
log.error("Exception while looking at refreshing shard terms", e);
}
- if (zkShardTerms.registered(coreName) && !zkShardTerms.canBecomeLeader(coreName)) {
+ if (zkShardTerms != null && zkShardTerms.registered(coreName) && !zkShardTerms.canBecomeLeader(coreName)) {
if (!waitForEligibleBecomeLeaderAfterTimeout(zkShardTerms, coreName, leaderVoteWait)) {
rejoinLeaderElection(core);
return false;
@@ -166,9 +166,9 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
PeerSync.PeerSyncResult result = null;
boolean success = false;
- if (core.getCoreContainer().isShutDown()) {
- return false;
- }
+// if (core.getCoreContainer().isShutDown()) {
+// return false;
+// }
result = syncStrategy.sync(zkController, core, leaderProps, weAreReplacement);
log.info("Sync strategy sync result {}", result);
success = result.isSuccess();
@@ -242,8 +242,11 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
// in case of leaderVoteWait timeout, a replica with lower term can win the election
if (setTermToMax) {
log.error("WARNING: Potential data loss -- Replica {} became leader after timeout (leaderVoteWait) " + "without being up-to-date with the previous leader", coreName);
- zkController.createCollectionTerms(collection);
- zkController.getShardTerms(collection, shardId).setTermEqualsToLeader(coreName);
+ try {
+ zkController.getShardTerms(collection, shardId).setTermEqualsToLeader(coreName);
+ } catch (Exception e) {
+ log.error("Exception trying to set shard terms equal to leader", e);
+ }
}
super.runLeaderProcess(context, weAreReplacement, 0);
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index f146d1b..b2abf8f 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -329,9 +329,8 @@ public class ZkController implements Closeable, Runnable {
}
public Object call() throws Exception {
- if (log.isInfoEnabled()) {
- log.info("Registering core {} afterExpiration? {}", descriptor.getName(), afterExpiration);
- }
+ log.info("Registering core with ZK {} afterExpiration? {}", descriptor.getName(), afterExpiration);
+
if (zkController.isDcCalled() || zkController.getCoreContainer().isShutDown() || (afterExpiration && !descriptor.getCloudDescriptor().hasRegistered())) {
return null;
@@ -1422,25 +1421,26 @@ public class ZkController implements Closeable, Runnable {
log.info("Wait to see leader for {}, {}", collection, shardId);
Replica leader = null;
- for (int i = 0; i < 30; i++) {
-// if (leaderElector.isLeader()) {
-// leader = replica;
-// break;
-// }
+ for (int i = 0; i < 15; i++) {
+ if (leaderElector.isLeader()) {
+ leader = replica;
+ break;
+ }
try {
- if (getCoreContainer().isShutDown() || isDcCalled() || isClosed()) {
- throw new AlreadyClosedException();
- }
+// if (getCoreContainer().isShutDown() || isDcCalled() || isClosed()) {
+// throw new AlreadyClosedException();
+// }
- leader = zkStateReader.getLeaderRetry(collection, shardId, 500, false);
+ leader = zkStateReader.getLeaderRetry(collection, shardId, 1500, false);
} catch (TimeoutException timeoutException) {
-
+ log.info("Timeout waiting to see leader, retry");
}
}
if (leader == null) {
+ log.error("No leader found while trying to register " + coreName + " with zookeeper");
throw new SolrException(ErrorCode.SERVER_ERROR, "No leader found while trying to register " + coreName + " with zookeeper");
}
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index ab33eae..26f6a17 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -102,7 +102,6 @@ import org.apache.solr.util.SystemIdResolver;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
import static java.util.Objects.requireNonNull;
import static org.apache.solr.common.params.CommonParams.AUTHC_PATH;
@@ -158,7 +157,6 @@ public class CoreContainer implements Closeable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
final SolrCores solrCores = new SolrCores(this);
- private final boolean isZkAware;
private volatile boolean startedLoadingCores;
private volatile boolean loaded;
@@ -347,19 +345,17 @@ public class CoreContainer implements Closeable {
assert ObjectReleaseTracker.track(this);
assert (closeTracker = new CloseTracker()) != null;
this.containerProperties = new Properties(config.getSolrProperties());
- String zkHost = System.getProperty("zkHost");
- if (!StringUtils.isEmpty(zkHost)) {
- zkSys = new ZkContainer(zkClient);
- isZkAware = true;
- } else {
- isZkAware = false;
- }
this.loader = config.getSolrResourceLoader();
this.solrHome = config.getSolrHome();
this.cfg = requireNonNull(config);
+ if (zkClient != null) {
+ zkSys = new ZkContainer(zkClient);
+ zkSys.initZooKeeper(this, cfg.getCloudConfig());
+ }
+
if (null != this.cfg.getBooleanQueryMaxClauseCount()) {
IndexSearcher.setMaxClauseCount(this.cfg.getBooleanQueryMaxClauseCount());
}
@@ -403,9 +399,7 @@ public class CoreContainer implements Closeable {
}
});
}
- if (zkClient != null) {
- zkSys.initZooKeeper(this, cfg.getCloudConfig());
- }
+
coreConfigService = ConfigSetService.createConfigSetService(cfg, loader, zkSys == null ? null : zkSys.zkController);
containerProperties.putAll(cfg.getSolrProperties());
@@ -606,7 +600,6 @@ public class CoreContainer implements Closeable {
cfg = null;
containerProperties = null;
replayUpdatesExecutor = null;
- isZkAware = false;
}
@@ -881,26 +874,26 @@ public class CoreContainer implements Closeable {
status |= CORE_DISCOVERY_COMPLETE;
startedLoadingCores = true;
for (final CoreDescriptor cd : cds) {
-// if (isZooKeeperAware()) {
-// String collection = cd.getCollectionName();
-// try {
-// zkSys.zkController.zkStateReader.waitForState(collection, 5, TimeUnit.SECONDS, (n, c) -> {
-// if (c != null) {
-// Replica replica = c.getReplica(cd.getName());
-//
-// if (replica.getState().equals(State.DOWN)) {
-// return true;
-// }
-//
-// }
-// return false;
-// });
-// } catch (InterruptedException e) {
-// ParWork.propagateInterrupt(e);
-// } catch (TimeoutException e) {
-// log.error("Timeout", e);
-// }
-// }
+ if (isZooKeeperAware()) {
+ String collection = cd.getCollectionName();
+ try {
+ zkSys.zkController.zkStateReader.waitForState(collection, 5, TimeUnit.SECONDS, (n, c) -> {
+ if (c != null) {
+ Replica replica = c.getReplica(cd.getName());
+
+ if (replica.getState().equals(State.DOWN)) {
+ return true;
+ }
+
+ }
+ return false;
+ });
+ } catch (InterruptedException e) {
+ ParWork.propagateInterrupt(e);
+ } catch (TimeoutException e) {
+ log.error("Timeout", e);
+ }
+ }
if (log.isDebugEnabled()) log.debug("Process core descriptor {} {} {}", cd.getName(), cd.isTransient(), cd.isLoadOnStartup());
if (cd.isTransient() || !cd.isLoadOnStartup()) {
@@ -911,25 +904,20 @@ public class CoreContainer implements Closeable {
if (cd.isLoadOnStartup()) {
coreLoadFutures.add(solrCoreLoadExecutor.submit(() -> {
- SolrCore core;
+ SolrCore core = null;
MDCLoggingContext.setCoreDescriptor(this, cd);
try {
try {
core = createFromDescriptor(cd, false);
- if (core.getDirectoryFactory().isSharedStorage()) {
- if (isZooKeeperAware()) {
- zkSys.getZkController().throwErrorIfReplicaReplaced(cd);
- }
- }
-
} finally {
solrCores.markCoreAsNotLoading(cd);
}
- if (isZooKeeperAware()) {
- new ZkController.RegisterCoreAsync(zkSys.zkController, cd, false).call();
- }
+
+ } catch (Exception e){
+ log.error("Error creating and register core {}", cd.getName(), e);
+ throw e;
} finally {
MDCLoggingContext.clear();
}
@@ -1409,9 +1397,7 @@ public class CoreContainer implements Closeable {
throw new AlreadyClosedException("Solr has been shutdown.");
}
solrCores.markCoreAsLoading(dcore);
- if (isZooKeeperAware()) {
- ParWork.getRootSharedExecutor().submit(new ZkController.RegisterCoreAsync(zkSys.zkController, dcore, false));
- }
+
core = new SolrCore(this, dcore, coreConfig);
} catch (Exception e) {
core = processCoreCreateException(e, dcore, coreConfig);
@@ -1421,6 +1407,17 @@ public class CoreContainer implements Closeable {
old = registerCore(dcore, core, true);
registered = true;
+ solrCores.markCoreAsNotLoading(dcore);
+
+ if (isZooKeeperAware()) {
+ if (!newCollection) {
+ if (core.getDirectoryFactory().isSharedStorage()) {
+ zkSys.getZkController().throwErrorIfReplicaReplaced(dcore);
+ }
+ }
+ new ZkController.RegisterCoreAsync(zkSys.zkController, dcore, false).call();
+ }
+
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e);
@@ -2168,7 +2165,7 @@ public class CoreContainer implements Closeable {
}
public boolean isZooKeeperAware() {
- return isZkAware && zkSys != null && zkSys.zkController != null;
+ return zkSys != null && zkSys.zkController != null;
}
public ZkController getZkController() {
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index 6ac1139..07887b3 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -44,10 +44,8 @@ import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.FastInputStream;
import org.apache.solr.common.util.NamedList;
-import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.SuppressForbidden;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.DirectoryFactory.DirContext;
@@ -522,7 +520,7 @@ public class IndexFetcher {
}
// Create the sync service
- fsyncService = ExecutorUtil.newMDCAwareSingleThreadExecutor(new SolrNamedThreadFactory("fsyncService"));
+ fsyncService = ParWork.getExecutorService(15);
// use a synchronized list because the list is read by other threads (to show details)
filesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
// if the generation of master is older than that of the slave , it means they are not compatible to be copied
@@ -812,7 +810,6 @@ public class IndexFetcher {
* terminate the fsync service and wait for all the tasks to complete. If it is already terminated
*/
private void terminateAndWaitFsyncService() throws Exception {
- if (fsyncServiceFuture == null || fsyncService.isTerminated()) return;
fsyncService.shutdown();
// give a long wait say 1 hr
fsyncService.awaitTermination(3600, TimeUnit.SECONDS);
@@ -1722,11 +1719,11 @@ public class IndexFetcher {
throw e;
} finally {
cleanup(null);
- //if cleanup succeeds . The file is downloaded fully. do an fsync
+ //if cleanup succeeds . The file is downloaded fully
fsyncServiceFuture = fsyncService.submit(() -> {
try {
- log.info("Sync and close fetched file", file);
- file.sync();
+ log.info("Close fetched file", file);
+ file.close();
} catch (Exception e) {
fsyncException = e;
}
diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
index 63eff4b..2c2dd9c 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java
@@ -1189,6 +1189,7 @@ public class RealTimeGetComponent extends SearchComponent
// TODO: get this from cache instead of rebuilding?
try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
+ log.info("Get updates versionsRequested={} params={}", versions.size(), params);
for (Long version : versions) {
try {
Object o = recentUpdates.lookup(version);
diff --git a/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java b/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
index e192fe9..8c945f0 100644
--- a/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
+++ b/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java
@@ -43,7 +43,7 @@ public class AddUpdateCommand extends UpdateCommand {
* Higher level SolrInputDocument, normally used to construct the Lucene Document(s)
* to index.
*/
- public SolrInputDocument solrDoc;
+ public volatile SolrInputDocument solrDoc;
/**
* This is the version of a document, previously indexed, on which the current
@@ -51,7 +51,7 @@ public class AddUpdateCommand extends UpdateCommand {
* or a full update. A negative value here, e.g. -1, indicates that this add
* update does not depend on a previous update.
*/
- public long prevVersion = -1;
+ public volatile long prevVersion = -1;
public boolean overwrite = true;
@@ -62,14 +62,14 @@ public class AddUpdateCommand extends UpdateCommand {
public int commitWithin = -1;
- public boolean isLastDocInBatch = false;
+ public volatile boolean isLastDocInBatch = false;
/** Is this a nested update, null means not yet calculated. */
- public Boolean isNested = null;
+ public volatile Boolean isNested = null;
// optional id in "internal" indexed form... if it is needed and not supplied,
// it will be obtained from the doc.
- private BytesRef indexedId;
+ private volatile BytesRef indexedId;
public AddUpdateCommand(SolrQueryRequest req) {
super(req);
diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java b/solr/core/src/java/org/apache/solr/update/PeerSync.java
index 387f27e..d24f905 100644
--- a/solr/core/src/java/org/apache/solr/update/PeerSync.java
+++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java
@@ -441,7 +441,7 @@ public class PeerSync implements SolrMetricProducer {
Object fingerprint = srsp.getSolrResponse().getResponse().get("fingerprint");
if (log.isInfoEnabled()) {
- log.info("{} Received {} versions from {} fingerprint:{}", msg(), otherVersions.size(), sreq.shards[0], fingerprint);
+ log.info("{} Received {} versions from {} {} fingerprint:{}", msg(), otherVersions.size(), otherVersions, sreq.shards[0], fingerprint);
}
if (fingerprint != null) {
sreq.fingerprint = IndexFingerprint.fromObject(fingerprint);
@@ -524,7 +524,7 @@ public class PeerSync implements SolrMetricProducer {
SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest();
if (updates.size() < sreq.totalRequestedUpdates) {
- log.error("{} Requested {} updates from {} but retrieved {}", msg(), sreq.totalRequestedUpdates, sreq.shards[0], updates.size());
+ log.error("{} Requested {} updates from {} but retrieved {} {}", msg(), sreq.totalRequestedUpdates, sreq.shards[0], updates.size(), srsp.getSolrResponse().getResponse());
return false;
}
@@ -746,7 +746,7 @@ public class PeerSync implements SolrMetricProducer {
return true;
}
- MissedUpdatesRequest handleVersionsWithRanges(List<Long> otherVersions, boolean completeList) {
+ static MissedUpdatesRequest handleVersionsWithRanges(List<Long> ourUpdates, List<Long> otherVersions, boolean completeList, long ourLowThreshold) {
// we may endup asking for updates for too many versions, causing 2MB post payload limit. Construct a range of
// versions to request instead of asking individual versions
List<String> rangesToRequest = new ArrayList<>();
@@ -788,9 +788,31 @@ public class PeerSync implements SolrMetricProducer {
}
String rangesToRequestStr = rangesToRequest.stream().collect(Collectors.joining(","));
+
+ log.info("handleVersionsWithRanges rangesToRequestStr={} otherVersions={} ourVersions={} completeList={} totalRequestedVersions={}", rangesToRequestStr, otherVersions, ourUpdates, completeList, totalRequestedVersions);
+
return MissedUpdatesRequest.of(rangesToRequestStr, totalRequestedVersions);
}
+ public static void main(String[] args) {
+
+ List<Long> ourUpdates = new ArrayList<>();
+ ourUpdates.add(1689636098592997376l);
+ ourUpdates.add(1689636098591948800l);
+ ourUpdates.add(1689636098531131395l);
+ ourUpdates.add(1689636098531131394l);
+
+//1689636098592997376, 1689636098591948800, 1689636098585657345
+ List<Long> otherVersions = new ArrayList<>();
+ otherVersions.add(1689636098531131395l);
+ otherVersions.add(1689636098531131394l);
+ otherVersions.add(0l);
+//1689636098531131395, 1689636098531131394, 0
+ MissedUpdatesRequest result = handleVersionsWithRanges(ourUpdates, otherVersions, false, 0);
+ System.out.println(result.versionsAndRanges);
+ System.out.println(result.totalRequestedUpdates);
+ }
+
MissedUpdatesRequest handleIndividualVersions(List<Long> otherVersions, boolean completeList) {
List<Long> toRequest = new ArrayList<>();
for (Long otherVersion : otherVersions) {
@@ -867,7 +889,7 @@ public class PeerSync implements SolrMetricProducer {
MissedUpdatesRequest updatesRequest;
if (canHandleVersionRanges.get()) {
- updatesRequest = handleVersionsWithRanges(otherVersions, completeList);
+ updatesRequest = handleVersionsWithRanges(ourUpdates, otherVersions, completeList, ourLowThreshold);
} else {
updatesRequest = handleIndividualVersions(otherVersions, completeList);
}
diff --git a/solr/core/src/java/org/apache/solr/update/PeerSyncWithLeader.java b/solr/core/src/java/org/apache/solr/update/PeerSyncWithLeader.java
index 4582fc6..b670372 100644
--- a/solr/core/src/java/org/apache/solr/update/PeerSyncWithLeader.java
+++ b/solr/core/src/java/org/apache/solr/update/PeerSyncWithLeader.java
@@ -244,7 +244,7 @@ public class PeerSyncWithLeader implements SolrMetricProducer {
MissedUpdatesRequest updatesRequest = missedUpdatesFinder.find(otherVersions, leaderUrl, () -> core.getSolrConfig().useRangeVersionsForPeerSync && canHandleVersionRanges());
if (updatesRequest == MissedUpdatesRequest.EMPTY) {
- if (doFingerprint) return MissedUpdatesRequest.UNABLE_TO_SYNC;
+ if (doFingerprint && updatesRequest.totalRequestedUpdates > 0) return MissedUpdatesRequest.UNABLE_TO_SYNC;
return MissedUpdatesRequest.ALREADY_IN_SYNC;
}
@@ -273,7 +273,7 @@ public class PeerSyncWithLeader implements SolrMetricProducer {
List<Object> updates = (List<Object>)rsp.get("updates");
if (updates.size() < numRequestedUpdates) {
- log.error("{} Requested {} updated from {} but retrieved {}", msg(), numRequestedUpdates, leaderUrl, updates.size());
+ log.error("{} Requested {} updated from {} but retrieved {} {}", msg(), numRequestedUpdates, leaderUrl, updates.size(), rsp);
return false;
}
@@ -298,13 +298,16 @@ public class PeerSyncWithLeader implements SolrMetricProducer {
// only DBI or DBQ in the gap (above) will satisfy this predicate
return version > leaderFingerprint.getMaxVersionEncountered() && (oper == UpdateLog.DELETE || oper == UpdateLog.DELETE_BY_QUERY);
});
+ log.info("existDBIOrDBQInTheGap={}", existDBIOrDBQInTheGap);
if (!existDBIOrDBQInTheGap) {
// it is safe to use leaderFingerprint.maxVersionEncountered as cut point now.
updates.removeIf(e -> {
@SuppressWarnings({"unchecked"})
List<Object> u = (List<Object>) e;
long version = (Long) u.get(1);
- return version > leaderFingerprint.getMaxVersionEncountered();
+ boolean success = version > leaderFingerprint.getMaxVersionEncountered();
+ log.info("existDBIOrDBQInTheGap version={} leaderFingerprint.getMaxVersionEncountered={} success={}", version, leaderFingerprint.getMaxVersionEncountered(), success);
+ return success;
});
}
}
@@ -312,6 +315,7 @@ public class PeerSyncWithLeader implements SolrMetricProducer {
try {
updater.applyUpdates(updates, leaderUrl);
} catch (Exception e) {
+ log.error("Could not apply updates", e);
return false;
}
return true;
@@ -386,6 +390,7 @@ public class PeerSyncWithLeader implements SolrMetricProducer {
if (cmp != 0) {
if (log.isDebugEnabled()) log.debug("Leader fingerprint: {}, Our fingerprint: {}", leaderFingerprint , ourFingerprint);
}
+
return cmp == 0; // currently, we only check for equality...
} catch (IOException e) {
log.warn("Could not confirm if we are already in sync. Continue with PeerSync");
@@ -426,7 +431,7 @@ public class PeerSyncWithLeader implements SolrMetricProducer {
boolean completeList = leaderVersions.size() < nUpdates;
MissedUpdatesRequest updatesRequest;
if (canHandleVersionRanges.get()) {
- updatesRequest = handleVersionsWithRanges(leaderVersions, completeList);
+ updatesRequest = handleVersionsWithRanges(ourUpdates, leaderVersions, completeList, ourLowThreshold);
} else {
updatesRequest = handleIndividualVersions(leaderVersions, completeList);
}
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index 0f51d41..39fa5de 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -190,25 +190,25 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
protected volatile TransactionLog bufferTlog;
protected volatile TransactionLog tlog;
- protected TransactionLog prevTlog;
+ protected volatile TransactionLog prevTlog;
protected TransactionLog prevTlogOnPrecommit;
protected final Deque<TransactionLog> logs = new LinkedList<>(); // list of recent logs, newest first
protected final LinkedList<TransactionLog> newestLogsOnStartup = new LinkedList<>();
- protected int numOldRecords; // number of records in the recent logs
+ protected volatile int numOldRecords; // number of records in the recent logs
protected volatile Map<BytesRef,LogPtr> map = new ConcurrentHashMap<>(32);
protected volatile Map<BytesRef,LogPtr> prevMap; // used while committing/reopening is happening
protected volatile Map<BytesRef,LogPtr> prevMap2; // used while committing/reopening is happening
- protected TransactionLog prevMapLog; // the transaction log used to look up entries found in prevMap
- protected TransactionLog prevMapLog2; // the transaction log used to look up entries found in prevMap2
+ protected volatile TransactionLog prevMapLog; // the transaction log used to look up entries found in prevMap
+ protected volatile TransactionLog prevMapLog2; // the transaction log used to look up entries found in prevMap2
protected final int numDeletesToKeep = 1000;
protected final int numDeletesByQueryToKeep = 100;
- protected int numRecordsToKeep;
+ protected volatile int numRecordsToKeep;
protected volatile int maxNumLogsToKeep;
protected volatile int numVersionBuckets = 65536; // This should only be used to initialize VersionInfo... the actual number of buckets may be rounded up to a power of two.
- protected Long maxVersionFromIndex = null;
- protected boolean existOldBufferLog = false;
+ protected volatile Long maxVersionFromIndex = null;
+ protected volatile boolean existOldBufferLog = false;
// keep track of deletes only... this is not updated on an add
protected Map<BytesRef, LogPtr> oldDeletes = Collections.synchronizedMap(new LinkedHashMap<BytesRef, LogPtr>(numDeletesToKeep) {
@@ -1142,22 +1142,18 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
// synchronization is needed for stronger guarantees (as VersionUpdateProcessor does).
public Long lookupVersion(BytesRef indexedId) {
LogPtr entry;
- tlogLock.lock();
- try {
- entry = map.get(indexedId);
- // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in map",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
- if (entry == null && prevMap != null) {
- entry = prevMap.get(indexedId);
- // something found in prevMap will always be found in prevMapLog (which could be tlog or prevTlog)
- // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
- }
- if (entry == null && prevMap2 != null) {
- entry = prevMap2.get(indexedId);
- // something found in prevMap2 will always be found in prevMapLog2 (which could be tlog or prevTlog)
- // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap2",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
- }
- } finally {
- tlogLock.unlock();
+
+ entry = map.get(indexedId);
+ // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in map",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+ if (entry == null && prevMap != null) {
+ entry = prevMap.get(indexedId);
+ // something found in prevMap will always be found in prevMapLog (which could be tlog or prevTlog)
+ // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
+ }
+ if (entry == null && prevMap2 != null) {
+ entry = prevMap2.get(indexedId);
+ // something found in prevMap2 will always be found in prevMapLog2 (which could be tlog or prevTlog)
+ // SolrCore.verbose("TLOG: lookup ver: for id ",indexedId.utf8ToString(),"in prevMap2",System.identityHashCode(map),"got",entry,"lookupLog=",lookupLog);
}
@@ -1174,12 +1170,8 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
// We can't get any version info for deletes from the index, so if the doc
// wasn't found, check a cache of recent deletes.
- tlogLock.lock();
- try {
- entry = oldDeletes.get(indexedId);
- } finally {
- tlogLock.unlock();
- }
+
+ entry = oldDeletes.get(indexedId);
if (entry != null) {
return entry.version;
@@ -1195,15 +1187,10 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
if (syncLevel == SyncLevel.NONE) {
return;
}
- TransactionLog currLog;
- tlogLock.lock();
- try {
- currLog = tlog;
- if (currLog == null) return;
- currLog.incref();
- } finally {
- tlogLock.unlock();
- }
+
+ TransactionLog currLog = tlog;
+ if (currLog == null) return;
+ currLog.incref();
try {
currLog.finish(syncLevel);
@@ -1327,20 +1314,16 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
public void commitAndSwitchToNewTlog(CommitUpdateCommand cuc) {
versionInfo.blockUpdates();
try {
- tlogLock.lock();
+ if (tlog == null) {
+ return;
+ }
+ preCommit(cuc);
try {
- if (tlog == null) {
- return;
- }
- preCommit(cuc);
- try {
- copyOverOldUpdates(cuc.getVersion());
- } finally {
- postCommit(cuc);
- }
+ copyOverOldUpdates(cuc.getVersion());
} finally {
- tlogLock.unlock();
+ postCommit(cuc);
}
+
} finally {
versionInfo.unblockUpdates();
}
@@ -1611,19 +1594,29 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
for (List<Update> singleList : updateList) {
for (Update ptr : singleList) {
if(Math.abs(ptr.version) > Math.abs(maxVersion)) continue;
- ret.add(ptr.version);
+ if (ptr.version != 0) {
+ ret.add(ptr.version);
+ }
if (--n <= 0) return ret;
}
}
+ log.info("Return getVersions {} {}", n, ret);
return ret;
}
public Object lookup(long version) {
+ log.info("lookup {}", version);
Update update = updates.get(version);
if (update == null) return null;
- return update.log.lookup(update.pointer);
+ log.info("found update from updates {} {}", update.version, updates.size());
+
+ Object object = update.log.lookup(update.pointer);
+
+ log.info("found update from log {} {} ptr={} object={}", update.version, update.log, update.pointer, object);
+
+ return object;
}
/** Returns the list of deleteByQueries that happened after the given version */
@@ -1640,7 +1633,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
private void update() {
int numUpdates = 0;
- updateList = new ArrayList<>(logList.size());
+ updateList = new ArrayList<>(numRecordsToKeep);
deleteByQueryList = new ArrayList<>();
deleteList = new ArrayList<>();
updates = new HashMap<>(numRecordsToKeep);
@@ -1705,12 +1698,14 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
// would be caused by a corrupt transaction log
} catch (Exception ex) {
log.warn("Exception reverse reading log", ex);
- break;
+ // break;
}
numUpdates++;
}
+ log.info("Recent updates updates numUpdates={} numUpdatesToKeep={}", numUpdates, numRecordsToKeep);
+
} catch (IOException | AssertionError e) { // catch AssertionError to handle certain test failures correctly
// failure to read a log record isn't fatal
log.error("Exception reading versions from log",e);
@@ -1744,6 +1739,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
public RecentUpdates getRecentUpdates() {
Deque<TransactionLog> logList;
tlogLock.lock();
+ RecentUpdates recentUpdates;
try {
logList = new LinkedList<>(logs);
for (TransactionLog log : logList) {
@@ -1761,14 +1757,15 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
bufferTlog.incref();
logList.addFirst(bufferTlog);
}
+
+ recentUpdates = new RecentUpdates(logList, numRecordsToKeep);
} finally {
tlogLock.unlock();
}
// TODO: what if I hand out a list of updates, then do an update, then hand out another list (and
// one of the updates I originally handed out fell off the list). Over-request?
- return new RecentUpdates(logList, numRecordsToKeep);
-
+ return recentUpdates;
}
public void bufferUpdates() {
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
index 0f7800e..ecdf3b6 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
@@ -108,7 +108,7 @@ public class UpdateShardHandler implements SolrInfoBean {
.connectionTimeout(cfg.getDistributedConnectionTimeout())
.idleTimeout(cfg.getDistributedSocketTimeout());
}
- updateOnlyClient = updateOnlyClientBuilder.markInternalRequest().strictEventOrdering(false).build();
+ updateOnlyClient = updateOnlyClientBuilder.markInternalRequest().strictEventOrdering(true).build();
updateOnlyClient.enableCloseLock();
// updateOnlyClient.addListenerFactory(updateHttpListenerFactory);
Set<String> queryParams = new HashSet<>(2);
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIDistributedZkTest.java
index 4e7b03d..b044358 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIDistributedZkTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionsAPIDistributedZkTest.java
@@ -289,6 +289,7 @@ public class CollectionsAPIDistributedZkTest extends SolrCloudTestCase {
}
@Test
+ @Ignore
public void testDeleteNonExistentCollection() throws Exception {
expectThrows(Exception.class, () -> {
diff --git a/solr/solrj/src/java/org/apache/solr/common/SolrInputDocument.java b/solr/solrj/src/java/org/apache/solr/common/SolrInputDocument.java
index 79305a5..4a457d0 100644
--- a/solr/solrj/src/java/org/apache/solr/common/SolrInputDocument.java
+++ b/solr/solrj/src/java/org/apache/solr/common/SolrInputDocument.java
@@ -19,6 +19,7 @@ package org.apache.solr.common;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -42,7 +43,7 @@ public class SolrInputDocument extends SolrDocumentBase<SolrInputField, SolrInpu
private List<SolrInputDocument> _childDocuments;
public SolrInputDocument(String... fields) {
- _fields = new LinkedHashMap<>();
+ _fields = Collections.synchronizedMap(new LinkedHashMap<>());
assert fields.length % 2 == 0;
for (int i = 0; i < fields.length; i += 2) {
addField(fields[i], fields[i + 1]);
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java
index 0546b05..9338679 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZooKeeper.java
@@ -104,22 +104,26 @@ public class SolrZooKeeper extends ZooKeeper {
@Override
public void close() {
if (closeTracker != null) closeTracker.close();
+// try {
+// try {
+// RequestHeader h = new RequestHeader();
+// h.setType(ZooDefs.OpCode.closeSession);
+//
+// cnxn.submitRequest(h, null, null, null);
+// } catch (InterruptedException e) {
+// // ignore, close the send/event threads
+// } finally {
+// ZooKeeperExposed zk = new ZooKeeperExposed(this, cnxn);
+// zk.closeCnxn();
+// }
+// } catch (Exception e) {
+// log.warn("Exception closing zookeeper client", e);
+// }
try {
- try {
- RequestHeader h = new RequestHeader();
- h.setType(ZooDefs.OpCode.closeSession);
-
- cnxn.submitRequest(h, null, null, null);
- } catch (InterruptedException e) {
- // ignore, close the send/event threads
- } finally {
- ZooKeeperExposed zk = new ZooKeeperExposed(this, cnxn);
- zk.closeCnxn();
- }
- } catch (Exception e) {
- log.warn("Exception closing zookeeper client", e);
+ super.close();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
}
-
}
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 48c3329..0e3a685 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -994,13 +994,13 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
Slice slice = coll.getSlice(shard);
if (slice != null) {
Replica leader = slice.getLeader();
- boolean valid;
+ boolean valid = false;
try {
- valid = mustBeLive ? isNodeLive(leader.getNodeName()) || zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + "/leaders/" + slice.getName() + "/leader") : isNodeLive(leader.getNodeName());
+ valid = mustBeLive ? leader != null && isNodeLive(leader.getNodeName()) || zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + "/leaders/" + slice.getName() + "/leader") : leader != null && isNodeLive(leader.getNodeName());
} catch (KeeperException e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
+
} catch (InterruptedException e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
+
}
if (leader != null && leader.getState() == Replica.State.ACTIVE && valid) {
return leader;
@@ -1022,36 +1022,25 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
Slice slice = c.getSlice(shard);
if (slice == null) return false;
Replica leader = slice.getLeader();
-
- if (leader != null && leader.getState() == Replica.State.ACTIVE) {
- boolean valid = false;
- try {
- valid = mustBeLive ? isNodeLive(leader.getNodeName()) || zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + "/leaders/" + slice.getName() + "/leader") : isNodeLive(leader.getNodeName());
- } catch (KeeperException e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
- } catch (InterruptedException e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
- }
- if (valid) {
- returnLeader.set(leader);
- return true;
- }
+ boolean valid = false;
+ try {
+ valid = mustBeLive ? (leader != null && isNodeLive(leader.getNodeName())) ||
+ zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + "/leaders/" + slice.getName() + "/leader") :
+ leader != null && isNodeLive(leader.getNodeName());
+ } catch (KeeperException e) {
+ return false;
+ } catch (InterruptedException e) {
+ return false;
+ }
+ if (leader != null && leader.getState() == Replica.State.ACTIVE && valid) {
+ returnLeader.set(leader);
+ return true;
}
Collection<Replica> replicas = slice.getReplicas();
for (Replica replica : replicas) {
- if ("true".equals(replica.getProperty(LEADER_PROP)) && replica.getState() == Replica.State.ACTIVE) {
- boolean valid = false;
- try {
- valid = mustBeLive ? zkClient.exists(COLLECTIONS_ZKNODE + "/" + collection + "/leaders/" + slice.getName() + "/leader") : isNodeLive(leader.getNodeName());
- } catch (KeeperException e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
- } catch (InterruptedException e) {
- throw new SolrException(ErrorCode.SERVER_ERROR, e);
- }
- if (valid) {
- returnLeader.set(replica);
- return true;
- }
+ if ("true".equals(replica.getProperty(LEADER_PROP)) && replica.getState() == Replica.State.ACTIVE && valid) {
+ returnLeader.set(replica);
+ return true;
}
}
@@ -1991,9 +1980,10 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
registerDocCollectionWatcher(collection, wrapper);
registerLiveNodesListener(wrapper);
-// DocCollection state = clusterState.getCollectionOrNull(collection);
-//
-// removeCollectionStateWatcher(collection, stateWatcher);
+ DocCollection state = clusterState.getCollectionOrNull(collection);
+ if (stateWatcher.onStateChanged(liveNodes, state) == true) {
+ removeCollectionStateWatcher(collection, stateWatcher);
+ }
}
/**