You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/07/30 13:44:01 UTC

[lucene-solr] 04/04: @470 More working out dist updates.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit f78f8c0466b2285420fb888d25c9f12f1e40873b
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Jul 30 08:42:06 2020 -0500

    @470 More working out dist updates.
---
 .../org/apache/solr/update/SolrCmdDistributor.java |  5 +-
 .../processor/DistributedUpdateProcessor.java      | 56 ++++++++++------------
 .../DistributedUpdateProcessorFactory.java         |  5 +-
 .../processor/DistributedZkUpdateProcessor.java    | 27 ++++++-----
 .../solr/cloud/FullSolrCloudDistribCmdsTest.java   | 11 +++--
 .../org/apache/solr/handler/JsonLoaderTest.java    |  6 +--
 .../src/java/org/apache/solr/common/ParWork.java   | 14 +++++-
 .../src/resources/logconf/log4j2-std-debug.xml     |  4 +-
 8 files changed, 68 insertions(+), 60 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index c7ed8fd..da12d4c 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -57,7 +57,7 @@ public class SolrCmdDistributor implements Closeable {
   private static final int MAX_RETRIES_ON_FORWARD = 3;
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private boolean finished = false; // see finish()
+  private volatile boolean finished = false; // see finish()
 
   private int maxRetriesOnForward = MAX_RETRIES_ON_FORWARD;
 
@@ -208,7 +208,8 @@ public class SolrCmdDistributor implements Closeable {
   }
 
   public void blockAndDoRetries() {
-    phaser.arriveAndAwaitAdvance();
+    //phaser.arriveAndAwaitAdvance();
+    solrClient.waitForOutstandingRequests();
   }
   
   void addCommit(UpdateRequest ureq, CommitUpdateCommand cmd) {
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index f1e9a11..4401302 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -92,11 +92,11 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
    * Requests from leader to it's followers will be retried this amount of times by default
    */
   static final int MAX_RETRIES_TO_FOLLOWERS_DEFAULT = Integer.getInteger("solr.retries.to.followers", 3);
-  private long versionOnUpdate;
-  private VersionBucket bucket;
-  private boolean isReplayOrPeersync;
-  private boolean leaderLogic;
-  private boolean forwardedFromCollection;
+  private volatile long versionOnUpdate;
+  private volatile VersionBucket bucket;
+  private volatile boolean isReplayOrPeersync;
+  private volatile boolean leaderLogic;
+  private volatile boolean forwardedFromCollection;
 
   /**
    * Values this processor supports for the <code>DISTRIB_UPDATE_PARAM</code>.
@@ -125,7 +125,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
   public static final String LOG_REPLAY = "log_replay";
 
   // used to assert we don't call finish more than once, see finish()
-  private boolean finished = false;
+  private volatile boolean finished = false;
 
   protected final SolrQueryRequest req;
   protected final SolrQueryResponse rsp;
@@ -135,7 +135,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
   @VisibleForTesting
   VersionInfo vinfo;
   private final boolean versionsStored;
-  private boolean returnVersions;
+  private volatile boolean returnVersions;
 
   private NamedList<Object> addsResponse = null;
   private NamedList<Object> deleteResponse = null;
@@ -146,9 +146,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
 
   // these are setup at the start of each request processing
   // method in this update processor
-  protected boolean isLeader = true;
-  protected boolean forwardToLeader = false;
-  protected boolean isSubShardLeader = false;
+  protected volatile boolean isLeader = true;
+  protected volatile boolean forwardToLeader = false;
+  protected volatile boolean isSubShardLeader = false;
   protected volatile boolean isIndexChanged = false;
 
   /**
@@ -160,7 +160,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
    */
   protected final int maxRetriesToFollowers = MAX_RETRIES_TO_FOLLOWERS_DEFAULT;
 
-  protected UpdateCommand updateCommand;  // the current command this processor is working on.
+  protected volatile UpdateCommand updateCommand;  // the current command this processor is working on.
 
   protected final Replica.Type replicaType;
 
@@ -233,23 +233,14 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
 
     if (dropCmd) {
       // TODO: do we need to add anything to the response?
+      log.info("Dropping update {}", cmd.getPrintableId());
       return;
     }
 
     SolrInputDocument clonedDoc = shouldCloneCmdDoc() ? cmd.solrDoc.deepCopy(): null;
-    AddUpdateCommand cmd2 = null;
+
     if (clonedDoc != null) {
-      cmd2 = new AddUpdateCommand(cmd.getReq());
-      cmd2.commitWithin = cmd.commitWithin;
-      cmd2.isNested = cmd.isNested;
-      cmd2.overwrite = cmd.overwrite;
-      cmd2.prevVersion = cmd.prevVersion;
-      cmd2.updateTerm = cmd.updateTerm;
-      cmd2.isLastDocInBatch = cmd.isLastDocInBatch;
-      cmd2.solrDoc = clonedDoc;
-      cmd2.setVersion((long) cmd.solrDoc.getFieldValue(CommonParams.VERSION_FIELD));
-      cmd2.setFlags(cmd.getFlags());
-      cmd2.setRoute(cmd.getRoute());
+      cmd.solrDoc = clonedDoc;
     }
     try (ParWork worker = new ParWork(this)) {
       if (!forwardToLeader) {
@@ -278,17 +269,17 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
           }
         });
       }
