You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2022/09/07 14:02:29 UTC
[druid] branch master updated: MSQ extension: Fix over-capacity write in ScanQueryFrameProcessor. (#13036)
This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f00f1f754d MSQ extension: Fix over-capacity write in ScanQueryFrameProcessor. (#13036)
f00f1f754d is described below
commit f00f1f754d431bafe34b20d614c9346c6f6f5e53
Author: Gian Merlino <gi...@gmail.com>
AuthorDate: Wed Sep 7 07:02:21 2022 -0700
MSQ extension: Fix over-capacity write in ScanQueryFrameProcessor. (#13036)
* MSQ extension: Fix over-capacity write in ScanQueryFrameProcessor.
Frame processors are meant to write only one output frame per cycle.
The ScanQueryFrameProcessor would write two when reading from a channel
if the input frame cursor cycled and then the output frame filled up
while reading from the next frame.
This patch fixes the bug, and adds a test. It also makes some adjustments
to the processor code in order to make it easier to test.
* Add license header.
---
.../druid/msq/querykit/BaseLeafFrameProcessor.java | 14 +-
.../querykit/BaseLeafFrameProcessorFactory.java | 3 +-
.../groupby/GroupByPreShuffleFrameProcessor.java | 25 +--
.../GroupByPreShuffleFrameProcessorFactory.java | 22 +-
.../msq/querykit/scan/ScanQueryFrameProcessor.java | 37 ++--
.../scan/ScanQueryFrameProcessorFactory.java | 22 +-
.../querykit/scan/ScanQueryFrameProcessorTest.java | 240 +++++++++++++++++++++
7 files changed, 299 insertions(+), 64 deletions(-)
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
index 01bcacfd2e..f971a4cc73 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
@@ -24,13 +24,13 @@ import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.druid.collections.ResourceHolder;
-import org.apache.druid.frame.allocation.MemoryAllocator;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.WritableFrameChannel;
import org.apache.druid.frame.processor.FrameProcessor;
import org.apache.druid.frame.processor.FrameProcessors;
import org.apache.druid.frame.processor.ReturnOrAwait;
import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.msq.input.ReadableInput;
@@ -55,7 +55,7 @@ public abstract class BaseLeafFrameProcessor implements FrameProcessor<Long>
private final ReadableInput baseInput;
private final List<ReadableFrameChannel> inputChannels;
private final ResourceHolder<WritableFrameChannel> outputChannel;
- private final ResourceHolder<MemoryAllocator> allocator;
+ private final ResourceHolder<FrameWriterFactory> frameWriterFactoryHolder;
private final BroadcastJoinHelper broadcastJoinHelper;
private Function<SegmentReference, SegmentReference> segmentMapFn;
@@ -66,14 +66,14 @@ public abstract class BaseLeafFrameProcessor implements FrameProcessor<Long>
final Int2ObjectMap<ReadableInput> sideChannels,
final JoinableFactoryWrapper joinableFactory,
final ResourceHolder<WritableFrameChannel> outputChannel,
- final ResourceHolder<MemoryAllocator> allocator,
+ final ResourceHolder<FrameWriterFactory> frameWriterFactoryHolder,
final long memoryReservedForBroadcastJoin
)
{
this.query = query;
this.baseInput = baseInput;
this.outputChannel = outputChannel;
- this.allocator = allocator;
+ this.frameWriterFactoryHolder = frameWriterFactoryHolder;
final Pair<List<ReadableFrameChannel>, BroadcastJoinHelper> inputChannelsAndBroadcastJoinHelper =
makeInputChannelsAndBroadcastJoinHelper(
@@ -120,12 +120,12 @@ public abstract class BaseLeafFrameProcessor implements FrameProcessor<Long>
{
// Don't close the output channel, because multiple workers write to the same channel.
// The channel should be closed by the caller.
- FrameProcessors.closeAll(inputChannels(), Collections.emptyList(), outputChannel, allocator);
+ FrameProcessors.closeAll(inputChannels(), Collections.emptyList(), outputChannel, frameWriterFactoryHolder);
}
- protected MemoryAllocator getAllocator()
+ protected FrameWriterFactory getFrameWriterFactory()
{
- return allocator.get();
+ return frameWriterFactoryHolder.get();
}
protected abstract ReturnOrAwait<Long> runWithSegment(SegmentWithDescriptor segment) throws IOException;
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java
index c844a11a15..ef1275355c 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java
@@ -147,8 +147,7 @@ public abstract class BaseLeafFrameProcessorFactory extends BaseFrameProcessorFa
}
}
),
- makeLazyResourceHolder(allocatorQueueRef, ignored -> {
- }),
+ makeLazyResourceHolder(allocatorQueueRef, ignored -> {}),
stageDefinition.getSignature(),
stageDefinition.getClusterBy(),
frameContext
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
index af407f0144..5bcc4267b9 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
@@ -23,12 +23,9 @@ import com.google.common.collect.Iterables;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.frame.Frame;
-import org.apache.druid.frame.FrameType;
-import org.apache.druid.frame.allocation.MemoryAllocator;
import org.apache.druid.frame.channel.FrameWithPartition;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.WritableFrameChannel;
-import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.frame.processor.FrameProcessor;
import org.apache.druid.frame.processor.FrameRowTooLargeException;
import org.apache.druid.frame.processor.ReturnOrAwait;
@@ -36,7 +33,6 @@ import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.segment.FrameSegment;
import org.apache.druid.frame.write.FrameWriter;
import org.apache.druid.frame.write.FrameWriterFactory;
-import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
@@ -66,8 +62,6 @@ public class GroupByPreShuffleFrameProcessor extends BaseLeafFrameProcessor
{
private final GroupByQuery query;
private final GroupByStrategySelector strategySelector;
- private final RowSignature aggregationSignature;
- private final ClusterBy clusterBy;
private final ColumnSelectorFactory frameWriterColumnSelectorFactory;
private final Closer closer = Closer.create();
@@ -82,10 +76,8 @@ public class GroupByPreShuffleFrameProcessor extends BaseLeafFrameProcessor
final Int2ObjectMap<ReadableInput> sideChannels,
final GroupByStrategySelector strategySelector,
final JoinableFactoryWrapper joinableFactory,
- final RowSignature aggregationSignature,
- final ClusterBy clusterBy,
final ResourceHolder<WritableFrameChannel> outputChannel,
- final ResourceHolder<MemoryAllocator> allocator,
+ final ResourceHolder<FrameWriterFactory> frameWriterFactoryHolder,
final long memoryReservedForBroadcastJoin
)
{
@@ -95,13 +87,11 @@ public class GroupByPreShuffleFrameProcessor extends BaseLeafFrameProcessor
sideChannels,
joinableFactory,
outputChannel,
- allocator,
+ frameWriterFactoryHolder,
memoryReservedForBroadcastJoin
);
this.query = query;
this.strategySelector = strategySelector;
- this.aggregationSignature = aggregationSignature;
- this.clusterBy = clusterBy;
this.frameWriterColumnSelectorFactory = RowBasedGrouperHelper.createResultRowBasedColumnSelectorFactory(
query,
() -> resultYielder.get(),
@@ -209,16 +199,9 @@ public class GroupByPreShuffleFrameProcessor extends BaseLeafFrameProcessor
private void createFrameWriterIfNeeded()
{
if (frameWriter == null) {
- final MemoryAllocator allocator = getAllocator();
- final FrameWriterFactory frameWriterFactory =
- FrameWriters.makeFrameWriterFactory(
- FrameType.ROW_BASED,
- allocator,
- aggregationSignature,
- clusterBy.getColumns()
- );
+ final FrameWriterFactory frameWriterFactory = getFrameWriterFactory();
frameWriter = frameWriterFactory.newFrameWriter(frameWriterColumnSelectorFactory);
- currentAllocatorCapacity = allocator.capacity();
+ currentAllocatorCapacity = frameWriterFactory.allocatorCapacity();
}
}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessorFactory.java
index b73f00efa2..63ad3cf890 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessorFactory.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessorFactory.java
@@ -25,13 +25,17 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.MemoryAllocator;
import org.apache.druid.frame.channel.WritableFrameChannel;
import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.write.FrameWriters;
+import org.apache.druid.java.util.common.Pair;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.kernel.FrameContext;
import org.apache.druid.msq.querykit.BaseLeafFrameProcessorFactory;
+import org.apache.druid.msq.querykit.LazyResourceHolder;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
@@ -57,8 +61,8 @@ public class GroupByPreShuffleFrameProcessorFactory extends BaseLeafFrameProcess
protected FrameProcessor<Long> makeProcessor(
final ReadableInput baseInput,
final Int2ObjectMap<ReadableInput> sideChannels,
- final ResourceHolder<WritableFrameChannel> outputChannelSupplier,
- final ResourceHolder<MemoryAllocator> allocatorSupplier,
+ final ResourceHolder<WritableFrameChannel> outputChannelHolder,
+ final ResourceHolder<MemoryAllocator> allocatorHolder,
final RowSignature signature,
final ClusterBy clusterBy,
final FrameContext frameContext
@@ -70,10 +74,16 @@ public class GroupByPreShuffleFrameProcessorFactory extends BaseLeafFrameProcess
sideChannels,
frameContext.groupByStrategySelector(),
new JoinableFactoryWrapper(frameContext.joinableFactory()),
- signature,
- clusterBy,
- outputChannelSupplier,
- allocatorSupplier,
+ outputChannelHolder,
+ new LazyResourceHolder<>(() -> Pair.of(
+ FrameWriters.makeFrameWriterFactory(
+ FrameType.ROW_BASED,
+ allocatorHolder.get(),
+ signature,
+ clusterBy.getColumns()
+ ),
+ allocatorHolder
+ )),
frameContext.memoryParameters().getBroadcastJoinMemory()
);
}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
index 1b8d21baae..307d274c73 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
@@ -25,12 +25,9 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.frame.Frame;
-import org.apache.druid.frame.FrameType;
-import org.apache.druid.frame.allocation.MemoryAllocator;
import org.apache.druid.frame.channel.FrameWithPartition;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.WritableFrameChannel;
-import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.frame.processor.FrameProcessor;
import org.apache.druid.frame.processor.FrameRowTooLargeException;
import org.apache.druid.frame.processor.ReturnOrAwait;
@@ -39,7 +36,6 @@ import org.apache.druid.frame.segment.FrameSegment;
import org.apache.druid.frame.util.SettableLongVirtualColumn;
import org.apache.druid.frame.write.FrameWriter;
import org.apache.druid.frame.write.FrameWriterFactory;
-import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
@@ -60,7 +56,6 @@ import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
-import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.timeline.SegmentId;
@@ -79,8 +74,6 @@ import java.util.concurrent.atomic.AtomicLong;
public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
{
private final ScanQuery query;
- private final RowSignature signature;
- private final ClusterBy clusterBy;
private final AtomicLong runningCountForLimit;
private final SettableLongVirtualColumn partitionBoostVirtualColumn;
private final VirtualColumns frameWriterVirtualColumns;
@@ -93,13 +86,11 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
public ScanQueryFrameProcessor(
final ScanQuery query,
- final RowSignature signature,
- final ClusterBy clusterBy,
final ReadableInput baseInput,
final Int2ObjectMap<ReadableInput> sideChannels,
final JoinableFactoryWrapper joinableFactory,
final ResourceHolder<WritableFrameChannel> outputChannel,
- final ResourceHolder<MemoryAllocator> allocator,
+ final ResourceHolder<FrameWriterFactory> frameWriterFactoryHolder,
@Nullable final AtomicLong runningCountForLimit,
final long memoryReservedForBroadcastJoin
)
@@ -110,12 +101,10 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
sideChannels,
joinableFactory,
outputChannel,
- allocator,
+ frameWriterFactoryHolder,
memoryReservedForBroadcastJoin
);
this.query = query;
- this.signature = signature;
- this.clusterBy = clusterBy;
this.runningCountForLimit = runningCountForLimit;
this.partitionBoostVirtualColumn = new SettableLongVirtualColumn(QueryKitUtils.PARTITION_BOOST_COLUMN);
@@ -174,7 +163,8 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
cursorYielder.close();
return ReturnOrAwait.returnObject(rowsOutput);
} else {
- setNextCursor(cursorYielder.get());
+ final long rowsFlushed = setNextCursor(cursorYielder.get());
+ assert rowsFlushed == 0; // There's only ever one cursor when running with a segment
closer.register(cursorYielder);
}
}
@@ -201,9 +191,9 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
if (cursor == null || cursor.isDone()) {
if (inputChannel.canRead()) {
final Frame frame = inputChannel.read();
- final FrameSegment frameSegment = new FrameSegment(frame, inputFrameReader, SegmentId.dummy("x"));
+ final FrameSegment frameSegment = new FrameSegment(frame, inputFrameReader, SegmentId.dummy("scan"));
- setNextCursor(
+ final long rowsFlushed = setNextCursor(
Iterables.getOnlyElement(
makeCursors(
query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY)),
@@ -211,6 +201,10 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
).toList()
)
);
+
+ if (rowsFlushed > 0) {
+ return ReturnOrAwait.runAgain();
+ }
} else if (inputChannel.isFinished()) {
flushFrameWriter();
return ReturnOrAwait.returnObject(rowsOutput);
@@ -256,13 +250,11 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
private void createFrameWriterIfNeeded()
{
if (frameWriter == null) {
- final MemoryAllocator allocator = getAllocator();
- final FrameWriterFactory frameWriterFactory =
- FrameWriters.makeFrameWriterFactory(FrameType.ROW_BASED, allocator, signature, clusterBy.getColumns());
+ final FrameWriterFactory frameWriterFactory = getFrameWriterFactory();
final ColumnSelectorFactory frameWriterColumnSelectorFactory =
frameWriterVirtualColumns.wrap(cursor.getColumnSelectorFactory());
frameWriter = frameWriterFactory.newFrameWriter(frameWriterColumnSelectorFactory);
- currentAllocatorCapacity = allocator.capacity();
+ currentAllocatorCapacity = frameWriterFactory.allocatorCapacity();
}
}
@@ -285,10 +277,11 @@ public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
}
}
- private void setNextCursor(final Cursor cursor) throws IOException
+ private long setNextCursor(final Cursor cursor) throws IOException
{
- flushFrameWriter();
+ final long rowsFlushed = flushFrameWriter();
this.cursor = cursor;
+ return rowsFlushed;
}
private static Sequence<Cursor> makeCursors(final ScanQuery query, final StorageAdapter adapter)
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
index 08b41ddfc3..2a948fd456 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
@@ -25,13 +25,17 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.MemoryAllocator;
import org.apache.druid.frame.channel.WritableFrameChannel;
import org.apache.druid.frame.key.ClusterBy;
import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.write.FrameWriters;
+import org.apache.druid.java.util.common.Pair;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.kernel.FrameContext;
import org.apache.druid.msq.querykit.BaseLeafFrameProcessorFactory;
+import org.apache.druid.msq.querykit.LazyResourceHolder;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.join.JoinableFactoryWrapper;
@@ -73,8 +77,8 @@ public class ScanQueryFrameProcessorFactory extends BaseLeafFrameProcessorFactor
protected FrameProcessor<Long> makeProcessor(
ReadableInput baseInput,
Int2ObjectMap<ReadableInput> sideChannels,
- ResourceHolder<WritableFrameChannel> outputChannelSupplier,
- ResourceHolder<MemoryAllocator> allocatorSupplier,
+ ResourceHolder<WritableFrameChannel> outputChannelHolder,
+ ResourceHolder<MemoryAllocator> allocatorHolder,
RowSignature signature,
ClusterBy clusterBy,
FrameContext frameContext
@@ -82,13 +86,19 @@ public class ScanQueryFrameProcessorFactory extends BaseLeafFrameProcessorFactor
{
return new ScanQueryFrameProcessor(
query,
- signature,
- clusterBy,
baseInput,
sideChannels,
new JoinableFactoryWrapper(frameContext.joinableFactory()),
- outputChannelSupplier,
- allocatorSupplier,
+ outputChannelHolder,
+ new LazyResourceHolder<>(() -> Pair.of(
+ FrameWriters.makeFrameWriterFactory(
+ FrameType.ROW_BASED,
+ allocatorHolder.get(),
+ signature,
+ clusterBy.getColumns()
+ ),
+ allocatorHolder
+ )),
runningCountForLimit,
frameContext.memoryParameters().getBroadcastJoinMemory()
);
diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
new file mode 100644
index 0000000000..2ea2958c73
--- /dev/null
+++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit.scan;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
+import org.apache.druid.frame.allocation.HeapMemoryAllocator;
+import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.FrameProcessorExecutor;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.testutil.FrameSequenceBuilder;
+import org.apache.druid.frame.testutil.FrameTestUtil;
+import org.apache.druid.frame.write.FrameWriter;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.frame.write.FrameWriters;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.msq.input.ReadableInput;
+import org.apache.druid.msq.kernel.StageId;
+import org.apache.druid.msq.kernel.StagePartition;
+import org.apache.druid.msq.querykit.LazyResourceHolder;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.TestIndex;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
+import org.apache.druid.segment.join.JoinableFactoryWrapper;
+import org.apache.druid.segment.join.NoopJoinableFactory;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class ScanQueryFrameProcessorTest extends InitializedNullHandlingTest
+{
+ private FrameProcessorExecutor exec;
+
+ @Before
+ public void setUp()
+ {
+ exec = new FrameProcessorExecutor(MoreExecutors.listeningDecorator(Execs.singleThreaded("test-exec")));
+ }
+
+ @After
+ public void tearDown() throws Exception
+ {
+ exec.getExecutorService().shutdownNow();
+ exec.getExecutorService().awaitTermination(10, TimeUnit.MINUTES);
+ }
+
+ @Test
+ public void test_runWithInputChannel() throws Exception
+ {
+ final IncrementalIndexStorageAdapter adapter =
+ new IncrementalIndexStorageAdapter(TestIndex.getIncrementalTestIndex());
+
+ final FrameSequenceBuilder frameSequenceBuilder =
+ FrameSequenceBuilder.fromAdapter(adapter)
+ .maxRowsPerFrame(5)
+ .frameType(FrameType.ROW_BASED)
+ .allocator(ArenaMemoryAllocator.createOnHeap(100_000));
+
+ final RowSignature signature = frameSequenceBuilder.signature();
+ final List<Frame> frames = frameSequenceBuilder.frames().toList();
+ final BlockingQueueFrameChannel inputChannel = new BlockingQueueFrameChannel(frames.size());
+ final BlockingQueueFrameChannel outputChannel = BlockingQueueFrameChannel.minimal();
+
+ try (final WritableFrameChannel writableInputChannel = inputChannel.writable()) {
+ for (final Frame frame : frames) {
+ writableInputChannel.write(frame);
+ }
+ }
+
+ final ScanQuery query =
+ Druids.newScanQueryBuilder()
+ .dataSource("test")
+ .intervals(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY))
+ .columns(adapter.getRowSignature().getColumnNames())
+ .legacy(false)
+ .build();
+
+ final StagePartition stagePartition = new StagePartition(new StageId("query", 0), 0);
+
+ // Limit output frames to 1 row to ensure we test edge cases
+ final FrameWriterFactory frameWriterFactory = limitedFrameWriterFactory(
+ FrameWriters.makeFrameWriterFactory(
+ FrameType.ROW_BASED,
+ HeapMemoryAllocator.unlimited(),
+ signature,
+ Collections.emptyList()
+ ),
+ 1
+ );
+
+ final ScanQueryFrameProcessor processor = new ScanQueryFrameProcessor(
+ query,
+ ReadableInput.channel(inputChannel.readable(), FrameReader.create(signature), stagePartition),
+ Int2ObjectMaps.emptyMap(),
+ new JoinableFactoryWrapper(NoopJoinableFactory.INSTANCE),
+ new ResourceHolder<WritableFrameChannel>()
+ {
+ @Override
+ public WritableFrameChannel get()
+ {
+ return outputChannel.writable();
+ }
+
+ @Override
+ public void close()
+ {
+ try {
+ outputChannel.writable().close();
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ },
+ new LazyResourceHolder<>(() -> Pair.of(frameWriterFactory, () -> {})),
+ null,
+ 0L
+ );
+
+ ListenableFuture<Long> retVal = exec.runFully(processor, null);
+
+ final Sequence<List<Object>> rowsFromProcessor = FrameTestUtil.readRowsFromFrameChannel(
+ outputChannel.readable(),
+ FrameReader.create(signature)
+ );
+
+ FrameTestUtil.assertRowsEqual(
+ FrameTestUtil.readRowsFromAdapter(adapter, signature, false),
+ rowsFromProcessor
+ );
+
+ Assert.assertEquals(adapter.getNumRows(), (long) retVal.get());
+ }
+
+ /**
+ * Wraps a {@link FrameWriterFactory}, creating a new factory that returns {@link FrameWriter} which write
+ * a limited number of rows.
+ */
+ private static FrameWriterFactory limitedFrameWriterFactory(final FrameWriterFactory baseFactory, final int rowLimit)
+ {
+ return new FrameWriterFactory()
+ {
+ @Override
+ public FrameWriter newFrameWriter(ColumnSelectorFactory columnSelectorFactory)
+ {
+ return new LimitedFrameWriter(baseFactory.newFrameWriter(columnSelectorFactory), rowLimit);
+ }
+
+ @Override
+ public long allocatorCapacity()
+ {
+ return baseFactory.allocatorCapacity();
+ }
+ };
+ }
+
+ private static class LimitedFrameWriter implements FrameWriter
+ {
+ private final FrameWriter baseWriter;
+ private final int rowLimit;
+
+ public LimitedFrameWriter(FrameWriter baseWriter, int rowLimit)
+ {
+ this.baseWriter = baseWriter;
+ this.rowLimit = rowLimit;
+ }
+
+ @Override
+ public boolean addSelection()
+ {
+ if (baseWriter.getNumRows() >= rowLimit) {
+ return false;
+ } else {
+ return baseWriter.addSelection();
+ }
+ }
+
+ @Override
+ public int getNumRows()
+ {
+ return baseWriter.getNumRows();
+ }
+
+ @Override
+ public long getTotalSize()
+ {
+ return baseWriter.getTotalSize();
+ }
+
+ @Override
+ public long writeTo(WritableMemory memory, long position)
+ {
+ return baseWriter.writeTo(memory, position);
+ }
+
+ @Override
+ public void close()
+ {
+ baseWriter.close();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org