You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/12/17 04:31:51 UTC
[lucene-solr] 01/02: @1247 Cleanup.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit acaadd50c58e43b5fd701b683bd164521d84db12
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Wed Dec 16 19:07:33 2020 -0600
@1247 Cleanup.
---
.../org/apache/solr/cloud/RecoveryStrategy.java | 18 ++++--
.../solr/cloud/ShardLeaderElectionContext.java | 1 -
.../java/org/apache/solr/cloud/StatePublisher.java | 5 +-
.../org/apache/solr/cloud/ZkCollectionTerms.java | 8 +--
.../java/org/apache/solr/cloud/ZkController.java | 32 +++++-----
.../java/org/apache/solr/cloud/ZkShardTerms.java | 73 +++++++++++-----------
.../java/org/apache/solr/core/CoreContainer.java | 2 +-
.../src/java/org/apache/solr/core/SolrCores.java | 3 +-
.../apache/solr/update/DefaultSolrCoreState.java | 7 +--
.../apache/solr/client/solrj/cloud/ShardTerms.java | 2 +-
.../apache/solr/common/cloud/ZkStateReader.java | 4 +-
11 files changed, 82 insertions(+), 73 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index 49682e2..2f12363 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -344,11 +344,15 @@ public class RecoveryStrategy implements Runnable, Closeable {
while (!isClosed()) {
try {
try {
- Replica leader = zkController.getZkStateReader().getLeaderRetry(coreDescriptor.getCollectionName(), coreDescriptor.getCloudDescriptor().getShardId(), 5000);
+ Replica leader = zkController.getZkStateReader().getLeaderRetry(coreDescriptor.getCollectionName(), coreDescriptor.getCloudDescriptor().getShardId(), 1500);
if (leader != null && leader.getName().equals(coreName)) {
log.info("We are the leader, STOP recovery");
return;
}
+ if (core.isClosing() || core.getCoreContainer().isShutDown()) {
+ log.info("We are closing, STOP recovery");
+ return;
+ }
} catch (InterruptedException e) {
log.info("InterruptedException, won't do recovery", e);
throw new SolrException(ErrorCode.BAD_REQUEST, e);
@@ -392,12 +396,14 @@ public class RecoveryStrategy implements Runnable, Closeable {
CloudDescriptor cloudDesc = this.coreDescriptor.getCloudDescriptor();
Replica leaderprops;
try {
- leaderprops = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 5000);
+ leaderprops = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 1500);
} catch (Exception e) {
log.error("Could not get leader for {} {} {}", cloudDesc.getCollectionName(), cloudDesc.getShardId(), zkStateReader.getClusterState().getCollectionOrNull(cloudDesc.getCollectionName()), e);
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
-
+ if (isClosed()) {
+ throw new AlreadyClosedException();
+ }
log.info("Starting Replication Recovery. [{}] leader is [{}] and I am [{}]", coreName, leaderprops.getName(), Replica.getCoreUrl(baseUrl, coreName));
try {
@@ -572,7 +578,11 @@ public class RecoveryStrategy implements Runnable, Closeable {
while (!successfulRecovery && !isClosed()) {
try {
CloudDescriptor cloudDesc = this.coreDescriptor.getCloudDescriptor();
- final Replica leader = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 5000);
+ final Replica leader = zkStateReader.getLeaderRetry(cloudDesc.getCollectionName(), cloudDesc.getShardId(), 1500);
+
+ if (isClosed()) {
+ throw new AlreadyClosedException();
+ }
log.info("Begin buffering updates. core=[{}]", coreName);
// recalling buffer updates will drop the old buffer tlog
diff --git a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
index e2af971..8d85470 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ShardLeaderElectionContext.java
@@ -99,7 +99,6 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
String coreName = leaderProps.getName();
log.info("Run leader process for shard [{}] election, first step is to try and sync with the shard core={}", context.leaderProps.getSlice(), coreName);
- cc.waitForLoadingCore(coreName, 15000);
try (SolrCore core = cc.getCore(coreName)) {
if (core == null) {
log.error("No SolrCore found, cannot become leader {}", coreName);
diff --git a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
index a735498..57527a4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
+++ b/solr/core/src/java/org/apache/solr/cloud/StatePublisher.java
@@ -18,6 +18,7 @@ package org.apache.solr.cloud;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
@@ -29,6 +30,7 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
+import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
@@ -143,8 +145,9 @@ public class StatePublisher implements Closeable {
String lastState = stateCache.get(core);
- if (state.equals(lastState)) {
+ if (state.equals(lastState) && !Replica.State.ACTIVE.toString().toLowerCase(Locale.ROOT).equals(state)) {
log.info("Skipping publish state as {} for {}, because it was the last state published", state, core);
+ // nocommit
return;
}
if (core == null || state == null) {
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
index 62f5cd6..96c8dcf 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
@@ -17,7 +17,6 @@
package org.apache.solr.cloud;
-import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.ObjectReleaseTracker;
@@ -52,11 +51,11 @@ class ZkCollectionTerms implements AutoCloseable {
try {
ZkShardTerms zkterms = null;
if (!terms.containsKey(shardId)) {
- if (closed) {
- throw new AlreadyClosedException();
- }
zkterms = new ZkShardTerms(collection, shardId, zkClient);
IOUtils.closeQuietly(terms.put(shardId, zkterms));
+ if (closed) {
+ IOUtils.closeQuietly(zkterms);
+ }
return zkterms;
}
return terms.get(shardId);
@@ -76,6 +75,7 @@ class ZkCollectionTerms implements AutoCloseable {
}
public void register(String shardId, String coreNodeName) throws Exception {
+ if (closed) return;
getShard(shardId).registerTerm(coreNodeName);
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index c1009a6..7186fee 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -328,6 +328,9 @@ public class ZkController implements Closeable, Runnable {
log.info("Registering core {} afterExpiration? {}", descriptor.getName(), afterExpiration);
}
+ if (zkController.isDcCalled() || zkController.getCoreContainer().isShutDown()) {
+ return null;
+ }
zkController.register(descriptor.getName(), descriptor, afterExpiration);
return descriptor;
}
@@ -1328,7 +1331,7 @@ public class ZkController implements Closeable, Runnable {
throw new AlreadyClosedException();
}
MDCLoggingContext.setCoreDescriptor(cc, desc);
- ZkShardTerms shardTerms;
+ ZkShardTerms shardTerms = null;
try {
final String baseUrl = getBaseUrl();
final CloudDescriptor cloudDesc = desc.getCloudDescriptor();
@@ -1336,9 +1339,10 @@ public class ZkController implements Closeable, Runnable {
final String shardId = cloudDesc.getShardId();
log.info("Register terms for replica {}", coreName);
- createCollectionTerms(collection);
+ ZkCollectionTerms ct = createCollectionTerms(collection);
shardTerms = getShardTerms(collection, cloudDesc.getShardId());
+
// the watcher is added to a set so multiple calls of this method will left only one watcher
getZkStateReader().registerCore(cloudDesc.getCollectionName());
@@ -1371,14 +1375,12 @@ public class ZkController implements Closeable, Runnable {
throw new SolrException(ErrorCode.SERVER_ERROR, "Error registering SolrCore, replica is removed from clusterstate \n" + zkStateReader.getClusterState().getCollectionOrNull(collection));
}
}
- if (replica.getType() != Type.PULL) {
- getCollectionTerms(collection).register(cloudDesc.getShardId(), coreName);
- }
log.info("Register replica - core:{} address:{} collection:{} shard:{} type={}", coreName, baseUrl, collection, shardId, replica.getType());
if (isDcCalled() || isClosed) {
throw new AlreadyClosedException();
}
+
LeaderElector leaderElector = leaderElectors.get(replica.getName());
if (leaderElector == null) {
ContextKey contextKey = new ContextKey(collection, coreName);
@@ -1392,6 +1394,7 @@ public class ZkController implements Closeable, Runnable {
// If we're a preferred leader, insert ourselves at the head of the queue
boolean joinAtHead = replica.getBool(SliceMutator.PREFERRED_LEADER_PROP, false);
if (replica.getType() != Type.PULL) {
+ ct.register(cloudDesc.getShardId(), coreName);
// nocommit review
joinElection(desc, joinAtHead);
}
@@ -1434,7 +1437,6 @@ public class ZkController implements Closeable, Runnable {
// TODO: should this be moved to another thread? To recoveryStrat?
// TODO: should this actually be done earlier, before (or as part of)
// leader election perhaps?
- cc.waitForLoadingCore(coreName, 15000);
try (SolrCore core = cc.getCore(coreName)) {
if (core == null || core.isClosing() || getCoreContainer().isShutDown()) {
throw new AlreadyClosedException();
@@ -1480,9 +1482,6 @@ public class ZkController implements Closeable, Runnable {
startReplicationFromLeader(coreName, false);
}
- // if (!isLeader) {
- // publish(desc, Replica.State.ACTIVE, true);
- // }
if (replica.getType() != Type.PULL && shardTerms != null) {
// the watcher is added to a set so multiple calls of this method will left only one watcher
@@ -1669,13 +1668,11 @@ public class ZkController implements Closeable, Runnable {
if (!isLeader) {
- if (!core.getUpdateHandler().getSolrCoreState().isRecoverying()) {
- if (log.isInfoEnabled()) {
- log.info("Core needs to recover:{}", core.getName());
- }
- core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
- return true;
+ if (log.isInfoEnabled()) {
+ log.info("Core needs to recover:{}", core.getName());
}
+ core.getUpdateHandler().getSolrCoreState().doRecovery(cc, core.getCoreDescriptor());
+ return true;
} else {
log.info("I am the leader, no recovery necessary");
@@ -1783,7 +1780,7 @@ public class ZkController implements Closeable, Runnable {
public ZkShardTerms getShardTerms(String collection, String shardId) throws Exception {
ZkCollectionTerms ct = getCollectionTerms(collection);
if (ct == null) {
- throw new AlreadyClosedException();
+ ct = createCollectionTerms(collection);
}
return ct.getShard(shardId);
}
@@ -1805,6 +1802,9 @@ public class ZkController implements Closeable, Runnable {
}
public ZkCollectionTerms createCollectionTerms(String collection) {
+// if (isClosed || dcCalled) {
+// throw new AlreadyClosedException();
+// }
ZkCollectionTerms ct = new ZkCollectionTerms(collection, zkClient);
IOUtils.closeQuietly(collectionToTerms.put(collection, ct));
return ct;
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
index ff16516..cba0e4d 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
@@ -124,7 +125,7 @@ public class ZkShardTerms implements Closeable {
ShardTerms newTerms;
while( (newTerms = terms.get().increaseTerms(leader, replicasNeedingRecovery)) != null) {
- if (forceSaveTerms(newTerms) || isClosed.get()) return;
+ if (forceSaveTerms(newTerms)) return;
}
}
@@ -166,7 +167,7 @@ public class ZkShardTerms implements Closeable {
public void close() {
// no watcher will be registered
- isClosed.set(true);
+ //isClosed.set(true);
ParWork.close(listeners);
listeners.clear();
@@ -211,7 +212,7 @@ public class ZkShardTerms implements Closeable {
return true;
}
tries++;
- if (tries > 60 || isClosed.get()) {
+ if (tries > 60) {
log.warn("Could not save terms to zk within " + tries + " tries");
return true;
}
@@ -227,7 +228,7 @@ public class ZkShardTerms implements Closeable {
void registerTerm(String coreNodeName) throws KeeperException, InterruptedException {
ShardTerms newTerms;
while ( (newTerms = terms.get().registerTerm(coreNodeName)) != null) {
- if (forceSaveTerms(newTerms) || isClosed.get()) break;
+ if (forceSaveTerms(newTerms)) break;
}
}
@@ -239,14 +240,14 @@ public class ZkShardTerms implements Closeable {
public void setTermEqualsToLeader(String coreNodeName) throws KeeperException, InterruptedException {
ShardTerms newTerms;
while ( (newTerms = terms.get().setTermEqualsToLeader(coreNodeName)) != null) {
- if (forceSaveTerms(newTerms) || isClosed.get()) break;
+ if (forceSaveTerms(newTerms)) break;
}
}
public void setTermToZero(String coreNodeName) throws KeeperException, InterruptedException {
ShardTerms newTerms;
while ( (newTerms = terms.get().setTermToZero(coreNodeName)) != null) {
- if (forceSaveTerms(newTerms) || isClosed.get()) break;
+ if (forceSaveTerms(newTerms)) break;
}
}
@@ -256,7 +257,7 @@ public class ZkShardTerms implements Closeable {
public void startRecovering(String coreNodeName) throws KeeperException, InterruptedException {
ShardTerms newTerms;
while ( (newTerms = terms.get().startRecovering(coreNodeName)) != null) {
- if (forceSaveTerms(newTerms) || isClosed.get()) break;
+ if (forceSaveTerms(newTerms)) break;
}
}
@@ -266,7 +267,7 @@ public class ZkShardTerms implements Closeable {
public void doneRecovering(String coreNodeName) throws KeeperException, InterruptedException {
ShardTerms newTerms;
while ( (newTerms = terms.get().doneRecovering(coreNodeName)) != null) {
- if (forceSaveTerms(newTerms) || isClosed.get()) break;
+ if (forceSaveTerms(newTerms)) break;
}
}
@@ -281,7 +282,7 @@ public class ZkShardTerms implements Closeable {
public void ensureHighestTermsAreNotZero() throws KeeperException, InterruptedException {
ShardTerms newTerms;
while ( (newTerms = terms.get().ensureHighestTermsAreNotZero()) != null) {
- if (forceSaveTerms(newTerms) || isClosed.get()) break;
+ if (forceSaveTerms(newTerms)) break;
}
}
@@ -340,8 +341,23 @@ public class ZkShardTerms implements Closeable {
public void refreshTerms() throws KeeperException {
ShardTerms newTerms;
try {
+ Watcher watcher = event -> {
+ // session events are not change events, and do not remove the watcher
+ if (Watcher.Event.EventType.None == event.getType()) {
+ return;
+ }
+ if (event.getType() == Watcher.Event.EventType.NodeCreated || event.getType() == Watcher.Event.EventType.NodeDataChanged) {
+ retryRegisterWatcher();
+ // Some events may be missed during register a watcher, so it is safer to refresh terms after registering watcher
+ try {
+ refreshTerms();
+ } catch (KeeperException e) {
+ log.warn("Could not refresh terms", e);
+ }
+ }
+ };
Stat stat = new Stat();
- byte[] data = zkClient.getData(znodePath, null, stat, true);
+ byte[] data = zkClient.getData(znodePath, watcher, stat, true);
ConcurrentHashMap<String,Long> values = new ConcurrentHashMap<>((Map<String,Long>) Utils.fromJSON(data));
log.info("refresh shard terms to zk version {}", stat.getVersion());
newTerms = new ShardTerms(values, stat.getVersion());
@@ -365,19 +381,19 @@ public class ZkShardTerms implements Closeable {
try {
registerWatcher();
return;
- } catch (KeeperException.SessionExpiredException | KeeperException.AuthFailedException e) {
+ } catch (KeeperException.AuthFailedException e) {
isClosed.set(true);
log.error("Failed watching shard term for collection: {} due to unrecoverable exception", collection, e);
return;
} catch (KeeperException e) {
log.warn("Failed watching shard term for collection: {}, retrying!", collection, e);
-// try {
-// zkClient.getConnectionManager().waitForConnected(zkClient.getZkClientTimeout());
-// } catch (TimeoutException | InterruptedException te) {
-// if (Thread.interrupted()) {
-// throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error watching shard term for collection: " + collection, te);
-// }
-// }
+ try {
+ zkClient.getConnectionManager().waitForConnected(zkClient.getZkClientTimeout());
+ } catch (TimeoutException | InterruptedException te) {
+ if (Thread.interrupted()) {
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error watching shard term for collection: " + collection, te);
+ }
+ }
}
}
}
@@ -386,24 +402,10 @@ public class ZkShardTerms implements Closeable {
* Register a watcher to the correspond ZK term node
*/
private void registerWatcher() throws KeeperException {
- Watcher watcher = event -> {
- // session events are not change events, and do not remove the watcher
- if (Watcher.Event.EventType.None == event.getType()) {
- return;
- }
- if (event.getType() == Watcher.Event.EventType.NodeCreated || event.getType() == Watcher.Event.EventType.NodeDataChanged) {
- retryRegisterWatcher();
- // Some events may be missed during register a watcher, so it is safer to refresh terms after registering watcher
- try {
- refreshTerms();
- } catch (KeeperException e) {
- log.warn("Could not refresh terms", e);
- }
- }
- };
+
try {
// exists operation is faster than getData operation
- zkClient.exists(znodePath, watcher, true);
+ zkClient.exists(znodePath, null, true);
} catch (InterruptedException e) {
Thread.interrupted();
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error watching shard term for collection: " + collection, e);
@@ -429,9 +431,6 @@ public class ZkShardTerms implements Closeable {
} else {
break;
}
- if (isClosed.get()) {
- break;
- }
}
} finally {
termsLock.unlock();
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 093cb2d..faa68d0 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -1729,7 +1729,7 @@ public class CoreContainer implements Closeable {
SolrCore oldCore = null;
boolean success = false;
try {
- // solrCores.waitForLoadingCoreToFinish(name, 15000);
+
ConfigSet coreConfig = coreConfigService.loadConfigSet(cd);
log.info("Reloading SolrCore '{}' using configuration from {}", name, coreConfig.getName());
DocCollection docCollection = null;
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCores.java b/solr/core/src/java/org/apache/solr/core/SolrCores.java
index b0b4069..0aa5d0e 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCores.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCores.java
@@ -280,7 +280,7 @@ class SolrCores implements Closeable {
/* If you don't increment the reference count, someone could close the core before you use it. */
SolrCore getCoreFromAnyList(String name) {
-
+ waitForLoadingCoreToFinish(name, 15000);
CoreDescriptor cd = residentDesciptors.get(name);
SolrCore core = cores.get(name);
@@ -336,6 +336,7 @@ class SolrCores implements Closeable {
public CoreDescriptor getCoreDescriptor(String coreName) {
if (coreName == null) return null;
+ waitForLoadingCoreToFinish(coreName, 15000);
CoreDescriptor cd = residentDesciptors.get(coreName);
if (cd != null) {
diff --git a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
index 2cc419b..4b43174 100644
--- a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
+++ b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java
@@ -318,11 +318,9 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
@Override
public void doRecovery(SolrCore core) {
log.info("Do recovery for core {}", core.getName());
- recoverying = true;
CoreContainer corecontainer = core.getCoreContainer();
CoreDescriptor coreDescriptor = core.getCoreDescriptor();
Runnable recoveryTask = () -> {
- boolean success = false;
try {
if (SKIP_AUTO_RECOVERY) {
log.warn("Skipping recovery according to sys prop solrcloud.skip.autorecovery");
@@ -347,7 +345,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
log.warn("Skipping recovery because Solr is shutdown");
return;
}
- recoverying = true;
+
// if we can't get the lock, another recovery is running
// we check to see if there is already one waiting to go
@@ -386,6 +384,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
log.info("Skipping recovery due to being closed");
return;
}
+ recoverying = true;
recoveryThrottle.minimumWaitBetweenActions();
recoveryThrottle.markAttemptingAction();
@@ -394,7 +393,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
recoveryStrat.setRecoveringAfterStartup(recoveringAfterStartup);
log.info("Running recovery");
- success = true;
+
recoveryStrat.run();
} catch (AlreadyClosedException e) {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java
index a6b4453..2c612a8 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/cloud/ShardTerms.java
@@ -126,7 +126,7 @@ public class ShardTerms implements MapWriter {
public ShardTerms ensureHighestTermsAreNotZero() {
if (maxTerm > 0) return null;
else {
- Map<String, Long> newValues = new ConcurrentHashMap<>(values);
+ Map<String, Long> newValues = new ConcurrentHashMap<String, Long>(32, 0.75F, 32);
for (String replica : values.keySet()) {
newValues.put(replica, 1L);
}
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 553782f..fd773a1 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -902,9 +902,7 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
}
- if (notifications != null) {
- notifications.shutdown();
- }
+//;
stateWatchersMap.forEach((s, stateWatcher) -> IOUtils.closeQuietly(stateWatcher));
stateWatchersMap.clear();