You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/09/07 20:17:30 UTC
[lucene-solr] branch reference_impl_dev updated: @781 Make a few
things better.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl_dev by this push:
new e1ac128 @781 Make a few things better.
e1ac128 is described below
commit e1ac128065add74bfb157e8acbce9c99a87e68a6
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Mon Sep 7 15:16:54 2020 -0500
@781 Make a few things better.
SolrCore ref counts and close.
Ensure server has at least cloud clients cluster state version or above for requests.
---
.../org/apache/solr/cloud/RecoveryStrategy.java | 40 ++---
.../java/org/apache/solr/cloud/ZkController.java | 2 +-
.../apache/solr/cloud/overseer/ZkStateWriter.java | 106 ++++++-------
.../java/org/apache/solr/core/CoreContainer.java | 164 +++++++++++----------
.../src/java/org/apache/solr/core/SolrCore.java | 39 ++---
.../src/java/org/apache/solr/core/SolrCores.java | 8 +-
.../java/org/apache/solr/servlet/HttpSolrCall.java | 34 ++++-
.../processor/DistributedUpdateProcessor.java | 5 +-
.../org/apache/solr/cloud/CollectionPropsTest.java | 3 +-
.../test/org/apache/solr/update/PeerSyncTest.java | 5 +
.../apache/solr/common/cloud/ZkStateReader.java | 4 +
11 files changed, 221 insertions(+), 189 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index a584da1..a39615c 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -132,12 +132,18 @@ public class RecoveryStrategy implements Runnable, Closeable {
private volatile Replica.Type replicaType;
private volatile CoreDescriptor coreDescriptor;
+ private volatile SolrCore core;
+
private final CoreContainer cc;
protected RecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryListener recoveryListener) {
// ObjectReleaseTracker.track(this);
this.cc = cc;
this.coreName = cd.getName();
+ this.core = cc.getCore(coreName, false);
+ if (core == null) {
+ throw new IllegalStateException("SolrCore is null");
+ }
this.recoveryListener = recoveryListener;
zkController = cc.getZkController();
zkStateReader = zkController.getZkStateReader();
@@ -194,17 +200,15 @@ public class RecoveryStrategy implements Runnable, Closeable {
@Override
final public void close() {
close = true;
- try (ParWork closer = new ParWork(this, true)) {
- closer.collect("prevSendPreRecoveryHttpUriRequestAbort", () -> {
- try {
- prevSendPreRecoveryHttpUriRequest.abort();
- } catch (NullPointerException e) {
- // expected
- }
- });
-
-
- try (SolrCore core = cc.getCore(coreName)) {
+ try {
+ try (ParWork closer = new ParWork(this, true)) {
+ closer.collect("prevSendPreRecoveryHttpUriRequestAbort", () -> {
+ try {
+ prevSendPreRecoveryHttpUriRequest.abort();
+ } catch (NullPointerException e) {
+ // expected
+ }
+ });
if (core == null) {
SolrException.log(log, "SolrCore not found - cannot recover:" + coreName);
@@ -214,16 +218,16 @@ public class RecoveryStrategy implements Runnable, Closeable {
ReplicationHandler replicationHandler = (ReplicationHandler) handler;
if (replicationHandler == null) {
- throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
- "Skipping recovery, no " + ReplicationHandler.PATH + " handler found");
+ throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Skipping recovery, no " + ReplicationHandler.PATH + " handler found");
}
closer.collect("abortFetch", () -> {
replicationHandler.abortFetch();
});
}
+ } finally {
+ core = null;
}
-
log.warn("Stopping recovery for core=[{}] coreNodeName=[{}]", coreName, coreZkNodeName);
//ObjectReleaseTracker.release(this);
}
@@ -351,7 +355,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
return;
}
// set request info for logging
- try (SolrCore core = cc.getCore(coreName)) {
+
if (core == null) {
SolrException.log(log, "SolrCore not found - cannot recover:" + coreName);
@@ -370,7 +374,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);
}
- }
+
} finally {
close();
}
@@ -771,8 +775,8 @@ public class RecoveryStrategy implements Runnable, Closeable {
zkController.publish(this.coreDescriptor, Replica.State.ACTIVE);
} catch (InterruptedException e) {
ParWork.propegateInterrupt(e);
- return;
- }catch (Exception e) {
+ close = true;
+ } catch (Exception e) {
log.error("Could not publish as ACTIVE after succesful recovery", e);
successfulRecovery = false;
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
index 64fd3d1..fd39654 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java
@@ -2114,7 +2114,7 @@ public class ZkController implements Closeable {
return coreNodeName;
}
- public void preRegister(CoreDescriptor cd, boolean publishState) {
+ public void preRegister(CoreDescriptor cd) {
log.info("PreRegister SolrCore, collection={}, shard={} coreNodeName={}", cd.getCloudDescriptor().getCollectionName(), cd.getCloudDescriptor().getShardId());
CloudDescriptor cloudDesc = cd.getCloudDescriptor();
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index 4aefae5..7f521e7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -124,75 +124,63 @@ public class ZkStateWriter {
String name = entry.getKey();
String path = ZkStateReader.getCollectionPath(name);
- while (true) {
- try {
+ try {
- if (c == null) {
- // let's clean up the state.json of this collection only, the rest should be clean by delete collection cmd
- if (log.isDebugEnabled()) {
- log.debug("going to delete state.json {}", path);
- }
- reader.getZkClient().clean(path);
- updatesToWrite.remove(name);
- } else if (updatesToWrite.get(name) != null || prevState.getCollectionOrNull(name) != null) {
- if (log.isDebugEnabled()) {
- log.debug(
- "enqueueUpdate() - going to update_collection {} version: {}",
- path, c.getZNodeVersion());
- }
+ if (c == null) {
+ // let's clean up the state.json of this collection only, the rest should be clean by delete collection cmd
+ if (log.isDebugEnabled()) {
+ log.debug("going to delete state.json {}", path);
+ }
+ reader.getZkClient().clean(path);
+ updatesToWrite.remove(name);
+ } else if (updatesToWrite.get(name) != null || prevState.getCollectionOrNull(name) != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("enqueueUpdate() - going to update_collection {} version: {}", path, c.getZNodeVersion());
+ }
- // assert c.getStateFormat() > 1;
- // stat = reader.getZkClient().getCurator().checkExists().forPath(path);
- DocCollection coll = updatesToWrite.get(name);
- if (coll == null) {
- coll = prevState.getCollectionOrNull(name);
- }
+ // assert c.getStateFormat() > 1;
+ // stat = reader.getZkClient().getCurator().checkExists().forPath(path);
+ DocCollection coll = updatesToWrite.get(name);
+ if (coll == null) {
+ coll = prevState.getCollectionOrNull(name);
+ }
- if (log.isDebugEnabled()) {
- log.debug("The new collection {}", c);
- }
- updatesToWrite.put(name, c);
- LinkedHashMap collStates = new LinkedHashMap<>(prevState.getCollectionStates());
- collStates.put(name, new ClusterState.CollectionRef(c));
- prevState = new ClusterState(prevState.getLiveNodes(),
- collStates, prevState.getZNodeVersion());
- } else {
- if (log.isDebugEnabled()) {
- log.debug(
- "enqueueUpdate() - going to create_collection {}",
- path);
- }
- // assert c.getStateFormat() > 1;
- DocCollection newCollection = new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(),
- 0, path);
-
- LinkedHashMap collStates = new LinkedHashMap<>(prevState.getCollectionStates());
- collStates.put(name, new ClusterState.CollectionRef(newCollection));
- prevState = new ClusterState(prevState.getLiveNodes(),
- collStates, prevState.getZNodeVersion());
- updatesToWrite.put(name, newCollection);
+ if (log.isDebugEnabled()) {
+ log.debug("The new collection {}", c);
}
+ updatesToWrite.put(name, c);
+ LinkedHashMap collStates = new LinkedHashMap<>(prevState.getCollectionStates());
+ collStates.put(name, new ClusterState.CollectionRef(c));
+ prevState = new ClusterState(prevState.getLiveNodes(), collStates, prevState.getZNodeVersion());
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("enqueueUpdate() - going to create_collection {}", path);
+ }
+ // assert c.getStateFormat() > 1;
+ DocCollection newCollection = new DocCollection(name, c.getSlicesMap(), c.getProperties(), c.getRouter(), 0, path);
- break;
- } catch (InterruptedException | AlreadyClosedException e) {
- ParWork.propegateInterrupt(e);
- throw e;
- } catch (KeeperException.SessionExpiredException e) {
+ LinkedHashMap collStates = new LinkedHashMap<>(prevState.getCollectionStates());
+ collStates.put(name, new ClusterState.CollectionRef(newCollection));
+ prevState = new ClusterState(prevState.getLiveNodes(), collStates, prevState.getZNodeVersion());
+ updatesToWrite.put(name, newCollection);
+ }
+ } catch (InterruptedException | AlreadyClosedException e) {
+ ParWork.propegateInterrupt(e);
+ throw e;
+ } catch (KeeperException.SessionExpiredException e) {
+ throw e;
+ } catch (Exception e) {
+ ParWork.propegateInterrupt(e);
+ if (e instanceof KeeperException.BadVersionException) {
+ log.warn("Tried to update the cluster state using but we where rejected, currently at {}", c == null ? "null" : c.getZNodeVersion(), e);
throw e;
- } catch (Exception e) {
- ParWork.propegateInterrupt(e);
- if (e instanceof KeeperException.BadVersionException) {
- log.warn(
- "Tried to update the cluster state using but we where rejected, currently at {}", c == null ? "null" : c.getZNodeVersion(), e);
- throw e;
- }
- ParWork.propegateInterrupt(e);
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
- "Failed processing update=" + c + "\n" + prevState, e);
}
+ ParWork.propegateInterrupt(e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Failed processing update=" + c + "\n" + prevState, e);
}
}
+
if (log.isDebugEnabled()) {
log.debug("enqueueUpdate(ClusterState, List<ZkWriteCommand>, ZkWriteCallback) - end");
}
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 80fada3..595c8d8 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -1365,7 +1365,7 @@ public class CoreContainer implements Closeable {
MDCLoggingContext.setCoreDescriptor(this, dcore);
SolrIdentifierValidator.validateCoreName(dcore.getName());
if (isZooKeeperAware()) {
- zkSys.getZkController().preRegister(dcore, publishState);
+ zkSys.getZkController().preRegister(dcore);
}
ConfigSet coreConfig = coreConfigService.loadConfigSet(dcore);
@@ -1445,9 +1445,11 @@ public class CoreContainer implements Closeable {
// Traverse full chain since CIE may not be root exception
Throwable cause = original;
- while ((cause = cause.getCause()) != null) {
- if (cause instanceof CorruptIndexException) {
- break;
+ if (!(cause instanceof CorruptIndexException)) {
+ while ((cause = cause.getCause()) != null) {
+ if (cause instanceof CorruptIndexException) {
+ break;
+ }
}
}
@@ -1639,93 +1641,93 @@ public class CoreContainer implements Closeable {
throw new AlreadyClosedException();
}
SolrCore newCore = null;
- SolrCore core = solrCores.getCoreFromAnyList(name, false);
- if (core != null) {
+ try (SolrCore core = solrCores.getCoreFromAnyList(name, true)) {
+ if (core != null) {
- // The underlying core properties files may have changed, we don't really know. So we have a (perhaps) stale
- // CoreDescriptor and we need to reload it from the disk files
- CoreDescriptor cd;
- if (core.getDirectoryFactory().isPersistent()) {
- cd = reloadCoreDescriptor(core.getCoreDescriptor());
- } else {
- cd = core.getCoreDescriptor();
- }
- solrCores.addCoreDescriptor(cd);
- Closeable oldCore = null;
- boolean success = false;
- try {
- solrCores.waitForLoadingCoreToFinish(cd.getName(), 15000);
- ConfigSet coreConfig = coreConfigService.loadConfigSet(cd);
- log.info("Reloading SolrCore '{}' using configuration from {}", cd.getName(), coreConfig.getName());
- newCore = core.reload(coreConfig);
-
- DocCollection docCollection = null;
- if (getZkController() != null) {
- docCollection = getZkController().getClusterState().getCollection(cd.getCollectionName());
- // turn off indexing now, before the new core is registered
- if (docCollection.getBool(ZkStateReader.READ_ONLY, false)) {
- newCore.readOnly = true;
- }
+ // The underlying core properties files may have changed, we don't really know. So we have a (perhaps) stale
+ // CoreDescriptor and we need to reload it from the disk files
+ CoreDescriptor cd;
+ if (core.getDirectoryFactory().isPersistent()) {
+ cd = reloadCoreDescriptor(core.getCoreDescriptor());
+ } else {
+ cd = core.getCoreDescriptor();
}
+ solrCores.addCoreDescriptor(cd);
+ Closeable oldCore = null;
+ boolean success = false;
+ try {
+ solrCores.waitForLoadingCoreToFinish(cd.getName(), 15000);
+ ConfigSet coreConfig = coreConfigService.loadConfigSet(cd);
+ log.info("Reloading SolrCore '{}' using configuration from {}", cd.getName(), coreConfig.getName());
+ newCore = core.reload(coreConfig);
+
+ DocCollection docCollection = null;
+ if (getZkController() != null) {
+ docCollection = getZkController().getClusterState().getCollection(cd.getCollectionName());
+ // turn off indexing now, before the new core is registered
+ if (docCollection.getBool(ZkStateReader.READ_ONLY, false)) {
+ newCore.readOnly = true;
+ }
+ }
- registerCore(cd, newCore, false, false);
-
- // force commit on old core if the new one is readOnly and prevent any new updates
- if (newCore.readOnly) {
- RefCounted<IndexWriter> iwRef = core.getSolrCoreState().getIndexWriter(null);
- if (iwRef != null) {
- IndexWriter iw = iwRef.get();
- // switch old core to readOnly
- core.readOnly = true;
- try {
- if (iw != null) {
- iw.commit();
+ registerCore(cd, newCore, false, false);
+
+ // force commit on old core if the new one is readOnly and prevent any new updates
+ if (newCore.readOnly) {
+ RefCounted<IndexWriter> iwRef = core.getSolrCoreState().getIndexWriter(null);
+ if (iwRef != null) {
+ IndexWriter iw = iwRef.get();
+ // switch old core to readOnly
+ core.readOnly = true;
+ try {
+ if (iw != null) {
+ iw.commit();
+ }
+ } finally {
+ iwRef.decref();
}
- } finally {
- iwRef.decref();
}
}
- }
+ if (docCollection != null) {
+ Replica replica = docCollection.getReplica(cd.getCloudDescriptor().getCoreNodeName());
+ assert replica != null;
+ if (replica.getType() == Replica.Type.TLOG) { // TODO: needed here?
+ getZkController().stopReplicationFromLeader(core.getName());
+ if (!cd.getCloudDescriptor().isLeader()) {
+ getZkController().startReplicationFromLeader(newCore.getName(), true);
+ }
- if (docCollection != null) {
- Replica replica = docCollection.getReplica(cd.getCloudDescriptor().getCoreNodeName());
- assert replica != null;
- if (replica.getType() == Replica.Type.TLOG) { // TODO: needed here?
- getZkController().stopReplicationFromLeader(core.getName());
- if (!cd.getCloudDescriptor().isLeader()) {
- getZkController().startReplicationFromLeader(newCore.getName(), true);
+ } else if (replica.getType() == Replica.Type.PULL) {
+ getZkController().startReplicationFromLeader(newCore.getName(), false);
}
-
- } else if (replica.getType() == Replica.Type.PULL) {
- getZkController().startReplicationFromLeader(newCore.getName(), false);
}
- }
- success = true;
- } catch (SolrCoreState.CoreIsClosedException e) {
- throw e;
- } catch (Exception e) {
- ParWork.propegateInterrupt("Exception reloading SolrCore", e);
- SolrException exp = new SolrException(ErrorCode.SERVER_ERROR, "Unable to reload core [" + cd.getName() + "]", e);
- try {
- coreInitFailures.put(cd.getName(), new CoreLoadFailure(cd, e));
+ success = true;
+ } catch (SolrCoreState.CoreIsClosedException e) {
+ throw e;
+ } catch (Exception e) {
+ ParWork.propegateInterrupt("Exception reloading SolrCore", e);
+ SolrException exp = new SolrException(ErrorCode.SERVER_ERROR, "Unable to reload core [" + cd.getName() + "]", e);
+ try {
+ coreInitFailures.put(cd.getName(), new CoreLoadFailure(cd, e));
- } catch (Exception e1) {
- ParWork.propegateInterrupt(e1);
- exp.addSuppressed(e1);
- }
- throw exp;
- } finally {
- if (!success) {
- ParWork.close(newCore);
+ } catch (Exception e1) {
+ ParWork.propegateInterrupt(e1);
+ exp.addSuppressed(e1);
+ }
+ throw exp;
+ } finally {
+ if (!success) {
+ ParWork.close(newCore);
+ }
}
- }
- } else {
- CoreLoadFailure clf = coreInitFailures.get(name);
- if (clf != null) {
- createFromDescriptor(clf.cd, true, false);
} else {
- throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No such core: " + name);
+ CoreLoadFailure clf = coreInitFailures.get(name);
+ if (clf != null) {
+ createFromDescriptor(clf.cd, true, false);
+ } else {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No such core: " + name);
+ }
}
}
}
@@ -1888,7 +1890,7 @@ public class CoreContainer implements Closeable {
* @see SolrCore#close()
*/
public SolrCore getCore(String name) {
- return getCore(name, false);
+ return getCore(name, true);
}
/**
@@ -1899,10 +1901,10 @@ public class CoreContainer implements Closeable {
* @throws SolrCoreInitializationException if a SolrCore with this name failed to be initialized
* @see SolrCore#close()
*/
- public SolrCore getCore(String name, boolean forClose) {
+ public SolrCore getCore(String name, boolean incRefCount) {
// Do this in two phases since we don't want to lock access to the cores over a load.
- SolrCore core = solrCores.getCoreFromAnyList(name, true, true);
+ SolrCore core = solrCores.getCoreFromAnyList(name, incRefCount, true);
// If a core is loaded, we're done just return it.
if (core != null) {
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 7c14937..1cadb10 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -194,6 +194,8 @@ public final class SolrCore implements SolrInfoBean, Closeable {
private volatile String name;
private String logid; // used to show what name is set
+ private final Object closeAndWait = new Object();
+
private volatile boolean isReloaded = false;
private final SolrConfig solrConfig;
@@ -1413,17 +1415,12 @@ public final class SolrCore implements SolrInfoBean, Closeable {
public void closeAndWait() {
close();
while (!isClosed()) {
- final long milliSleep = 250;
- if (log.isDebugEnabled()) {
- log.debug("Core {} is not yet closed, waiting {} ms before checking again.", getName(), milliSleep);
- }
- try {
- Thread.sleep(milliSleep);
- } catch (InterruptedException e) {
- ParWork.propegateInterrupt(e);
- throw new SolrException(ErrorCode.SERVER_ERROR,
- "Caught InterruptedException whilst waiting for core " + getName() + " to close: "
- + e.getMessage(), e);
+ synchronized (closeAndWait) {
+ try {
+ closeAndWait.wait(500);
+ } catch (InterruptedException e) {
+ ParWork.propegateInterrupt(e);
+ }
}
}
}
@@ -1573,9 +1570,12 @@ public final class SolrCore implements SolrInfoBean, Closeable {
*/
@Override
public void close() {
- int count = refCount.decrementAndGet();
- if (count > 0) return; // close is called often, and only actually closes if nothing is using it.
- if (count < 0) {
+ int count = refCount.get();
+ if (count - 1 > 0) {
+ refCount.decrementAndGet();
+ return; // close is called often, and only actually closes if nothing is using it.
+ }
+ if (count - 1 < 0) {
log.error("Too many close [count:{}] on {}. Please report this exception to solr-user@lucene.apache.org", count, this);
throw new SolrException(ErrorCode.SERVER_ERROR, "Too many closes on SolrCore");
}
@@ -1729,13 +1729,16 @@ public final class SolrCore implements SolrInfoBean, Closeable {
} finally {
+ infoRegistry.clear();
+
+ //areAllSearcherReferencesEmpty();
+ refCount.set(-1);
+ synchronized (closeAndWait) {
+ closeAndWait.notifyAll();
+ }
assert ObjectReleaseTracker.release(this);
}
- infoRegistry.clear();
- //areAllSearcherReferencesEmpty();
- refCount.set(-1);
- ObjectReleaseTracker.release(this);
}
/**
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCores.java b/solr/core/src/java/org/apache/solr/core/SolrCores.java
index 79d5a51..c24d985 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCores.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCores.java
@@ -228,9 +228,9 @@ class SolrCores implements Closeable {
return set;
}
- SolrCore getCore(String name) {
- return cores.get(name);
- }
+// SolrCore getCore(String name) {
+// return cores.get(name);
+// }
protected void swap(String n0, String n1) {
if (isClosed()) {
@@ -289,7 +289,7 @@ class SolrCores implements Closeable {
/* If you don't increment the reference count, someone could close the core before you use it. */
SolrCore getCoreFromAnyList(String name, boolean incRefCount, boolean onClose) {
- if (!onClose && closed) {
+ if (closed) {
throw new AlreadyClosedException("SolrCores has been closed");
}
SolrCore core = cores.get(name);
diff --git a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
index b1d7814..44a561d 100644
--- a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
+++ b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
@@ -43,6 +43,7 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import io.opentracing.Span;
import org.apache.commons.io.IOUtils;
@@ -185,7 +186,7 @@ public class HttpSolrCall {
protected Action action;
protected String coreUrl;
protected SolrConfig config;
- protected Map<String, Integer> invalidStates;
+ protected volatile Map<String, Integer> invalidStates;
//The states of client that is invalid in this request
protected String origCorename; // What's in the URL path; might reference a collection/alias or a Solr core name
@@ -260,9 +261,7 @@ public class HttpSolrCall {
solrReq.getContext().put(CoreContainer.class.getName(), cores);
requestType = RequestType.ADMIN;
action = ADMIN;
- if (cores.isZooKeeperAware()) {
- invalidStates = checkStateVersionsAreValid(solrReq.getParams().get(CloudSolrClient.STATE_VERSION));
- }
+ ensureStatesAreAtLeastAtClient();
return;
}
@@ -347,6 +346,8 @@ public class HttpSolrCall {
solrReq = parser.parse(core, path, req);
}
+ ensureStatesAreAtLeastAtClient();
+
invalidStates = checkStateVersionsAreValid(solrReq.getParams().get(CloudSolrClient.STATE_VERSION));
addCollectionParamIfNeeded(getCollectionsList());
@@ -360,6 +361,31 @@ public class HttpSolrCall {
action = PASSTHROUGH;
}
+ private void ensureStatesAreAtLeastAtClient() throws InterruptedException, TimeoutException {
+ if (cores.isZooKeeperAware()) {
+ invalidStates = checkStateVersionsAreValid(solrReq.getParams().get(CloudSolrClient.STATE_VERSION));
+ if (invalidStates != null) {
+ Set<Map.Entry<String,Integer>> entries = invalidStates.entrySet();
+ for (Map.Entry<String,Integer> entry : entries) {
+ String collection = entry.getKey();
+ Integer version = entry.getValue();
+
+ if (cores.getZkController().getZkStateReader().watched(collection)) {
+ cores.getZkController().getZkStateReader().waitForState(collection, 5, TimeUnit.SECONDS, (liveNodes, collectionState) -> {
+ if (collectionState == null) {
+ return false;
+ }
+ if (collectionState.getZNodeVersion() < version) {
+ return false;
+ }
+ return true;
+ });
+ }
+ }
+ }
+ }
+ }
+
protected void autoCreateSystemColl(String corename) throws Exception {
if (core == null &&
SYSTEM_COLL.equals(corename) &&
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index 5f5da9f..1fbe7e9 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -585,7 +585,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
private long waitForDependentUpdates(AddUpdateCommand cmd, long versionOnUpdate,
boolean isReplayOrPeersync, VersionBucket bucket) throws IOException {
long lastFoundVersion = 0;
- TimeOut waitTimeout = new TimeOut(Integer.getInteger("solr.dependentupdate.timeout", 3) , TimeUnit.SECONDS, TimeSource.NANO_TIME);
+ int wait = Integer.getInteger("solr.dependentupdate.timeout", 5);
+ TimeOut waitTimeout = new TimeOut(wait, TimeUnit.SECONDS, TimeSource.NANO_TIME);
vinfo.lockForUpdate();
try {
@@ -684,7 +685,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
String leaderUrl = getLeaderUrl(id);
- if(leaderUrl == null) {
+ if (leaderUrl == null) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Can't find document with id=" + id);
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionPropsTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionPropsTest.java
index a73cd35..fb13824 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionPropsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionPropsTest.java
@@ -146,7 +146,7 @@ public class CollectionPropsTest extends SolrCloudTestCase {
return;
}
lastValueSeen = value;
- Thread.sleep(250);
+ Thread.sleep(50);
}
String collectionpropsInZk = null;
try {
@@ -177,7 +177,6 @@ public class CollectionPropsTest extends SolrCloudTestCase {
// Trigger a new znode event
log.info("setting value1");
collectionProps.setCollectionProperty(collectionName, "property", "value1");
- Thread.sleep(1000);
assertEquals(1, watcher.waitForTrigger());
assertEquals("value1", watcher.getProps().get("property"));
diff --git a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
index af8197d..9f92f10 100644
--- a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
+++ b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java
@@ -50,6 +50,11 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase {
params(DISTRIB_UPDATE_PARAM, FROM_LEADER);
public PeerSyncTest() {
+ try {
+ useFactory(null);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
stress = 0;
// TODO: a better way to do this?
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 57d20d1..f15fa11 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -1565,6 +1565,10 @@ public class ZkStateReader implements SolrCloseable {
}
}
+ public boolean watched(String collection) {
+ return collectionWatches.contains(collection);
+ }
+
/**
* Notify this reader that a local core that is a member of a collection has been closed.
* <p>