You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/08/11 16:08:49 UTC

[lucene-solr] branch reference_impl_dev updated: @508 Dist update work.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl_dev by this push:
     new 91aba24  @508 Dist update work.
91aba24 is described below

commit 91aba2446964653eabdba77112f02ec51168898f
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Tue Aug 11 11:06:54 2020 -0500

    @508 Dist update work.
---
 .../apache/solr/handler/RequestHandlerBase.java    |  2 +-
 .../component/SolrExecutorCompletionService.java   |  2 +-
 .../org/apache/solr/servlet/SolrQoSFilter.java     |  2 +-
 .../org/apache/solr/update/SolrCmdDistributor.java |  2 +-
 .../processor/DistributedUpdateProcessor.java      | 18 ++++++++------
 .../processor/DistributedZkUpdateProcessor.java    | 29 +++++++++++++++++-----
 .../solr/cloud/FullSolrCloudDistribCmdsTest.java   | 10 +++++---
 .../solr/client/solrj/impl/Http2SolrClient.java    |  1 -
 .../solr/client/solrj/impl/HttpSolrClient.java     |  5 ++--
 .../org/apache/solr/common/ParWorkExecService.java |  7 +++---
 .../org/apache/solr/common/ParWorkExecutor.java    |  2 +-
 .../org/apache/solr/common/cloud/SolrZkClient.java | 15 -----------
 .../apache/solr/common/cloud/ZkStateReader.java    |  6 ++---
 13 files changed, 54 insertions(+), 47 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java b/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java
