You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/02/19 06:42:38 UTC
[lucene-solr] branch reference_impl_dev updated: @1355 Fix an issue
with dist updates and a bit of surrounding test cleanup.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl_dev by this push:
new 2cab3c1 @1355 Fix an issue with dist updates and a bit of surrounding test cleanup.
2cab3c1 is described below
commit 2cab3c1acc4264fe0dc29c0f676fcc65d59616ab
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Fri Feb 19 00:42:10 2021 -0600
@1355 Fix an issue with dist updates and a bit of surrounding test cleanup.
---
.../processor/DistributedUpdateProcessor.java | 4 +--
.../processor/DistributedZkUpdateProcessor.java | 6 +++-
.../solr/cloud/FullSolrCloudDistribCmdsTest.java | 4 +--
.../handler/admin/MetricsHistoryHandlerTest.java | 8 ++---
.../solr/client/solrj/impl/Http2SolrClient.java | 7 ++--
.../org/apache/solr/common/cloud/SolrZkClient.java | 3 ++
.../apache/solr/common/cloud/ZkStateReader.java | 38 +++++++++++++++-------
7 files changed, 48 insertions(+), 22 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index f6e83a6..9ea8e6c 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -373,7 +373,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// realtime-get to work reliably.
// TODO: if versions aren't stored, do we need to set on the cmd anyway for some reason?
// there may be other reasons in the future for a version on the commands
- boolean nodist = noDistrib();
+
AddUpdateCommand cloneCmd = null;
if (versionsStored) {
@@ -513,7 +513,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
AddUpdateCommand finalCloneCmd;
- if (!nodist) {
+ if (forwardToLeader || getNodes() != null && getNodes().size() > 0) {
if (cloneCmd != null) {
finalCloneCmd = cloneCmd;
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index 8e58a3a..3e57a3b 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -342,7 +342,11 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
if (log.isDebugEnabled()) log.debug("Using nodes {}", nodes);
if (nodes != null && nodes.size() > 0) {
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
- params.set(DISTRIB_UPDATE_PARAM, (isLeader || isSubShardLeader ? DistribPhase.FROMLEADER.toString() : DistribPhase.TOLEADER.toString()));
+ if (forwardToLeader) {
+ params.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
+ } else {
+ params.set(DISTRIB_UPDATE_PARAM, (isLeader || isSubShardLeader ? DistribPhase.FROMLEADER.toString() : DistribPhase.TOLEADER.toString()));
+ }
params.set(DISTRIB_FROM, Replica.getCoreUrl(zkController.getBaseUrl(), req.getCore().getName()));
if (req.getParams().get(UpdateRequest.MIN_REPFACT) != null) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
index 319da81..8247710 100644
--- a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
@@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory;
* Super basic testing, no shard restarting or anything.
*/
@Slow
-@LuceneTestCase.Nightly // nocommit flakey
+@LuceneTestCase.Nightly // nocommit flakey + using testConcurrentIndexing as custom test
public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final AtomicInteger NAME_COUNTER = new AtomicInteger(1);
@@ -496,7 +496,7 @@ public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
cluster.stopJettyRunners();
cluster.startJettyRunners();
- cluster.waitForActiveCollection(collectionName, 3, 9);
+ // cluster.waitForActiveCollection(collectionName, 2, 4);
cluster.getSolrClient().getZkStateReader().checkShardConsistency(collectionName, false, true);
//checkShardConsistency(params("q","*:*", "rows", ""+(1 + numDocs),"_trace","addAll"));
diff --git a/solr/core/src/test/org/apache/solr/handler/admin/MetricsHistoryHandlerTest.java b/solr/core/src/test/org/apache/solr/handler/admin/MetricsHistoryHandlerTest.java
index cb93268..49f81d4 100644
--- a/solr/core/src/test/org/apache/solr/handler/admin/MetricsHistoryHandlerTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/admin/MetricsHistoryHandlerTest.java
@@ -116,15 +116,15 @@ public class MetricsHistoryHandlerTest extends SolrCloudTestCase {
List<Pair<String, Long>> list = handler.getFactory().list(100);
if (list.size() == 0) {
- TimeOut timeout = new TimeOut(1000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
- while (!timeout.hasTimedOut() && list.size() == 0) {
+ TimeOut timeout = new TimeOut(3000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+ while (!timeout.hasTimedOut() && list.size() <= 1) {
Thread.sleep(10);
list = handler.getFactory().list(100);
}
}
// solr.jvm, solr.node, solr.collection..system
- // Ahem - replicas are disabled by default, nodes too, though I enabled - solr.jvm is not populated, we make this request handler ourselves.
- assertEquals(list.toString(), 1, list.size());
+
+ assertEquals(list.toString(), 2, list.size());
for (Pair<String, Long> p : list) {
RrdDb db = RrdDb.getBuilder().setPath(MetricsHistoryHandler.URI_PREFIX + p.first()).setReadOnly(true).setBackendFactory( handler.getFactory()).setUsePool(true).build();
int dsCount = db.getDsCount();
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 21d91fe..cb97695 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -1055,7 +1055,10 @@ public class Http2SolrClient extends SolrClient {
}
}
- public void waitForComplete() {
+ public synchronized void waitForComplete() {
+ if (phaser.getRegisteredParties() == 1) {
+ return;
+ }
if (log.isTraceEnabled()) log.trace("Before wait for outstanding requests registered: {} arrived: {}, {} {}", phaser.getRegisteredParties(), phaser.getArrivedParties(), phaser.getUnarrivedParties(), phaser);
try {
phaser.awaitAdvanceInterruptibly(phaser.arrive(), idleTimeout, TimeUnit.MILLISECONDS);
@@ -1065,7 +1068,7 @@ public class Http2SolrClient extends SolrClient {
ParWork.propagateInterrupt(e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
} catch (TimeoutException e) {
- throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout", e);
+ throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout waiting for outstanding async requests", e);
}
if (log.isTraceEnabled()) log.trace("After wait for outstanding requests {}", phaser);
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 434db40..ff7e929 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -787,6 +787,9 @@ public class SolrZkClient implements Closeable {
public void delete(Collection<String> paths, boolean wait) throws KeeperException {
if (log.isDebugEnabled()) log.debug("delete paths {} wait={}", paths, wait);
+ if (paths.size() == 0) {
+ return;
+ }
CountDownLatch latch = null;
if (wait) {
latch = new CountDownLatch(paths.size());
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 7d85c4d..9b7384b 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -2147,27 +2147,19 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
throws InterruptedException, TimeoutException {
DocCollection coll = clusterState.getCollectionOrNull(collection);
- if (predicate.matches(liveNodes, coll)) {
+ if (predicate.matches(getLiveNodes(), coll)) {
return;
}
final CountDownLatch latch = new CountDownLatch(1);
AtomicReference<DocCollection> docCollection = new AtomicReference<>();
- org.apache.solr.common.cloud.CollectionStateWatcher watcher = (n, c) -> {
- // if (isClosed()) return true;
- docCollection.set(c);
- boolean matches = predicate.matches(this.liveNodes, c);
- if (matches)
- latch.countDown();
-
- return matches;
- };
+ org.apache.solr.common.cloud.CollectionStateWatcher watcher = new PredicateMatcher(predicate, latch, docCollection).invoke();
registerCollectionStateWatcher(collection, watcher);
try {
// wait for the watcher predicate to return true, or time out
if (!latch.await(wait, unit)) {
coll = clusterState.getCollectionOrNull(collection);
- if (predicate.matches(liveNodes, coll)) {
+ if (predicate.matches(getLiveNodes(), coll)) {
return;
}
@@ -3063,4 +3055,28 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
}
}
}
+
+ private class PredicateMatcher {
+ private CollectionStatePredicate predicate;
+ private CountDownLatch latch;
+ private AtomicReference<DocCollection> docCollection;
+
+ public PredicateMatcher(CollectionStatePredicate predicate, CountDownLatch latch, AtomicReference<DocCollection> docCollection) {
+ this.predicate = predicate;
+ this.latch = latch;
+ this.docCollection = docCollection;
+ }
+
+ public org.apache.solr.common.cloud.CollectionStateWatcher invoke() {
+ return (n, c) -> {
+ // if (isClosed()) return true;
+ docCollection.set(c);
+ boolean matches = predicate.matches(getLiveNodes(), c);
+ if (matches)
+ latch.countDown();
+
+ return matches;
+ };
+ }
+ }
}