You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2021/02/19 06:42:38 UTC

[lucene-solr] branch reference_impl_dev updated: @1355 Fix an issue with dist updates and a bit of surrounding test cleanup.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl_dev by this push:
     new 2cab3c1  @1355 Fix an issue with dist updates and a bit of surrounding test cleanup.
2cab3c1 is described below

commit 2cab3c1acc4264fe0dc29c0f676fcc65d59616ab
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Fri Feb 19 00:42:10 2021 -0600

    @1355 Fix an issue with dist updates and a bit of surrounding test cleanup.
---
 .../processor/DistributedUpdateProcessor.java      |  4 +--
 .../processor/DistributedZkUpdateProcessor.java    |  6 +++-
 .../solr/cloud/FullSolrCloudDistribCmdsTest.java   |  4 +--
 .../handler/admin/MetricsHistoryHandlerTest.java   |  8 ++---
 .../solr/client/solrj/impl/Http2SolrClient.java    |  7 ++--
 .../org/apache/solr/common/cloud/SolrZkClient.java |  3 ++
 .../apache/solr/common/cloud/ZkStateReader.java    | 38 +++++++++++++++-------
 7 files changed, 48 insertions(+), 22 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index f6e83a6..9ea8e6c 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -373,7 +373,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     // realtime-get to work reliably.
     // TODO: if versions aren't stored, do we need to set on the cmd anyway for some reason?
     // there may be other reasons in the future for a version on the commands