index 8ec3de3..f892677 100644
--- a/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java
+++ b/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java
@@ -233,7 +233,7 @@ public abstract class RequestHandlerBase implements SolrRequestHandler, SolrInfo
         if (isTragic) {
           if (e instanceof SolrException) {
             // Tragic exceptions should always throw a server error
-            assert ((SolrException) e).code() == 500;
+            //assert ((SolrException) e).code() == 500;
           } else {
             // wrap it in a solr exception
             e = new SolrException(SolrException.ErrorCode.SERVER_ERROR, e.getMessage(), e);
diff --git a/solr/core/src/java/org/apache/solr/handler/component/SolrExecutorCompletionService.java b/solr/core/src/java/org/apache/solr/handler/component/SolrExecutorCompletionService.java
index a4c6998..fef2b9e 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/SolrExecutorCompletionService.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/SolrExecutorCompletionService.java
@@ -61,7 +61,7 @@ public class SolrExecutorCompletionService<V> implements CompletionService<V> {
       throw new NullPointerException();
     } else {
       RunnableFuture<V> f = this.newTaskFor(task, result);
-      this.executor.execute(new SolrExecutorCompletionService.QueueingFuture(f, this.completionQueue), true);
+      this.executor.doSubmit(new SolrExecutorCompletionService.QueueingFuture(f, this.completionQueue), true);
       return f;
     }
   }
diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java
index 9035239..8a1c2b4 100644
--- a/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java
+++ b/solr/core/src/java/org/apache/solr/servlet/SolrQoSFilter.java
@@ -60,7 +60,7 @@ public class SolrQoSFilter extends QoSFilter {
       throws IOException, ServletException {
     HttpServletRequest req = (HttpServletRequest) request;
     String source = req.getHeader(QoSParams.REQUEST_SOURCE);
-    if (source == null || !source.equals(QoSParams.INTERNAL)) {
+    if (!req.getPathInfo().startsWith("/img/") && (source == null || !source.equals(QoSParams.INTERNAL))) {
 
       // TODO - we don't need to call this *every* request
       double ourLoad = sysStats.getAvarageUsagePerCPU();
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index f7da47b..78b1f20 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -192,7 +192,7 @@ public class SolrCmdDistributor implements Closeable {
     Set<CountDownLatch> latches = new HashSet<>(nodes.size());
 
     // we need to do any retries before commit...
-  //  blockAndDoRetries();
+    blockAndDoRetries();
     if (log.isDebugEnabled()) log.debug("Distrib commit to: {} params: {}", nodes, params);
 
     for (Node node : nodes) {
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
index b5991dc..38d3b04 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java
@@ -242,9 +242,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
     }
 
     SolrInputDocument clonedDoc = shouldCloneCmdDoc() ? cmd.solrDoc.deepCopy(): null;
-
+    AddUpdateCommand cloneCmd = null;
     if (clonedDoc != null) {
-      cmd.solrDoc = clonedDoc;
+      cloneCmd = (AddUpdateCommand) cmd.clone();
+      cloneCmd.solrDoc = clonedDoc;
     }
     try (ParWork worker = new ParWork(this)) {
       if (!forwardToLeader) {
@@ -253,7 +254,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
           try {
 
             // TODO: possibly set checkDeleteByQueries as a flag on the command?
-            log.info("Local add cmd");
+            if (log.isDebugEnabled()) log.debug("Local add cmd");
             doLocalAdd(cmd);
 
             // if the update updates a doc that is part of a nested structure,
@@ -274,15 +275,16 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
         });
       }
       boolean zkAware = req.getCore().getCoreContainer().isZooKeeperAware();
-      log.info("Is zk aware {}", zkAware);
+      if (log.isDebugEnabled()) log.debug("Is zk aware {}", zkAware);
       if (zkAware && hasNodes()) {
 
-        log.info("Collect distrib add");
+        if (log.isDebugEnabled()) log.debug("Collect distrib add");
+        AddUpdateCommand finalCloneCmd = cloneCmd == null ? cmd : cloneCmd;
         worker.collect(() -> {
-          log.info("Run distrib add collection");
+          if (log.isDebugEnabled()) log.debug("Run distrib add collection");
           try {
-            doDistribAdd(cmd);
-            log.info("after distrib add collection");
+            doDistribAdd(finalCloneCmd);
+            if (log.isDebugEnabled()) log.debug("after distrib add collection");
           } catch (Throwable e) {
             ParWork.propegateInterrupt(e);
             throw new SolrException(ErrorCode.SERVER_ERROR, e);
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index 969659b..ff7d2c0 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -177,15 +177,13 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
       Replica leaderReplica = null;
       zkCheck();
 
-      nodes = getCollectionUrls(collection, EnumSet.of(Replica.Type.TLOG,Replica.Type.NRT), true);
-
       if (req.getParams().get(COMMIT_END_POINT, "").equals("terminal")) {
         log.info("Do a local commit on single replica directly");
         doLocalCommit(cmd);
         return;
       }
 
-
+      nodes = getCollectionUrls(collection, EnumSet.of(Replica.Type.TLOG,Replica.Type.NRT), true);
 
       if (nodes != null) {
         nodes.removeIf((node) -> node.getNodeProps().getCoreNodeName().equals(
@@ -230,8 +228,28 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
             log.info("send commit to leaders nodes={}", useNodes);
             params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
                     zkController.getBaseUrl(), req.getCore().getName()));
+            try (ParWork worker = new ParWork(this, false, true)) {
+              List<SolrCmdDistributor.Node> finalUseNodes1 = useNodes;
+              worker.collect(() -> {
+                cmdDistrib.distribCommit(cmd, finalUseNodes1, params);
+              });
 
-            cmdDistrib.distribCommit(cmd, useNodes, params);
+              if (isLeader) {
+
+                log.info("Do a local commit on NRT endpoint for leader");
+                worker.collect(() -> {
+                  try {
+                    doLocalCommit(cmd);
+                  } catch (IOException e) {
+                    log.error("Error on local commit");
+                    throw new SolrException(ErrorCode.SERVER_ERROR, e);
+                  }
+                });
+
+              }
+              worker.addCollect("commitToLeaders");
+            }
+            return;
           }
         }
         if (isLeader) {
@@ -241,7 +259,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
               log.info("Do a local commit on NRT endpoint for leader");
               try {
                 doLocalCommit(cmd);
-              } catch (Exception e) {
+              } catch (IOException e) {
                 log.error("Error on local commit");
                 throw new SolrException(ErrorCode.SERVER_ERROR, e);
               }
@@ -302,7 +320,6 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
 
   @Override
   protected void doDistribAdd(AddUpdateCommand cmd) throws IOException {
-    log.info("in zk dist add");
     log.info("Distribute add cmd {} to {} {}", cmd, nodes, isLeader);
     if (isLeader && !isSubShardLeader)  {
       DocCollection coll = clusterState.getCollection(collection);
diff --git a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
index 8638ef1..550f943 100644
--- a/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/FullSolrCloudDistribCmdsTest.java
@@ -63,6 +63,7 @@ import org.slf4j.LoggerFactory;
  * Super basic testing, no shard restarting or anything.
  */
 @Slow
+@Ignore
 public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
   private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private static final AtomicInteger NAME_COUNTER = new AtomicInteger(1);
@@ -114,7 +115,7 @@ public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
   public static String createAndSetNewDefaultCollection() throws Exception {
     final CloudHttp2SolrClient cloudClient = cluster.getSolrClient();
     final String name = "test_collection_" + NAME_COUNTER.getAndIncrement();
-    CollectionAdminRequest.createCollection(name, "_default", 2, 2).setMaxShardsPerNode(10)
+    CollectionAdminRequest.createCollection(name, "_default", 3, 4).setMaxShardsPerNode(10)
                  .process(cloudClient);
     cloudClient.setDefaultCollection(name);
     return name;
@@ -465,7 +466,7 @@ public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
     final CloudHttp2SolrClient cloudClient = cluster.getSolrClient();
     final String collectionName = createAndSetNewDefaultCollection();
 
-    final int numDocs = TEST_NIGHTLY ? atLeast(150) : 55;
+    final int numDocs = 5000;//TEST_NIGHTLY ? atLeast(500) : 5000;
     final JettySolrRunner nodeToUpdate = cluster.getRandomJetty(random());
     try (ConcurrentUpdateSolrClient indexClient
          = getConcurrentUpdateSolrClient(nodeToUpdate.getBaseUrl() + "/" + collectionName, 10, 2)) {
@@ -475,10 +476,13 @@ public class FullSolrCloudDistribCmdsTest extends SolrCloudTestCase {
         indexClient.add(sdoc("id", i, "text_t",
                              TestUtil.randomRealisticUnicodeString(random(), 200)));
       }
-      indexClient.blockUntilFinished();
+     // indexClient.blockUntilFinished();
       assertEquals(0, indexClient.commit().getStatus());
       indexClient.blockUntilFinished();
     }
+
+    cluster.waitForActiveCollection(collectionName, 3, 12);
+
     assertEquals(numDocs, cloudClient.query(params("q","*:*")).getResults().getNumFound());
 
     checkShardConsistency(params("q","*:*", "rows", ""+(1 + numDocs),"_trace","addAll"));
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index 02044e5..cc61151 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -260,7 +260,6 @@ public class Http2SolrClient extends SolrClient {
           }
         });
       }
-      closer.collect(httpClientExecutor);
       closer.collect(() -> {
 
         try {
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
index 4273109..f146b58 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/HttpSolrClient.java
@@ -50,6 +50,7 @@ import org.apache.solr.client.solrj.V2RequestSupport;
 import org.apache.solr.client.solrj.request.RequestWriter;
 import org.apache.solr.client.solrj.request.V2Request;
 import org.apache.solr.common.ParWork;
+import org.apache.solr.common.ParWorkExecService;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
@@ -314,13 +315,11 @@ public class HttpSolrClient extends BaseHttpSolrClient {
   public HttpUriRequestResponse httpUriRequest(final SolrRequest request, final ResponseParser processor) throws SolrServerException, IOException {
     HttpUriRequestResponse mrr = new HttpUriRequestResponse();
     final HttpRequestBase method = createMethod(request, null);
-    ExecutorService pool = ExecutorUtil.newMDCAwareFixedThreadPool(1, new SolrNamedThreadFactory("httpUriRequest"));
     try {
       MDC.put("HttpSolrClient.url", baseUrl);
-      mrr.future = pool.submit(() -> executeMethod(method, request.getUserPrincipal(), processor, isV2ApiRequest(request)));
+      mrr.future = ((ParWorkExecService) ParWork.getExecutor()).doSubmit(() -> executeMethod(method, request.getUserPrincipal(), processor, isV2ApiRequest(request)), true);
  
     } finally {
-      pool.shutdown();
       MDC.remove("HttpSolrClient.url");
     }
     assert method != null;
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
index 1baf365..5902aad 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
@@ -225,7 +225,7 @@ public class ParWorkExecService implements ExecutorService {
             throws InterruptedException, ExecutionException, TimeoutException {
           Object ret;
           try {
-            ret = future.get();
+            ret = future.get(l, timeUnit);
           } finally {
             available.release();
           }
@@ -326,11 +326,12 @@ public class ParWorkExecService implements ExecutorService {
     }
 
     double ourLoad = ParWork.getSysStats().getAvarageUsagePerCPU();
-    if (ourLoad > 1) {
+
+    if (ourLoad > 99.0D) {
       return false;
     } else {
       double sLoad = load / (double) ParWork.PROC_COUNT;
-      if (sLoad > 1.0D) {
+      if (sLoad > ParWork.PROC_COUNT) {
         return false;
       }
       if (log.isDebugEnabled()) log.debug("ParWork, load:" + sLoad);
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
index 311b1b7c..9c3adb5 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecutor.java
@@ -50,7 +50,7 @@ public class ParWorkExecutor extends ExecutorUtil.MDCAwareThreadPoolExecutor {
                 Thread t = new Thread(group, r, name + threadNumber.getAndIncrement(), 0) {
                     public void run() {
                         try {
-                            super.run();
+                            r.run();
                         } finally {
                             ParWork.closeExecutor();
                         }
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 10b3234..141affb 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -171,26 +171,11 @@ public class SolrZkClient implements Closeable {
       connManager.start();
       connManager.waitForConnected(this.zkClientConnectTimeout);
     } catch (TimeoutException e) {
-      try (ParWork worker = new ParWork(this, true)) {
-        worker.add("zkCallbackExecutor", zkCallbackExecutor);
-        worker.add("connectionManager", connManager);
-        worker.add("zkCallbackExecutor", zkConnManagerCallbackExecutor);
-      }
       throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
     } catch (InterruptedException e) {
       ParWork.propegateInterrupt(e);
-      try (ParWork worker = new ParWork(this, true)) {
-        worker.add("zkCallbackExecutor", zkCallbackExecutor);
-        worker.add("connectionManager", connManager);
-        worker.add("zkCallbackExecutor", zkConnManagerCallbackExecutor);
-      }
       throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
     } catch (IOException e) {
-      try (ParWork worker = new ParWork(this, true)) {
-        worker.add("zkCallbackExecutor", zkCallbackExecutor);
-        worker.add("connectionManager", connManager);
-        worker.add("zkCallbackExecutor", zkConnManagerCallbackExecutor);
-      }
       throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
     }
     return this;
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index 6265a59..bdcd0686 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -762,8 +762,8 @@ public class ZkStateReader implements SolrCloseable {
 
   private class LazyCollectionRef extends ClusterState.CollectionRef {
     private final String collName;
-    private long lastUpdateTime;
-    private DocCollection cachedDocCollection;
+    private volatile long lastUpdateTime;
+    private volatile DocCollection cachedDocCollection;
 
     public LazyCollectionRef(String collName) {
       super(null);
@@ -772,7 +772,7 @@ public class ZkStateReader implements SolrCloseable {
     }
 
     @Override
-    public synchronized DocCollection get(boolean allowCached) {
+    public DocCollection get(boolean allowCached) {
       gets.incrementAndGet();
       if (!allowCached || lastUpdateTime < 0 || System.nanoTime() - lastUpdateTime > LAZY_CACHE_TIME) {
         boolean shouldFetch = true;