-      if (req.getCore().getCoreContainer().isZooKeeperAware()) {
-        AddUpdateCommand finalCmd;
-        if (cmd2 == null) {
-          finalCmd = cmd;
-        } else {
-          finalCmd = cmd2;
-        }
+      boolean zkAware = req.getCore().getCoreContainer().isZooKeeperAware();
+      log.info("Is zk aware {}", zkAware);
+      if (zkAware) {
+
+        log.info("Collect distrib add");
         worker.collect(() -> {
+          log.info("Run distrib add collection");
           try {
-            doDistribAdd(finalCmd);
-          } catch (Exception e) {
+            DistributedUpdateProcessor.this.doDistribAdd(cmd);
+            log.info("after distrib add collection");
+          } catch (Throwable e) {
             ParWork.propegateInterrupt(e);
             throw new SolrException(ErrorCode.SERVER_ERROR, e);
           }
@@ -319,6 +310,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
 
   protected void doDistribAdd(AddUpdateCommand cmd) throws IOException {
     // no-op for derived classes to implement
+    log.info("in dist add");
   }
 
   // must be synchronized by bucket
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
index 93c1bf2..19093a4 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java
@@ -57,8 +57,9 @@ public class DistributedUpdateProcessorFactory
             new DistributedZkUpdateProcessor(req, rsp, next) :
             new DistributedUpdateProcessor(req, rsp, next);
     // note: will sometimes return DURP (no overhead) instead of wrapping
-    return RoutedAliasUpdateProcessor.wrap(req,
-        distribUpdateProcessor);
+    UpdateRequestProcessor proc = RoutedAliasUpdateProcessor
+        .wrap(req, distribUpdateProcessor);
+    return proc;
   }
   
 }
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index 52277a8..c68390b 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -78,7 +78,6 @@ import org.slf4j.LoggerFactory;
 import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
 
 public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
-
   private final CloudDescriptor cloudDesc;
   private final ZkController zkController;
   private final SolrCmdDistributor cmdDistrib;
@@ -227,15 +226,18 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
             params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
                     zkController.getBaseUrl(), req.getCore().getName()));
 
-            List<SolrCmdDistributor.Node> finalUseNodes = useNodes;
-            ParWork.getExecutor().submit(() -> cmdDistrib.distribCommit(cmd, finalUseNodes, params));
-
+            cmdDistrib.distribCommit(cmd, useNodes, params);
           }
         }
         if (isLeader) {
 
           log.info("Do a local commit on NRT endpoint for leader");
-          doLocalCommit(cmd);
+          try {
+            doLocalCommit(cmd);
+          } catch (Exception e) {
+            log.error("Error on local commit");
+            throw new SolrException(ErrorCode.SERVER_ERROR, e);
+          }
 
           params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
 
@@ -251,9 +253,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
             params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
                     zkController.getBaseUrl(), req.getCore().getName()));
 