-    boolean nodist = noDistrib();
+
     AddUpdateCommand cloneCmd = null;
     if (versionsStored) {
 
@@ -513,7 +513,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
 
 
     AddUpdateCommand finalCloneCmd;
-    if (!nodist) {
+    if (forwardToLeader || getNodes() != null && getNodes().size() > 0) {
 
       if (cloneCmd != null) {
         finalCloneCmd = cloneCmd;
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index 8e58a3a..3e57a3b 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -342,7 +342,11 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
     if (log.isDebugEnabled()) log.debug("Using nodes {}", nodes);
     if (nodes != null && nodes.size() > 0) {
       ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
-      params.set(DISTRIB_UPDATE_PARAM, (isLeader || isSubShardLeader ? DistribPhase.FROMLEADER.toString() : DistribPhase.TOLEADER.toString()));
+      if (forwardToLeader) {
+        params.set(DISTRIB_UPDATE_PARAM,  DistribPhase.TOLEADER.toString());
+      } else {
+        params.set(DISTRIB_UPDATE_PARAM, (isLeader || isSubShardLeader ? DistribPhase.FROMLEADER.toString() : DistribPhase.TOLEADER.toString()));
+      }
       params.set(DISTRIB_FROM, Replica.getCoreUrl(zkController.getBaseUrl(), req.getCore().getName()));
 
       if (req.getParams().get(UpdateRequest.MIN_REPFACT) != null) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
index 319da81..8247710 100644
--- a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
@@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory;
  * Super basic testing, no shard restarting or anything.
  */
 @Slow
-@LuceneTestCase.Nightly // nocommit flakey
+@LuceneTestCase.Nightly // nocommit flakey + using testConcurrentIndexing as custom test
 public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private static final AtomicInteger NAME_COUNTER = new AtomicInteger(1);
@@ -496,7 +496,7 @@ public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
     cluster.stopJettyRunners();
     cluster.startJettyRunners();
 
-    cluster.waitForActiveCollection(collectionName, 3, 9);
+ //   cluster.waitForActiveCollection(collectionName, 2, 4);
 
     cluster.getSolrClient().getZkStateReader().checkShardConsistency(collectionName, false, true);
     //checkShardConsistency(params("q","*:*", "rows", ""+(1 + numDocs),"_trace","addAll"));
diff --git a/solr/core/src/test/org/apache/solr/handler/admin/MetricsHistoryHandlerTest.java b/solr/core/src/test/org/apache/solr/handler/admin/MetricsHistoryHandlerTest.java
index cb93268..49f81d4 100644
--- a/solr/core/src/test/org/apache/solr/handler/admin/MetricsHistoryHandlerTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/admin/MetricsHistoryHandlerTest.java
@@ -116,15 +116,15 @@ public class MetricsHistoryHandlerTest extends SolrCloudTestCase {
     List<Pair<String, Long>> list = handler.getFactory().list(100);
 
     if (list.size() == 0) {
-      TimeOut timeout = new TimeOut(1000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
-      while (!timeout.hasTimedOut() && list.size() == 0) {
+      TimeOut timeout = new TimeOut(3000, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
+      while (!timeout.hasTimedOut() && list.size() <= 1) {
         Thread.sleep(10);
         list = handler.getFactory().list(100);
       }
     }
     // solr.jvm, solr.node, solr.collection..system
-    // Ahem - replicas are disabled by default, nodes too, though I enabled - solr.jvm is not populated, we make this request handler ourselves.
-    assertEquals(list.toString(), 1, list.size());
+
+    assertEquals(list.toString(), 2, list.size());
     for (Pair<String, Long> p : list) {
       RrdDb db = RrdDb.getBuilder().setPath(MetricsHistoryHandler.URI_PREFIX + p.first()).setReadOnly(true).setBackendFactory( handler.getFactory()).setUsePool(true).build();
       int dsCount = db.getDsCount();
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 21d91fe..cb97695 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -1055,7 +1055,10 @@ public class Http2SolrClient extends SolrClient {
       }
     }
 
-    public void waitForComplete() {
+    public synchronized void waitForComplete() {
+      if (phaser.getRegisteredParties() == 1) {
+        return;
+      }
       if (log.isTraceEnabled()) log.trace("Before wait for outstanding requests registered: {} arrived: {}, {} {}", phaser.getRegisteredParties(), phaser.getArrivedParties(), phaser.getUnarrivedParties(), phaser);
       try {
         phaser.awaitAdvanceInterruptibly(phaser.arrive(), idleTimeout, TimeUnit.MILLISECONDS);
@@ -1065,7 +1068,7 @@ public class Http2SolrClient extends SolrClient {
         ParWork.propagateInterrupt(e);
         throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
       } catch (TimeoutException e) {
-        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout", e);
+        throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Timeout waiting for outstanding async requests", e);
       }
 
       if (log.isTraceEnabled()) log.trace("After wait for outstanding requests {}", phaser);
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 434db40..ff7e929 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -787,6 +787,9 @@ public class SolrZkClient implements Closeable {
 
   public void delete(Collection<String> paths, boolean wait) throws KeeperException {
     if (log.isDebugEnabled()) log.debug("delete paths {} wait={}", paths, wait);
+    if (paths.size() == 0) {
+      return;
+    }
     CountDownLatch latch = null;
     if (wait) {
       latch = new CountDownLatch(paths.size());
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 7d85c4d..9b7384b 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -2147,27 +2147,19 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       throws InterruptedException, TimeoutException {
 
     DocCollection coll = clusterState.getCollectionOrNull(collection);
-    if (predicate.matches(liveNodes, coll)) {
+    if (predicate.matches(getLiveNodes(), coll)) {
       return;
     }
     final CountDownLatch latch = new CountDownLatch(1);
     AtomicReference<DocCollection> docCollection = new AtomicReference<>();
-    org.apache.solr.common.cloud.CollectionStateWatcher watcher = (n, c) -> {
-      // if (isClosed()) return true;
-      docCollection.set(c);
-      boolean matches = predicate.matches(this.liveNodes, c);
-      if (matches)
-        latch.countDown();
-
-      return matches;
-    };
+    org.apache.solr.common.cloud.CollectionStateWatcher watcher = new PredicateMatcher(predicate, latch, docCollection).invoke();
     registerCollectionStateWatcher(collection, watcher);
     try {
 
       // wait for the watcher predicate to return true, or time out
       if (!latch.await(wait, unit)) {
         coll = clusterState.getCollectionOrNull(collection);
-        if (predicate.matches(liveNodes, coll)) {
+        if (predicate.matches(getLiveNodes(), coll)) {
           return;
         }
 
@@ -3063,4 +3055,28 @@ public class ZkStateReader implements SolrCloseable, Replica.NodeNameToBaseUrl {
       }
     }
   }
+
+  private class PredicateMatcher {
+    private CollectionStatePredicate predicate;
+    private CountDownLatch latch;
+    private AtomicReference<DocCollection> docCollection;
+
+    public PredicateMatcher(CollectionStatePredicate predicate, CountDownLatch latch, AtomicReference<DocCollection> docCollection) {
+      this.predicate = predicate;
+      this.latch = latch;
+      this.docCollection = docCollection;
+    }
+
+    public org.apache.solr.common.cloud.CollectionStateWatcher invoke() {
+      return (n, c) -> {
+        // if (isClosed()) return true;
+        docCollection.set(c);
+        boolean matches = predicate.matches(getLiveNodes(), c);
+        if (matches)
+          latch.countDown();
+
+        return matches;
+      };
+    }
+  }
 }