You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ka...@apache.org on 2023/03/10 01:28:07 UTC

[druid] branch master updated: Add a readOnly() method for PartitionedOutputChannel (#13755)

This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b0b3a9b2c Add a readOnly() method for PartitionedOutputChannel (#13755)
5b0b3a9b2c is described below

commit 5b0b3a9b2c08fd988a1967d760eaf9b2b819b1b6
Author: Laksh Singla <la...@gmail.com>
AuthorDate: Fri Mar 10 06:58:00 2023 +0530

    Add a readOnly() method for PartitionedOutputChannel (#13755)
    
    With SuperSorter using the PartitionedOutputChannels for sorting, it might OOM on inputs of reasonable size because the channel consists of both the writable frame channel and the frame allocator, both of which are not required once the output channel has been written to.
    This change adds a readOnly to the output channel which contains only the readable channel, due to which unnecessary memory references to the writable channel and the memory allocator are lost once the output channel has been written to, preventing the OOM.
---
 .../channel/ComposingWritableFrameChannel.java     |  54 ++++++--
 .../processor/ComposingOutputChannelFactory.java   |  16 ++-
 .../druid/frame/processor/OutputChannel.java       |  38 ++++--
 .../frame/processor/PartitionedOutputChannel.java  |  48 +++++--
 .../apache/druid/frame/processor/SuperSorter.java  |   6 +-
 .../channel/ComposingWritableFrameChannelTest.java | 144 +++++++++++++++++++++
 .../druid/frame/processor/OutputChannelTest.java   |   6 +-
 .../druid/frame/processor/OutputChannelsTest.java  |   6 +-
 8 files changed, 279 insertions(+), 39 deletions(-)

diff --git a/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java b/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java
index 7f2a61e437..c576c23696 100644
--- a/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java
+++ b/processing/src/main/java/org/apache/druid/frame/channel/ComposingWritableFrameChannel.java
@@ -22,6 +22,9 @@ package org.apache.druid.frame.channel;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.frame.processor.OutputChannel;
+import org.apache.druid.frame.processor.PartitionedOutputChannel;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.query.ResourceLimitExceededException;
 
@@ -38,16 +41,29 @@ import java.util.function.Supplier;
  */
 public class ComposingWritableFrameChannel implements WritableFrameChannel
 {
-  private final List<Supplier<WritableFrameChannel>> channels;
+  @Nullable
+  private final List<Supplier<OutputChannel>> outputChannelSuppliers;
+
+  @Nullable
+  private final List<Supplier<PartitionedOutputChannel>> partitionedOutputChannelSuppliers;
+
+  private final List<Supplier<WritableFrameChannel>> writableChannelSuppliers;
   private final Map<Integer, HashSet<Integer>> partitionToChannelMap;
   private int currentIndex;
 
   public ComposingWritableFrameChannel(
-      List<Supplier<WritableFrameChannel>> channels,
+      @Nullable List<Supplier<OutputChannel>> outputChannelSuppliers,
+      @Nullable List<Supplier<PartitionedOutputChannel>> partitionedOutputChannelSuppliers,
+      List<Supplier<WritableFrameChannel>> writableChannelSuppliers,
       Map<Integer, HashSet<Integer>> partitionToChannelMap
   )
   {
-    this.channels = Preconditions.checkNotNull(channels, "channels is null");
+    if (outputChannelSuppliers != null && partitionedOutputChannelSuppliers != null) {
+      throw new IAE("Atmost one of outputChannelSuppliers and partitionedOutputChannelSuppliers can be provided");
+    }
+    this.outputChannelSuppliers = outputChannelSuppliers;
+    this.partitionedOutputChannelSuppliers = partitionedOutputChannelSuppliers;
+    this.writableChannelSuppliers = Preconditions.checkNotNull(writableChannelSuppliers, "writableChannelSuppliers is null");
     this.partitionToChannelMap =
         Preconditions.checkNotNull(partitionToChannelMap, "partitionToChannelMap is null");
     this.currentIndex = 0;
@@ -56,12 +72,12 @@ public class ComposingWritableFrameChannel implements WritableFrameChannel
   @Override
   public void write(FrameWithPartition frameWithPartition) throws IOException
   {
-    if (currentIndex >= channels.size()) {
-      throw new ISE("No more channels available to write. Total available channels : " + channels.size());
+    if (currentIndex >= writableChannelSuppliers.size()) {
+      throw new ISE("No more channels available to write. Total available channels : " + writableChannelSuppliers.size());
     }
 
     try {
-      channels.get(currentIndex).get().write(frameWithPartition);
+      writableChannelSuppliers.get(currentIndex).get().write(frameWithPartition);
       partitionToChannelMap.computeIfAbsent(frameWithPartition.partition(), k -> Sets.newHashSetWithExpectedSize(1))
                            .add(currentIndex);
     }
@@ -70,9 +86,19 @@ public class ComposingWritableFrameChannel implements WritableFrameChannel
       // exception is automatically passed up to the user incase all the channels are exhausted. If in future, more
       // cases come up to dictate control flow, then we can switch to returning a custom object from the channel's write
       // operation.
-      channels.get(currentIndex).get().close();
+      writableChannelSuppliers.get(currentIndex).get().close();
+
+      // We are converting the corresponding channel to read only after exhausting it because that channel won't be used
+      // for writes anymore
+      if (outputChannelSuppliers != null) {
+        outputChannelSuppliers.get(currentIndex).get().convertToReadOnly();
+      }
+      if (partitionedOutputChannelSuppliers != null) {
+        partitionedOutputChannelSuppliers.get(currentIndex).get().convertToReadOnly();
+      }
+
       currentIndex++;
-      if (currentIndex >= channels.size()) {
+      if (currentIndex >= writableChannelSuppliers.size()) {
         throw rlee;
       }
       write(frameWithPartition);
@@ -82,7 +108,7 @@ public class ComposingWritableFrameChannel implements WritableFrameChannel
   @Override
   public void fail(@Nullable Throwable cause) throws IOException
   {
-    for (Supplier<WritableFrameChannel> channel : channels) {
+    for (Supplier<WritableFrameChannel> channel : writableChannelSuppliers) {
       channel.get().fail(cause);
     }
   }
@@ -90,21 +116,21 @@ public class ComposingWritableFrameChannel implements WritableFrameChannel
   @Override
   public void close() throws IOException
   {
-    if (currentIndex < channels.size()) {
-      channels.get(currentIndex).get().close();
-      currentIndex = channels.size();
+    if (currentIndex < writableChannelSuppliers.size()) {
+      writableChannelSuppliers.get(currentIndex).get().close();
+      currentIndex = writableChannelSuppliers.size();
     }
   }
 
   @Override
   public boolean isClosed()
   {
-    return currentIndex == channels.size();
+    return currentIndex == writableChannelSuppliers.size();
   }
 
   @Override
   public ListenableFuture<?> writabilityFuture()
   {
-    return channels.get(currentIndex).get().writabilityFuture();
+    return writableChannelSuppliers.get(currentIndex).get().writabilityFuture();
   }
 }
diff --git a/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java b/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java
index 8ca9aa4f6f..cf94262ac3 100644
--- a/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java
+++ b/processing/src/main/java/org/apache/druid/frame/processor/ComposingOutputChannelFactory.java
@@ -60,6 +60,7 @@ public class ComposingOutputChannelFactory implements OutputChannelFactory
   {
     ImmutableList.Builder<Supplier<WritableFrameChannel>> writableFrameChannelSuppliersBuilder = ImmutableList.builder();
     ImmutableList.Builder<Supplier<ReadableFrameChannel>> readableFrameChannelSuppliersBuilder = ImmutableList.builder();
+    ImmutableList.Builder<Supplier<OutputChannel>> outputChannelSupplierBuilder = ImmutableList.builder();
     for (OutputChannelFactory channelFactory : channelFactories) {
       // open channel lazily
       Supplier<OutputChannel> channel =
@@ -71,14 +72,19 @@ public class ComposingOutputChannelFactory implements OutputChannelFactory
               throw new UncheckedIOException(e);
             }
           })::get;
+      outputChannelSupplierBuilder.add(channel);
       writableFrameChannelSuppliersBuilder.add(() -> channel.get().getWritableChannel());
-      readableFrameChannelSuppliersBuilder.add(() -> channel.get().getReadableChannelSupplier().get());
+      // We read the output channel once they have been written to, and therefore it is space efficient and safe to
+      // save their read only copies
+      readableFrameChannelSuppliersBuilder.add(() -> channel.get().readOnly().getReadableChannelSupplier().get());
     }
 
     // the map maintains a mapping of channels which have the data for a given partition.
     // it is useful to identify the readable channels to open in the composition while reading the partition data.
     Map<Integer, HashSet<Integer>> partitionToChannelMap = new HashMap<>();
     ComposingWritableFrameChannel writableFrameChannel = new ComposingWritableFrameChannel(
+        outputChannelSupplierBuilder.build(),
+        null,
         writableFrameChannelSuppliersBuilder.build(),
         partitionToChannelMap
     );
@@ -103,6 +109,7 @@ public class ComposingOutputChannelFactory implements OutputChannelFactory
     ImmutableList.Builder<Supplier<WritableFrameChannel>> writableFrameChannelsBuilder = ImmutableList.builder();
     ImmutableList.Builder<Supplier<PartitionedReadableFrameChannel>> readableFrameChannelSuppliersBuilder =
         ImmutableList.builder();
+    ImmutableList.Builder<Supplier<PartitionedOutputChannel>> partitionedOutputChannelSupplierBuilder = ImmutableList.builder();
     for (OutputChannelFactory channelFactory : channelFactories) {
       Supplier<PartitionedOutputChannel> channel =
           Suppliers.memoize(() -> {
@@ -113,14 +120,19 @@ public class ComposingOutputChannelFactory implements OutputChannelFactory
               throw new UncheckedIOException(e);
             }
           })::get;
+      partitionedOutputChannelSupplierBuilder.add(channel);
       writableFrameChannelsBuilder.add(() -> channel.get().getWritableChannel());
-      readableFrameChannelSuppliersBuilder.add(() -> channel.get().getReadableChannelSupplier().get());
+      // We read the output channel once they have been written to, and therefore it is space efficient and safe to
+      // save their read only copies
+      readableFrameChannelSuppliersBuilder.add(() -> channel.get().readOnly().getReadableChannelSupplier().get());
     }
     // the map maintains a mapping of channels which have the data for a given partition.
     // it is useful to identify the readable channels to open in the composition while reading the partition data.
 
     Map<Integer, HashSet<Integer>> partitionToChannelMap = new HashMap<>();
     ComposingWritableFrameChannel writableFrameChannel = new ComposingWritableFrameChannel(
+        null,
+        partitionedOutputChannelSupplierBuilder.build(),
         writableFrameChannelsBuilder.build(),
         partitionToChannelMap
     );
diff --git a/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java b/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java
index ac0e0a5fac..e1377eddca 100644
--- a/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java
+++ b/processing/src/main/java/org/apache/druid/frame/processor/OutputChannel.java
@@ -21,6 +21,7 @@ package org.apache.druid.frame.processor;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Suppliers;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
 import org.apache.druid.frame.allocation.MemoryAllocator;
 import org.apache.druid.frame.channel.FrameWithPartition;
 import org.apache.druid.frame.channel.ReadableFrameChannel;
@@ -42,11 +43,16 @@ import java.util.function.Supplier;
  */
 public class OutputChannel
 {
+  @GuardedBy("this")
   @Nullable
-  private final WritableFrameChannel writableChannel;
+  private WritableFrameChannel writableChannel;
+
+  @GuardedBy("this")
   @Nullable
-  private final MemoryAllocator frameMemoryAllocator;
+  private MemoryAllocator frameMemoryAllocator;
+
   private final Supplier<ReadableFrameChannel> readableChannelSupplier;
+
   private final boolean readableChannelUsableWhileWriting;
   private final int partitionNumber;
 
@@ -157,12 +163,14 @@ public class OutputChannel
   }
 
   /**
-   * Returns the writable channel of this pair. The producer writes to this channel.
+   * Returns the writable channel of this pair. The producer writes to this channel. Throws ISE if the output channel is
+   * read only.
    */
-  public WritableFrameChannel getWritableChannel()
+  public synchronized WritableFrameChannel getWritableChannel()
   {
     if (writableChannel == null) {
-      throw new ISE("Writable channel is not available");
+      throw new ISE("Writable channel is not available. The output channel might be marked as read-only,"
+                    + " hence no writes are allowed.");
     } else {
       return writableChannel;
     }
@@ -170,11 +178,13 @@ public class OutputChannel
 
   /**
    * Returns the memory allocator for the writable channel. The producer uses this to generate frames for the channel.
+   * Throws ISE if the output channel is read only.
    */
-  public MemoryAllocator getFrameMemoryAllocator()
+  public synchronized MemoryAllocator getFrameMemoryAllocator()
   {
     if (frameMemoryAllocator == null) {
-      throw new ISE("Writable channel is not available");
+      throw new ISE("Frame allocator is not available. The output channel might be marked as read-only,"
+                    + " hence memory allocator is not required.");
     } else {
       return frameMemoryAllocator;
     }
@@ -197,7 +207,7 @@ public class OutputChannel
   /**
    * Whether {@link #getReadableChannel()} is ready to use.
    */
-  public boolean isReadableChannelReady()
+  public synchronized boolean isReadableChannelReady()
   {
     return readableChannelUsableWhileWriting || writableChannel == null || writableChannel.isClosed();
   }
@@ -212,7 +222,7 @@ public class OutputChannel
     return partitionNumber;
   }
 
-  public OutputChannel mapWritableChannel(final Function<WritableFrameChannel, WritableFrameChannel> mapFn)
+  public synchronized OutputChannel mapWritableChannel(final Function<WritableFrameChannel, WritableFrameChannel> mapFn)
   {
     if (writableChannel == null) {
       return this;
@@ -235,4 +245,14 @@ public class OutputChannel
   {
     return OutputChannel.readOnly(readableChannelSupplier, partitionNumber);
   }
+
+  /**
+   * Removes the reference to the {@link #writableChannel} and {@link #frameMemoryAllocator} from the object, making
+   * it more efficient
+   */
+  public synchronized void convertToReadOnly()
+  {
+    this.writableChannel = null;
+    this.frameMemoryAllocator = null;
+  }
 }
diff --git a/processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java b/processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java
index 3e455545b0..34ad2c2323 100644
--- a/processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java
+++ b/processing/src/main/java/org/apache/druid/frame/processor/PartitionedOutputChannel.java
@@ -21,6 +21,7 @@ package org.apache.druid.frame.processor;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Suppliers;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
 import org.apache.druid.frame.allocation.MemoryAllocator;
 import org.apache.druid.frame.channel.PartitionedReadableFrameChannel;
 import org.apache.druid.frame.channel.WritableFrameChannel;
@@ -37,10 +38,15 @@ import java.util.function.Supplier;
  */
 public class PartitionedOutputChannel
 {
+
+  @GuardedBy("this")
   @Nullable
-  private final WritableFrameChannel writableChannel;
+  private WritableFrameChannel writableChannel;
+
+  @GuardedBy("this")
   @Nullable
-  private final MemoryAllocator frameMemoryAllocator;
+  private MemoryAllocator frameMemoryAllocator;
+
   private final Supplier<PartitionedReadableFrameChannel> readableChannelSupplier;
 
   private PartitionedOutputChannel(
@@ -76,12 +82,14 @@ public class PartitionedOutputChannel
   }
 
   /**
-   * Returns the writable channel of this pair. The producer writes to this channel.
+   * Returns the writable channel of this pair. The producer writes to this channel. Throws ISE if the output channel is
+   * read only.
    */
-  public WritableFrameChannel getWritableChannel()
+  public synchronized WritableFrameChannel getWritableChannel()
   {
     if (writableChannel == null) {
-      throw new ISE("Writable channel is not available");
+      throw new ISE("Writable channel is not available. The output channel might be marked as read-only,"
+                    + " hence no writes are allowed.");
     } else {
       return writableChannel;
     }
@@ -89,11 +97,13 @@ public class PartitionedOutputChannel
 
   /**
    * Returns the memory allocator for the writable channel. The producer uses this to generate frames for the channel.
+   * Throws ISE if the output channel is read only.
    */
-  public MemoryAllocator getFrameMemoryAllocator()
+  public synchronized MemoryAllocator getFrameMemoryAllocator()
   {
     if (frameMemoryAllocator == null) {
-      throw new ISE("Writable channel is not available");
+      throw new ISE("Frame allocator is not available. The output channel might be marked as read-only,"
+                    + " hence memory allocator is not required.");
     } else {
       return frameMemoryAllocator;
     }
@@ -102,12 +112,12 @@ public class PartitionedOutputChannel
   /**
    * Returns the partitioned readable channel supplier of this pair. The consumer reads from this channel.
    */
-  public Supplier<PartitionedReadableFrameChannel> getReadableChannelSupplier()
+  public synchronized Supplier<PartitionedReadableFrameChannel> getReadableChannelSupplier()
   {
     return readableChannelSupplier;
   }
 
-  public PartitionedOutputChannel mapWritableChannel(final Function<WritableFrameChannel, WritableFrameChannel> mapFn)
+  public synchronized PartitionedOutputChannel mapWritableChannel(final Function<WritableFrameChannel, WritableFrameChannel> mapFn)
   {
     if (writableChannel == null) {
       return this;
@@ -119,4 +129,24 @@ public class PartitionedOutputChannel
       );
     }
   }
+
+
+  /**
+   * Returns a read-only version of this instance. Read-only versions have neither {@link #getWritableChannel()} nor
+   * {@link #getFrameMemoryAllocator()}, and therefore require substantially less memory.
+   */
+  public PartitionedOutputChannel readOnly()
+  {
+    return new PartitionedOutputChannel(null, null, readableChannelSupplier);
+  }
+
+  /**
+   * Removes the reference to the {@link #writableChannel} and {@link #frameMemoryAllocator} from the object, making
+   * it more efficient
+   */
+  public synchronized void convertToReadOnly()
+  {
+    this.writableChannel = null;
+    this.frameMemoryAllocator = null;
+  }
 }
diff --git a/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java b/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java
index 1be686be6f..440da49d7c 100644
--- a/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java
+++ b/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java
@@ -611,7 +611,11 @@ public class SuperSorter
         );
         writableChannel = partitionedOutputChannel.getWritableChannel();
         frameAllocatorFactory = new SingleMemoryAllocatorFactory(partitionedOutputChannel.getFrameMemoryAllocator());
-        levelAndRankToReadableChannelMap.put(levelAndRankKey, partitionedOutputChannel);
+
+        // We add the readOnly() channel even though we require the writableChannel and the frame allocator because
+        // the original partitionedOutputChannel would contain the reference to those, which would get cleaned up
+        // appropriately and not be held up in the class level map
+        levelAndRankToReadableChannelMap.put(levelAndRankKey, partitionedOutputChannel.readOnly());
       }
 
       final FrameChannelMerger worker =
diff --git a/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java b/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java
new file mode 100644
index 0000000000..c2968b3579
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/frame/channel/ComposingWritableFrameChannelTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.channel;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
+import org.apache.druid.frame.processor.OutputChannel;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.query.ResourceLimitExceededException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.function.Supplier;
+
+
+public class ComposingWritableFrameChannelTest
+{
+  @Test
+  public void testComposingWritableChannelSwitchesProperly() throws IOException
+  {
+
+    // This frame channel writes a single frame
+    WritableFrameChannel writableFrameChannel1 = new LimitedWritableFrameChannel(2);
+    WritableFrameChannel writableFrameChannel2 = new LimitedWritableFrameChannel(100);
+
+    Supplier<ReadableFrameChannel> readableFrameChannelSupplier1 = () -> null;
+    Supplier<ReadableFrameChannel> readableFrameChannelSupplier2 = () -> null;
+
+    OutputChannel outputChannel1 = OutputChannel.pair(
+        writableFrameChannel1,
+        ArenaMemoryAllocator.createOnHeap(1),
+        readableFrameChannelSupplier1,
+        1
+    );
+    OutputChannel outputChannel2 = OutputChannel.pair(
+        writableFrameChannel2,
+        ArenaMemoryAllocator.createOnHeap(1),
+        readableFrameChannelSupplier2,
+        2
+    );
+
+    Map<Integer, HashSet<Integer>> partitionToChannelMap = new HashMap<>();
+
+    ComposingWritableFrameChannel composingWritableFrameChannel = new ComposingWritableFrameChannel(
+        ImmutableList.of(
+            () -> outputChannel1,
+            () -> outputChannel2
+        ),
+        null,
+        ImmutableList.of(
+            () -> writableFrameChannel1,
+            () -> writableFrameChannel2
+        ),
+        partitionToChannelMap
+    );
+
+    composingWritableFrameChannel.write(new FrameWithPartition(Mockito.mock(Frame.class), 1));
+    composingWritableFrameChannel.write(new FrameWithPartition(Mockito.mock(Frame.class), 2));
+    composingWritableFrameChannel.write(new FrameWithPartition(Mockito.mock(Frame.class), 3));
+
+    // Assert the location of the channels where the frames have been written to
+    Assert.assertEquals(ImmutableSet.of(0), partitionToChannelMap.get(1));
+    Assert.assertEquals(ImmutableSet.of(0), partitionToChannelMap.get(2));
+    Assert.assertEquals(ImmutableSet.of(1), partitionToChannelMap.get(3));
+
+    // Test if the older channel has been converted to read only
+    Assert.assertThrows(ISE.class, outputChannel1::getWritableChannel);
+  }
+
+  static class LimitedWritableFrameChannel implements WritableFrameChannel
+  {
+    private final int maxFrames;
+    private int curFrame = 0;
+
+    public LimitedWritableFrameChannel(int maxFrames)
+    {
+      this.maxFrames = maxFrames;
+    }
+
+    @Override
+    public void write(FrameWithPartition frameWithPartition)
+    {
+      if (curFrame >= maxFrames) {
+        throw new ResourceLimitExceededException("Cannot write more frames to the channel");
+      }
+      ++curFrame;
+    }
+
+    @Override
+    public void write(Frame frame)
+    {
+    }
+
+    @Override
+    public void fail(@Nullable Throwable cause)
+    {
+
+    }
+
+    @Override
+    public void close()
+    {
+
+    }
+
+    @Override
+    public boolean isClosed()
+    {
+      return false;
+    }
+
+    @Override
+    public ListenableFuture<?> writabilityFuture()
+    {
+      return null;
+    }
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelTest.java b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelTest.java
index ac0b03b286..6e5b904ab3 100644
--- a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelTest.java
+++ b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelTest.java
@@ -44,14 +44,16 @@ public class OutputChannelTest
     final IllegalStateException e1 = Assert.assertThrows(IllegalStateException.class, channel::getWritableChannel);
     MatcherAssert.assertThat(
         e1,
-        ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Writable channel is not available"))
+        ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(
+        "Writable channel is not available. The output channel might be marked as read-only, hence no writes are allowed."))
     );
 
     // No writable channel: cannot call getFrameMemoryAllocator.
     final IllegalStateException e2 = Assert.assertThrows(IllegalStateException.class, channel::getFrameMemoryAllocator);
     MatcherAssert.assertThat(
         e2,
-        ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Writable channel is not available"))
+        ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(
+            "Frame allocator is not available. The output channel might be marked as read-only, hence memory allocator is not required."))
     );
 
     // Mapping the writable channel of a nil channel has no effect, because there is no writable channel.
diff --git a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelsTest.java b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelsTest.java
index dbef92b047..a22c7e92bf 100644
--- a/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelsTest.java
+++ b/processing/src/test/java/org/apache/druid/frame/processor/OutputChannelsTest.java
@@ -86,7 +86,8 @@ public class OutputChannelsTest
 
     MatcherAssert.assertThat(
         e,
-        ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Writable channel is not available"))
+        ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(
+            "Writable channel is not available. The output channel might be marked as read-only, hence no writes are allowed."))
     );
 
     final IllegalStateException e2 = Assert.assertThrows(
@@ -96,7 +97,8 @@ public class OutputChannelsTest
 
     MatcherAssert.assertThat(
         e2,
-        ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Writable channel is not available"))
+        ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo(
+            "Frame allocator is not available. The output channel might be marked as read-only, hence memory allocator is not required."))
     );
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org