You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/03/01 03:58:01 UTC
[lucene-solr] 02/02: @1417 Continue a bit on the state updates,
some tweaks and play around the fallout of that. HEY MARK,
COME BACK AND REVIEW A BIT SOON.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit 2b53d6727f951ec012473eb99b9393522df29c58
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sun Feb 28 21:31:00 2021 -0600
@1417 Continue a bit on the state updates, some tweaks and play around the fallout of that. HEY MARK, COME BACK AND REVIEW A BIT SOON.
Took 3 hours 7 minutes
---
.../client/solrj/embedded/JettySolrRunner.java | 47 ++++----
.../src/java/org/apache/solr/cloud/Overseer.java | 21 +---
.../apache/solr/cloud/OverseerElectionContext.java | 5 +-
.../solr/cloud/ShardLeaderElectionContextBase.java | 4 +-
.../java/org/apache/solr/cloud/StatePublisher.java | 18 ++--
.../java/org/apache/solr/cloud/ZkController.java | 119 +++++++--------------
.../apache/solr/cloud/overseer/ZkStateWriter.java | 22 +---
.../java/org/apache/solr/core/CoreContainer.java | 68 ++++++------
.../src/java/org/apache/solr/core/SolrCore.java | 10 +-
.../apache/solr/update/DirectUpdateHandler2.java | 11 +-
.../org/apache/solr/cloud/TestCloudRecovery2.java | 2 -
.../org/apache/solr/common/cloud/ClusterState.java | 11 ++
.../java/org/apache/solr/common/cloud/Replica.java | 9 +-
13 files changed, 143 insertions(+), 204 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
index 21f5607..0784133 100644
--- a/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
+++ b/solr/core/src/java/org/apache/solr/client/solrj/embedded/JettySolrRunner.java
@@ -84,6 +84,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
@@ -706,29 +707,29 @@ public class JettySolrRunner implements Closeable {
throw new RuntimeException(e);
}
-// if (wait && coreContainer != null && coreContainer
-// .isZooKeeperAware()) {
-// log.info("waitForJettyToStop: {}", getLocalPort());
-// String nodeName = getNodeName();
-// if (nodeName == null) {
-// log.info("Cannot wait for Jetty with null node name");
-// } else {
-//
-// log.info("waitForNode: {}", getNodeName());
-//
-// ZkStateReader reader = coreContainer.getZkController().getZkStateReader();
-//
-// try {
-// if (!reader.isClosed() && reader.getZkClient().isConnected()) {
-// reader.waitForLiveNodes(10, TimeUnit.SECONDS, (n) -> !n.contains(nodeName));
-// }
-// } catch (InterruptedException e) {
-// ParWork.propagateInterrupt(e);
-// } catch (TimeoutException e) {
-// log.error("Timeout waiting for live node");
-// }
-// }
-// }
+ if (wait && coreContainer != null && coreContainer
+ .isZooKeeperAware()) {
+ log.info("waitForJettyToStop: {}", getLocalPort());
+ String nodeName = getNodeName();
+ if (nodeName == null) {
+ log.info("Cannot wait for Jetty with null node name");
+ } else {
+
+ log.info("waitForNode: {}", getNodeName());
+
+ ZkStateReader reader = coreContainer.getZkController().getZkStateReader();
+
+ try {
+ if (!reader.isClosed() && reader.getZkClient().isConnected()) {
+ reader.waitForLiveNodes(10, TimeUnit.SECONDS, (n) -> !n.contains(nodeName));
+ }
+ } catch (InterruptedException e) {
+ ParWork.propagateInterrupt(e);
+ } catch (TimeoutException e) {
+ log.error("Timeout waiting for live node");
+ }
+ }
+ }
} catch (Exception e) {
SolrZkClient.checkInterrupted(e);
diff --git a/solr/core/src/java/org/apache/solr/cloud/Overseer.java b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
index 2d42625..1a45047 100644
--- a/solr/core/src/java/org/apache/solr/cloud/Overseer.java
+++ b/solr/core/src/java/org/apache/solr/cloud/Overseer.java
@@ -823,18 +823,13 @@ public class Overseer implements SolrCloseable {
@Override
public void close() {
- ourLock.lock();
- try {
- this.closed = true;
- closeWatcher();
- } finally {
- ourLock.unlock();
- }
+ this.closed = true;
+ closeWatcher();
}
private void closeWatcher() {
try {
- zkController.getZkClient().removeWatches(path, this, WatcherType.Data, true);
+ zkController.getZkClient().removeWatches(path, this, WatcherType.Any, true);
} catch (KeeperException.NoWatcherException e) {
} catch (Exception e) {
@@ -883,16 +878,6 @@ public class Overseer implements SolrCloseable {
if (zkController.getZkClient().isAlive()) {
try {
zkController.getZkClient().delete(fullPaths, true);
- } catch (KeeperException.SessionExpiredException | KeeperException.ConnectionLossException e) {
- if (zkController.getZkClient().isAlive()) {
- try {
- zkController.getZkClient().delete(fullPaths, true);
- } catch (KeeperException keeperException) {
- log.warn("Failed deleting processed items", e);
- }
- } else {
- log.warn("Failed deleting processed items", e);
- }
} catch (Exception e) {
log.warn("Failed deleting processed items", e);
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
index 5657efe..d294266 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerElectionContext.java
@@ -38,15 +38,14 @@ final class OverseerElectionContext extends ShardLeaderElectionContextBase {
private final Overseer overseer;
public OverseerElectionContext(final String zkNodeName, SolrZkClient zkClient, Overseer overseer) {
- super(zkNodeName, Overseer.OVERSEER_ELECT, Overseer.OVERSEER_ELECT + "/leader", new Replica("-1", getIDMap(zkNodeName, overseer), "overseer", "overseer", overseer.getZkStateReader()), null, zkClient);
+ super(zkNodeName, Overseer.OVERSEER_ELECT, Overseer.OVERSEER_ELECT + "/leader", new Replica("overseer:" + overseer.getZkController().getNodeName(), getIDMap(zkNodeName, overseer), "overseer", "overseer", overseer.getZkStateReader()), null, zkClient);
this.overseer = overseer;
this.zkClient = zkClient;
}
private static Map<String,Object> getIDMap(String zkNodeName, Overseer overseer) {
Map<String,Object> idMap = new HashMap<>(2);
- idMap.put("id", "-1");
- idMap.put("zknode", zkNodeName);
+ idMap.put("id", zkNodeName);
idMap.put(ZkStateReader.NODE_NAME_PROP, overseer.getZkController().getNodeName());
return idMap;
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
index ab1515a..f8c5752 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContextBase.java
@@ -66,7 +66,7 @@ class ShardLeaderElectionContextBase extends ElectionContext {
// return;
// }
super.cancelElection();
- if (zkClient.isAlive()) {
+ // if (zkClient.isAlive()) {
try {
if (leaderZkNodeParentVersion != null) {
try {
@@ -165,7 +165,7 @@ class ShardLeaderElectionContextBase extends ElectionContext {
log.info("Exception trying to cancel election {} {}", e.getClass().getName(), e.getMessage());
}
leaderZkNodeParentVersion = null;
- }
+ // }
}
@Override
diff --git a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
index 22a1e1e..845b1aa 100644
--- a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
+++ b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
@@ -156,25 +156,25 @@ public class StatePublisher implements Closeable {
}
private void clearStatesForNode(ZkNodeProps bulkMessage, String nodeName) {
- Set<String> removeCores = new HashSet<>();
- Set<String> cores = bulkMessage.getProperties().keySet();
- for (String core : cores) {
- if (core.equals(OverseerAction.DOWNNODE.toLower()) || core.equals(OverseerAction.RECOVERYNODE.toLower())) {
+ Set<String> removeIds = new HashSet<>();
+ Set<String> ids = bulkMessage.getProperties().keySet();
+ for (String id : ids) {
+ if (id.equals(OverseerAction.DOWNNODE.toLower()) || id.equals(OverseerAction.RECOVERYNODE.toLower())) {
continue;
}
Collection<DocCollection> collections = zkStateReader.getClusterState().getCollectionsMap().values();
for (DocCollection collection : collections) {
- Replica replica = collection.getReplica(core);
+ Replica replica = collection.getReplicaById(id);
if (replica != null) {
if (replica.getNodeName().equals(nodeName)) {
- removeCores.add(core);
+ removeIds.add(id);
}
}
}
}
- for (String core : removeCores) {
- bulkMessage.getProperties().remove(core);
+ for (String id : removeIds) {
+ bulkMessage.getProperties().remove(id);
}
}
@@ -206,7 +206,7 @@ public class StatePublisher implements Closeable {
if (coll != null) {
Replica replica = coll.getReplica(core);
id = replica.getId();
- String lastState = stateCache.get(core);
+ String lastState = stateCache.get(id);
if (collection != null && replica != null && !state.equals(Replica.State.ACTIVE) && state.equals(lastState) && replica.getState().toString().equals(state)) {
log.info("Skipping publish state as {} for {}, because it was the last state published", state, core);
return;
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index fbf8a73..f74c2f8 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -23,7 +23,6 @@ import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
import org.apache.solr.client.solrj.impl.SolrClientCloudManager;
import org.apache.solr.cloud.overseer.OverseerAction;
-import org.apache.solr.cloud.overseer.SliceMutator;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrException;
@@ -362,6 +361,7 @@ public class ZkController implements Closeable, Runnable {
zkController.register(descriptor.getName(), descriptor, afterExpiration);
} catch (Exception e) {
log.error("Error registering core name={} afterExpireation={}", descriptor.getName(), afterExpiration);
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
}
return descriptor;
@@ -487,8 +487,6 @@ public class ZkController implements Closeable, Runnable {
removeEphemeralLiveNode();
- publishNodeAs(getNodeName(), OverseerAction.RECOVERYNODE);
-
// recreate our watchers first so that they exist even on any problems below
zkStateReader.createClusterStateWatchersAndUpdate();
@@ -1304,72 +1302,50 @@ public class ZkController implements Closeable, Runnable {
getZkStateReader().registerCore(cloudDesc.getCollectionName());
}
- try {
- log.info("Waiting to see our entry in state.json {}", desc.getName());
- zkStateReader.waitForState(collection, Integer.getInteger("solr.zkregister.leaderwait", 30000), TimeUnit.MILLISECONDS, (l, c) -> { // MRM TODO: timeout
- if (c == null) {
- return false;
- }
- coll.set(c);
- Replica r = c.getReplica(coreName);
- if (r != null) {
- replicaRef.set(r);
- return true;
- }
- return false;
- });
- } catch (TimeoutException e) {
- log.warn("Timeout waiting to see core " + coreName + " \ncollection=" + collection + " " + coll.get());
- }
-
- Replica replica = replicaRef.get();
-
- if (replica == null) {
- throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore, replica=" + coreName + " is removed from clusterstate \n"
- + coll.get());
- }
-
- log.info("Register replica - core:{} address:{} collection:{} shard:{} type={}", coreName, baseUrl, collection, shardId, replica.getType());
+ log.info("Register replica - core:{} address:{} collection:{} shard:{} type={}", coreName, baseUrl, collection, shardId, cloudDesc.getReplicaType());
log.info("Register terms for replica {}", coreName);
registerShardTerms(collection, cloudDesc.getShardId(), coreName);
log.info("Create leader elector for replica {}", coreName);
- leaderElector = leaderElectors.get(replica.getName());
+ leaderElector = leaderElectors.get(coreName);
if (leaderElector == null) {
leaderElector = new LeaderElector(this);
- LeaderElector oldElector = leaderElectors.putIfAbsent(replica.getName(), leaderElector);
+ LeaderElector oldElector = leaderElectors.putIfAbsent(coreName, leaderElector);
if (oldElector != null) {
IOUtils.closeQuietly(leaderElector);
}
- if (cc.isShutDown()) {
- IOUtils.closeQuietly(leaderElector);
- IOUtils.closeQuietly(oldElector);
- IOUtils.closeQuietly(getShardTermsOrNull(collection, shardId));
- throw new AlreadyClosedException();
- }
+// if (cc.isShutDown()) {
+// IOUtils.closeQuietly(leaderElector);
+// IOUtils.closeQuietly(oldElector);
+// IOUtils.closeQuietly(getShardTermsOrNull(collection, shardId));
+// throw new AlreadyClosedException();
+// }
}
// If we're a preferred leader, insert ourselves at the head of the queue
- boolean joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false);
- if (replica.getType() != Type.PULL) {
+ boolean joinAtHead = false; //replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false);
+
+ if (cloudDesc.getReplicaType() != Type.PULL) {
//getCollectionTerms(collection).register(cloudDesc.getShardId(), coreName);
// MRM TODO: review joinAtHead
joinElection(desc, joinAtHead);
}
log.info("Wait to see leader for {}, {}", collection, shardId);
- Replica leader = null;
+ String leaderName = null;
+
for (int i = 0; i < 60; i++) {
if (leaderElector.isLeader()) {
- leader = replica;
+ leaderName = coreName;
break;
}
try {
- leader = zkStateReader.getLeaderRetry(collection, shardId, 3000, true);
+ Replica leader = zkStateReader.getLeaderRetry(collection, shardId, 3000, true);
+ leaderName = leader.getName();
} catch (TimeoutException timeoutException) {
if (isClosed() || isDcCalled() || cc.isShutDown()) {
@@ -1380,15 +1356,15 @@ public class ZkController implements Closeable, Runnable {
}
}
- if (leader == null) {
+ if (leaderName == null) {
log.error("No leader found while trying to register " + coreName + " with zookeeper");
throw new SolrException(ErrorCode.SERVER_ERROR, "No leader found while trying to register " + coreName + " with zookeeper");
}
- String ourUrl = replica.getCoreUrl();
- boolean isLeader = leader.getName().equals(coreName);
- log.info("We are {} and leader is {} isLeader={}", ourUrl, leader.getCoreUrl(), isLeader);
+ boolean isLeader = leaderName.equals(coreName);
+
+ log.info("We are {} and leader is {} isLeader={}", coreName, leaderName, isLeader);
log.info("Check if we should recover isLeader={}", isLeader);
//assert !(isLeader && replica.getType() == Type.PULL) : "Pull replica became leader!";
@@ -1404,7 +1380,7 @@ public class ZkController implements Closeable, Runnable {
}
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
- boolean isTlogReplicaAndNotLeader = replica.getType() == Replica.Type.TLOG && !isLeader;
+ boolean isTlogReplicaAndNotLeader = cloudDesc.getReplicaType() == Replica.Type.TLOG && !isLeader;
if (isTlogReplicaAndNotLeader) {
String commitVersion = ReplicateFromLeader.getCommitVersion(core);
if (commitVersion != null) {
@@ -1415,7 +1391,7 @@ public class ZkController implements Closeable, Runnable {
if (!afterExpiration && !core.isReloaded() && ulog != null && !isTlogReplicaAndNotLeader) {
// disable recovery in case shard is in construction state (for shard splits)
Slice slice = getClusterState().getCollection(collection).getSlice(shardId);
- if (slice.getState() != Slice.State.CONSTRUCTION || (slice.getState() == Slice.State.CONSTRUCTION && !isLeader)) {
+ if (slice.getState() != Slice.State.CONSTRUCTION || !isLeader) {
Future<UpdateLog.RecoveryInfo> recoveryFuture = core.getUpdateHandler().getUpdateLog().recoverFromLog();
if (recoveryFuture != null) {
log.info("Replaying tlog for {} during startup... NOTE: This can take a while.", core);
@@ -1431,20 +1407,17 @@ public class ZkController implements Closeable, Runnable {
}
}
- if (replica.getType() != Type.PULL) {
- checkRecovery(isLeader, core, cc);
- }
-
- if (isTlogReplicaAndNotLeader) {
+ if (cloudDesc.getReplicaType() != Type.PULL && !isLeader) {
+ checkRecovery(core, cc);
+ } else if (isTlogReplicaAndNotLeader) {
startReplicationFromLeader(coreName, true);
}
- if (replica.getType() == Type.PULL) {
+ if (cloudDesc.getReplicaType() == Type.PULL) {
startReplicationFromLeader(coreName, false);
- publish(desc, Replica.State.ACTIVE);
}
- if (replica.getType() != Type.PULL) {
+ if (cloudDesc.getReplicaType() != Type.PULL) {
shardTerms = getShardTerms(collection, cloudDesc.getShardId());
// the watcher is added to a set so multiple calls of this method will left only one watcher
if (log.isDebugEnabled()) log.debug("add shard terms listener for {}", coreName);
@@ -1590,19 +1563,11 @@ public class ZkController implements Closeable, Runnable {
/**
* Returns whether or not a recovery was started
*/
- private void checkRecovery(final boolean isLeader, SolrCore core, CoreContainer cc) {
-
- if (!isLeader) {
-
+ private void checkRecovery(SolrCore core, CoreContainer cc) {
if (log.isInfoEnabled()) {
log.info("Core needs to recover:{}", core.getName());
}
core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
-
- } else {
- log.info("I am the leader, no recovery necessary");
- }
-
}
@@ -1958,8 +1923,6 @@ public class ZkController implements Closeable, Runnable {
}
try {
overseerElector.retryElection(joinAtHead);
- } catch (AlreadyClosedException e) {
- return;
} catch (Exception e) {
ParWork.propagateInterrupt(e);
throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to rejoin election", e);
@@ -2119,14 +2082,13 @@ public class ZkController implements Closeable, Runnable {
if (confListeners.confDirListeners.isEmpty()) {
// no more listeners for this confDir, remove it from the map
if (log.isDebugEnabled()) log.debug("No more listeners for config directory [{}]", confDir);
- zkClient.removeWatches(COLLECTIONS_ZKNODE, confListeners.watcher, Watcher.WatcherType.Any, true, (rc, path, ctx) -> {
- if (rc != 0) {
- KeeperException ex = KeeperException.create(KeeperException.Code.get(rc), path);
- if (!(ex instanceof KeeperException.NoWatcherException)) {
- log.error("Exception removing watch for " + path, ex);
- }
- }
- }, "confWatcher");
+ try {
+ zkClient.removeWatches(COLLECTIONS_ZKNODE, confListeners.watcher, Watcher.WatcherType.Any, true);
+ } catch (KeeperException.NoWatcherException e) {
+
+ } catch (Exception e) {
+ log.info("could not remove watch {} {}", e.getClass().getSimpleName(), e.getMessage());
+ }
confDirectoryListeners.remove(confDir);
}
}
@@ -2239,12 +2201,7 @@ public class ZkController implements Closeable, Runnable {
private static void setConfWatcher(String zkDir, Watcher watcher, Stat stat, CoreContainer cc, Map<String, ConfListeners> confDirectoryListeners, SolrZkClient zkClient) {
try {
- zkClient.addWatch(zkDir, watcher, AddWatchMode.PERSISTENT, (rc, path, ctx) -> {
- if (rc != 0) {
- KeeperException ex = KeeperException.create(KeeperException.Code.get(rc), path);
- log.error("Exception creating watch for " + path, ex);
- }
- }, "confWatcher");
+ zkClient.addWatch(zkDir, watcher, AddWatchMode.PERSISTENT);
Stat newStat = zkClient.exists(zkDir, null);
if (stat != null && newStat.getVersion() > stat.getVersion()) {
//a race condition where a we missed an event fired
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index de321da..475ab1a 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -142,7 +142,6 @@ public class ZkStateWriter {
if (latestColl == null) {
//log.info("no node exists, using version 0");
trackVersions.remove(collection.getName());
- idToCollection.remove(collection.getId());
} else {
cs.getCollectionStates().put(latestColl.getName(), new ClusterState.CollectionRef(latestColl));
//log.info("got version from zk {}", existsStat.getVersion());
@@ -152,13 +151,8 @@ public class ZkStateWriter {
}
}
- DocCollection currentCollection = cs.getCollectionOrNull(collection.getName());
- if (currentCollection != null) {
- idToCollection.put(currentCollection.getId(), currentCollection.getName());
- } else {
- idToCollection.put(collection.getId(), collection.getName());
- }
+ DocCollection currentCollection = cs.getCollectionOrNull(collection.getName());
collection.getProperties().remove("pullReplicas");
collection.getProperties().remove("replicationFactor");
collection.getProperties().remove("maxShardsPerNode");
@@ -234,18 +228,10 @@ public class ZkStateWriter {
}
long collectionId = Long.parseLong(id.split("-")[0]);
- String collection = idToCollection.get(collectionId);
+ String collection = reader.getClusterState().getCollection(collectionId);
if (collection == null) {
- String[] cname = new String[1];
- this.cs.forEachCollection( col -> {if (col.getId() == collectionId) cname[0]=col.getName();});
-
- if (cname[0] != null) {
- collection = cname[0];
- }
- if (collection == null) {
- continue;
- }
+ continue;
}
String setState = Replica.State.shortStateToState(stateString).toString();
@@ -381,8 +367,6 @@ public class ZkStateWriter {
// ver = docColl.getZNodeVersion();
if (ver == null) {
ver = 0;
- } else {
-
}
}
updates.getProperties().put("_cs_ver_", ver.toString());
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 725e1f1..ddd0bd2 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -883,40 +883,40 @@ public class CoreContainer implements Closeable {
status |= CORE_DISCOVERY_COMPLETE;
startedLoadingCores = true;
- if (isZooKeeperAware()) {
-
- log.info("Waiting to see RECOVERY states for node on startup ...");
- for (final CoreDescriptor cd : cds) {
- String collection = cd.getCollectionName();
- try {
- getZkController().getZkStateReader().waitForState(collection, 5, TimeUnit.SECONDS, (n, c) -> {
- if (c == null) {
- if (log.isDebugEnabled()) log.debug("Found incorrect state c={}", c);
- return false;
- }
- String nodeName = getZkController().getNodeName();
- List<Replica> replicas = c.getReplicas();
- for (Replica replica : replicas) {
- if (replica.getNodeName().equals(nodeName)) {
- if (!replica.getState().equals(Replica.State.RECOVERING)) {
- if (log.isDebugEnabled()) log.debug("Found incorrect state {} {} ourNodeName={}", replica.getState(), replica.getNodeName(), nodeName);
- return false;
- }
- } else {
- if (log.isDebugEnabled()) log.debug("Found incorrect state {} {} ourNodeName={}", replica.getState(), replica.getNodeName(), nodeName);
- }
- }
-
- return true;
- });
- } catch (InterruptedException e) {
- ParWork.propagateInterrupt(e);
- return;
- } catch (TimeoutException e) {
- log.error("Timeout", e);
- }
- }
- }
+// if (isZooKeeperAware()) {
+//
+// log.info("Waiting to see RECOVERY states for node on startup ...");
+// for (final CoreDescriptor cd : cds) {
+// String collection = cd.getCollectionName();
+// try {
+// getZkController().getZkStateReader().waitForState(collection, 5, TimeUnit.SECONDS, (n, c) -> {
+// if (c == null) {
+// if (log.isDebugEnabled()) log.debug("Found incorrect state c={}", c);
+// return false;
+// }
+// String nodeName = getZkController().getNodeName();
+// List<Replica> replicas = c.getReplicas();
+// for (Replica replica : replicas) {
+// if (replica.getNodeName().equals(nodeName)) {
+// if (!replica.getState().equals(Replica.State.RECOVERING)) {
+// if (log.isDebugEnabled()) log.debug("Found incorrect state {} {} ourNodeName={}", replica.getState(), replica.getNodeName(), nodeName);
+// return false;
+// }
+// } else {
+// if (log.isDebugEnabled()) log.debug("Found incorrect state {} {} ourNodeName={}", replica.getState(), replica.getNodeName(), nodeName);
+// }
+// }
+//
+// return true;
+// });
+// } catch (InterruptedException e) {
+// ParWork.propagateInterrupt(e);
+// return;
+// } catch (TimeoutException e) {
+// log.error("Timeout", e);
+// }
+// }
+// }
for (final CoreDescriptor cd : cds) {
if (!cd.isTransient() && cd.isLoadOnStartup()) {
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 407af18..faab326 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -734,7 +734,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
CoreDescriptor cd = getCoreDescriptor();
cd.loadExtraProperties(); //Reload the extra properties
// coreMetricManager.close();
- if (coreContainer.isShutDown()) {
+ if (coreContainer.isShutDown() || closing) {
throw new AlreadyClosedException();
}
core = new SolrCore(coreContainer, getName(), coreConfig, cd, getDataDir(), updateHandler, solrDelPolicy, currentCore, true);
@@ -2387,7 +2387,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
SolrIndexSearcher tmp = null;
RefCounted<SolrIndexSearcher> newestSearcher = null;
boolean success = false;
- if (coreContainer.isShutDown()) {
+ if (coreContainer.isShutDown() || closing) {
throw new AlreadyClosedException();
}
try {
@@ -2410,7 +2410,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
searcherLock.lock();
try {
- if (coreContainer.isShutDown()) { // if we start new searchers after close we won't close them
+ if (coreContainer.isShutDown() || closing) { // if we start new searchers after close we won't close them
throw new SolrCoreState.CoreIsClosedException();
}
@@ -2472,7 +2472,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
// (caches take a little while to instantiate)
final boolean useCaches = !realtime;
final String newName = realtime ? "realtime" : "main";
- if (coreContainer.isShutDown()) { // if we start new searchers after close we won't close them
+ if (coreContainer.isShutDown() || closing) { // if we start new searchers after close we won't close them
throw new SolrCoreState.CoreIsClosedException();
}
@@ -2586,7 +2586,7 @@ public final class SolrCore implements SolrInfoBean, Closeable {
*/
public RefCounted<SolrIndexSearcher> getSearcher ( boolean forceNew, boolean returnSearcher, @SuppressWarnings({"rawtypes"}) final Future[] waitSearcher,
boolean updateHandlerReopens){
- if (coreContainer.isShutDown()) { // if we start new searchers after close we won't close them
+ if (coreContainer.isShutDown() || closing) { // if we start new searchers after close we won't close them
throw new SolrCoreState.CoreIsClosedException();
}
diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
index 6a5cd7d..3094740 100644
--- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
+++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java
@@ -50,13 +50,12 @@ import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrConfig.UpdateHandlerInfo;
import org.apache.solr.core.SolrCore;
-import org.apache.solr.core.SolrInfoBean;
-import org.apache.solr.metrics.SolrMetricProducer;
import org.apache.solr.metrics.SolrMetricsContext;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
@@ -835,10 +834,10 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
if (log.isDebugEnabled()) {
log.debug("closing {}", this);
}
- try (ParWork closer = new ParWork(this, true, false)) {
- closer.collect(commitTracker);
- closer.collect(softCommitTracker);
- }
+
+ IOUtils.closeQuietly(commitTracker);
+ IOUtils.closeQuietly(softCommitTracker);
+
super.close();
numDocsPending.reset();
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java b/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java
index 8c5ae74..8da3dc9 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery2.java
@@ -69,8 +69,6 @@ public class TestCloudRecovery2 extends SolrCloudTestCase {
node2.stop();
- cluster.waitForActiveCollection(COLLECTION, 1, 1, true);
-
cluster.getSolrClient().getZkStateReader().waitForLiveNodes(5, TimeUnit.SECONDS, (newLiveNodes) -> newLiveNodes.size() == 1);
UpdateRequest req = new UpdateRequest();
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
index eae1d10..a29e497 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
@@ -24,6 +24,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
@@ -339,6 +340,16 @@ public class ClusterState implements JSONWriter.Writable {
return highest[0];
}
+ public String getCollection(long id) {
+ Set<Entry<String,CollectionRef>> entries = collectionStates.entrySet();
+ for (Entry<String,CollectionRef> entry : entries) {
+ if (entry.getValue().get().getId() == id) {
+ return entry.getKey();
+ }
+ }
+ return null;
+ }
+
public static class CollectionRef {
protected final AtomicInteger gets = new AtomicInteger();
private final DocCollection coll;
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
index c3d37fd..43aeb9f 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java
@@ -25,7 +25,7 @@ import org.apache.solr.common.util.Utils;
public class Replica extends ZkNodeProps {
- final Long id;
+ Long id;
final Long collId;
public String getId() {
@@ -163,7 +163,12 @@ public class Replica extends ZkNodeProps {
this.name = name;
this.nodeName = (String) propMap.get(ZkStateReader.NODE_NAME_PROP);
- this.id = propMap.containsKey("id") ? Long.parseLong((String) propMap.get("id")) : null;
+
+ String rawId = (String) propMap.get("id");
+ if (rawId != null && !rawId.contains(":")) {
+ this.id = Long.parseLong(rawId);
+ }
+
this.collId = propMap.containsKey("collId") ? Long.parseLong((String) propMap.get("collId")) : null;
this.baseUrl = nodeNameToBaseUrl.getBaseUrlForNodeName(this.nodeName);
type = Type.get((String) propMap.get(ZkStateReader.REPLICA_TYPE));