You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "paul-rogers (via GitHub)" <gi...@apache.org> on 2023/03/20 19:34:17 UTC

[GitHub] [druid] paul-rogers commented on a diff in pull request #13952: Limit the subquery results by memory usage

paul-rogers commented on code in PR #13952:
URL: https://github.com/apache/druid/pull/13952#discussion_r1142587809


##########
processing/src/main/java/org/apache/druid/query/InlineDataSource.java:
##########
@@ -300,7 +363,7 @@ public boolean equals(Object o)
       return false;
     }
     InlineDataSource that = (InlineDataSource) o;
-    return rowsEqual(rows, that.rows) &&
+    return rowsEqual(getRowsAsList(), that.getRowsAsList()) &&

Review Comment:
   Does it even make semantic sense to compare two input sources for equality? Are we adding this become some static check told us we need it, but not because we actually use it?



##########
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java:
##########
@@ -577,25 +632,71 @@
       );
     }
 
+    if (memoryLimitAccumulator.get() >= memoryLimitToUse) {
+      throw ResourceLimitExceededException.withMessage(
+          "Cannot issue subquery, maximum subquery result bytes[%d] reached",
+          memoryLimitToUse
+      );
+    }
+
     final RowSignature signature = toolChest.resultArraySignature(query);
 
-    final ArrayList<Object[]> resultList = new ArrayList<>();
+    final List<Object[]> resultList = toolChest.resultsAsArrays(query, results).toList();
 
-    toolChest.resultsAsArrays(query, results).accumulate(
-        resultList,
-        (acc, in) -> {
-          if (limitAccumulator.getAndIncrement() >= limitToUse) {
-            throw ResourceLimitExceededException.withMessage(
-                "Subquery generated results beyond maximum[%d]",
-                limitToUse
-            );
+    if (limitAccumulator.addAndGet(resultList.size()) >= limitToUse) {
+      throw ResourceLimitExceededException.withMessage(
+          "Subquery generated results beyond maximum[%d] rows",
+          limitToUse
+      );
+    }
+
+    Frame frame = null;
+
+    // Try to serialize the results into a frame only if the memory limit is set on the server or the query
+    if (memoryLimitSet) {
+      try {
+        FrameWriterFactory frameWriterFactory = FrameWriters.makeFrameWriterFactory(
+            FrameType.ROW_BASED,
+            new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()),
+            signature,
+            new ArrayList<>()
+        );
+
+        final Cursor cursor = new InlineResultsCursor(resultList, signature);
+
+        try (final FrameWriter frameWriter = frameWriterFactory.newFrameWriter(cursor.getColumnSelectorFactory())) {
+          while (!cursor.isDone()) {
+            if (!frameWriter.addSelection()) {
+              throw new FrameRowTooLargeException(frameWriterFactory.allocatorCapacity());
+            }
+
+            cursor.advance();
           }
-          acc.add(in);
-          return acc;
+
+          frame = Frame.wrap(frameWriter.toByteArray());
         }
-    );
 
-    return InlineDataSource.fromIterable(resultList, signature);
+
+        if (memoryLimitAccumulator.addAndGet(frame.numBytes()) >= memoryLimitToUse) {
+          throw ResourceLimitExceededException.withMessage(
+              "Subquery generated results beyond maximum[%d] bytes",
+              memoryLimit
+          );
+        }
+      }
+      catch (ResourceLimitExceededException rlee) {
+        throw rlee;
+      }
+      catch (Exception e) {
+        log.info("Unable to write the subquery results to a frame. Results won't be accounted for in the memory"
+                 + "calculation");

Review Comment:
   Our error rules state that you must interpolate something, such as the reason why the subquery can't be written to a frame. And, yes, that code scanning item above is real.



##########
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java:
##########
@@ -565,10 +616,14 @@ private static <T, QueryType extends Query<T>> InlineDataSource toInlineDataSour
       final Sequence<T> results,
       final QueryToolChest<T, QueryType> toolChest,
       final AtomicInteger limitAccumulator,

Review Comment:
   If we have two limits, we might want to give them names so we know which limit that `limitAccumulator` is accumulating: row count or memory bytes?



##########
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java:
##########
@@ -577,25 +632,71 @@ private static <T, QueryType extends Query<T>> InlineDataSource toInlineDataSour
       );
     }
 
+    if (memoryLimitAccumulator.get() >= memoryLimitToUse) {
+      throw ResourceLimitExceededException.withMessage(
+          "Cannot issue subquery, maximum subquery result bytes[%d] reached",
+          memoryLimitToUse
+      );
+    }
+
     final RowSignature signature = toolChest.resultArraySignature(query);
 
-    final ArrayList<Object[]> resultList = new ArrayList<>();
+    final List<Object[]> resultList = toolChest.resultsAsArrays(query, results).toList();

Review Comment:
   If we are under memory pressure, is it wise to materialize the entire result set? Can the results be processed incrementally in a streaming manner?



##########
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java:
##########
@@ -577,25 +632,71 @@ private static <T, QueryType extends Query<T>> InlineDataSource toInlineDataSour
       );
     }
 
