You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/09/03 15:21:04 UTC

[lucene-solr] branch reference_impl_dev updated: @720 UpdateLog executor usage fix/improve.

This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/reference_impl_dev by this push:
     new c80d097  @720 UpdateLog executor usage fix/improve.
c80d097 is described below

commit c80d097ef572cd3048121f748d0d7b6e0eaf2115
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Sep 3 10:20:12 2020 -0500

    @720 UpdateLog executor usage fix/improve.
---
 .../src/java/org/apache/solr/update/UpdateLog.java | 52 ++++++++++------------
 1 file changed, 23 insertions(+), 29 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index a9e319e..e5c9069 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -553,12 +553,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
 
   public String[] getLogList(File directory) {
     final String prefix = TLOG_NAME+'.';
-    String[] names = directory.list(new FilenameFilter() {
-      @Override
-      public boolean accept(File dir, String name) {
-        return name.startsWith(prefix);
-      }
-    });
+    String[] names = directory.list(new MyFilenameFilter(prefix));
     if (names == null) {
       throw new RuntimeException(new FileNotFoundException(directory.getAbsolutePath()));
     }
@@ -1010,7 +1005,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
             // This can happen when trying to deserialize the entry at position lookupPointer,
             // but from a different tlog than the one containing the desired entry.
             // Just ignore the exception, so as to proceed to the next tlog.
-            log.debug("Exception reading the log (this is expected, don't worry)={}, for version={}. This can be ignored"
+            log.info("Exception reading the log (this is expected, don't worry)={}, for version={}. This can be ignored"
                 , lookupLog, lookupVersion);
           }
 
@@ -1165,7 +1160,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
 
     if (recoverLogs.isEmpty()) return null;
 
-    ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<>(recoveryExecutor);
+    ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<>(ParWork.getRootSharedExecutor());
     LogReplayer replayer = new LogReplayer(recoverLogs, false);
 
     versionInfo.blockUpdates();
@@ -1201,7 +1196,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
     recoveryInfo = new RecoveryInfo();
     tlog.incref();
 
-    ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<>(recoveryExecutor);
+    ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<>(ParWork.getRootSharedExecutor());
     LogReplayer replayer = new LogReplayer(Collections.singletonList(tlog), false, true);
 
     versionInfo.blockUpdates();
@@ -1403,7 +1398,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
 
   public void close(boolean committed, boolean deleteOnClose) {
     this.isClosed = true;
-    recoveryExecutor.shutdown(); // no new tasks
 
     synchronized (this) {
 
@@ -1428,13 +1422,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
 
     }
 
-    try {
-      ExecutorUtil.shutdownAndAwaitTermination(recoveryExecutor);
-    } catch (Exception e) {
-      ParWork.propegateInterrupt(e);
-      SolrException.log(log, e);
-    }
-
     ObjectReleaseTracker.release(this);
   }
 
@@ -1456,17 +1443,32 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
     }
   }
 
-  public class RecentUpdates implements Closeable {
+  private static class MyFilenameFilter implements FilenameFilter {
+    private final String prefix;
+
+    public MyFilenameFilter(String prefix) {
+      this.prefix = prefix;
+    }
+
+    @Override
+    public boolean accept(File dir, String name) {
+      return name.startsWith(prefix);
+    }
+  }
+
+  public static class RecentUpdates implements Closeable {
 
     final Deque<TransactionLog> logList;    // newest first
+    private final int numRecordsToKeep;
     List<List<Update>> updateList;
     HashMap<Long, Update> updates;
     List<Update> deleteByQueryList;
     List<DeleteUpdate> deleteList;
     Set<Long> bufferUpdates = new HashSet<>();
 
-    public RecentUpdates(Deque<TransactionLog> logList) {
+    public RecentUpdates(Deque<TransactionLog> logList, int numRecordsToKeep) {
       this.logList = logList;
+      this.numRecordsToKeep = numRecordsToKeep;
       boolean success = false;
       try {
         update();
@@ -1648,7 +1650,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
 
     // TODO: what if I hand out a list of updates, then do an update, then hand out another list (and
     // one of the updates I originally handed out fell off the list).  Over-request?
-    return new RecentUpdates(logList);
+    return new RecentUpdates(logList, numRecordsToKeep);
 
   }
 
@@ -1737,10 +1739,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
       versionInfo.unblockUpdates();
     }
 
-    if (recoveryExecutor.isShutdown()) {
-      throw new RuntimeException("executor is not running...");
-    }
-    ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<>(recoveryExecutor);
+    ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<>(ParWork.getRootSharedExecutor());
     LogReplayer replayer = new LogReplayer(Collections.singletonList(bufferTlog), true);
     return cs.submit(() -> {
       replayer.run();
@@ -2144,11 +2143,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
     }
     return cmd;
   }
-  
-  ThreadPoolExecutor recoveryExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor(0,
-      Integer.MAX_VALUE, 1, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
-      new SolrNamedThreadFactory("recoveryExecutor"));
-
 
   public static void deleteFile(File file) {
     boolean success = false;