You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ka...@apache.org on 2023/03/29 12:30:15 UTC
[druid] branch master updated: OOM fix for running MSQ jobs with `intermediateSuperSorterStorageMaxLocalBytes` set (#13974)
This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 8dce3ca4d5 OOM fix for running MSQ jobs with `intermediateSuperSorterStorageMaxLocalBytes` set (#13974)
8dce3ca4d5 is described below
commit 8dce3ca4d594ddc498c1372d4d6cef3c6c3ed056
Author: Karan Kumar <ka...@gmail.com>
AuthorDate: Wed Mar 29 18:00:00 2023 +0530
OOM fix for running MSQ jobs with `intermediateSuperSorterStorageMaxLocalBytes` set (#13974)
While using intermediateSuperSorterStorageMaxLocalBytes the super sorter was retaining references of the memory allocator.
The fix clears the current outputChannel when close() is called on the ComposingWritableFrameChannel.java
---
.../channel/ComposingWritableFrameChannel.java | 18 ++++++++++++------
.../channel/ComposingWritableFrameChannelTest.java | 21 +++++++++++++++++++++
2 files changed, 33 insertions(+), 6 deletions(-)
diff --git a/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java b/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java
index c576c23696..ed0f56c408 100644
--- a/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java
+++ b/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java
@@ -90,12 +90,7 @@ public class ComposingWritableFrameChannel implements WritableFrameChannel
// We are converting the corresponding channel to read only after exhausting it because that channel won't be used
// for writes anymore
- if (outputChannelSuppliers != null) {
- outputChannelSuppliers.get(currentIndex).get().convertToReadOnly();
- }
- if (partitionedOutputChannelSuppliers != null) {
- partitionedOutputChannelSuppliers.get(currentIndex).get().convertToReadOnly();
- }
+ convertChannelSuppliersToReadOnly(currentIndex);
currentIndex++;
if (currentIndex >= writableChannelSuppliers.size()) {
@@ -105,6 +100,16 @@ public class ComposingWritableFrameChannel implements WritableFrameChannel
}
}
+ private void convertChannelSuppliersToReadOnly(int index)
+ {
+ if (outputChannelSuppliers != null) {
+ outputChannelSuppliers.get(index).get().convertToReadOnly();
+ }
+ if (partitionedOutputChannelSuppliers != null) {
+ partitionedOutputChannelSuppliers.get(index).get().convertToReadOnly();
+ }
+ }
+
@Override
public void fail(@Nullable Throwable cause) throws IOException
{
@@ -118,6 +123,7 @@ public class ComposingWritableFrameChannel implements WritableFrameChannel
{
if (currentIndex < writableChannelSuppliers.size()) {
writableChannelSuppliers.get(currentIndex).get().close();
+ convertChannelSuppliersToReadOnly(currentIndex);
currentIndex = writableChannelSuppliers.size();
}
}
diff --git a/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java b/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java
index c2968b3579..eee8ce5e62 100644
--- a/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java
+++ b/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java
@@ -27,8 +27,11 @@ import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
import org.apache.druid.frame.processor.OutputChannel;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.query.ResourceLimitExceededException;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.mockito.Mockito;
import javax.annotation.Nullable;
@@ -89,8 +92,26 @@ public class ComposingWritableFrameChannelTest
Assert.assertEquals(ImmutableSet.of(0), partitionToChannelMap.get(2));
Assert.assertEquals(ImmutableSet.of(1), partitionToChannelMap.get(3));
+
// Test if the older channel has been converted to read only
Assert.assertThrows(ISE.class, outputChannel1::getWritableChannel);
+ composingWritableFrameChannel.close();
+
+ Exception ise1 = Assert.assertThrows(IllegalStateException.class, () -> outputChannel1.getFrameMemoryAllocator());
+ MatcherAssert.assertThat(
+ ise1,
+ ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(
+ "Frame allocator is not available. The output channel might be marked as read-only, hence memory allocator is not required."))
+ );
+
+
+ Exception ise2 = Assert.assertThrows(IllegalStateException.class, () -> outputChannel2.getFrameMemoryAllocator());
+ MatcherAssert.assertThat(
+ ise2,
+ ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(
+ "Frame allocator is not available. The output channel might be marked as read-only, hence memory allocator is not required."))
+ );
+
}
static class LimitedWritableFrameChannel implements WritableFrameChannel
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org