+    if (memoryLimitAccumulator.get() >= memoryLimitToUse) {
+      throw ResourceLimitExceededException.withMessage(
+          "Cannot issue subquery, maximum subquery result bytes[%d] reached",
+          memoryLimitToUse
+      );
+    }
+
     final RowSignature signature = toolChest.resultArraySignature(query);
 
-    final ArrayList<Object[]> resultList = new ArrayList<>();
+    final List<Object[]> resultList = toolChest.resultsAsArrays(query, results).toList();
 
-    toolChest.resultsAsArrays(query, results).accumulate(
-        resultList,
-        (acc, in) -> {
-          if (limitAccumulator.getAndIncrement() >= limitToUse) {
-            throw ResourceLimitExceededException.withMessage(
-                "Subquery generated results beyond maximum[%d]",
-                limitToUse
-            );
+    if (limitAccumulator.addAndGet(resultList.size()) >= limitToUse) {
+      throw ResourceLimitExceededException.withMessage(
+          "Subquery generated results beyond maximum[%d] rows",
+          limitToUse
+      );
+    }
+
+    Frame frame = null;
+
+    // Try to serialize the results into a frame only if the memory limit is set on the server or the query
+    if (memoryLimitSet) {
+      try {
+        FrameWriterFactory frameWriterFactory = FrameWriters.makeFrameWriterFactory(
+            FrameType.ROW_BASED,
+            new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()),
+            signature,
+            new ArrayList<>()
+        );
+
+        final Cursor cursor = new InlineResultsCursor(resultList, signature);

Review Comment:
   All of this seems too complex to put inside the segment walker. For one thing, it is hard to test if it is an implementation detail. Perhaps pull out this logic into a separate class that can be unit tested extensively. For example, we'd want tests for hitting each of the limits, for handling variable-width columns, etc.



##########
processing/src/main/java/org/apache/druid/query/InlineDataSource.java:
##########
@@ -190,15 +211,57 @@ public List<ColumnType> getColumnTypes()
   @JsonProperty("rows")
   public List<Object[]> getRowsAsList()
   {
+    Iterable<Object[]> rows = getRows();
     return rows instanceof List ? ((List<Object[]>) rows) : Lists.newArrayList(rows);
   }
 
+  public Frame getBackingFrame()
+  {
+    return frame;
+  }
+
+  public boolean isBackedByFrame()
+  {
+    return frame != null;
+  }
+
   /**
    * Returns rows as an Iterable.
    */
   @JsonIgnore
   public Iterable<Object[]> getRows()
   {
+    if (isBackedByFrame()) {

Review Comment:
   This is the kind of unnecessary complexity that comes when we have one class do two things, so would be cool to split this into three: base class, array implementation, frame implementation.



##########
processing/src/main/java/org/apache/druid/query/InlineDataSource.java:
##########
@@ -53,14 +63,17 @@
 public class InlineDataSource implements DataSource
 {
   private final Iterable<Object[]> rows;
+  private final Frame frame;
   private final RowSignature signature;
 
   private InlineDataSource(
       final Iterable<Object[]> rows,
+      final Frame frame,
       final RowSignature signature
   )
   {
-    this.rows = Preconditions.checkNotNull(rows, "'rows' must be nonnull");
+    this.rows = rows;

Review Comment:
   If the code can use either rows or frames, but not both, then a cleaner design is to have two distinct classes. That avoids a bunch if if/then statements and the need for a reader to think through both cases at the same time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org