You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ka...@apache.org on 2023/03/29 12:30:15 UTC

[druid] branch master updated: OOM fix for running MSQ jobs with `intermediateSuperSorterStorageMaxLocalBytes` set (#13974)

This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8dce3ca4d5 OOM fix for running MSQ jobs with `intermediateSuperSorterStorageMaxLocalBytes` set (#13974)
8dce3ca4d5 is described below

commit 8dce3ca4d594ddc498c1372d4d6cef3c6c3ed056
Author: Karan Kumar <ka...@gmail.com>
AuthorDate: Wed Mar 29 18:00:00 2023 +0530

    OOM fix for running MSQ jobs with `intermediateSuperSorterStorageMaxLocalBytes` set (#13974)
    
    
    While using intermediateSuperSorterStorageMaxLocalBytes the super sorter was retaining references of the memory allocator.
    
    The fix clears the current outputChannel when close() is called on the ComposingWritableFrameChannel.java
---
 .../channel/ComposingWritableFrameChannel.java      | 18 ++++++++++++------
 .../channel/ComposingWritableFrameChannelTest.java  | 21 +++++++++++++++++++++
 2 files changed, 33 insertions(+), 6 deletions(-)

diff --git a/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java b/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java
index c576c23696..ed0f56c408 100644
--- a/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java
+++ b/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java
@@ -90,12 +90,7 @@ public class ComposingWritableFrameChannel implements WritableFrameChannel
 
       // We are converting the corresponding channel to read only after exhausting it because that channel won't be used
       // for writes anymore
-      if (outputChannelSuppliers != null) {
-        outputChannelSuppliers.get(currentIndex).get().convertToReadOnly();
-      }
-      if (partitionedOutputChannelSuppliers != null) {
-        partitionedOutputChannelSuppliers.get(currentIndex).get().convertToReadOnly();
-      }
+      convertChannelSuppliersToReadOnly(currentIndex);
 
       currentIndex++;
       if (currentIndex >= writableChannelSuppliers.size()) {
@@ -105,6 +100,16 @@ public class ComposingWritableFrameChannel implements WritableFrameChannel
     }
   }
 
+  private void convertChannelSuppliersToReadOnly(int index)
+  {
+    if (outputChannelSuppliers != null) {
+      outputChannelSuppliers.get(index).get().convertToReadOnly();
+    }
+    if (partitionedOutputChannelSuppliers != null) {
+      partitionedOutputChannelSuppliers.get(index).get().convertToReadOnly();
+    }
+  }
+
   @Override
   public void fail(@Nullable Throwable cause) throws IOException
   {
@@ -118,6 +123,7 @@ public class ComposingWritableFrameChannel implements WritableFrameChannel
   {
     if (currentIndex < writableChannelSuppliers.size()) {
       writableChannelSuppliers.get(currentIndex).get().close();
+      convertChannelSuppliersToReadOnly(currentIndex);
       currentIndex = writableChannelSuppliers.size();
     }
   }
diff --git a/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java b/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java
index c2968b3579..eee8ce5e62 100644
--- a/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java
+++ b/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java
@@ -27,8 +27,11 @@ import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
 import org.apache.druid.frame.processor.OutputChannel;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.query.ResourceLimitExceededException;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
 import org.mockito.Mockito;
 
 import javax.annotation.Nullable;
@@ -89,8 +92,26 @@ public class ComposingWritableFrameChannelTest
     Assert.assertEquals(ImmutableSet.of(0), partitionToChannelMap.get(2));
     Assert.assertEquals(ImmutableSet.of(1), partitionToChannelMap.get(3));
 
+
     // Test if the older channel has been converted to read only
     Assert.assertThrows(ISE.class, outputChannel1::getWritableChannel);
+    composingWritableFrameChannel.close();
+
+    Exception ise1 = Assert.assertThrows(IllegalStateException.class, () -> outputChannel1.getFrameMemoryAllocator());
+    MatcherAssert.assertThat(
+        ise1,
+        ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(
+            "Frame allocator is not available. The output channel might be marked as read-only, hence memory allocator is not required."))
+    );
+
+
+    Exception ise2 = Assert.assertThrows(IllegalStateException.class, () -> outputChannel2.getFrameMemoryAllocator());
+    MatcherAssert.assertThat(
+        ise2,
+        ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(
+            "Frame allocator is not available. The output channel might be marked as read-only, hence memory allocator is not required."))
+    );
+
   }
 
   static class LimitedWritableFrameChannel implements WritableFrameChannel


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org