You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by ma...@apache.org on 2020/09/03 15:21:04 UTC
[lucene-solr] branch reference_impl_dev updated: @720 UpdateLog
executor usage fix/improve.
This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch reference_impl_dev
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/reference_impl_dev by this push:
new c80d097 @720 UpdateLog executor usage fix/improve.
c80d097 is described below
commit c80d097ef572cd3048121f748d0d7b6e0eaf2115
Author: markrmiller@gmail.com <ma...@gmail.com>
AuthorDate: Thu Sep 3 10:20:12 2020 -0500
@720 UpdateLog executor usage fix/improve.
---
.../src/java/org/apache/solr/update/UpdateLog.java | 52 ++++++++++------------
1 file changed, 23 insertions(+), 29 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
index a9e319e..e5c9069 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java
@@ -553,12 +553,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
public String[] getLogList(File directory) {
final String prefix = TLOG_NAME+'.';
- String[] names = directory.list(new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- return name.startsWith(prefix);
- }
- });
+ String[] names = directory.list(new MyFilenameFilter(prefix));
if (names == null) {
throw new RuntimeException(new FileNotFoundException(directory.getAbsolutePath()));
}
@@ -1010,7 +1005,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
// This can happen when trying to deserialize the entry at position lookupPointer,
// but from a different tlog than the one containing the desired entry.
// Just ignore the exception, so as to proceed to the next tlog.
- log.debug("Exception reading the log (this is expected, don't worry)={}, for version={}. This can be ignored"
+ log.info("Exception reading the log (this is expected, don't worry)={}, for version={}. This can be ignored"
, lookupLog, lookupVersion);
}
@@ -1165,7 +1160,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
if (recoverLogs.isEmpty()) return null;
- ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<>(recoveryExecutor);
+ ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<>(ParWork.getRootSharedExecutor());
LogReplayer replayer = new LogReplayer(recoverLogs, false);
versionInfo.blockUpdates();
@@ -1201,7 +1196,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
recoveryInfo = new RecoveryInfo();
tlog.incref();
- ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<>(recoveryExecutor);
+ ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<>(ParWork.getRootSharedExecutor());
LogReplayer replayer = new LogReplayer(Collections.singletonList(tlog), false, true);
versionInfo.blockUpdates();
@@ -1403,7 +1398,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
public void close(boolean committed, boolean deleteOnClose) {
this.isClosed = true;
- recoveryExecutor.shutdown(); // no new tasks
synchronized (this) {
@@ -1428,13 +1422,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
}
- try {
- ExecutorUtil.shutdownAndAwaitTermination(recoveryExecutor);
- } catch (Exception e) {
- ParWork.propegateInterrupt(e);
- SolrException.log(log, e);
- }
-
ObjectReleaseTracker.release(this);
}
@@ -1456,17 +1443,32 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
}
}
- public class RecentUpdates implements Closeable {
+ private static class MyFilenameFilter implements FilenameFilter {
+ private final String prefix;
+
+ public MyFilenameFilter(String prefix) {
+ this.prefix = prefix;
+ }
+
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.startsWith(prefix);
+ }
+ }
+
+ public static class RecentUpdates implements Closeable {
final Deque<TransactionLog> logList; // newest first
+ private final int numRecordsToKeep;
List<List<Update>> updateList;
HashMap<Long, Update> updates;
List<Update> deleteByQueryList;
List<DeleteUpdate> deleteList;
Set<Long> bufferUpdates = new HashSet<>();
- public RecentUpdates(Deque<TransactionLog> logList) {
+ public RecentUpdates(Deque<TransactionLog> logList, int numRecordsToKeep) {
this.logList = logList;
+ this.numRecordsToKeep = numRecordsToKeep;
boolean success = false;
try {
update();
@@ -1648,7 +1650,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
// TODO: what if I hand out a list of updates, then do an update, then hand out another list (and
// one of the updates I originally handed out fell off the list). Over-request?
- return new RecentUpdates(logList);
+ return new RecentUpdates(logList, numRecordsToKeep);
}
@@ -1737,10 +1739,7 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
versionInfo.unblockUpdates();
}
- if (recoveryExecutor.isShutdown()) {
- throw new RuntimeException("executor is not running...");
- }
- ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<>(recoveryExecutor);
+ ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<>(ParWork.getRootSharedExecutor());
LogReplayer replayer = new LogReplayer(Collections.singletonList(bufferTlog), true);
return cs.submit(() -> {
replayer.run();
@@ -2144,11 +2143,6 @@ public class UpdateLog implements PluginInfoInitialized, SolrMetricProducer {
}
return cmd;
}
-
- ThreadPoolExecutor recoveryExecutor = new ExecutorUtil.MDCAwareThreadPoolExecutor(0,
- Integer.MAX_VALUE, 1, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
- new SolrNamedThreadFactory("recoveryExecutor"));
-
public static void deleteFile(File file) {
boolean success = false;