You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/07/30 16:00:41 UTC

[lucene-solr] 05/06: @471 Give recovery strat some retry sleep time, reinstate some thread management tactics that were briefly commented out, auto hard commit is a bit aggressive considering RAM these days.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git

commit 3071b12939f1cc7a98280067e6579b589a86624c
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Jul 30 10:04:22 2020 -0500

    @471 Give recovery strat some retry sleep time, reinstate some thread management tactics that were briefly commented out, auto hard commit is a bit aggressive considering RAM these days.
---
 .../org/apache/solr/cloud/RecoveryStrategy.java    |   5 +-
 .../org/apache/solr/update/SolrCmdDistributor.java |   2 +-
 .../processor/DistributedZkUpdateProcessor.java    |   6 ++
 .../solr/configsets/_default/conf/solrconfig.xml   |   2 +-
 .../solr/client/solrj/impl/Http2SolrClient.java    |   2 +-
 .../org/apache/solr/common/ParWorkExecService.java | 114 ++++++++-------------
 .../src/java/org/apache/solr/SolrTestCase.java     |   6 +-
 7 files changed, 57 insertions(+), 80 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
index ced5bf5..d335428 100644
--- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
+++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
@@ -72,6 +72,7 @@ import org.apache.solr.update.PeerSyncWithLeader;
 import org.apache.solr.update.UpdateLog;
 import org.apache.solr.update.UpdateLog.RecoveryInfo;
 import org.apache.solr.update.UpdateShardHandlerConfig;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
 import org.apache.solr.util.RefCounted;
 import org.apache.solr.util.SolrPluginUtils;
 import org.apache.solr.util.plugin.NamedListInitializedPlugin;
@@ -114,7 +115,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
       .getInteger("solr.cloud.wait-for-updates-with-stale-state-pause", 0);
   private volatile int maxRetries = 500;
   private volatile int startingRecoveryDelayMilliSeconds = Integer
