You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by at...@apache.org on 2020/04/01 15:12:38 UTC

[lucene-solr] branch master updated: LUCENE-9074: Slice Allocation Control Plane For Concurrent Searches (#1294)

This is an automated email from the ASF dual-hosted git repository.

atri pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/lucene-solr.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ed71a6  LUCENE-9074: Slice Allocation Control Plane For Concurrent Searches (#1294)
9ed71a6 is described below

commit 9ed71a6efe2d11fd184de53d4191753b2183b73e
Author: Atri Sharma <at...@gmail.com>
AuthorDate: Wed Apr 1 20:42:26 2020 +0530

    LUCENE-9074: Slice Allocation Control Plane For Concurrent Searches (#1294)
    
    This commit introduces a mechanism to control allocation of threads to slices planned for a query.
    The default implementation uses the size of backlog queue of the executor to determine if a slice should be allocated a new thread
---
 .../org/apache/lucene/search/IndexSearcher.java    | 58 +++++++------
 .../lucene/search/QueueSizeBasedExecutor.java      | 60 ++++++++++++++
 .../org/apache/lucene/search/SliceExecutor.java    | 80 ++++++++++++++++++
 .../apache/lucene/search/TestIndexSearcher.java    | 94 +++++++++++++++++++++-
 4 files changed, 267 insertions(+), 25 deletions(-)

diff --git a/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java b/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java
index f658de0..44fba0a 100644
--- a/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java
+++ b/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java
@@ -26,12 +26,11 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
-import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.DirectoryReader;
@@ -123,6 +122,9 @@ public class IndexSearcher {
   // These are only used for multi-threaded search
   private final Executor executor;
 
+  // Used internally for load balancing threads executing for the query
+  private final SliceExecutor sliceExecutor;
+
   // the default Similarity
   private static final Similarity defaultSimilarity = new BM25Similarity();
 
@@ -208,9 +210,17 @@ public class IndexSearcher {
    * @lucene.experimental
    */
   public IndexSearcher(IndexReaderContext context, Executor executor) {
+    this(context, executor, getSliceExecutionControlPlane(executor));
+  }
+
+  // Package private for testing
+  IndexSearcher(IndexReaderContext context, Executor executor, SliceExecutor sliceExecutor) {
     assert context.isTopLevel: "IndexSearcher's ReaderContext must be topLevel for reader" + context.reader();
+    assert (sliceExecutor == null) == (executor==null);
+
     reader = context.reader();
     this.executor = executor;
+    this.sliceExecutor = sliceExecutor;
     this.readerContext = context;
     leafContexts = context.leaves();
     this.leafSlices = executor == null ? null : slices(leafContexts);
@@ -662,36 +672,21 @@ public class IndexSearcher {
       }
       query = rewrite(query);
       final Weight weight = createWeight(query, scoreMode, 1);
-      final List<Future<C>> topDocsFutures = new ArrayList<>(leafSlices.length);
-      for (int i = 0; i < leafSlices.length - 1; ++i) {
+      final List<FutureTask<C>> listTasks = new ArrayList<>();
+      for (int i = 0; i < leafSlices.length; ++i) {
         final LeafReaderContext[] leaves = leafSlices[i].leaves;
         final C collector = collectors.get(i);
         FutureTask<C> task = new FutureTask<>(() -> {
           search(Arrays.asList(leaves), weight, collector);
           return collector;
         });
-        boolean executedOnCallerThread = false;
-        try {
-          executor.execute(task);
-        } catch (RejectedExecutionException e) {
-          // Execute on caller thread
-          search(Arrays.asList(leaves), weight, collector);
-          topDocsFutures.add(CompletableFuture.completedFuture(collector));
-          executedOnCallerThread = true;
-        }
 
-        // Do not add the task's future if it was not used
-        if (executedOnCallerThread == false) {
-          topDocsFutures.add(task);
-        }
+        listTasks.add(task);
       }
-      final LeafReaderContext[] leaves = leafSlices[leafSlices.length - 1].leaves;
-      final C collector = collectors.get(leafSlices.length - 1);
-      // execute the last on the caller thread
-      search(Arrays.asList(leaves), weight, collector);
-      topDocsFutures.add(CompletableFuture.completedFuture(collector));
+
+      sliceExecutor.invokeAll(listTasks);
       final List<C> collectedCollectors = new ArrayList<>();
-      for (Future<C> future : topDocsFutures) {
+      for (Future<C> future : listTasks) {
         try {
           collectedCollectors.add(future.get());
         } catch (InterruptedException e) {
@@ -878,7 +873,7 @@ public class IndexSearcher {
 
   @Override
   public String toString() {
-    return "IndexSearcher(" + reader + "; executor=" + executor + ")";
+    return "IndexSearcher(" + reader + "; executor=" + executor + "; sliceExecutionControlPlane " + sliceExecutor + ")";
   }
   
   /**
@@ -943,4 +938,19 @@ public class IndexSearcher {
       super("maxClauseCount is set to " + maxClauseCount);
     }
   }
+
+  /**
+   * Return the SliceExecutionControlPlane instance to be used for this IndexSearcher instance
+   */
+  private static SliceExecutor getSliceExecutionControlPlane(Executor executor) {
+    if (executor == null) {
+      return null;
+    }
+
+    if (executor instanceof ThreadPoolExecutor) {
+      return new QueueSizeBasedExecutor((ThreadPoolExecutor) executor);
+    }
+
+    return new SliceExecutor(executor);
+  }
 }
diff --git a/lucene/core/src/java/org/apache/lucene/search/QueueSizeBasedExecutor.java b/lucene/core/src/java/org/apache/lucene/search/QueueSizeBasedExecutor.java
new file mode 100644
index 0000000..e579a5a
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/search/QueueSizeBasedExecutor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.search;
+
+import java.util.Collection;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * Derivative of SliceExecutor that controls the number of active threads
+ * that are used for a single query. At any point, no more than (maximum pool size of the executor * LIMITING_FACTOR)
+ * tasks should be active. If the limit is exceeded, further segments are searched on the caller thread
+ */
+class QueueSizeBasedExecutor extends SliceExecutor {
+  private static final double LIMITING_FACTOR = 1.5;
+
+  private final ThreadPoolExecutor threadPoolExecutor;
+
+  public QueueSizeBasedExecutor(ThreadPoolExecutor threadPoolExecutor) {
+    super(threadPoolExecutor);
+    this.threadPoolExecutor = threadPoolExecutor;
+  }
+
+  @Override
+  public void invokeAll(Collection<? extends Runnable> tasks) {
+    int i = 0;
+
+    for (Runnable task : tasks) {
+      boolean shouldExecuteOnCallerThread = false;
+
+      // Execute last task on caller thread
+      if (i == tasks.size() - 1) {
+        shouldExecuteOnCallerThread = true;
+      }
+
+      if (threadPoolExecutor.getQueue().size() >=
+          (threadPoolExecutor.getMaximumPoolSize() * LIMITING_FACTOR)) {
+        shouldExecuteOnCallerThread = true;
+      }
+
+      processTask(task, shouldExecuteOnCallerThread);
+
+      ++i;
+    }
+  }
+}
diff --git a/lucene/core/src/java/org/apache/lucene/search/SliceExecutor.java b/lucene/core/src/java/org/apache/lucene/search/SliceExecutor.java
new file mode 100644
index 0000000..680c16b
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/search/SliceExecutor.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.search;
+
+import java.util.Collection;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+
+/**
+ * Executor which is responsible
+ * for execution of slices based on the current status
+ * of the system and current system load
+ */
+class SliceExecutor {
+  private final Executor executor;
+
+  public SliceExecutor(Executor executor) {
+    this.executor = executor;
+  }
+
+  public void invokeAll(Collection<? extends Runnable> tasks) {
+
+    if (tasks == null) {
+      throw new IllegalArgumentException("Tasks is null");
+    }
+
+    if (executor == null) {
+      throw new IllegalArgumentException("Executor is null");
+    }
+
+    int i = 0;
+
+    for (Runnable task : tasks) {
+      boolean shouldExecuteOnCallerThread = false;
+
+      // Execute last task on caller thread
+      if (i == tasks.size() - 1) {
+        shouldExecuteOnCallerThread = true;
+      }
+
+      processTask(task, shouldExecuteOnCallerThread);
+      ++i;
+    };
+  }
+
+  // Helper method to execute a single task
+  protected void processTask(final Runnable task,
+                             final boolean shouldExecuteOnCallerThread) {
+    if (task == null) {
+      throw new IllegalArgumentException("Input is null");
+    }
+
+    if (!shouldExecuteOnCallerThread) {
+      try {
+        executor.execute(task);
+
+        return;
+      } catch (RejectedExecutionException e) {
+        // Execute on caller thread
+      }
+    }
+
+    task.run();
+  }
+}
diff --git a/lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java b/lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java
index 6617c60..aafd883 100644
--- a/lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java
+++ b/lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java
@@ -22,8 +22,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -34,8 +36,8 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field.Store;
 import org.apache.lucene.document.Field;
+import org.apache.lucene.document.Field.Store;
 import org.apache.lucene.document.SortedDocValuesField;
 import org.apache.lucene.document.StringField;
 import org.apache.lucene.index.IndexReader;
@@ -61,12 +63,17 @@ public class TestIndexSearcher extends LuceneTestCase {
     super.setUp();
     dir = newDirectory();
     RandomIndexWriter iw = new RandomIndexWriter(random(), dir);
+    Random random = random();
     for (int i = 0; i < 100; i++) {
       Document doc = new Document();
       doc.add(newStringField("field", Integer.toString(i), Field.Store.NO));
       doc.add(newStringField("field2", Boolean.toString(i % 2 == 0), Field.Store.NO));
       doc.add(new SortedDocValuesField("field2", new BytesRef(Boolean.toString(i % 2 == 0))));
       iw.addDocument(doc);
+
+      if (random.nextBoolean()) {
+        iw.commit();
+      }
     }
     reader = iw.getReader();
     iw.close();
@@ -347,4 +354,89 @@ public class TestIndexSearcher extends LuceneTestCase {
       throw new RejectedExecutionException();
     }
   }
+
+  public void testQueueSizeBasedSliceExecutor() throws Exception {
+    ThreadPoolExecutor service = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
+        new LinkedBlockingQueue<Runnable>(),
+        new NamedThreadFactory("TestIndexSearcher"));
+
+    runSliceExecutorTest(service, false);
+
+    TestUtil.shutdownExecutorService(service);
+  }
+
+  public void testRandomBlockingSliceExecutor() throws Exception {
+    ThreadPoolExecutor service = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
+        new LinkedBlockingQueue<Runnable>(),
+        new NamedThreadFactory("TestIndexSearcher"));
+
+    runSliceExecutorTest(service, true);
+
+    TestUtil.shutdownExecutorService(service);
+  }
+
+  private void runSliceExecutorTest(ThreadPoolExecutor service, boolean useRandomSliceExecutor) throws Exception {
+    SliceExecutor sliceExecutor = useRandomSliceExecutor == true ? new RandomBlockingSliceExecutor(service) :
+                                                              new QueueSizeBasedExecutor(service);
+
+    IndexSearcher searcher = new IndexSearcher(reader.getContext(), service, sliceExecutor);
+
+    Query queries[] = new Query[] {
+        new MatchAllDocsQuery(),
+        new TermQuery(new Term("field", "1"))
+    };
+    Sort sorts[] = new Sort[] {
+        null,
+        new Sort(new SortField("field2", SortField.Type.STRING))
+    };
+    ScoreDoc afters[] = new ScoreDoc[] {
+        null,
+        new FieldDoc(0, 0f, new Object[] { new BytesRef("boo!") })
+    };
+
+    for (ScoreDoc after : afters) {
+      for (Query query : queries) {
+        for (Sort sort : sorts) {
+          searcher.search(query, Integer.MAX_VALUE);
+          searcher.searchAfter(after, query, Integer.MAX_VALUE);
+          if (sort != null) {
+            TopDocs topDocs = searcher.search(query, Integer.MAX_VALUE, sort);
+            assertTrue(topDocs.totalHits.value > 0);
+
+            topDocs = searcher.search(query, Integer.MAX_VALUE, sort, true);
+            assertTrue(topDocs.totalHits.value > 0);
+
+            topDocs = searcher.search(query, Integer.MAX_VALUE, sort, false);
+            assertTrue(topDocs.totalHits.value > 0);
+
+            topDocs = searcher.searchAfter(after, query, Integer.MAX_VALUE, sort);
+            assertTrue(topDocs.totalHits.value > 0);
+
+            topDocs = searcher.searchAfter(after, query, Integer.MAX_VALUE, sort, true);
+            assertTrue(topDocs.totalHits.value > 0);
+
+            topDocs = searcher.searchAfter(after, query, Integer.MAX_VALUE, sort, false);
+            assertTrue(topDocs.totalHits.value > 0);
+          }
+        }
+      }
+    }
+  }
+
+  private class RandomBlockingSliceExecutor extends SliceExecutor {
+
+    public RandomBlockingSliceExecutor(Executor executor) {
+      super(executor);
+    }
+
+    @Override
+    public void invokeAll(Collection<? extends Runnable> tasks){
+
+      for (Runnable task : tasks) {
+        boolean shouldExecuteOnCallerThread = random().nextBoolean();
+
+        processTask(task, shouldExecuteOnCallerThread);
+      }
+    }
+  }
 }