You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "cryptoe (via GitHub)" <gi...@apache.org> on 2023/03/20 12:35:05 UTC

[GitHub] [druid] cryptoe commented on a diff in pull request #13952: Limit the subquery results by memory usage

cryptoe commented on code in PR #13952:
URL: https://github.com/apache/druid/pull/13952#discussion_r1142025452


##########
processing/src/main/java/org/apache/druid/query/InlineDataSource.java:
##########
@@ -53,14 +63,17 @@
 public class InlineDataSource implements DataSource
 {
   private final Iterable<Object[]> rows;
+  private final Frame frame;

Review Comment:
   Lets update the documentation of this class mentioning it can be backed by frames. 



##########
processing/src/main/java/org/apache/druid/query/InlineDataSource.java:
##########
@@ -190,15 +211,57 @@ public List<ColumnType> getColumnTypes()
   @JsonProperty("rows")
   public List<Object[]> getRowsAsList()
   {
+    Iterable<Object[]> rows = getRows();
     return rows instanceof List ? ((List<Object[]>) rows) : Lists.newArrayList(rows);
   }
 
+  public Frame getBackingFrame()
+  {
+    return frame;
+  }
+
+  public boolean isBackedByFrame()
+  {
+    return frame != null;
+  }
+
   /**
    * Returns rows as an Iterable.
    */
   @JsonIgnore
   public Iterable<Object[]> getRows()
   {
+    if (isBackedByFrame()) {
+      List<Object[]> frameRows = new ArrayList<>();
+      FrameReader frameReader = FrameReader.create(signature);
+      final Sequence<Cursor> cursorSequence = new FrameStorageAdapter(
+          frame,
+          frameReader,
+          Intervals.ETERNITY
+      ).makeCursors(null, Intervals.ETERNITY, VirtualColumns.EMPTY, Granularities.ALL, false, null);
+      cursorSequence.accumulate(
+          null,
+          (accumulated, cursor) -> {
+            final ColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory();
+            final List<BaseObjectColumnValueSelector> selectors = frameReader.signature()
+                                                                             .getColumnNames()
+                                                                             .stream()
+                                                                             .map(columnSelectorFactory::makeColumnValueSelector)
+                                                                             .collect(Collectors.toList());
+            while (!cursor.isDone()) {
+              Object[] row = new Object[signature.size()];
+              for (int i = 0; i < signature.size(); ++i) {
+                row[i] = selectors.get(i).getObject();
+              }
+              frameRows.add(row);
+              cursor.advance();
+            }
+            return null;
+          }
+      );
+      return frameRows;
+    }
+
     return rows;

Review Comment:
   Nit: Lets add this in the else part ?
   



##########
server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java:
##########
@@ -798,7 +798,36 @@ public void testTimeseriesOnGroupByOnTableErrorTooManyRows()
                                 .withId(DUMMY_QUERY_ID);
 
     expectedException.expect(ResourceLimitExceededException.class);
-    expectedException.expectMessage("Subquery generated results beyond maximum[2]");
+    expectedException.expectMessage("Subquery generated results beyond maximum[2] rows");
+
+    testQuery(query, ImmutableList.of(), ImmutableList.of());
+  }
+
+
+  @Test
+  public void testTimeseriesOnGroupByOnTableErrorTooLarge()

Review Comment:
   I think tests around
   1. String cols
   2. Long cols
   3. Double/float cols
   4. Complex cols
   5. Array cols
   6. Nested cols 
   Would help us build confidence in the feature. 
   
   Some tests to checkout: 
   * `CalciteQueryTests#testMaxSubqueryRows`
   * `GroupByQueryRunnerTest#testGroupByMaxRowsLimitContextOverride`
   
   Ideally all tests which have a subquery should be executed using the new code path but since its feature flagged it might not be a hard requirement. 



##########
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java:
##########
@@ -577,25 +632,71 @@ private static <T, QueryType extends Query<T>> InlineDataSource toInlineDataSour
       );
     }
 
+    if (memoryLimitAccumulator.get() >= memoryLimitToUse) {
+      throw ResourceLimitExceededException.withMessage(
+          "Cannot issue subquery, maximum subquery result bytes[%d] reached",
+          memoryLimitToUse
+      );
+    }
+
     final RowSignature signature = toolChest.resultArraySignature(query);
 
-    final ArrayList<Object[]> resultList = new ArrayList<>();
+    final List<Object[]> resultList = toolChest.resultsAsArrays(query, results).toList();
 
-    toolChest.resultsAsArrays(query, results).accumulate(
-        resultList,
-        (acc, in) -> {
-          if (limitAccumulator.getAndIncrement() >= limitToUse) {
-            throw ResourceLimitExceededException.withMessage(
-                "Subquery generated results beyond maximum[%d]",
-                limitToUse
-            );
+    if (limitAccumulator.addAndGet(resultList.size()) >= limitToUse) {
+      throw ResourceLimitExceededException.withMessage(
+          "Subquery generated results beyond maximum[%d] rows",
+          limitToUse
+      );
+    }
+
+    Frame frame = null;
+
+    // Try to serialize the results into a frame only if the memory limit is set on the server or the query
+    if (memoryLimitSet) {

Review Comment:
   We should only serialize the to frames only when memoryLimit Is set else the old code path should be invoked.  
   



##########
processing/src/main/java/org/apache/druid/query/InlineDataSource.java:
##########
@@ -300,7 +363,7 @@ public boolean equals(Object o)
       return false;
     }
     InlineDataSource that = (InlineDataSource) o;
-    return rowsEqual(rows, that.rows) &&

Review Comment:
   maybe compare frames directly if possible and save on the cost of initializing various frameReaders and then de serializing them to rows. 



##########
processing/src/main/java/org/apache/druid/query/InlineDataSource.java:
##########
@@ -53,14 +63,17 @@
 public class InlineDataSource implements DataSource
 {
   private final Iterable<Object[]> rows;
+  private final Frame frame;
   private final RowSignature signature;
 
   private InlineDataSource(
       final Iterable<Object[]> rows,
+      final Frame frame,
       final RowSignature signature
   )
   {
-    this.rows = Preconditions.checkNotNull(rows, "'rows' must be nonnull");
+    this.rows = rows;

Review Comment:
   The inline DS can either be backed by a frame or by a rows. 
   So lets error out if both are null. 



##########
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java:
##########
@@ -565,10 +616,14 @@ private static <T, QueryType extends Query<T>> InlineDataSource toInlineDataSour
       final Sequence<T> results,
       final QueryToolChest<T, QueryType> toolChest,
       final AtomicInteger limitAccumulator,

Review Comment:
   we should never be using both limits I guess. Can we remove the additional 3 params and just pass another param called type ? and re-use the same limit variables ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org