You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by no...@apache.org on 2022/07/05 13:09:00 UTC
[lucene-solr] branch jira/solr15138-4 updated: implementing avoiding force fetch
This is an automated email from the ASF dual-hosted git repository.
noble pushed a commit to branch jira/solr15138-4
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/jira/solr15138-4 by this push:
new fe63f56522c implementing avoiding force fetch
fe63f56522c is described below
commit fe63f56522c035a36a7d32337e9f1c6a37581437
Author: Noble Paul <no...@gmail.com>
AuthorDate: Tue Jul 5 23:08:43 2022 +1000
implementing avoiding force fetch
---
.../java/org/apache/solr/cloud/LeaderElector.java | 15 +----
.../solr/cloud/ShardLeaderElectionContext.java | 58 +++++++++++---------
.../solr/cloud/ShardLeaderElectionContextBase.java | 21 ++++---
.../java/org/apache/solr/cloud/ZkController.java | 41 +++++++-------
.../src/java/org/apache/solr/core/ConfigSet.java | 3 +
.../org/apache/solr/core/ConfigSetService.java | 3 +
.../java/org/apache/solr/core/CoreContainer.java | 6 --
.../apache/solr/common/cloud/ZkStateReader.java | 64 ++++++++++++++++++++++
8 files changed, 135 insertions(+), 76 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
index c8249214b16..2afb361a1d6 100644
--- a/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
+++ b/solr/core/src/java/org/apache/solr/cloud/LeaderElector.java
@@ -229,8 +229,6 @@ public class LeaderElector {
* @return sequential node number
*/
public int joinElection(ElectionContext context, boolean replacement,boolean joinAtHead) throws KeeperException, InterruptedException, IOException {
- Timer.TLInst.start("LeaderElector#joinElection()_0");
-
context.joinedElectionFired();
final String shardsElectZkPath = context.electionPath + LeaderElector.ELECTION_NODE;
@@ -241,12 +239,10 @@ public class LeaderElector {
boolean cont = true;
int tries = 0;
- Timer.TLInst.end("LeaderElector#joinElection()_0");
-
while (cont) {
try {
if(joinAtHead){
- Timer.TLInst.start("LeaderElector#joinElection()_0.5");
+ Timer.TLInst.start("LeaderElector#joinAtHead");
log.debug("Node {} trying to join election at the head", id);
List<String> nodes = OverseerTaskProcessor.getSortedElectionNodes(zkClient, shardsElectZkPath);
@@ -264,16 +260,11 @@ public class LeaderElector {
leaderSeqPath = shardsElectZkPath + "/" + id + "-n_"+ m.group(1);
zkClient.create(leaderSeqPath, null, CreateMode.EPHEMERAL, false);
}
- Timer.TLInst.end("LeaderElector#joinElection()_0.5");
+ Timer.TLInst.end("LeaderElector#joinAtHead");
} else {
- Timer.TLInst.start("LeaderElector#joinElection()_1");
-
leaderSeqPath = zkClient.create(shardsElectZkPath + "/" + id + "-n_", null,
CreateMode.EPHEMERAL_SEQUENTIAL, false);
-
- Timer.TLInst.end("LeaderElector#joinElection()_1");
-
}
log.debug("Joined leadership election with path: {}", leaderSeqPath);
@@ -336,9 +327,7 @@ public class LeaderElector {
checkIfIamLeader(context, replacement);
Timer.TLInst.end("LeaderElector#joinElection()_2");
- Timer.TLInst.start("LeaderElector#joinElection()_3");
int abc = getSeq(context.leaderSeqPath);
- Timer.TLInst.end("LeaderElector#joinElection()_3");
return abc;
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
index 623124a9a0a..8fafb101a2b 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
@@ -109,32 +109,38 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
//lt.minimumWaitBetweenActions();
//lt.markAttemptingAction();
- Timer.TLInst.start("runLeader#leaderVote");
int leaderVoteWait = cc.getZkController().getLeaderVoteWait();
- Timer.TLInst.end("runLeader#leaderVote");
-
- Timer.TLInst.start("runLeader#2");
+ Timer.TLInst.start("SLEC.runLeader#2");
// TODOFORNOBLE: we don't need this leader message for PRS
log.debug("Running the leader process for shard={} and weAreReplacement={} and leaderVoteWait={}", shardId, weAreReplacement, leaderVoteWait);
- if (zkController.getClusterState().getCollection(collection).getSlice(shardId).getReplicas().size() > 1) {
+ ZkStateReader.CurrentShardData current = ZkStateReader.getCurrent(zkStateReader, collection);
+ DocCollection coll = current.getColl();
+ int replicas = coll.getSlice(shardId).getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG)).size();
+ if (!coll.isPerReplicaState() && current.getNrtPlusTLOG(shardId)>1) {
// Clear the leader in clusterstate. We only need to worry about this if there is actually more than one replica.
ZkNodeProps m = new ZkNodeProps(Overseer.QUEUE_OPERATION, OverseerAction.LEADER.toLower(),
ZkStateReader.SHARD_ID_PROP, shardId, ZkStateReader.COLLECTION_PROP, collection);
zkController.getOverseer().getStateUpdateQueue().offer(Utils.toJSON(m));
}
+ Timer.TLInst.end("SLEC.runLeader#2");
if (!weAreReplacement) {
- waitForReplicasToComeUp(leaderVoteWait);
+ if(replicas > 1) {
+ Timer.TLInst.start("waitForReplicasToComeUp(leaderVoteWait)");
+ waitForReplicasToComeUp(leaderVoteWait);
+ Timer.TLInst.end("waitForReplicasToComeUp(leaderVoteWait)");
+ }
} else {
// SUSPICIOUS, this takes time 6s max, 2.7s avg, and does nothing
+ Timer.TLInst.start("areAllReplicasParticipating()");
areAllReplicasParticipating();
+ Timer.TLInst.end("areAllReplicasParticipating()");
}
- Timer.TLInst.end("runLeader#2");
if (isClosed) {
// Solr is shutting down or the ZooKeeper session expired while waiting for replicas. If the later,
@@ -209,7 +215,6 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
- Timer.TLInst.start("runLeader#5");
if (!success) {
boolean hasRecentUpdates = false;
@@ -234,7 +239,6 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
}
}
}
- Timer.TLInst.end("runLeader#5");
// solrcloud_debug
@@ -285,16 +289,14 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
, "without being up-to-date with the previous leader", coreNodeName);
zkController.getShardTerms(collection, shardId).setTermEqualsToLeader(coreNodeName);
}
- Timer.TLInst.start("runLeader#6");
+ Timer.TLInst.start("SLEC.runLeader#6");
super.runLeaderProcess(weAreReplacement, 0);
- Timer.TLInst.end("runLeader#6");
+ Timer.TLInst.end("SLEC.runLeader#6");
try (SolrCore core = cc.getCore(coreName)) {
if (core != null) {
- Timer.TLInst.start("runLeader#7");
core.getCoreDescriptor().getCloudDescriptor().setLeader(true);
publishActiveIfRegisteredAndNotActive(core);
- Timer.TLInst.end("runLeader#7");
} else {
return;
}
@@ -304,9 +306,7 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
}
// we made it as leader - send any recovery requests we need to
- Timer.TLInst.start("runLeader#8");
syncStrategy.requestRecoveries();
- Timer.TLInst.end("runLeader#8");
} catch (SessionExpiredException e) {
throw new SolrException(ErrorCode.SERVER_ERROR,
@@ -351,18 +351,24 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
* false if otherwise
*/
private boolean waitForEligibleBecomeLeaderAfterTimeout(ZkShardTerms zkShardTerms, String coreNodeName, int timeout) throws InterruptedException {
- long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
- while (!isClosed && !cc.isShutDown()) {
- if (System.nanoTime() > timeoutAt) {
- log.warn("After waiting for {}ms, no other potential leader was found, {} try to become leader anyway (core_term:{}, highest_term:{})",
- timeout, coreNodeName, zkShardTerms.getTerm(coreNodeName), zkShardTerms.getHighestTerm());
- return true;
- }
- if (replicasWithHigherTermParticipated(zkShardTerms, coreNodeName)) {
- log.info("Can't become leader, other replicas with higher term participated in leader election");
- return false;
+ Timer.TLInst.start("waitForEligibleBecomeLeaderAfterTimeout");
+ try {
+ long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
+ while (!isClosed && !cc.isShutDown()) {
+ if (System.nanoTime() > timeoutAt) {
+ log.warn("After waiting for {}ms, no other potential leader was found, {} try to become leader anyway (core_term:{}, highest_term:{})",
+ timeout, coreNodeName, zkShardTerms.getTerm(coreNodeName), zkShardTerms.getHighestTerm());
+ return true;
+ }
+ if (replicasWithHigherTermParticipated(zkShardTerms, coreNodeName)) {
+ log.info("Can't become leader, other replicas with higher term participated in leader election");
+ return false;
+ }
+ Thread.sleep(500L);
}
- Thread.sleep(500L);
+ } finally {
+ Timer.TLInst.end("waitForEligibleBecomeLeaderAfterTimeout");
+
}
return false;
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
index dd16c32a21c..6e75f6e64a4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
@@ -170,9 +170,10 @@ class ShardLeaderElectionContextBase extends ElectionContext {
// TODOFORNOBLE, don't need for PRS
assert shardId != null;
boolean isAlreadyLeader = false;
- if (zkStateReader.getClusterState() != null &&
- zkStateReader.getClusterState().getCollection(collection).getSlice(shardId).getReplicas().size() < 2) {
- Replica leader = zkStateReader.getLeader(collection, shardId);
+ ZkStateReader.CurrentShardData current = ZkStateReader.getCurrent(zkStateReader, collection);
+ if (current.getColl() != null &&
+ current.getNrtPlusTLOG(shardId) < 2) {
+ Replica leader = current.getColl().getLeader(shardId);
if (leader != null
&& leader.getNodeName().equals(leaderProps.get(ZkStateReader.NODE_NAME_PROP))
&& leader.getCoreName().equals(leaderProps.get(ZkStateReader.CORE_NAME_PROP))) {
@@ -193,21 +194,19 @@ class ShardLeaderElectionContextBase extends ElectionContext {
ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
assert zkController != null;
assert zkController.getOverseer() != null;
- DocCollection coll = zkStateReader.getCollection(this.collection);
+ DocCollection coll = current.getColl();
if (coll == null || coll.getStateFormat() < 2 || ZkController.sendToOverseer(coll, id)) {
- Timer.TLInst.start("superRunLeader#1");
-
+ Timer.TLInst.start("SLECB.offerStateUpdate(Utils.toJSON(m))");
zkController.getOverseer().offerStateUpdate(Utils.toJSON(m));
- Timer.TLInst.end("superRunLeader#1");
+ Timer.TLInst.end("SLECB.offerStateUpdate(Utils.toJSON(m))");
} else {
- Timer.TLInst.start("superRunLeader#2");
+ Timer.TLInst.start("SLECB.PRS.flipLeader()");
PerReplicaStates prs = PerReplicaStates.fetch(coll.getZNode(), zkClient, coll.getPerReplicaStates());
- PerReplicaStatesOps.flipLeader(zkStateReader.getClusterState().getCollection(collection).getSlice(shardId).getReplicaNames(), id, prs)
+ PerReplicaStatesOps.flipLeader(current.getColl().getSlice(shardId).getReplicaNames(), id, prs)
.persist(coll.getZNode(), zkStateReader.getZkClient());
- Timer.TLInst.end("superRunLeader#2");
-
+ Timer.TLInst.start("SLECB.PRS.flipLeader()");
}
}
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 016c6862cfd..1b1f3e08711 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -50,7 +50,6 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.base.Strings;
-import com.google.common.base.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
@@ -1223,31 +1222,34 @@ public class ZkController implements Closeable {
public String register(String coreName, final CoreDescriptor desc, boolean recoverReloadedCores,
boolean afterExpiration, boolean skipRecovery) throws Exception {
MDCLoggingContext.setCoreDescriptor(cc, desc);
+ final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
+ final String collection = cloudDesc.getCollectionName();
+ ZkStateReader.setCurrent(zkStateReader, collection);
+
try {
- Timer.TLInst.start("register()_1");
// pre register has published our down state
final String baseUrl = getBaseUrl();
- final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
- final String collection = cloudDesc.getCollectionName();
final String shardId = cloudDesc.getShardId();
final String coreZkNodeName = cloudDesc.getCoreNodeName();
assert coreZkNodeName != null : "we should have a coreNodeName by now";
// check replica's existence in clusterstate first
try {
+ Timer.TLInst.start("ZKC.register().ZK.waitForState()");
zkStateReader.waitForState(collection, Overseer.isLegacy(zkStateReader) ? 60000 : 100,
TimeUnit.MILLISECONDS, (collectionState) -> getReplicaOrNull(collectionState, shardId, coreZkNodeName) != null);
+ Timer.TLInst.end("ZKC.register().ZK.waitForState()");
} catch (TimeoutException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore, timeout waiting for replica present in clusterstate");
}
- Replica replica = getReplicaOrNull(zkStateReader.getClusterState().getCollectionOrNull(collection), shardId, coreZkNodeName);
+ Replica replica = getReplicaOrNull(ZkStateReader.getCurrent(zkStateReader, collection).getColl(), shardId, coreZkNodeName);
if (replica == null) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore, replica is removed from clusterstate");
}
- Timer.TLInst.end("register()_1");
- Timer.TLInst.start("register()_2");
+
+ Timer.TLInst.start("ZKC.register()_2");
if (replica.getType() != Type.PULL) {
getCollectionTerms(collection).register(cloudDesc.getShardId(), coreZkNodeName);
@@ -1258,8 +1260,8 @@ public class ZkController implements Closeable {
log.debug("Register replica - core:{} address:{} collection:{} shard:{}",
coreName, baseUrl, collection, shardId);
- Timer.TLInst.end("register()_2");
- Timer.TLInst.start("register()_3");
+ Timer.TLInst.end("ZKC.register()_2");
+ Timer.TLInst.start("ZKC.register()_3");
try {
// If we're a preferred leader, insert ourselves at the head of the queue
@@ -1281,8 +1283,8 @@ public class ZkController implements Closeable {
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
}
- Timer.TLInst.end("register()_3");
- Timer.TLInst.start("register()_4");
+ Timer.TLInst.end("ZKC.register()_3");
+ Timer.TLInst.start("ZKC.register()_4");
// in this case, we want to wait for the leader as long as the leader might
@@ -1295,8 +1297,8 @@ public class ZkController implements Closeable {
boolean isLeader = leaderUrl.equals(ourUrl);
assert !(isLeader && replica.getType() == Type.PULL) : "Pull replica became leader!";
- Timer.TLInst.end("register()_4");
- Timer.TLInst.start("register()_5");
+ Timer.TLInst.end("ZKC.register()_4");
+ Timer.TLInst.start("ZKC.register()_5");
try (SolrCore core = cc.getCore(desc.getName())) {
@@ -1355,17 +1357,19 @@ public class ZkController implements Closeable {
unregister(coreName, desc, false);
throw e;
}
-
+ Timer.TLInst.end("ZKC.register()_5");
+ Timer.TLInst.start("forceUpdateCollection()");
// make sure we have an update cluster state right away
zkStateReader.forceUpdateCollection(collection);
+ Timer.TLInst.end("forceUpdateCollection()");
+ Timer.TLInst.start("registerDocCollectionWatcher()");
// the watcher is added to a set so multiple calls of this method will left only one watcher
zkStateReader.registerDocCollectionWatcher(cloudDesc.getCollectionName(),
new UnloadCoreOnDeletedWatcher(coreZkNodeName, shardId, desc.getName()));
-
- Timer.TLInst.end("register()_5");
-
+ Timer.TLInst.end("registerDocCollectionWatcher()");
return shardId;
} finally {
+ ZkStateReader.removeCurrentColl();
MDCLoggingContext.clear();
}
}
@@ -1508,8 +1512,6 @@ public class ZkController implements Closeable {
throws InterruptedException, KeeperException, IOException {
// look for old context - if we find it, cancel it
- Timer.TLInst.start("joinElection()_1");
-
String collection = cd.getCloudDescriptor().getCollectionName();
final String coreNodeName = cd.getCloudDescriptor().getCoreNodeName();
@@ -1521,7 +1523,6 @@ public class ZkController implements Closeable {
prevContext.cancelElection();
}
- Timer.TLInst.end("joinElection()_1");
Timer.TLInst.start("joinElection()_2");
String shardId = cd.getCloudDescriptor().getShardId();
diff --git a/solr/core/src/java/org/apache/solr/core/ConfigSet.java b/solr/core/src/java/org/apache/solr/core/ConfigSet.java
index 7e175592590..c98f29a447e 100644
--- a/solr/core/src/java/org/apache/solr/core/ConfigSet.java
+++ b/solr/core/src/java/org/apache/solr/core/ConfigSet.java
@@ -16,6 +16,7 @@
*/
package org.apache.solr.core;
+import org.apache.solr.common.Timer;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.schema.IndexSchema;
@@ -45,7 +46,9 @@ public class ConfigSet {
this.name = name;
this.solrconfig = solrConfig;
this.schemaSupplier = indexSchemaSupplier;
+ Timer.TLInst.start("schemaSupplier.get()");
schema = schemaSupplier.get(true);
+ Timer.TLInst.end("schemaSupplier.get()");
this.properties = properties;
this.trusted = trusted;
}
diff --git a/solr/core/src/java/org/apache/solr/core/ConfigSetService.java b/solr/core/src/java/org/apache/solr/core/ConfigSetService.java
index 6d65f86705f..69c2b47e6a7 100644
--- a/solr/core/src/java/org/apache/solr/core/ConfigSetService.java
+++ b/solr/core/src/java/org/apache/solr/core/ConfigSetService.java
@@ -31,6 +31,7 @@ import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkSolrResourceLoader;
import org.apache.solr.common.ConfigNode;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.Timer;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.IndexSchemaFactory;
@@ -204,10 +205,12 @@ public abstract class ConfigSetService {
}
public static ConfigNode getParsedSchema(InputStream is, SolrResourceLoader loader, String name) throws IOException, SAXException, ParserConfigurationException {
+ Timer.TLInst.start("getParsedSchema()");
XmlConfigFile schemaConf = null;
InputSource inputSource = new InputSource(is);
inputSource.setSystemId(SystemIdResolver.createSystemIdFromResourceName(name));
schemaConf = new XmlConfigFile(loader, SCHEMA, inputSource, SLASH + SCHEMA + SLASH, null);
+ Timer.TLInst.end("getParsedSchema()");
return new DataConfigNode(new DOMConfigNode(schemaConf.getDocument().getDocumentElement()));
}
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 08111c8e2f3..f255d6cc369 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -1257,11 +1257,7 @@ public class CoreContainer {
}
assert core.getName().equals(cd.getName()) : "core name " + core.getName() + " != cd " + cd.getName();
-
- Timer.TLInst.start("solrCores.putCore()");
SolrCore old = solrCores.putCore(cd, core);
- Timer.TLInst.end("solrCores.putCore()");
-
coreInitFailures.remove(cd.getName());
if (old == null || old == core) {
@@ -1344,9 +1340,7 @@ public class CoreContainer {
// Much of the logic in core handling pre-supposes that the core.properties file already exists, so create it
// first and clean it up if there's an error.
- Timer.TLInst.start("coresLocator.create()");
coresLocator.create(this, cd);
- Timer.TLInst.end("coresLocator.create()");
SolrCore core;
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 1fa7487fadc..09e4c4fc12c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -445,6 +445,28 @@ public class ZkStateReader implements SolrCloseable {
refreshLegacyClusterState(null);
} else if (watchedCollectionStates.containsKey(collection)) {
// Exists as a watched collection, force a refresh.
+ DocCollection current = watchedCollectionStates.get(collection);
+ if(current != null) {
+ String collectionPath = getCollectionPath(collection);
+ Stat stat = zkClient.exists(collectionPath, null, true);
+ if (stat.getVersion() <= current.getZNodeVersion()) {
+ //we already have the latest version
+ if (current.isPerReplicaState()) {
+ PerReplicaStates prs = current.getPerReplicaStates();
+ if (prs != null && prs.cversion <= stat.getCversion()) {
+ DocCollection newState = current.copyWith(PerReplicaStates.fetch(collectionPath, zkClient, null));
+ if (updateWatchedCollection(collection, newState)) {
+ constructState(Collections.singleton(collection));
+ }
+ return;
+ } else {
+ return;
+ }
+ } else {
+ return;
+ }
+ }
+ }
log.debug("Forcing refresh of watched collection state for {}", collection);
DocCollection newState = fetchCollectionState(collection, null, null);
if (updateWatchedCollection(collection, newState)) {
@@ -1856,6 +1878,48 @@ public class ZkStateReader implements SolrCloseable {
}
}
+ static ThreadLocal<CurrentShardData> CCD = new ThreadLocal<>();
+ public static class CurrentShardData {
+
+ DocCollection coll;
+ int nrtPlusTLOG;
+
+ CurrentShardData(ZkStateReader zkStateReader, String coll) {
+ this.coll = zkStateReader.getClusterState().getCollectionOrNull(coll);
+ }
+
+ public DocCollection getColl() {
+ return coll;
+ }
+
+ public int getNrtPlusTLOG(String shard) {
+ if (nrtPlusTLOG > -1) return nrtPlusTLOG;
+ Slice sl = coll.getSlice(shard);
+ nrtPlusTLOG = sl.getReplicas(EnumSet.of(Replica.Type.NRT, Replica.Type.TLOG)).size();
+ return nrtPlusTLOG;
+ }
+ }
+
+
+
+ public static CurrentShardData getCurrent(ZkStateReader zksr, String coll) {
+ CurrentShardData ccd = CCD.get();
+ if(ccd == null){
+ ccd = new CurrentShardData(zksr,coll);
+ }
+ return ccd;
+ }
+ public static CurrentShardData setCurrent(ZkStateReader zksr, String coll) {
+ CurrentShardData ccd = CCD.get();
+ if(ccd == null) {
+ CCD.set(ccd = new CurrentShardData(zksr,coll));
+ }
+ return ccd;
+ }
+
+ public static void removeCurrentColl(){
+ CCD.remove();
+ }
/**
* Block until a Predicate returns true, or the wait times out
*