You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by at...@apache.org on 2020/04/01 15:12:38 UTC
[lucene-solr] branch master updated: LUCENE-9074: Slice Allocation
Control Plane For Concurrent Searches (#1294)
This is an automated email from the ASF dual-hosted git repository.
atri pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git
The following commit(s) were added to refs/heads/master by this push:
new 9ed71a6 LUCENE-9074: Slice Allocation Control Plane For Concurrent Searches (#1294)
9ed71a6 is described below
commit 9ed71a6efe2d11fd184de53d4191753b2183b73e
Author: Atri Sharma <at...@gmail.com>
AuthorDate: Wed Apr 1 20:42:26 2020 +0530
LUCENE-9074: Slice Allocation Control Plane For Concurrent Searches (#1294)
This commit introduces a mechanism to control allocation of threads to slices planned for a query.
The default implementation uses the size of backlog queue of the executor to determine if a slice should be allocated a new thread
---
.../org/apache/lucene/search/IndexSearcher.java | 58 +++++++------
.../lucene/search/QueueSizeBasedExecutor.java | 60 ++++++++++++++
.../org/apache/lucene/search/SliceExecutor.java | 80 ++++++++++++++++++
.../apache/lucene/search/TestIndexSearcher.java | 94 +++++++++++++++++++++-
4 files changed, 267 insertions(+), 25 deletions(-)
diff --git a/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java b/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java
index f658de0..44fba0a 100644
--- a/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java
+++ b/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java
@@ -26,12 +26,11 @@ import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
-import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
@@ -123,6 +122,9 @@ public class IndexSearcher {
// These are only used for multi-threaded search
private final Executor executor;
+ // Used internally for load balancing threads executing for the query
+ private final SliceExecutor sliceExecutor;
+
// the default Similarity
private static final Similarity defaultSimilarity = new BM25Similarity();
@@ -208,9 +210,17 @@ public class IndexSearcher {
* @lucene.experimental
*/
public IndexSearcher(IndexReaderContext context, Executor executor) {
+ this(context, executor, getSliceExecutionControlPlane(executor));
+ }
+
+ // Package private for testing
+ IndexSearcher(IndexReaderContext context, Executor executor, SliceExecutor sliceExecutor) {
assert context.isTopLevel: "IndexSearcher's ReaderContext must be topLevel for reader" + context.reader();
+ assert (sliceExecutor == null) == (executor==null);
+
reader = context.reader();
this.executor = executor;
+ this.sliceExecutor = sliceExecutor;
this.readerContext = context;
leafContexts = context.leaves();
this.leafSlices = executor == null ? null : slices(leafContexts);
@@ -662,36 +672,21 @@ public class IndexSearcher {
}
query = rewrite(query);
final Weight weight = createWeight(query, scoreMode, 1);
- final List<Future<C>> topDocsFutures = new ArrayList<>(leafSlices.length);
- for (int i = 0; i < leafSlices.length - 1; ++i) {
+ final List<FutureTask<C>> listTasks = new ArrayList<>();
+ for (int i = 0; i < leafSlices.length; ++i) {
final LeafReaderContext[] leaves = leafSlices[i].leaves;
final C collector = collectors.get(i);
FutureTask<C> task = new FutureTask<>(() -> {
search(Arrays.asList(leaves), weight, collector);
return collector;
});
- boolean executedOnCallerThread = false;
- try {
- executor.execute(task);
- } catch (RejectedExecutionException e) {
- // Execute on caller thread
- search(Arrays.asList(leaves), weight, collector);
- topDocsFutures.add(CompletableFuture.completedFuture(collector));
- executedOnCallerThread = true;
- }
- // Do not add the task's future if it was not used
- if (executedOnCallerThread == false) {
- topDocsFutures.add(task);
- }
+ listTasks.add(task);
}
- final LeafReaderContext[] leaves = leafSlices[leafSlices.length - 1].leaves;
- final C collector = collectors.get(leafSlices.length - 1);
- // execute the last on the caller thread
- search(Arrays.asList(leaves), weight, collector);
- topDocsFutures.add(CompletableFuture.completedFuture(collector));
+
+ sliceExecutor.invokeAll(listTasks);
final List<C> collectedCollectors = new ArrayList<>();
- for (Future<C> future : topDocsFutures) {
+ for (Future<C> future : listTasks) {
try {
collectedCollectors.add(future.get());
} catch (InterruptedException e) {
@@ -878,7 +873,7 @@ public class IndexSearcher {
@Override
public String toString() {
- return "IndexSearcher(" + reader + "; executor=" + executor + ")";
+ return "IndexSearcher(" + reader + "; executor=" + executor + "; sliceExecutionControlPlane " + sliceExecutor + ")";
}
/**
@@ -943,4 +938,19 @@ public class IndexSearcher {
super("maxClauseCount is set to " + maxClauseCount);
}
}
+
+ /**
+ * Return the SliceExecutionControlPlane instance to be used for this IndexSearcher instance
+ */
+ private static SliceExecutor getSliceExecutionControlPlane(Executor executor) {
+ if (executor == null) {
+ return null;
+ }
+
+ if (executor instanceof ThreadPoolExecutor) {
+ return new QueueSizeBasedExecutor((ThreadPoolExecutor) executor);
+ }
+
+ return new SliceExecutor(executor);
+ }
}
diff --git a/lucene/core/src/java/org/apache/lucene/search/QueueSizeBasedExecutor.java b/lucene/core/src/java/org/apache/lucene/search/QueueSizeBasedExecutor.java
new file mode 100644
index 0000000..e579a5a
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/search/QueueSizeBasedExecutor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.search;
+
+import java.util.Collection;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * Derivative of SliceExecutor that controls the number of active threads
+ * that are used for a single query. At any point, no more than (maximum pool size of the executor * LIMITING_FACTOR)
+ * tasks should be active. If the limit is exceeded, further segments are searched on the caller thread
+ */
+class QueueSizeBasedExecutor extends SliceExecutor {
+ private static final double LIMITING_FACTOR = 1.5;
+
+ private final ThreadPoolExecutor threadPoolExecutor;
+
+ public QueueSizeBasedExecutor(ThreadPoolExecutor threadPoolExecutor) {
+ super(threadPoolExecutor);
+ this.threadPoolExecutor = threadPoolExecutor;
+ }
+
+ @Override
+ public void invokeAll(Collection<? extends Runnable> tasks) {
+ int i = 0;
+
+ for (Runnable task : tasks) {
+ boolean shouldExecuteOnCallerThread = false;
+
+ // Execute last task on caller thread
+ if (i == tasks.size() - 1) {
+ shouldExecuteOnCallerThread = true;
+ }
+
+ if (threadPoolExecutor.getQueue().size() >=
+ (threadPoolExecutor.getMaximumPoolSize() * LIMITING_FACTOR)) {
+ shouldExecuteOnCallerThread = true;
+ }
+
+ processTask(task, shouldExecuteOnCallerThread);
+
+ ++i;
+ }
+ }
+}
diff --git a/lucene/core/src/java/org/apache/lucene/search/SliceExecutor.java b/lucene/core/src/java/org/apache/lucene/search/SliceExecutor.java
new file mode 100644
index 0000000..680c16b
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/search/SliceExecutor.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.search;
+
+import java.util.Collection;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+
+/**
+ * Executor which is responsible
+ * for execution of slices based on the current status
+ * of the system and current system load
+ */
+class SliceExecutor {
+ private final Executor executor;
+
+ public SliceExecutor(Executor executor) {
+ this.executor = executor;
+ }
+
+ public void invokeAll(Collection<? extends Runnable> tasks) {
+
+ if (tasks == null) {
+ throw new IllegalArgumentException("Tasks is null");
+ }
+
+ if (executor == null) {
+ throw new IllegalArgumentException("Executor is null");
+ }
+
+ int i = 0;
+
+ for (Runnable task : tasks) {
+ boolean shouldExecuteOnCallerThread = false;
+
+ // Execute last task on caller thread
+ if (i == tasks.size() - 1) {
+ shouldExecuteOnCallerThread = true;
+ }
+
+ processTask(task, shouldExecuteOnCallerThread);
+ ++i;
+ };
+ }
+
+ // Helper method to execute a single task
+ protected void processTask(final Runnable task,
+ final boolean shouldExecuteOnCallerThread) {
+ if (task == null) {
+ throw new IllegalArgumentException("Input is null");
+ }
+
+ if (!shouldExecuteOnCallerThread) {
+ try {
+ executor.execute(task);
+
+ return;
+ } catch (RejectedExecutionException e) {
+ // Execute on caller thread
+ }
+ }
+
+ task.run();
+ }
+}
diff --git a/lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java b/lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java
index 6617c60..aafd883 100644
--- a/lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java
+++ b/lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java
@@ -22,8 +22,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
@@ -34,8 +36,8 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.Field;
+import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.SortedDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexReader;
@@ -61,12 +63,17 @@ public class TestIndexSearcher extends LuceneTestCase {
super.setUp();
dir = newDirectory();
RandomIndexWriter iw = new RandomIndexWriter(random(), dir);
+ Random random = random();
for (int i = 0; i < 100; i++) {
Document doc = new Document();
doc.add(newStringField("field", Integer.toString(i), Field.Store.NO));
doc.add(newStringField("field2", Boolean.toString(i % 2 == 0), Field.Store.NO));
doc.add(new SortedDocValuesField("field2", new BytesRef(Boolean.toString(i % 2 == 0))));
iw.addDocument(doc);
+
+ if (random.nextBoolean()) {
+ iw.commit();
+ }
}
reader = iw.getReader();
iw.close();
@@ -347,4 +354,89 @@ public class TestIndexSearcher extends LuceneTestCase {
throw new RejectedExecutionException();
}
}
+
+ public void testQueueSizeBasedSliceExecutor() throws Exception {
+ ThreadPoolExecutor service = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new NamedThreadFactory("TestIndexSearcher"));
+
+ runSliceExecutorTest(service, false);
+
+ TestUtil.shutdownExecutorService(service);
+ }
+
+ public void testRandomBlockingSliceExecutor() throws Exception {
+ ThreadPoolExecutor service = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ new NamedThreadFactory("TestIndexSearcher"));
+
+ runSliceExecutorTest(service, true);
+
+ TestUtil.shutdownExecutorService(service);
+ }
+
+ private void runSliceExecutorTest(ThreadPoolExecutor service, boolean useRandomSliceExecutor) throws Exception {
+ SliceExecutor sliceExecutor = useRandomSliceExecutor == true ? new RandomBlockingSliceExecutor(service) :
+ new QueueSizeBasedExecutor(service);
+
+ IndexSearcher searcher = new IndexSearcher(reader.getContext(), service, sliceExecutor);
+
+ Query queries[] = new Query[] {
+ new MatchAllDocsQuery(),
+ new TermQuery(new Term("field", "1"))
+ };
+ Sort sorts[] = new Sort[] {
+ null,
+ new Sort(new SortField("field2", SortField.Type.STRING))
+ };
+ ScoreDoc afters[] = new ScoreDoc[] {
+ null,
+ new FieldDoc(0, 0f, new Object[] { new BytesRef("boo!") })
+ };
+
+ for (ScoreDoc after : afters) {
+ for (Query query : queries) {
+ for (Sort sort : sorts) {
+ searcher.search(query, Integer.MAX_VALUE);
+ searcher.searchAfter(after, query, Integer.MAX_VALUE);
+ if (sort != null) {
+ TopDocs topDocs = searcher.search(query, Integer.MAX_VALUE, sort);
+ assertTrue(topDocs.totalHits.value > 0);
+
+ topDocs = searcher.search(query, Integer.MAX_VALUE, sort, true);
+ assertTrue(topDocs.totalHits.value > 0);
+
+ topDocs = searcher.search(query, Integer.MAX_VALUE, sort, false);
+ assertTrue(topDocs.totalHits.value > 0);
+
+ topDocs = searcher.searchAfter(after, query, Integer.MAX_VALUE, sort);
+ assertTrue(topDocs.totalHits.value > 0);
+
+ topDocs = searcher.searchAfter(after, query, Integer.MAX_VALUE, sort, true);
+ assertTrue(topDocs.totalHits.value > 0);
+
+ topDocs = searcher.searchAfter(after, query, Integer.MAX_VALUE, sort, false);
+ assertTrue(topDocs.totalHits.value > 0);
+ }
+ }
+ }
+ }
+ }
+
+ private class RandomBlockingSliceExecutor extends SliceExecutor {
+
+ public RandomBlockingSliceExecutor(Executor executor) {
+ super(executor);
+ }
+
+ @Override
+ public void invokeAll(Collection<? extends Runnable> tasks){
+
+ for (Runnable task : tasks) {
+ boolean shouldExecuteOnCallerThread = random().nextBoolean();
+
+ processTask(task, shouldExecuteOnCallerThread);
+ }
+ }
+ }
}