-
-            List<SolrCmdDistributor.Node> finalUseNodes1 = useNodes;
-            cmdDistrib.distribCommit(cmd, finalUseNodes1, params);
+            cmdDistrib.distribCommit(cmd, useNodes, params);
 
           }
 
@@ -287,6 +287,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
 
   @Override
   protected void doDistribAdd(AddUpdateCommand cmd) throws IOException {
+    log.info("in zk dist add");
     log.info("Distribute add cmd {} to {} {}", cmd, nodes, isLeader);
     if (isLeader && !isSubShardLeader)  {
       DocCollection coll = clusterState.getCollection(collection);
@@ -318,8 +319,10 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
         return;
 
       }
+    } else {
+      log.info("Not a shard or sub shard leader");
     }
-
+    log.info("Using nodes {}", nodes);
     if (nodes != null) {
       ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
       params.set(DISTRIB_UPDATE_PARAM,
@@ -351,9 +354,9 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
           }
 
       } else {
-        if (!isLeader && params.get(DISTRIB_UPDATE_PARAM).equals(DistribPhase.FROMLEADER.toString())) {
-          throw new IllegalStateException();
-        }
+//        if (!isLeader && params.get(DISTRIB_UPDATE_PARAM).equals(DistribPhase.FROMLEADER.toString())) {
+//          throw new IllegalStateException();
+//        }
         try {
           cmdDistrib
               .distribAdd(cmd, nodes, params, false, rollupReplicationTracker,
diff --git a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
index a1ff80c..5c929f4 100644
--- a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
@@ -466,22 +466,23 @@ public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
     final CloudHttp2SolrClient cloudClient = cluster.getSolrClient();
     final String collectionName = createAndSetNewDefaultCollection();
 
-    final int numDocs = 3;//atLeast(50);
+    final int numDocs = TEST_NIGHTLY ? atLeast(150) : 55;
     final JettySolrRunner nodeToUpdate = cluster.getRandomJetty(random());
     try (ConcurrentUpdateSolrClient indexClient
          = getConcurrentUpdateSolrClient(nodeToUpdate.getBaseUrl() + "/" + collectionName, 10, 2)) {
       
       for (int i = 0; i < numDocs; i++) {
+        log.info("add doc {}", i);
         indexClient.add(sdoc("id", i, "text_t",
                              TestUtil.randomRealisticUnicodeString(random(), 200)));
       }
       indexClient.blockUntilFinished();
-      
       assertEquals(0, indexClient.commit().getStatus());
-      assertEquals(numDocs, cloudClient.query(params("q","*:*")).getResults().getNumFound());
-
-      checkShardConsistency(params("q","*:*", "rows", ""+(1 + numDocs),"_trace","addAll"));
+      indexClient.blockUntilFinished();
     }
+    assertEquals(numDocs, cloudClient.query(params("q","*:*")).getResults().getNumFound());
+
+    checkShardConsistency(params("q","*:*", "rows", ""+(1 + numDocs),"_trace","addAll"));
   }
   
   /**
diff --git a/solr/core/src/test/org/apache/solr/handler/JsonLoaderTest.java b/solr/core/src/test/org/apache/solr/handler/JsonLoaderTest.java
index 49bf463..ccd832c 100644
--- a/solr/core/src/test/org/apache/solr/handler/JsonLoaderTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/JsonLoaderTest.java
@@ -647,7 +647,7 @@ public class JsonLoaderTest extends SolrTestCaseJ4 {
 
     ignoreException("big_integer_t");
 
-    SolrException ex = expectThrows(SolrException.class, () -> {
+    Exception ex = expectThrows(Exception.class, () -> {
       updateJ(json( "[{'id':'1','big_integer_tl':12345678901234567890}]" ), null);
     });
     // nocommit
@@ -655,10 +655,10 @@ public class JsonLoaderTest extends SolrTestCaseJ4 {
 
     // Adding a BigInteger to an integer field should fail
     // BigInteger.intValue() returns only the low-order 32 bits.
-    ex = expectThrows(SolrException.class, () -> {
+    ex = expectThrows(Exception.class, () -> {
       updateJ(json( "[{'id':'1','big_integer_ti':12345678901234567890}]" ), null);
     });
-    assertTrue(ex.getCause() instanceof NumberFormatException);
+    assertTrue(ex.getCause().getCause() instanceof NumberFormatException);
 
     unIgnoreException("big_integer_t");
   }
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWork.java b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
index 39c4535..88bf51c 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWork.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWork.java
@@ -530,7 +530,15 @@ public class ParWork implements Closeable {
                 continue;
 
               closeCalls.add(() -> {
-                handleObject(workUnit.label, exception, workUnitTracker, object);
+                try {
+                  handleObject(workUnit.label, exception, workUnitTracker,
+                      object);
+                } catch (Throwable t) {
+                  log.error(RAN_INTO_AN_ERROR_WHILE_DOING_WORK, t);
+                  if (exception.get() == null) {
+                    exception.set(t);
+                  }
+                }
                 return object;
               });
 
@@ -569,7 +577,9 @@ public class ParWork implements Closeable {
     } catch (Throwable t) {
       log.error(RAN_INTO_AN_ERROR_WHILE_DOING_WORK, t);
 
-      exception.set(t);
+      if (exception.get() == null) {
+        exception.set(t);
+      }
     } finally {
 
       tracker.doneClose();
diff --git a/solr/test-framework/src/resources/logconf/log4j2-std-debug.xml b/solr/test-framework/src/resources/logconf/log4j2-std-debug.xml
index 53c117b..ac820b2 100644
--- a/solr/test-framework/src/resources/logconf/log4j2-std-debug.xml
+++ b/solr/test-framework/src/resources/logconf/log4j2-std-debug.xml
@@ -37,7 +37,6 @@
         <AsyncLogger name="org.apache.solr.handler.admin.CollectionsHandler" level="INFO"/>
         <AsyncLogger name="org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler" level="INFO"/>
         <AsyncLogger name="org.apache.solr.cloud.api.collections.CreateCollectionCmd" level="INFO"/>
-        <!--  <AsyncLogger name="org.apache.solr.common.patterns.DW" level="DEBUG"/> -->
         <AsyncLogger name="org.apache.solr.cloud.overseer.ZkStateWriter" level="INFO"/>
         <AsyncLogger name="org.apache.solr.cloud.Overseer" level="INFO"/>
         <AsyncLogger name="org.apache.solr.cloud.OverseerTaskProcessor" level="INFO"/>
@@ -47,7 +46,8 @@
         <AsyncLogger name="org.apache.solr.client.solrj.impl.LBSolrClient" level="INFO"/>
         <AsyncLogger name="org.apache.solr.cloud.ZkController" level="INFO"/>
         <AsyncLogger name="org.apache.solr.common.cloud.ZkMaintenanceUtils" level="INFO"/>
-        <AsyncLogger name="org.apache.solr.update.processor.DistributedZkUpdateProcessor" level="INFO"/>
+        <AsyncLogger name="org.apache.solr.update.processor.DistributedZkUpdateProcessor" level="DEBUG"/>
+        <AsyncLogger name="org.apache.solr.update.processor.DistributedUpdateProcessor" level="DEBUG"/>
         <AsyncLogger name="org.apache.solr.update.SolrCmdDistributor" level="DEBUG"/>
         <AsyncLogger name="org.apache.solr.update.processor.LogUpdateProcessorFactory" level="DEBUG"/>