You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/12/12 17:28:33 UTC
[lucene-solr] 06/06: @1242 WIP
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
commit d860f539896096498e76420544df041297db0570
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Sat Dec 12 11:28:00 2020 -0600
@1242 WIP
---
.../org/apache/solr/cloud/ZkCollectionTerms.java | 7 ++-
.../java/org/apache/solr/cloud/ZkController.java | 7 +--
.../java/org/apache/solr/cloud/ZkShardTerms.java | 56 +++++++++---------
.../java/org/apache/solr/core/CoreContainer.java | 8 ++-
.../solr/handler/admin/CollectionsHandler.java | 9 ++-
.../processor/DistributedZkUpdateProcessor.java | 67 +++++++++++++---------
.../org/apache/solr/cloud/ZkShardTermsTest.java | 23 +++++---
7 files changed, 104 insertions(+), 73 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
index 54d762d..42311fc 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkCollectionTerms.java
@@ -20,6 +20,7 @@ package org.apache.solr.cloud;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.core.CoreDescriptor;
+import org.apache.zookeeper.KeeperException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -43,7 +44,7 @@ class ZkCollectionTerms implements AutoCloseable {
assert ObjectReleaseTracker.track(this);
}
- ZkShardTerms getShard(String shardId) {
+ ZkShardTerms getShard(String shardId) throws Exception {
collectionToTermsLock.lock();
try {
if (!terms.containsKey(shardId)) {
@@ -65,11 +66,11 @@ class ZkCollectionTerms implements AutoCloseable {
}
}
- public void register(String shardId, String coreNodeName) {
+ public void register(String shardId, String coreNodeName) throws Exception {
getShard(shardId).registerTerm(coreNodeName);
}
- public void remove(String shardId, CoreDescriptor coreDescriptor) {
+ public void remove(String shardId, CoreDescriptor coreDescriptor) throws KeeperException, InterruptedException {
collectionToTermsLock.lock();
try {
ZkShardTerms zterms = getShardOrNull(shardId);
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index e2c094d..f5ce3a9 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -28,7 +28,6 @@ import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
-import org.apache.solr.common.cloud.BeforeReconnect;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.ConnectionManager;
import org.apache.solr.common.cloud.DefaultZkACLProvider;
@@ -1674,7 +1673,7 @@ public class ZkController implements Closeable, Runnable {
*/
private boolean checkRecovery(final boolean isLeader,
final String collection, String coreZkNodeName, String shardId,
- SolrCore core, CoreContainer cc) {
+ SolrCore core, CoreContainer cc) throws Exception {
boolean doRecovery = true;
if (!isLeader) {
@@ -1800,7 +1799,7 @@ public class ZkController implements Closeable, Runnable {
statePublisher.submitState(message);
}
- public ZkShardTerms getShardTerms(String collection, String shardId) {
+ public ZkShardTerms getShardTerms(String collection, String shardId) throws Exception {
ZkCollectionTerms ct = getCollectionTerms(collection);
if (ct == null) {
throw new AlreadyClosedException();
@@ -1854,7 +1853,7 @@ public class ZkController implements Closeable, Runnable {
}
}
- public void unregister(String coreName, CoreDescriptor cd) {
+ public void unregister(String coreName, CoreDescriptor cd) throws KeeperException, InterruptedException {
log.info("Unregister core from zookeeper {}", coreName);
final String collection = cd.getCloudDescriptor().getCollectionName();
try {
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
index 40eba00..ff16516 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkShardTerms.java
@@ -17,6 +17,8 @@
package org.apache.solr.cloud;
+import java.io.Closeable;
+import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.Map;
@@ -27,7 +29,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.solr.client.solrj.cloud.ShardTerms;
-import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
@@ -63,7 +64,7 @@ import org.slf4j.LoggerFactory;
* </ul>
* This class should not be reused after {@link org.apache.zookeeper.Watcher.Event.KeeperState#Expired} event
*/
-public class ZkShardTerms implements AutoCloseable{
+public class ZkShardTerms implements Closeable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -98,12 +99,16 @@ public class ZkShardTerms implements AutoCloseable{
void close();
}
- public ZkShardTerms(String collection, String shard, SolrZkClient zkClient) {
+ public ZkShardTerms(String collection, String shard, SolrZkClient zkClient) throws IOException {
this.znodePath = ZkStateReader.COLLECTIONS_ZKNODE + "/" + collection + "/terms/" + shard;
this.collection = collection;
this.shard = shard;
this.zkClient = zkClient;
- refreshTerms();
+ try {
+ refreshTerms();
+ } catch (KeeperException e) {
+ throw new IOException(e);
+ }
retryRegisterWatcher();
assert ObjectReleaseTracker.track(this);
}
@@ -113,7 +118,7 @@ public class ZkShardTerms implements AutoCloseable{
* @param leader coreNodeName of leader
* @param replicasNeedingRecovery set of replicas in which their terms should be lower than leader's term
*/
- public void ensureTermsIsHigher(String leader, Set<String> replicasNeedingRecovery) {
+ public void ensureTermsIsHigher(String leader, Set<String> replicasNeedingRecovery) throws KeeperException, InterruptedException {
if (log.isDebugEnabled()) log.debug("ensureTermsIsHigher leader={} replicasNeedingRecvoery={}", leader, replicasNeedingRecovery);
if (replicasNeedingRecovery.isEmpty()) return;
@@ -185,7 +190,7 @@ public class ZkShardTerms implements AutoCloseable{
* Remove the coreNodeName from terms map and also remove any expired listeners
* @return Return true if this object should not be reused
*/
- boolean removeTerm(CoreDescriptor cd) {
+ boolean removeTerm(CoreDescriptor cd) throws KeeperException, InterruptedException {
int numListeners;
// solrcore already closed
listeners.removeIf(coreTermWatcher -> !coreTermWatcher.onTermChanged(terms.get()));
@@ -196,7 +201,7 @@ public class ZkShardTerms implements AutoCloseable{
// package private for testing, only used by tests
// return true if this object should not be reused
- boolean removeTerm(String coreNodeName) {
+ boolean removeTerm(String coreNodeName) throws KeeperException, InterruptedException {
ShardTerms newTerms;
int tries = 0;
while ( (newTerms = terms.get().removeTerm(coreNodeName)) != null) {
@@ -219,7 +224,7 @@ public class ZkShardTerms implements AutoCloseable{
* If a term is already associate with this replica do nothing
* @param coreNodeName of the replica
*/
- void registerTerm(String coreNodeName) {
+ void registerTerm(String coreNodeName) throws KeeperException, InterruptedException {
ShardTerms newTerms;
while ( (newTerms = terms.get().registerTerm(coreNodeName)) != null) {
if (forceSaveTerms(newTerms) || isClosed.get()) break;
@@ -231,14 +236,14 @@ public class ZkShardTerms implements AutoCloseable{
* This call should only be used by {@link org.apache.solr.common.params.CollectionParams.CollectionAction#FORCELEADER}
* @param coreNodeName of the replica
*/
- public void setTermEqualsToLeader(String coreNodeName) {
+ public void setTermEqualsToLeader(String coreNodeName) throws KeeperException, InterruptedException {
ShardTerms newTerms;
while ( (newTerms = terms.get().setTermEqualsToLeader(coreNodeName)) != null) {
if (forceSaveTerms(newTerms) || isClosed.get()) break;
}
}
- public void setTermToZero(String coreNodeName) {
+ public void setTermToZero(String coreNodeName) throws KeeperException, InterruptedException {
ShardTerms newTerms;
while ( (newTerms = terms.get().setTermToZero(coreNodeName)) != null) {
if (forceSaveTerms(newTerms) || isClosed.get()) break;
@@ -248,7 +253,7 @@ public class ZkShardTerms implements AutoCloseable{
/**
* Mark {@code coreNodeName} as recovering
*/
- public void startRecovering(String coreNodeName) {
+ public void startRecovering(String coreNodeName) throws KeeperException, InterruptedException {
ShardTerms newTerms;
while ( (newTerms = terms.get().startRecovering(coreNodeName)) != null) {
if (forceSaveTerms(newTerms) || isClosed.get()) break;
@@ -258,7 +263,7 @@ public class ZkShardTerms implements AutoCloseable{
/**
* Mark {@code coreNodeName} as finished recovering
*/
- public void doneRecovering(String coreNodeName) {
+ public void doneRecovering(String coreNodeName) throws KeeperException, InterruptedException {
ShardTerms newTerms;
while ( (newTerms = terms.get().doneRecovering(coreNodeName)) != null) {
if (forceSaveTerms(newTerms) || isClosed.get()) break;
@@ -273,7 +278,7 @@ public class ZkShardTerms implements AutoCloseable{
* When first updates come in, all replicas have some data now,
* so we must switch from term 0 (registered) to 1 (have some data)
*/
- public void ensureHighestTermsAreNotZero() {
+ public void ensureHighestTermsAreNotZero() throws KeeperException, InterruptedException {
ShardTerms newTerms;
while ( (newTerms = terms.get().ensureHighestTermsAreNotZero()) != null) {
if (forceSaveTerms(newTerms) || isClosed.get()) break;
@@ -300,7 +305,7 @@ public class ZkShardTerms implements AutoCloseable{
* @param newTerms to be set
* @return true if terms is saved successfully to ZK, false if otherwise
*/
- private boolean forceSaveTerms(ShardTerms newTerms) {
+ private boolean forceSaveTerms(ShardTerms newTerms) throws KeeperException, InterruptedException {
try {
return saveTerms(newTerms);
} catch (KeeperException.NoNodeException e) {
@@ -315,7 +320,7 @@ public class ZkShardTerms implements AutoCloseable{
* @return true if terms is saved successfully to ZK, false if otherwise
* @throws KeeperException.NoNodeException correspond ZK term node is not created
*/
- private boolean saveTerms(ShardTerms newTerms) throws KeeperException.NoNodeException {
+ private boolean saveTerms(ShardTerms newTerms) throws KeeperException, InterruptedException {
byte[] znodeData = Utils.toJSON(newTerms);
try {
Stat stat = zkClient.setData(znodePath, znodeData, newTerms.getVersion(), true);
@@ -325,11 +330,6 @@ public class ZkShardTerms implements AutoCloseable{
} catch (KeeperException.BadVersionException e) {
log.info("Failed to save terms, version is not a match, retrying version={}", newTerms.getVersion());
refreshTerms();
- } catch (KeeperException.NoNodeException e) {
- return true;
- } catch (Exception e) {
- ParWork.propagateInterrupt(e);
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error while saving shard term for collection: " + collection, e);
}
return false;
}
@@ -337,7 +337,7 @@ public class ZkShardTerms implements AutoCloseable{
/**
* Fetch latest terms from ZK
*/
- public void refreshTerms() {
+ public void refreshTerms() throws KeeperException {
ShardTerms newTerms;
try {
Stat stat = new Stat();
@@ -352,8 +352,6 @@ public class ZkShardTerms implements AutoCloseable{
} catch (InterruptedException e) {
ParWork.propagateInterrupt(e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection: " + collection, e);
- } catch (KeeperException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error updating shard term for collection: " + collection, e);
}
setNewTerms(newTerms);
@@ -393,9 +391,15 @@ public class ZkShardTerms implements AutoCloseable{
if (Watcher.Event.EventType.None == event.getType()) {
return;
}
- retryRegisterWatcher();
- // Some events may be missed during register a watcher, so it is safer to refresh terms after registering watcher
- refreshTerms();
+ if (event.getType() == Watcher.Event.EventType.NodeCreated || event.getType() == Watcher.Event.EventType.NodeDataChanged) {
+ retryRegisterWatcher();
+ // Some events may be missed during register a watcher, so it is safer to refresh terms after registering watcher
+ try {
+ refreshTerms();
+ } catch (KeeperException e) {
+ log.warn("Could not refresh terms", e);
+ }
+ }
};
try {
// exists operation is faster than getData operation
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 42fd09d..c2e492a 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -1546,9 +1546,13 @@ public class CoreContainer implements Closeable {
getZkController().getShardTerms(desc.getCollectionName(), desc.getShardId()).setTermToZero(dcore.getName());
return new SolrCore(this, dcore, coreConfig);
}
- } catch (SolrException se) {
+ } catch (Exception se) {
se.addSuppressed(original);
- throw se;
+ if (se instanceof SolrException) {
+ throw (SolrException) se;
+ } else {
+ throw new SolrException(ErrorCode.SERVER_ERROR, se);
+ }
}
}
if (original instanceof RuntimeException) {
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 7bff7f8..3c64ffd 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -76,7 +76,6 @@ import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.security.AuthorizationContext;
import org.apache.solr.security.PermissionNameProvider;
import org.apache.zookeeper.KeeperException;
-import org.eclipse.jetty.rewrite.handler.Rule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1380,7 +1379,13 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
//TODO only increase terms of replicas less out-of-sync
liveReplicas.stream()
.filter(rep -> zkShardTerms.registered(rep.getName()))
- .forEach(rep -> zkShardTerms.setTermEqualsToLeader(rep.getName()));
+ .forEach(rep -> {
+ try {
+ zkShardTerms.setTermEqualsToLeader(rep.getName());
+ } catch (Exception e) {
+ log.error("Exception in shard terms", e);
+ }
+ });
}
// Wait till we have an active leader
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index 28b1111..1a02637 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -762,7 +762,12 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
List<SolrCmdDistributor.Node> nodes = new ArrayList<>(replicas.size());
skippedCoreNodeNames = new HashSet<>();
- ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId);
+ ZkShardTerms zkShardTerms = null;
+ try {
+ zkShardTerms = zkController.getShardTerms(collection, shardId);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
for (Replica replica: replicas) {
String coreNodeName = replica.getName();
if (skipList != null && skipListSet.contains(replica.getCoreUrl())) {
@@ -926,7 +931,12 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
List<SolrCmdDistributor.Node> nodes = new ArrayList<>(replicas.size());
skippedCoreNodeNames = new HashSet<>();
- ZkShardTerms zkShardTerms = zkController.getShardTerms(collection, shardId);
+ ZkShardTerms zkShardTerms = null;
+ try {
+ zkShardTerms = zkController.getShardTerms(collection, shardId);
+ } catch (Exception e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ }
for (Replica replica : replicas) {
String coreNodeName = replica.getName();
if (skipList != null && skipListSet.contains(replica.getCoreUrl())) {
@@ -1128,11 +1138,17 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
boolean shouldUpdateTerms = isLeader && isIndexChanged;
if (shouldUpdateTerms) {
- ZkShardTerms zkShardTerms = zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId());
- if (skippedCoreNodeNames != null) {
- zkShardTerms.ensureTermsIsHigher(desc.getName(), skippedCoreNodeNames);
+ ZkShardTerms zkShardTerms = null;
+ try {
+ zkShardTerms = zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId());
+
+ if (skippedCoreNodeNames != null) {
+ zkShardTerms.ensureTermsIsHigher(desc.getName(), skippedCoreNodeNames);
+ }
+ zkController.getShardTerms(collection, cloudDesc.getShardId()).ensureHighestTermsAreNotZero();
+ } catch (Exception e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
- zkController.getShardTerms(collection, cloudDesc.getShardId()).ensureHighestTermsAreNotZero();
}
// TODO: if not a forward and replication req is not specified, we could
// send in a background thread
@@ -1168,22 +1184,19 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
// legit
DistribPhase phase = DistribPhase.parseParam(error.req.uReq.getParams().get(DISTRIB_UPDATE_PARAM));
- if (phase != DistribPhase.FROMLEADER)
- continue; // don't have non-leaders try to recovery other nodes
+ if (phase != DistribPhase.FROMLEADER) continue; // don't have non-leaders try to recovery other nodes
// commits are special -- they can run on any node irrespective of whether it is a leader or not
// we don't want to run recovery on a node which missed a commit command
- if (error.req.uReq.getParams().get(COMMIT_END_POINT) != null)
- continue;
+ if (error.req.uReq.getParams().get(COMMIT_END_POINT) != null) continue;
final String replicaUrl = error.req.node.getUrl();
// if the remote replica failed the request because of leader change (SOLR-6511), then fail the request
- String cause = (error.t instanceof SolrException) ? ((SolrException)error.t).getMetadata("cause") : null;
+ String cause = (error.t instanceof SolrException) ? ((SolrException) error.t).getMetadata("cause") : null;
if ("LeaderChanged".equals(cause)) {
// let's just fail this request and let the client retry? or just call processAdd again?
- log.error("On {}, replica {} now thinks it is the leader! Failing the request to let the client retry!"
- , desc.getName(), replicaUrl, error.t);
+ log.error("On {}, replica {} now thinks it is the leader! Failing the request to let the client retry!", desc.getName(), replicaUrl, error.t);
errorsForClient.add(error);
continue;
}
@@ -1192,7 +1205,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
String shardId = null;
if (error.req.node instanceof SolrCmdDistributor.StdNode) {
- SolrCmdDistributor.StdNode stdNode = (SolrCmdDistributor.StdNode)error.req.node;
+ SolrCmdDistributor.StdNode stdNode = (SolrCmdDistributor.StdNode) error.req.node;
collection = stdNode.getCollection();
shardId = stdNode.getShardId();
@@ -1209,16 +1222,15 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
getLeaderExc = exc;
}
if (leaderCoreNodeName == null) {
- log.warn("Failed to determine if {} is still the leader for collection={} shardId={} before putting {} into leader-initiated recovery",
- desc.getName(), collection, shardId, replicaUrl, getLeaderExc);
+ log.warn("Failed to determine if {} is still the leader for collection={} shardId={} before putting {} into leader-initiated recovery", desc.getName(), collection, shardId, replicaUrl,
+ getLeaderExc);
}
- List<Replica> myReplicas = zkController.getZkStateReader().getReplicaProps(collection,
- cloudDesc.getShardId(), desc.getName());
+ List<Replica> myReplicas = zkController.getZkStateReader().getReplicaProps(collection, cloudDesc.getShardId(), desc.getName());
boolean foundErrorNodeInReplicaList = false;
if (myReplicas != null) {
for (Replica replicaProp : myReplicas) {
- if (((Replica) replicaProp).getName().equals(((Replica)stdNode.getNodeProps()).getName())) {
+ if (((Replica) replicaProp).getName().equals(((Replica) stdNode.getNodeProps()).getName())) {
foundErrorNodeInReplicaList = true;
break;
}
@@ -1238,29 +1250,30 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
} catch (Exception exc) {
SolrZkClient.checkInterrupted(exc);
Throwable setLirZnodeFailedCause = SolrException.getRootCause(exc);
- log.error("Leader failed to set replica {} state to DOWN due to: {}"
- , error.req.node.getUrl(), setLirZnodeFailedCause, setLirZnodeFailedCause);
+ log.error("Leader failed to set replica {} state to DOWN due to: {}", error.req.node.getUrl(), setLirZnodeFailedCause, setLirZnodeFailedCause);
}
} else {
// not the leader anymore maybe or the error'd node is not my replica?
if (!foundErrorNodeInReplicaList) {
- log.warn("Core {} belonging to {} {}, does not have error'd node {} as a replica. No request recovery command will be sent!"
- , desc.getName(), collection, cloudDesc.getShardId(), stdNode.getNodeProps().getCoreUrl());
+ log.warn("Core {} belonging to {} {}, does not have error'd node {} as a replica. No request recovery command will be sent!", desc.getName(), collection, cloudDesc.getShardId(),
+ stdNode.getNodeProps().getCoreUrl());
if (!shardId.equals(cloudDesc.getShardId())) {
// some replicas on other shard did not receive the updates (ex: during splitshard),
// exception must be notified to clients
errorsForClient.add(error);
}
} else {
- log.warn("Core {} is no longer the leader for {} {} or we tried to put ourself into LIR, no request recovery command will be sent!"
- , desc.getName(), collection, shardId);
+ log.warn("Core {} is no longer the leader for {} {} or we tried to put ourself into LIR, no request recovery command will be sent!", desc.getName(), collection, shardId);
}
}
}
}
if (!replicasShouldBeInLowerTerms.isEmpty()) {
- zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId())
- .ensureTermsIsHigher(desc.getName(), replicasShouldBeInLowerTerms);
+ try {
+ zkController.getShardTerms(cloudDesc.getCollectionName(), cloudDesc.getShardId()).ensureTermsIsHigher(desc.getName(), replicasShouldBeInLowerTerms);
+ } catch (Exception e) {
+ throw new SolrException(ErrorCode.SERVER_ERROR, e);
+ }
}
handleReplicationFactor();
if (0 < errorsForClient.size()) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
index 9ed585d..c8cf1be 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ZkShardTermsTest.java
@@ -75,7 +75,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
}
@Test
- public void testRecoveringFlag() throws KeeperException, InterruptedException {
+ public void testRecoveringFlag() throws Exception {
cluster.getZkClient().makePath("/collections/recoveringFlag/terms/s1", ZkStateReader.emptyJson, false);
String collection = "recoveringFlag";
try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "s1", cluster.getZkClient())) {
@@ -129,7 +129,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
}
@Test
- public void testCoreRemovalWhileRecovering() throws KeeperException, InterruptedException {
+ public void testCoreRemovalWhileRecovering() throws Exception {
cluster.getZkClient().makePath("/collections/recoveringFlagRemoval/terms/s1", ZkStateReader.emptyJson, false);
String collection = "recoveringFlagRemoval";
try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "s1", cluster.getZkClient())) {
@@ -151,7 +151,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
}
}
- public void testRegisterTerm() throws InterruptedException, KeeperException {
+ public void testRegisterTerm() throws Exception {
cluster.getZkClient().makePath("/collections/registerTerm/terms/s1", ZkStateReader.emptyJson, false);
String collection = "registerTerm";
ZkShardTerms rep1Terms = new ZkShardTerms(collection, "s1", cluster.getZkClient());
@@ -195,7 +195,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
@Test
@Nightly
- public void testRaceConditionOnUpdates() throws InterruptedException {
+ public void testRaceConditionOnUpdates() throws Exception {
String collection = "raceConditionOnUpdates";
List<String> replicas = Arrays.asList("rep1", "rep2", "rep3", "rep4");
for (String replica : replicas) {
@@ -214,17 +214,22 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
for (int i = 0; i < failedReplicas.size(); i++) {
String replica = failedReplicas.get(i);
threads[i] = new Thread(() -> {
+
try (ZkShardTerms zkShardTerms = new ZkShardTerms(collection, "s1", cluster.getZkClient())) {
while (!stop.get()) {
try {
Thread.sleep(LuceneTestCase.random().nextInt(TEST_NIGHTLY ? 200 : 50));
zkShardTerms.setTermEqualsToLeader(replica);
- } catch (InterruptedException e) {
+ } catch (InterruptedException | KeeperException e) {
ParWork.propagateInterrupt(e);
log.error("", e);
}
}
+ } catch (Exception e) {
+ ParWork.propagateInterrupt(e);
+ log.error("", e);
}
+
});
threads[i].start();
}
@@ -246,7 +251,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
}
}
- public void testCoreTermWatcher() throws InterruptedException, KeeperException {
+ public void testCoreTermWatcher() throws Exception {
cluster.getZkClient().makePath("/collections/coreTermWatcher/terms/s1", ZkStateReader.emptyJson, false);
String collection = "coreTermWatcher";
ZkShardTerms leaderTerms = new ZkShardTerms(collection, "s1", cluster.getZkClient());
@@ -287,7 +292,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
assertEquals(1L, terms.getTerm("leader").longValue());
}
- public void testSetTermToZero() throws KeeperException, InterruptedException {
+ public void testSetTermToZero() throws Exception {
cluster.getZkClient().makePath("/collections/setTermToZero/terms/s1", ZkStateReader.emptyJson, false);
String collection = "setTermToZero";
ZkShardTerms terms = new ZkShardTerms(collection, "s1", cluster.getZkClient());
@@ -300,7 +305,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
terms.close();
}
- public void testReplicaCanBecomeLeader() throws InterruptedException, KeeperException {
+ public void testReplicaCanBecomeLeader() throws Exception {
cluster.getZkClient().makePath("/collections/replicaCanBecomeLeader/terms/s1", ZkStateReader.emptyJson, false);
String collection = "replicaCanBecomeLeader";
ZkShardTerms leaderTerms = new ZkShardTerms(collection, "s1", cluster.getZkClient());
@@ -324,7 +329,7 @@ public class ZkShardTermsTest extends SolrCloudTestCase {
replicaTerms.close();
}
- public void testSetTermEqualsToLeader() throws InterruptedException, KeeperException {
+ public void testSetTermEqualsToLeader() throws Exception {
cluster.getZkClient().makePath("/collections/setTermEqualsToLeader/terms/s1", ZkStateReader.emptyJson, false);
String collection = "setTermEqualsToLeader";
ZkShardTerms leaderTerms = new ZkShardTerms(collection, "s1", cluster.getZkClient());