-          .getInteger("solr.cloud.starting-recovery-delay-milli-seconds", 0);
+          .getInteger("solr.cloud.starting-recovery-delay-milli-seconds", 1000);
 
   public static interface RecoveryListener {
     public void recovered();
@@ -341,7 +342,7 @@ public class RecoveryStrategy implements Runnable, Closeable {
     try (HttpSolrClient client = buildRecoverySolrClient(leaderUrl)) {
       UpdateRequest ureq = new UpdateRequest();
       ureq.setParams(new ModifiableSolrParams());
-      // ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
+      ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, "terminal");
       // ureq.getParams().set(UpdateParams.OPEN_SEARCHER, onlyLeaderIndexes);// Why do we need to open searcher if
       // "onlyLeaderIndexes"?
       ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
diff --git a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
index da12d4c..5adebab 100644
--- a/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
+++ b/solr/core/src/java/org/apache/solr/update/SolrCmdDistributor.java
@@ -78,7 +78,7 @@ public class SolrCmdDistributor implements Closeable {
 
   public SolrCmdDistributor(UpdateShardHandler updateShardHandler) {
     assert ObjectReleaseTracker.track(this);
-    this.solrClient = new Http2SolrClient.Builder().markInternalRequest().withHttpClient(updateShardHandler.getUpdateOnlyHttpClient()).build();
+    this.solrClient = new Http2SolrClient.Builder().markInternalRequest().withHttpClient(updateShardHandler.getUpdateOnlyHttpClient()).idleTimeout(60000).build();
   }
 
   public void finish() {
diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index c68390b..6c325b7 100644
--- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -174,6 +174,12 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
 
       nodes = getCollectionUrls(collection, EnumSet.of(Replica.Type.TLOG,Replica.Type.NRT), true);
 
+      if (req.getParams().get(COMMIT_END_POINT, "").equals("terminal")) {
+        log.info("Do a local commit on single replica directly");
+        doLocalCommit(cmd);
+        return;
+      }
+
 
 
       if (nodes != null) {
diff --git a/solr/server/solr/configsets/_default/conf/solrconfig.xml b/solr/server/solr/configsets/_default/conf/solrconfig.xml
index 44a024b..6fc8b0d 100644
--- a/solr/server/solr/configsets/_default/conf/solrconfig.xml
+++ b/solr/server/solr/configsets/_default/conf/solrconfig.xml
@@ -309,7 +309,7 @@
          have some sort of hard autoCommit to limit the log size.
       -->
     <autoCommit>
-      <maxTime>${solr.autoCommit.maxTime:15000}</maxTime>
+      <maxTime>${solr.autoCommit.maxTime:30000}</maxTime>
       <openSearcher>false</openSearcher>
     </autoCommit>
 
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
index d7d418a..b1ef811 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/Http2SolrClient.java
@@ -886,7 +886,7 @@ public class Http2SolrClient extends SolrClient {
   private class AsyncTracker {
 
     // nocommit - look at outstanding max again
-    private static final int MAX_OUTSTANDING_REQUESTS = 1000;
+    private static final int MAX_OUTSTANDING_REQUESTS = 100;
 
     private final Semaphore available;
 
diff --git a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
index 634216d..5e830cf 100644
--- a/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
+++ b/solr/solrj/src/java/org/apache/solr/common/ParWorkExecService.java
@@ -24,7 +24,7 @@ public class ParWorkExecService implements ExecutorService {
   private static final Logger log = LoggerFactory
       .getLogger(MethodHandles.lookup().lookupClass());
 
-  private static final int MAX_AVAILABLE = 500;//ParWork.PROC_COUNT;
+  private static final int MAX_AVAILABLE = ParWork.PROC_COUNT;
   private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
 
   private final Phaser phaser = new Phaser(1) {
@@ -93,18 +93,17 @@ public class ParWorkExecService implements ExecutorService {
       throw new RejectedExecutionException();
     }
     try {
-//      if (!requiresAnotherThread) {
-//        boolean success = checkLoad();
-//        if (success) {
-//          success = available.tryAcquire();
-//        }
-//        if (!success) {
-//          available.acquire();
-//          return CompletableFuture.completedFuture(callable.call());
-//        }
-//      } else {
-        available.acquire();
-    //  }
+      if (!requiresAnotherThread) {
+        boolean success = checkLoad();
+        if (success) {
+          success = available.tryAcquire();
+        }
+        if (!success) {
+          return CompletableFuture.completedFuture(callable.call());
+        }
+      } else {
+        available.acquireUninterruptibly();
+      }
       Future<T> future = service.submit(callable);
       return new Future<T>() {
         @Override
@@ -158,23 +157,13 @@ public class ParWorkExecService implements ExecutorService {
       throw new RejectedExecutionException();
     }
     boolean success = checkLoad();
-    try {
-      available.acquire();
-    } catch (InterruptedException e) {
-      ParWork.propegateInterrupt(e);
+    if (success) {
+      success = available.tryAcquire();
+    }
+    if (!success) {
+      runnable.run();
+      return CompletableFuture.completedFuture(null);
     }
-    //    if (success) {
-//      success = available.tryAcquire();
-//    }
-//    if (!success) {
-//      try {
-//        awaitOutstanding(10, TimeUnit.SECONDS);
-//      } catch (InterruptedException e) {
-//        ParWork.propegateInterrupt(e);
-//      }
-//      runnable.run();
-//      return CompletableFuture.completedFuture(null);
-//    }
     return service.submit(new Runnable() {
       @Override
       public void run() {
@@ -197,31 +186,17 @@ public class ParWorkExecService implements ExecutorService {
     if (shutdown || terminated) {
       throw new RejectedExecutionException();
     }
-//    if (!requiresAnotherThread) {
-//      boolean success = checkLoad();
-//      if (success) {
-//        success = available.tryAcquire();
-//      }
-//      if (!success) {
-//        try {
-//          awaitOutstanding(10, TimeUnit.SECONDS);
-//        } catch (InterruptedException e) {
-//          ParWork.propegateInterrupt(e);
-//        }
-//        runnable.run();
-//        return CompletableFuture.completedFuture(null);
-//      }
-//    } else {
-////      try {
-////        available.acquire();
-////      } catch (InterruptedException e) {
-////        ParWork.propegateInterrupt(e);
-////      }
-//    }
-    try {
-      available.acquire();
-    } catch (InterruptedException e) {
-      ParWork.propegateInterrupt(e);
+    if (!requiresAnotherThread) {
+      boolean success = checkLoad();
+      if (success) {
+        success = available.tryAcquire();
+      }
+      if (!success) {
+        runnable.run();
+        return CompletableFuture.completedFuture(null);
+      }
+    } else {
+      available.acquireUninterruptibly();
     }
     Future<?> future = service.submit(runnable);
 
@@ -317,30 +292,25 @@ public class ParWorkExecService implements ExecutorService {
     if (shutdown || terminated) {
       throw new RejectedExecutionException();
     }
-//    boolean success = checkLoad();
-//    if (success) {
-//      success = available.tryAcquire();
-//    }
-//    if (!success) {
-//      try {
-//        awaitOutstanding(10, TimeUnit.SECONDS);
-//      } catch (InterruptedException e) {
-//        ParWork.propegateInterrupt(e);
-//      }
-//      try {
-//        runnable.run();
-//      } finally {
-//        available.release();
-//      }
-//      return;
-//    }
+    boolean success = checkLoad();
+    if (success) {
+      success = available.tryAcquire();
+    }
+    if (!success) {
+      try {
+        runnable.run();
+      } finally {
+        available.release();
+      }
+      return;
+    }
     service.execute(new Runnable() {
       @Override
       public void run() {
         try {
           runnable.run();
         } finally {
-         // available.release();
+          available.release();
         }
       }
     });
diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
index 56cde50..c90e7f2 100644
--- a/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
+++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCase.java
@@ -283,14 +283,14 @@ public class SolrTestCase extends LuceneTestCase {
       System.setProperty("leaderVoteWait", "5000"); // this is also apparently controlling how long we wait for a leader on register nocommit
       System.setProperty("leaderConflictResolveWait", "10000");
 
-      System.setProperty("solr.recovery.recoveryThrottle", "250");
-      System.setProperty("solr.recovery.leaderThrottle", "50");
+      System.setProperty("solr.recovery.recoveryThrottle", "500");
+      System.setProperty("solr.recovery.leaderThrottle", "100");
 
       System.setProperty("bucketVersionLockTimeoutMs", "8000");
       System.setProperty("socketTimeout", "30000");
       System.setProperty("connTimeout", "10000");
       System.setProperty("solr.cloud.wait-for-updates-with-stale-state-pause", "0");
-      System.setProperty("solr.cloud.starting-recovery-delay-milli-seconds", "0");
+      System.setProperty("solr.cloud.starting-recovery-delay-milli-seconds", "500");
 
       System.setProperty("solr.waitForState", "5"); // secs