You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/03/18 01:01:33 UTC

[incubator-pinot] 01/01: Allow early termination for selection only queries

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch early_termination_for_select_only
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit c98b96efcbcbc70fe11502291f934b7ca85afd11
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Tue Mar 17 18:01:09 2020 -0700

    Allow early termination for selection only queries
---
 .../pinot/core/operator/CombineOperator.java       | 17 ++++++
 .../pinot/queries/BaseSingleValueQueriesTest.java  | 11 +++-
 .../queries/SelectionOnlyEarlyTerminationTest.java | 71 ++++++++++++++++++++++
 3 files changed, 97 insertions(+), 2 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineOperator.java
index 14b4c49..64ed6ab 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineOperator.java
@@ -101,6 +101,9 @@ public class CombineOperator extends BaseOperator<IntermediateResultsBlock> {
 
             IntermediateResultsBlock mergedBlock = (IntermediateResultsBlock) _operators.get(index).nextBlock();
             for (int i = index + numThreads; i < numOperators; i += numThreads) {
+              if (allowEarlyTermination(_brokerRequest, mergedBlock)) {
+                break;
+              }
               IntermediateResultsBlock blockToMerge = (IntermediateResultsBlock) _operators.get(i).nextBlock();
               try {
                 CombineService.mergeTwoBlocks(_brokerRequest, mergedBlock, blockToMerge);
@@ -134,6 +137,9 @@ public class CombineOperator extends BaseOperator<IntermediateResultsBlock> {
             }
             int numMergedBlocks = 1;
             while (numMergedBlocks < numThreads) {
+              if (allowEarlyTermination(_brokerRequest, mergedBlock)) {
+                break;
+              }
               IntermediateResultsBlock blockToMerge =
                   blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
               if (blockToMerge == null) {
@@ -196,6 +202,17 @@ public class CombineOperator extends BaseOperator<IntermediateResultsBlock> {
     return mergedBlock;
   }
 
+  // This will check if IntermediateResultsBlock already satisfying query, so there is no need to continue query processing.
+  private boolean allowEarlyTermination(BrokerRequest brokerRequest, IntermediateResultsBlock mergedBlock) {
+    // Check for selection only query, if first segment already offers enough records, then there is no need to scan for the rest segments.
+    if ((brokerRequest.getSelections() != null) && (brokerRequest.getOrderBy() == null)) {
+      if ((mergedBlock.getSelectionResult() != null) && (mergedBlock.getSelectionResult().size() >= brokerRequest.getSelections().getSize())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   @Override
   public String getOperatorName() {
     return OPERATOR_NAME;
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java
index d54fea0..c17b771 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.queries;
 
 import java.io.File;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -121,8 +122,14 @@ public abstract class BaseSingleValueQueriesTest extends BaseQueriesTest {
       throws Exception {
     ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
     _indexSegment = immutableSegment;
-    _segmentDataManagers = Arrays
-        .asList(new ImmutableSegmentDataManager(immutableSegment), new ImmutableSegmentDataManager(immutableSegment));
+    _segmentDataManagers = new ArrayList<>();
+    for (int i = 0; i< getNumSegmentDataManagers(); i++) {
+      _segmentDataManagers.add(new ImmutableSegmentDataManager(immutableSegment));
+    }
+  }
+
+  protected int getNumSegmentDataManagers() {
+    return 2;
   }
 
   @AfterClass
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java
new file mode 100644
index 0000000..96c7617
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.IOException;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Tests selection only queries
+ */
+public class SelectionOnlyEarlyTerminationTest extends BaseSingleValueQueriesTest {
+  private int numSegmentDataManagers = Math.max(1, Math.min(10, Runtime.getRuntime().availableProcessors() / 2)) * 2;
+
+  protected int getNumSegmentDataManagers() {
+    return numSegmentDataManagers;
+  }
+
+  /**
+   * With early termination, Selection Only query is scheduled with ${numSegmentDataManagers} threads,
+   * total segment processed is same at num threads.
+   */
+  @Test
+  public void testSelectOnlyQuery() {
+    String query = "SELECT column1, column6 FROM testTable";
+    BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+    Assert.assertNotNull(brokerResponse.getSelectionResults());
+    Assert.assertNull(brokerResponse.getResultTable());
+    Assert.assertTrue(brokerResponse.getNumSegmentsMatched() == numSegmentDataManagers);
+    Assert.assertTrue(brokerResponse.getNumSegmentsProcessed() == numSegmentDataManagers);
+
+    brokerResponse = getBrokerResponseForSqlQuery(query);
+    Assert.assertNull(brokerResponse.getSelectionResults());
+    Assert.assertNotNull(brokerResponse.getResultTable());
+    Assert.assertTrue(brokerResponse.getNumSegmentsMatched() == numSegmentDataManagers);
+    Assert.assertTrue(brokerResponse.getNumSegmentsProcessed() == numSegmentDataManagers);
+  }
+
+  /**
+   * Without early termination, Selection order by query should hit all segments.
+   */
+  @Test
+  public void testSelectWithOrderByQuery() {
+    int numSegments = numSegmentDataManagers * 2;
+    String query = "SELECT column11, column1 FROM testTable ORDER BY column11";
+
+    BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+    Assert.assertNotNull(brokerResponse.getSelectionResults());
+    Assert.assertNull(brokerResponse.getResultTable());
+    Assert.assertTrue(brokerResponse.getNumSegmentsMatched() == numSegments);
+    Assert.assertTrue(brokerResponse.getNumSegmentsProcessed() == numSegments);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org