You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "gianm (via GitHub)" <gi...@apache.org> on 2023/05/03 08:44:09 UTC

[GitHub] [druid] gianm commented on a diff in pull request #13952: Limit the subquery results by memory usage

gianm commented on code in PR #13952:
URL: https://github.com/apache/druid/pull/13952#discussion_r1182869188


##########
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java:
##########
@@ -577,25 +627,80 @@ private static <T, QueryType extends Query<T>> InlineDataSource toInlineDataSour
       );
     }
 
-    final RowSignature signature = toolChest.resultArraySignature(query);
+    if (memoryLimitSet && memoryLimitAccumulator.get() >= memoryLimit) {
+      throw ResourceLimitExceededException.withMessage(
+          "Cannot issue subquery, maximum subquery result bytes[%d] reached",
+          memoryLimit
+      );
+    }
 
-    final ArrayList<Object[]> resultList = new ArrayList<>();
+    DataSource dataSource;
+    // Try to serialize the results into a frame only if the memory limit is set on the server or the query
+    if (memoryLimitSet) {
+      try {
+        Optional<Sequence<FrameSignaturePair>> framesOptional = toolChest.resultsAsFrames(
+            query,
+            results,
+            memoryLimit - memoryLimitAccumulator.get()
+        );
 
-    toolChest.resultsAsArrays(query, results).accumulate(
-        resultList,
-        (acc, in) -> {
-          if (limitAccumulator.getAndIncrement() >= limitToUse) {
-            throw ResourceLimitExceededException.withMessage(
-                "Subquery generated results beyond maximum[%d]",
-                limitToUse
-            );
-          }
-          acc.add(in);
-          return acc;
+        if (!framesOptional.isPresent()) {
+          throw new ISE("The memory of the subqueries cannot be estimated correctly.");
         }
-    );
 
-    return InlineDataSource.fromIterable(resultList, signature);
+        Sequence<FrameSignaturePair> frames = framesOptional.get();
+        List<FrameSignaturePair> frameSignaturePairs = new ArrayList<>();
+        frames.forEach(
+            frame -> {
+              if (memoryLimitAccumulator.addAndGet(frame.getFrame().numBytes()) >= memoryLimit) {
+                throw ResourceLimitExceededException.withMessage(
+                    "Subquery generated results beyond maximum[%d] bytes",
+                    memoryLimit
+                );
+
+              }
+              if (limitAccumulator.addAndGet(frame.getFrame().numRows()) >= limitToUse) {
+                throw ResourceLimitExceededException.withMessage(
+                    "Subquery generated results beyond maximum[%d] rows",
+                    limitToUse
+                );
+              }
+              frameSignaturePairs.add(frame);
+            }
+        );
+        dataSource = new FramesBackedInlineDataSource(frameSignaturePairs, toolChest.resultArraySignature(query));
+      }
+      catch (ResourceLimitExceededException rlee) {
+        throw rlee;
+      }
+      catch (Exception e) {
+        log.info(

Review Comment:
   In this `catch` case, the whole query fails, right? Are there known situations where it would happen?
   
   IMO we should be doing one of two things:
   
   1. If we believe this can never happen, we don't need a `catch` block at all. Just let the exception out and fail the query. Let's then also make sure we have enough testing to be confident it doesn't happen.
   2. If there's known situations where this would happen, we should detect them and avoid going down the frame path at all. If that's not feasible, then we should put in some logic here to recover the query rather than failing it.
   
   What're your thoughts?
   
   Either way, there's no reason to log anything, let's not clutter the logs. The standard exception handling is enough if the query is going to fail. And if it's not going to fail, we should recover quietly.



##########
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java:
##########
@@ -560,15 +607,18 @@ private DataSource insertSubqueryIds(
    *                         If zero, this method will throw an error immediately.
    * @throws ResourceLimitExceededException if the limit is exceeded
    */
-  private static <T, QueryType extends Query<T>> InlineDataSource toInlineDataSource(
+  private static <T, QueryType extends Query<T>> DataSource toInlineDataSource(
       final QueryType query,
       final Sequence<T> results,
       final QueryToolChest<T, QueryType> toolChest,
       final AtomicInteger limitAccumulator,
-      final int limit
+      final AtomicLong memoryLimitAccumulator,
+      final int limit,
+      long memoryLimit
   )
   {
     final int limitToUse = limit < 0 ? Integer.MAX_VALUE : limit;

Review Comment:
   Rename this `rowLimitToUse`, for clarity.



##########
processing/src/main/java/org/apache/druid/frame/write/FrameWriters.java:
##########
@@ -53,24 +54,38 @@ private FrameWriters()
    * @param signature        signature of the frames
    * @param sortColumns      sort columns for the frames. If nonempty, {@link FrameSort#sort} is used to sort the
    *                         resulting frames.
+   * @param allowNullColumnTypes to allow null ColumnType in the signature. This should only be enabled when the user
+   *                             knows that the column objects exist as native Java POJOs (LinkedList, Maps etc), which
+   *                             can be serded using the Druid's nested columns
    */
   public static FrameWriterFactory makeFrameWriterFactory(
       final FrameType frameType,
       final MemoryAllocatorFactory allocatorFactory,
-      final RowSignature signature,
-      final List<KeyColumn> sortColumns
+      @Nonnull final RowSignature signature,

Review Comment:
   Use `@Nullable` where appropriate, but avoid `@Nonnull` on parameters in general, since it creates an implication that unannotated things _can_ be null. But, they mostly can't, so it's misleading.
   
   If you're looking for benefit for static analyzers, better to use `@EverythingIsNonnullByDefault` at package level through `package-info.java`. We have that on various packages already and could add it to more as desired. That's like annotating every un-annotated parameter with `@Nonnull`, without the clutter.



##########
processing/src/main/java/org/apache/druid/query/QueryToolChest.java:
##########
@@ -330,4 +331,32 @@ public Sequence<Object[]> resultsAsArrays(QueryType query, Sequence<ResultType>
   {
     throw new UOE("Query type '%s' does not support returning results as arrays", query.getType());
   }
+
+  /**
+   * Converts a sequence of this query's ResultType into a sequence of {@link FrameSignaturePair}. The array signature
+   * is the one give by {@link #resultArraySignature(Query)}. If the toolchest doesn't support this method, then it can
+   * return an empty optional. It is the duty of the callees to throw an appropriate exception in that case or use an
+   * alternative fallback approach
+   *
+   * Check documentation of {@link #resultsAsArrays(Query, Sequence)} as the behaviour of the rows represented by the
+   * frame sequence is identical.
+   *
+   * Each Frame has a separate {@link RowSignature} because for some query types like the Scan query, every

Review Comment:
   This seems to contradict the statement earlier that "the array signature is the one given by `resultArraySignature`". Sounds like it could actually be different. In that case please update the earlier text accordingly.



##########
server/src/main/java/org/apache/druid/segment/FrameBasedInlineSegmentWrangler.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.segment.FrameSegment;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.FramesBackedInlineDataSource;
+import org.apache.druid.timeline.SegmentId;
+import org.joda.time.Interval;
+
+import java.util.stream.Collectors;
+
+public class FrameBasedInlineSegmentWrangler implements SegmentWrangler
+{
+
+  private static final String SEGMENT_ID = "inline";
+
+  @Override
+  public Iterable<Segment> getSegmentsForIntervals(
+      DataSource dataSource,
+      Iterable<Interval> intervals
+  )
+  {
+    final FramesBackedInlineDataSource framesBackedInlineDataSource = (FramesBackedInlineDataSource) dataSource;
+
+    return framesBackedInlineDataSource.getFrames().stream().map(

Review Comment:
   Better to avoid materializing the list: return an `Iterable` based on `.iterator()` instead of `.collect(Collectors.toList())`.



##########
processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java:
##########
@@ -705,6 +718,39 @@ public Sequence<Object[]> resultsAsArrays(final GroupByQuery query, final Sequen
     return resultSequence.map(ResultRow::getArray);
   }
 
+  /**
+   * This returns a single frame containing the results of the group by query.
+   */
+  @Override
+  public Optional<Sequence<FrameSignaturePair>> resultsAsFrames(
+      GroupByQuery query,
+      Sequence<ResultRow> resultSequence,
+      @Nullable Long memoryLimitBytes
+  )
+  {
+    RowSignature rowSignature = resultArraySignature(query);
+
+    FrameWriterFactory frameWriterFactory = FrameWriters.makeFrameWriterFactory(
+        FrameType.ROW_BASED,
+        new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()),
+        rowSignature,
+        new ArrayList<>(),
+        true
+    );
+
+
+    Cursor cursor = IterableRowsCursorHelper.getCursorFromSequence(
+        resultSequence.map(ResultRow::getArray),
+        rowSignature
+    );
+
+    Frame frame = FrameCursorUtils.cursorToFrame(cursor, frameWriterFactory, memoryLimitBytes);

Review Comment:
   With the changes to `resultsAsFrames`, this would start returning a sequence of multiple smaller frames, and wouldn't take `memoryLimitBytes`.



##########
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java:
##########
@@ -179,15 +190,19 @@ public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<In
     }
 
     // Now that we know the structure is workable, actually do the inlining (if necessary).
-    newQuery = newQuery.withDataSource(
-        inlineIfNecessary(
-            freeTradeDataSource,
-            toolChest,
-            new AtomicInteger(),
-            maxSubqueryRows,
-            false
-        )
+    AtomicLong memoryLimitAcc = new AtomicLong(0);
+    DataSource maybeInlinedDataSource = inlineIfNecessary(
+        freeTradeDataSource,
+        toolChest,
+        new AtomicInteger(),
+        memoryLimitAcc,
+        maxSubqueryRows,
+        maxSubqueryMemory,
+        false
     );
+    newQuery = newQuery.withDataSource(maybeInlinedDataSource);
+
+    log.info("Memory used by subqueries of query [%s] is [%d]", query, memoryLimitAcc.get());

Review Comment:
   We shouldn't have any `INFO` level logs that happen on a per-query basis. It generates too many log messages. Please reduce this to `DEBUG` level.



##########
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java:
##########
@@ -313,7 +328,9 @@ private DataSource inlineIfNecessary(
       final DataSource dataSource,
       @Nullable final QueryToolChest toolChestIfOutermost,
       final AtomicInteger subqueryRowLimitAccumulator,
+      final AtomicLong subqueryRowMemoryLimitAccumulator,

Review Comment:
   `subqueryMemoryLimitAccumulator` is a better name.



##########
server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java:
##########
@@ -577,25 +627,80 @@ private static <T, QueryType extends Query<T>> InlineDataSource toInlineDataSour
       );
     }
 
-    final RowSignature signature = toolChest.resultArraySignature(query);
+    if (memoryLimitSet && memoryLimitAccumulator.get() >= memoryLimit) {
+      throw ResourceLimitExceededException.withMessage(

Review Comment:
   Update the other message to refer to the row limit, for clarity.



##########
server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java:
##########
@@ -348,7 +350,7 @@ public void testTimeseriesOnGroupByOnTable()
 
     testQuery(
         query,
-        ImmutableList.of(
+        new ArrayList<>(ImmutableList.of(

Review Comment:
   Is there a reason this needs to be a mutable array?



##########
processing/src/main/java/org/apache/druid/query/QueryToolChest.java:
##########
@@ -330,4 +331,32 @@ public Sequence<Object[]> resultsAsArrays(QueryType query, Sequence<ResultType>
   {
     throw new UOE("Query type '%s' does not support returning results as arrays", query.getType());
   }
+
+  /**
+   * Converts a sequence of this query's ResultType into a sequence of {@link FrameSignaturePair}. The array signature
+   * is the one give by {@link #resultArraySignature(Query)}. If the toolchest doesn't support this method, then it can
+   * return an empty optional. It is the duty of the callees to throw an appropriate exception in that case or use an
+   * alternative fallback approach
+   *
+   * Check documentation of {@link #resultsAsArrays(Query, Sequence)} as the behaviour of the rows represented by the
+   * frame sequence is identical.
+   *
+   * Each Frame has a separate {@link RowSignature} because for some query types like the Scan query, every
+   * column in the final result might not be present in the individual ResultType (and subsequently Frame). Therefore,
+   * this is done to preserve the space by not populating the column in that particular Frame and omitting it from its
+   * signature
+   * 
+   * @param query Query being executed by the toolchest. Used to determine the rowSignature of the Frames
+   * @param resultSequence results of the form returned by {@link #mergeResults(QueryRunner)}
+   * @param memoryLimitBytes Limit the memory results. Throws {@link ResourceLimitExceededException} if the result exceed
+   *                    the memoryLimitBytes
+   */
+  public Optional<Sequence<FrameSignaturePair>> resultsAsFrames(

Review Comment:
   Rather than accepting `memoryLimitBytes` and returning a jumbo frame, it'd be better to return a sequence of moderately-sized frames and have the caller decide when they've had too many frames. This makes the method simpler, easier to test/understand, and more flexible for potential future use cases. In particular I suggest the following changes:
   
   - Remove the `memoryLimitBytes` parameter.
   - Add a `MemoryAllocatorFactory` parameter so the caller can control the size of each frame.



##########
server/src/main/java/org/apache/druid/segment/FrameBasedInlineSegmentWrangler.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.segment.FrameSegment;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.FramesBackedInlineDataSource;
+import org.apache.druid.timeline.SegmentId;
+import org.joda.time.Interval;
+
+import java.util.stream.Collectors;
+
+public class FrameBasedInlineSegmentWrangler implements SegmentWrangler

Review Comment:
   Some of the classes are `FrameBased` and some are `FramesBacked`. Let's keep it consistent and use `FrameBased` for all of them.



##########
processing/src/main/java/org/apache/druid/query/QueryContexts.java:
##########
@@ -54,6 +54,7 @@
   public static final String VECTORIZE_VIRTUAL_COLUMNS_KEY = "vectorizeVirtualColumns";
   public static final String VECTOR_SIZE_KEY = "vectorSize";
   public static final String MAX_SUBQUERY_ROWS_KEY = "maxSubqueryRows";
+  public static final String MAX_SUBQUERY_MEMORY_BYTES_KEY = "maxSubqueryMemoryBytes";

Review Comment:
   `maxSubqueryBytes` is better, IMO, since it's just as clear and a bit more succinct.



##########
processing/src/main/java/org/apache/druid/frame/write/FrameWriters.java:
##########
@@ -53,24 +54,38 @@ private FrameWriters()
    * @param signature        signature of the frames
    * @param sortColumns      sort columns for the frames. If nonempty, {@link FrameSort#sort} is used to sort the
    *                         resulting frames.
+   * @param allowNullColumnTypes to allow null ColumnType in the signature. This should only be enabled when the user
+   *                             knows that the column objects exist as native Java POJOs (LinkedList, Maps etc), which
+   *                             can be serded using the Druid's nested columns
    */
   public static FrameWriterFactory makeFrameWriterFactory(

Review Comment:
   IMO adding `allowNullColumnTypes` as a parameter here isn't the right approach. Better for the caller to replace all the unknown types (`null`) in `signature` with `COMPLEX<json>`, if that's what's desired. It keeps the frame writer code simpler, easier to test/understand, etc.



##########
processing/src/main/java/org/apache/druid/segment/join/FramesBackedInlineJoinableFactory.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.join;
+
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.FramesBackedInlineDataSource;
+import org.apache.druid.query.InlineDataSource;
+
+import java.util.Optional;
+
+/**
+ * Creates a joinable from the {@link FramesBackedInlineDataSource}. This materializes the datasource to an
+ * {@link InlineDataSource}, before creating the joinable on it, which carries the overhead of this conversion.
+ */
+public class FramesBackedInlineJoinableFactory implements JoinableFactory
+{
+  private final InlineJoinableFactory INLINE_JOINABLE_FACTORY = new InlineJoinableFactory();
+
+
+  @Override
+  public boolean isDirectlyJoinable(DataSource dataSource)
+  {
+    return dataSource instanceof FramesBackedInlineDataSource;
+  }
+
+  @Override
+  public Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition)
+  {
+    FramesBackedInlineDataSource framesBackedInlineDataSource = (FramesBackedInlineDataSource) dataSource;
+    InlineDataSource inlineDataSource = framesBackedInlineDataSource.toInlineDataSource();

Review Comment:
   Can we create a Joinable directly on the frames? Materialization to inline is going to be expensive and likely 2–3x memory usage as well. I don't think that is going to be viable.



##########
processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java:
##########
@@ -52,6 +54,8 @@ public DruidDefaultSerializersModule()
 
     JodaStuff.register(this);
 
+    addSerializer(FramesBackedInlineDataSource.class, new FramesBackedInlineDataSourceSerializer());

Review Comment:
   I would think we don't need to serialize this, as it should exist in-memory only. So I'm also wondering where this was needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org