You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "github-code-scanning[bot] (via GitHub)" <gi...@apache.org> on 2023/02/23 21:18:42 UTC

[GitHub] [druid] github-code-scanning[bot] commented on a diff in pull request #13506: Sort-merge join and hash shuffles for MSQ.

github-code-scanning[bot] commented on code in PR #13506:
URL: https://github.com/apache/druid/pull/13506#discussion_r1116256993


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java:
##########
@@ -1370,6 +934,818 @@
     }
   }
 
+  /**
+   * Main worker logic for executing a {@link WorkOrder}.
+   */
+  private class RunWorkOrder
+  {
+    private final WorkerStageKernel kernel;
+    private final InputChannelFactory inputChannelFactory;
+    private final CounterTracker counterTracker;
+    private final FrameProcessorExecutor exec;
+    private final String cancellationId;
+    private final int parallelism;
+    private final FrameContext frameContext;
+    private final MSQWarningReportPublisher warningPublisher;
+
+    private InputSliceReader inputSliceReader;
+    private OutputChannelFactory workOutputChannelFactory;
+    private OutputChannelFactory shuffleOutputChannelFactory;
+    private ResultAndChannels<?> workResultAndOutputChannels;
+    private SettableFuture<ClusterByPartitions> stagePartitionBoundariesFuture;
+    private ListenableFuture<OutputChannels> shuffleOutputChannelsFuture;
+
+    public RunWorkOrder(
+        final WorkerStageKernel kernel,
+        final InputChannelFactory inputChannelFactory,
+        final CounterTracker counterTracker,
+        final FrameProcessorExecutor exec,
+        final String cancellationId,
+        final int parallelism,
+        final FrameContext frameContext,
+        final MSQWarningReportPublisher warningPublisher
+    )
+    {
+      this.kernel = kernel;
+      this.inputChannelFactory = inputChannelFactory;
+      this.counterTracker = counterTracker;
+      this.exec = exec;
+      this.cancellationId = cancellationId;
+      this.parallelism = parallelism;
+      this.frameContext = frameContext;
+      this.warningPublisher = warningPublisher;
+    }
+
+    private void start() throws IOException
+    {
+      final WorkOrder workOrder = kernel.getWorkOrder();
+      final StageDefinition stageDef = workOrder.getStageDefinition();
+
+      makeInputSliceReader();
+      makeWorkOutputChannelFactory();
+      makeShuffleOutputChannelFactory();
+      makeAndRunWorkProcessors();
+
+      if (stageDef.doesShuffle()) {
+        makeAndRunShuffleProcessors();
+      } else {
+        // No shuffling: work output _is_ shuffle output. Retain read-only versions to reduce memory footprint.
+        shuffleOutputChannelsFuture =
+            Futures.immediateFuture(workResultAndOutputChannels.getOutputChannels().readOnly());
+      }
+
+      setUpCompletionCallbacks();
+    }
+
+    /**
+     * Settable {@link ClusterByPartitions} future for global sort. Necessary because we don't know ahead of time
+     * what the boundaries will be. The controller decides based on statistics from all workers. Once the controller
+     * decides, its decision is written to this future, which allows sorting on workers to proceed.
+     */
+    @Nullable
+    public SettableFuture<ClusterByPartitions> getStagePartitionBoundariesFuture()
+    {
+      return stagePartitionBoundariesFuture;
+    }
+
+    private void makeInputSliceReader()
+    {
+      if (inputSliceReader != null) {
+        throw new ISE("inputSliceReader already created");
+      }
+
+      final WorkOrder workOrder = kernel.getWorkOrder();
+      final String queryId = workOrder.getQueryDefinition().getQueryId();
+
+      final InputChannels inputChannels =
+          new InputChannelsImpl(
+              workOrder.getQueryDefinition(),
+              InputSlices.allReadablePartitions(workOrder.getInputs()),
+              inputChannelFactory,
+              () -> ArenaMemoryAllocator.createOnHeap(frameContext.memoryParameters().getStandardFrameSize()),
+              exec,
+              cancellationId
+          );
+
+      inputSliceReader = new MapInputSliceReader(
+          ImmutableMap.<Class<? extends InputSlice>, InputSliceReader>builder()
+                      .put(NilInputSlice.class, NilInputSliceReader.INSTANCE)
+                      .put(StageInputSlice.class, new StageInputSliceReader(queryId, inputChannels))
+                      .put(ExternalInputSlice.class, new ExternalInputSliceReader(frameContext.tempDir()))
+                      .put(SegmentsInputSlice.class, new SegmentsInputSliceReader(frameContext.dataSegmentProvider()))
+                      .build()
+      );
+    }
+
+    private void makeWorkOutputChannelFactory()
+    {
+      if (workOutputChannelFactory != null) {
+        throw new ISE("processorOutputChannelFactory already created");
+      }
+
+      final OutputChannelFactory baseOutputChannelFactory;
+
+      if (kernel.getStageDefinition().doesShuffle()) {
+        // Writing to a consumer in the same JVM (which will be set up later on in this method). Use the large frame
+        // size if we're writing to a SuperSorter, since we'll generate fewer temp files if we use larger frames.
+        // Otherwise, use the standard frame size.
+        final int frameSize;
+
+        if (kernel.getStageDefinition().getShuffleSpec().kind().isSort()) {
+          frameSize = frameContext.memoryParameters().getLargeFrameSize();
+        } else {
+          frameSize = frameContext.memoryParameters().getStandardFrameSize();
+        }
+
+        baseOutputChannelFactory = new BlockingQueueOutputChannelFactory(frameSize);
+      } else {
+        // Writing stage output.
+        baseOutputChannelFactory =
+            makeStageOutputChannelFactory(frameContext, kernel.getStageDefinition().getStageNumber());
+      }
+
+      workOutputChannelFactory = new CountingOutputChannelFactory(
+          baseOutputChannelFactory,
+          counterTracker.channel(CounterNames.outputChannel())
+      );
+    }
+
+    private void makeShuffleOutputChannelFactory()
+    {
+      shuffleOutputChannelFactory =
+          new CountingOutputChannelFactory(
+              makeStageOutputChannelFactory(frameContext, kernel.getStageDefinition().getStageNumber()),
+              counterTracker.channel(CounterNames.shuffleChannel())
+          );
+    }
+
+    private <FactoryType extends FrameProcessorFactory<I, WorkerClass, T, R>, I, WorkerClass extends FrameProcessor<T>, T, R> void makeAndRunWorkProcessors()
+        throws IOException
+    {
+      if (workResultAndOutputChannels != null) {
+        throw new ISE("workResultAndOutputChannels already set");
+      }
+
+      @SuppressWarnings("unchecked")
+      final FactoryType processorFactory = (FactoryType) kernel.getStageDefinition().getProcessorFactory();
+
+      @SuppressWarnings("unchecked")
+      final ProcessorsAndChannels<WorkerClass, T> processors =
+          processorFactory.makeProcessors(
+              kernel.getStageDefinition(),
+              kernel.getWorkOrder().getWorkerNumber(),
+              kernel.getWorkOrder().getInputs(),
+              inputSliceReader,
+              (I) kernel.getWorkOrder().getExtraInfo(),
+              workOutputChannelFactory,
+              frameContext,
+              parallelism,
+              counterTracker,
+              e -> warningPublisher.publishException(kernel.getStageDefinition().getStageNumber(), e)
+          );
+
+      final Sequence<WorkerClass> processorSequence = processors.processors();
+
+      final int maxOutstandingProcessors;
+
+      if (processors.getOutputChannels().getAllChannels().isEmpty()) {
+        // No output channels: run up to "parallelism" processors at once.
+        maxOutstandingProcessors = Math.max(1, parallelism);
+      } else {
+        // If there are output channels, that acts as a ceiling on the number of processors that can run at once.
+        maxOutstandingProcessors =
+            Math.max(1, Math.min(parallelism, processors.getOutputChannels().getAllChannels().size()));
+      }
+
+      final ListenableFuture<R> workResultFuture = exec.runAllFully(
+          processorSequence,
+          processorFactory.newAccumulatedResult(),
+          processorFactory::accumulateResult,
+          maxOutstandingProcessors,
+          processorBouncer,
+          cancellationId
+      );
+
+      workResultAndOutputChannels = new ResultAndChannels<>(workResultFuture, processors.getOutputChannels());
+    }
+
+    private void makeAndRunShuffleProcessors()
+    {
+      if (shuffleOutputChannelsFuture != null) {
+        throw new ISE("shuffleOutputChannelsFuture already set");
+      }
+
+      final ShuffleSpec shuffleSpec = kernel.getWorkOrder().getStageDefinition().getShuffleSpec();
+
+      final ShufflePipelineBuilder shufflePipeline = new ShufflePipelineBuilder(
+          kernel,
+          counterTracker,
+          exec,
+          cancellationId,
+          frameContext
+      );
+
+      shufflePipeline.initialize(workResultAndOutputChannels);
+
+      switch (shuffleSpec.kind()) {
+        case MIX:
+          shufflePipeline.mix(shuffleOutputChannelFactory);
+          break;
+
+        case HASH:
+          shufflePipeline.hashPartition(shuffleOutputChannelFactory);
+          break;
+
+        case HASH_LOCAL_SORT:
+          final OutputChannelFactory hashOutputChannelFactory;
+
+          if (shuffleSpec.partitionCount() == 1) {
+            // Single partition; no need to write temporary files.
+            hashOutputChannelFactory =
+                new BlockingQueueOutputChannelFactory(frameContext.memoryParameters().getStandardFrameSize());
+          } else {
+            // Multi-partition; write temporary files and then sort each one file-by-file.
+            hashOutputChannelFactory =
+                new FileOutputChannelFactory(
+                    context.tempDir(kernel.getStageDefinition().getStageNumber(), "hash-parts"),
+                    frameContext.memoryParameters().getStandardFrameSize(),
+                    null
+                );
+          }
+
+          shufflePipeline.hashPartition(hashOutputChannelFactory);
+          shufflePipeline.localSort(shuffleOutputChannelFactory);
+          break;
+
+        case GLOBAL_SORT:
+          shufflePipeline.gatherResultKeyStatisticsIfNeeded();
+          shufflePipeline.globalSort(shuffleOutputChannelFactory, makeGlobalSortPartitionBoundariesFuture());
+          break;
+
+        default:
+          throw new UOE("Cannot handle shuffle kind [%s]", shuffleSpec.kind());
+      }
+
+      shuffleOutputChannelsFuture = shufflePipeline.build();
+    }
+
+    private ListenableFuture<ClusterByPartitions> makeGlobalSortPartitionBoundariesFuture()
+    {
+      if (kernel.getStageDefinition().mustGatherResultKeyStatistics()) {
+        if (stagePartitionBoundariesFuture != null) {
+          throw new ISE("Cannot call 'makeGlobalSortPartitionBoundariesFuture' twice");
+        }
+
+        return (stagePartitionBoundariesFuture = SettableFuture.create());
+      } else {
+        return Futures.immediateFuture(kernel.getResultPartitionBoundaries());
+      }
+    }
+
+    private void setUpCompletionCallbacks()
+    {
+      final StageDefinition stageDef = kernel.getStageDefinition();
+
+      Futures.addCallback(
+          Futures.allAsList(
+              Arrays.asList(
+                  workResultAndOutputChannels.getResultFuture(),
+                  shuffleOutputChannelsFuture
+              )
+          ),
+          new FutureCallback<List<Object>>()
+          {
+            @Override
+            public void onSuccess(final List<Object> workerResultAndOutputChannelsResolved)
+            {
+              final Object resultObject = workerResultAndOutputChannelsResolved.get(0);
+              final OutputChannels outputChannels = (OutputChannels) workerResultAndOutputChannelsResolved.get(1);
+
+              for (OutputChannel channel : outputChannels.getAllChannels()) {
+                try {
+                  stageOutputs.computeIfAbsent(stageDef.getId(), ignored1 -> new ConcurrentHashMap<>())
+                              .computeIfAbsent(channel.getPartitionNumber(), ignored2 -> channel.getReadableChannel());
+                }
+                catch (Exception e) {
+                  kernelManipulationQueue.add(holder -> {
+                    throw new RE(e, "Worker completion callback error for stage [%s]", stageDef.getId());
+                  });
+
+                  // Don't make the "setResultsComplete" call below.
+                  return;
+                }
+              }
+
+              // Once the outputs channels have been resolved and are ready for reading, write success file, if
+              // using durable storage.
+              writeDurableStorageSuccessFileIfNeeded(stageDef.getStageNumber());
+
+              kernelManipulationQueue.add(holder -> holder.getStageKernelMap()
+                                                          .get(stageDef.getId())
+                                                          .setResultsComplete(resultObject));
+            }
+
+            @Override
+            public void onFailure(final Throwable t)
+            {
+              kernelManipulationQueue.add(
+                  kernelHolder ->
+                      kernelHolder.getStageKernelMap().get(stageDef.getId()).fail(t)
+              );
+            }
+          }
+      );
+    }
+
+    /**
+     * Write {@link DurableStorageUtils#SUCCESS_MARKER_FILENAME} for a particular stage, if durable storage is enabled.
+     */
+    private void writeDurableStorageSuccessFileIfNeeded(final int stageNumber)
+    {
+      if (!durableStageStorageEnabled) {
+        return;
+      }
+
+      DurableStorageOutputChannelFactory durableStorageOutputChannelFactory =
+          DurableStorageOutputChannelFactory.createStandardImplementation(
+              task.getControllerTaskId(),
+              task().getWorkerNumber(),
+              stageNumber,
+              task().getId(),
+              frameContext.memoryParameters().getStandardFrameSize(),
+              MSQTasks.makeStorageConnector(context.injector()),
+              context.tempDir()
+          );
+      try {
+        durableStorageOutputChannelFactory.createSuccessFile(task.getId());
+      }
+      catch (IOException e) {
+        throw new ISE(
+            e,
+            "Unable to create the success file [%s] at the location [%s]",
+            DurableStorageUtils.SUCCESS_MARKER_FILENAME,
+            DurableStorageUtils.getSuccessFilePath(
+                task.getControllerTaskId(),
+                stageNumber,
+                task().getWorkerNumber()
+            )
+        );
+      }
+    }
+  }
+
+  /**
+   * Helper for {@link RunWorkOrder#makeAndRunShuffleProcessors()}. Builds a {@link FrameProcessor} pipeline to
+   * handle the shuffle.
+   */
+  private class ShufflePipelineBuilder
+  {
+    private final WorkerStageKernel kernel;
+    private final CounterTracker counterTracker;
+    private final FrameProcessorExecutor exec;
+    private final String cancellationId;
+    private final FrameContext frameContext;
+
+    // Current state of the pipeline. It's a future to allow pipeline construction to be deferred if necessary.
+    private ListenableFuture<ResultAndChannels<?>> pipelineFuture;
+
+    public ShufflePipelineBuilder(
+        final WorkerStageKernel kernel,
+        final CounterTracker counterTracker,
+        final FrameProcessorExecutor exec,
+        final String cancellationId,
+        final FrameContext frameContext
+    )
+    {
+      this.kernel = kernel;
+      this.counterTracker = counterTracker;
+      this.exec = exec;
+      this.cancellationId = cancellationId;
+      this.frameContext = frameContext;
+    }
+
+    /**
+     * Start the pipeline with the outputs of the main processor.
+     */
+    public void initialize(final ResultAndChannels<?> resultAndChannels)
+    {
+      if (pipelineFuture != null) {
+        throw new ISE("already initialized");
+      }
+
+      pipelineFuture = Futures.immediateFuture(resultAndChannels);
+    }
+
+    /**
+     * Add {@link FrameChannelMixer}, which mixes all current outputs into a single channel from the provided factory.
+     */
+    public void mix(final OutputChannelFactory outputChannelFactory)
+    {
+      // No sorting or statistics gathering, just combining all outputs into one big partition. Use a mixer to get
+      // everything into one file. Note: even if there is only one output channel, we'll run it through the mixer
+      // anyway, to ensure the data gets written to a file. (httpGetChannelData requires files.)
+
+      push(
+          resultAndChannels -> {
+            final OutputChannel outputChannel = outputChannelFactory.openChannel(0);
+
+            final FrameChannelMixer mixer =
+                new FrameChannelMixer(
+                    resultAndChannels.getOutputChannels().getAllReadableChannels(),
+                    outputChannel.getWritableChannel()
+                );
+
+            return new ResultAndChannels<>(
+                exec.runFully(mixer, cancellationId),
+                OutputChannels.wrap(Collections.singletonList(outputChannel.readOnly()))
+            );
+          }
+      );
+    }
+
+    /**
+     * Add {@link KeyStatisticsCollectionProcessor} if {@link StageDefinition#mustGatherResultKeyStatistics()}.
+     */
+    public void gatherResultKeyStatisticsIfNeeded()
+    {
+      push(
+          resultAndChannels -> {
+            final StageDefinition stageDefinition = kernel.getStageDefinition();
+            final OutputChannels channels = resultAndChannels.getOutputChannels();
+
+            if (channels.getAllChannels().isEmpty()) {
+              // No data coming out of this processor. Report empty statistics, if the kernel is expecting statistics.
+              if (stageDefinition.mustGatherResultKeyStatistics()) {
+                kernelManipulationQueue.add(
+                    holder ->
+                        holder.getStageKernelMap().get(stageDefinition.getId())
+                              .setResultKeyStatisticsSnapshot(ClusterByStatisticsSnapshot.empty())
+                );
+              }
+
+              // Generate one empty channel so the SuperSorter has something to do.
+              final BlockingQueueFrameChannel channel = BlockingQueueFrameChannel.minimal();
+              channel.writable().close();
+
+              final OutputChannel outputChannel = OutputChannel.readOnly(
+                  channel.readable(),
+                  FrameWithPartition.NO_PARTITION
+              );
+
+              return new ResultAndChannels<>(
+                  Futures.immediateFuture(null),
+                  OutputChannels.wrap(Collections.singletonList(outputChannel))
+              );
+            } else if (stageDefinition.mustGatherResultKeyStatistics()) {
+              return gatherResultKeyStatistics(channels);
+            } else {
+              return resultAndChannels;
+            }
+          }
+      );
+    }
+
+    /**
+     * Add a {@link SuperSorter} using {@link StageDefinition#getSortKey()} and partition boundaries
+     * from {@code partitionBoundariesFuture}.
+     */
+    public void globalSort(
+        final OutputChannelFactory outputChannelFactory,
+        final ListenableFuture<ClusterByPartitions> partitionBoundariesFuture
+    )
+    {
+      pushAsync(
+          resultAndChannels -> {
+            final StageDefinition stageDefinition = kernel.getStageDefinition();
+
+            final File sorterTmpDir = context.tempDir(stageDefinition.getStageNumber(), "super-sort");
+            FileUtils.mkdirp(sorterTmpDir);
+            if (!sorterTmpDir.isDirectory()) {
+              throw new IOException("Cannot create directory: " + sorterTmpDir);
+            }
+
+            final WorkerMemoryParameters memoryParameters = frameContext.memoryParameters();
+            final SuperSorter sorter = new SuperSorter(
+                resultAndChannels.getOutputChannels().getAllReadableChannels(),
+                stageDefinition.getFrameReader(),
+                stageDefinition.getSortKey(),
+                partitionBoundariesFuture,
+                exec,
+                outputChannelFactory,
+                makeSuperSorterIntermediateOutputChannelFactory(
+                    frameContext,
+                    stageDefinition.getStageNumber(),
+                    sorterTmpDir
+                ),
+                memoryParameters.getSuperSorterMaxActiveProcessors(),
+                memoryParameters.getSuperSorterMaxChannelsPerProcessor(),
+                -1,
+                cancellationId,
+                counterTracker.sortProgress()
+            );
+
+            return FutureUtils.transform(
+                sorter.run(),
+                sortedChannels -> new ResultAndChannels<>(Futures.immediateFuture(null), sortedChannels)
+            );
+          }
+      );
+    }
+
+    /**
+     * Add a {@link FrameChannelHashPartitioner} using {@link StageDefinition#getSortKey()}.
+     */
+    public void hashPartition(final OutputChannelFactory outputChannelFactory)
+    {
+      pushAsync(
+          resultAndChannels -> {
+            final ShuffleSpec shuffleSpec = kernel.getStageDefinition().getShuffleSpec();
+            final int partitions = shuffleSpec.partitionCount();
+
+            final List<OutputChannel> outputChannels = new ArrayList<>();
+
+            for (int i = 0; i < partitions; i++) {
+              outputChannels.add(outputChannelFactory.openChannel(i));
+            }
+
+            final FrameChannelHashPartitioner partitioner = new FrameChannelHashPartitioner(
+                resultAndChannels.getOutputChannels().getAllReadableChannels(),
+                outputChannels.stream().map(OutputChannel::getWritableChannel).collect(Collectors.toList()),
+                kernel.getStageDefinition().getFrameReader(),
+                kernel.getStageDefinition().getClusterBy().getColumns().size(),
+                FrameWriters.makeFrameWriterFactory(
+                    FrameType.ROW_BASED,
+                    new ArenaMemoryAllocatorFactory(frameContext.memoryParameters().getStandardFrameSize()),
+                    kernel.getStageDefinition().getSignature(),
+                    kernel.getStageDefinition().getSortKey()
+                )
+            );
+
+            final ListenableFuture<Long> partitionerFuture = exec.runFully(partitioner, cancellationId);
+
+            final ResultAndChannels<Long> retVal =
+                new ResultAndChannels<>(partitionerFuture, OutputChannels.wrap(outputChannels));
+
+            if (retVal.getOutputChannels().areReadableChannelsReady()) {
+              return Futures.immediateFuture(retVal);
+            } else {
+              return FutureUtils.transform(partitionerFuture, ignored -> retVal);
+            }
+          }
+      );
+    }
+
+    /**
+     * Add a sequence of {@link SuperSorter}, operating on each current output channel in order, one at a time.
+     */
+    public void localSort(final OutputChannelFactory outputChannelFactory)
+    {
+      pushAsync(
+          resultAndChannels -> {
+            final WorkerMemoryParameters memoryParameters = frameContext.memoryParameters();

Review Comment:
   ## Unread local variable
   
   Variable 'WorkerMemoryParameters memoryParameters' is never read.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4318)



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/common/SortMergeJoinFrameProcessor.java:
##########
@@ -0,0 +1,1075 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit.common;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.FrameWithPartition;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.key.FrameComparisonWidget;
+import org.apache.druid.frame.key.KeyColumn;
+import org.apache.druid.frame.key.RowKey;
+import org.apache.druid.frame.key.RowKeyReader;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.FrameProcessors;
+import org.apache.druid.frame.processor.FrameRowTooLargeException;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.segment.FrameCursor;
+import org.apache.druid.frame.write.FrameWriter;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.msq.exec.Limits;
+import org.apache.druid.msq.indexing.error.MSQException;
+import org.apache.druid.msq.indexing.error.TooManyRowsWithSameKeyFault;
+import org.apache.druid.msq.input.ReadableInput;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.filter.ValueMatcher;
+import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.DimensionSelectorUtils;
+import org.apache.druid.segment.IdLookup;
+import org.apache.druid.segment.NilColumnValueSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.segment.data.ZeroIndexedInts;
+import org.apache.druid.segment.join.JoinPrefixUtils;
+import org.apache.druid.segment.join.JoinType;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Processor for a sort-merge join of two inputs.
+ *
+ * Prerequisites:
+ *
+ * 1) Two inputs, both of which are stages; i.e. {@link ReadableInput#hasChannel()}.
+ *
+ * 2) Conditions are all simple equalities. Validated by {@link SortMergeJoinFrameProcessorFactory#validateCondition}
+ * and then transformed to lists of key columns by {@link SortMergeJoinFrameProcessorFactory#toKeyColumns}.
+ *
+ * 3) Both inputs are comprised of {@link org.apache.druid.frame.FrameType#ROW_BASED} frames, are sorted by the same
+ * key, and that key can be used to check the provided condition. Validated by
+ * {@link SortMergeJoinFrameProcessorFactory#validateInputFrameSignatures}.
+ *
+ * Algorithm:
+ *
+ * 1) Read current key from each side of the join.
+ *
+ * 2) If there is no match, emit or skip the row for the earlier key, as appropriate, based on the join type.
+ *
+ * 3) If there is a match, identify a complete set on one side or the other. (It doesn't matter which side has the
+ * complete set, but we need it on one of them.) We mark the first row for the key using {@link Tracker#markCurrent()}
+ * and find complete sets using {@link Tracker#hasCompleteSetForMark()}. Once we find one, we store it in
+ * {@link #trackerWithCompleteSetForCurrentKey}. If both sides have a complete set, we break ties by choosing the
+ * left side.
+ *
+ * 4) Once a complete set for the current key is identified: for each row on the *other* side, loop through the entire
+ * set of rows on {@link #trackerWithCompleteSetForCurrentKey}, and emit that many joined rows.
+ *
+ * 5) Once we process the final row on the *other* side, reset both marks with {@link Tracker#markCurrent()} and
+ * continue the algorithm.
+ */
+public class SortMergeJoinFrameProcessor implements FrameProcessor<Long>
+{
+  private static final int LEFT = 0;
+  private static final int RIGHT = 1;
+
+  /**
+   * Input channels for each side of the join. Two-element array: {@link #LEFT} and {@link #RIGHT}.
+   */
+  private final List<ReadableFrameChannel> inputChannels;
+
+  /**
+   * Trackers for each side of the join. Two-element array: {@link #LEFT} and {@link #RIGHT}.
+   */
+  private final List<Tracker> trackers;
+
+  private final WritableFrameChannel outputChannel;
+  private final FrameWriterFactory frameWriterFactory;
+  private final String rightPrefix;
+  private final JoinType joinType;
+  private final JoinColumnSelectorFactory joinColumnSelectorFactory = new JoinColumnSelectorFactory();
+  private FrameWriter frameWriter = null;
+
+  // Used by runIncrementally to defer certain logic to the next run.
+  private Runnable nextIterationRunnable = null;
+
+  // Used by runIncrementally to remember which tracker has the complete set for the current key.
+  private int trackerWithCompleteSetForCurrentKey = -1;
+
+  SortMergeJoinFrameProcessor(
+      ReadableInput left,
+      ReadableInput right,
+      WritableFrameChannel outputChannel,
+      FrameWriterFactory frameWriterFactory,
+      String rightPrefix,
+      List<List<KeyColumn>> keyColumns,
+      JoinType joinType
+  )
+  {
+    this.inputChannels = ImmutableList.of(left.getChannel(), right.getChannel());
+    this.outputChannel = outputChannel;
+    this.frameWriterFactory = frameWriterFactory;
+    this.rightPrefix = rightPrefix;
+    this.joinType = joinType;
+    this.trackers = ImmutableList.of(
+        new Tracker(left, keyColumns.get(LEFT)),
+        new Tracker(right, keyColumns.get(RIGHT))
+    );
+  }
+
+  @Override
+  public List<ReadableFrameChannel> inputChannels()
+  {
+    return inputChannels;
+  }
+
+  @Override
+  public List<WritableFrameChannel> outputChannels()
+  {
+    return Collections.singletonList(outputChannel);
+  }
+
+  @Override
+  public ReturnOrAwait<Long> runIncrementally(IntSet readableInputs) throws IOException
+  {
+    // Fetch enough frames such that each tracker has one readable row.
+    for (int i = 0; i < inputChannels.size(); i++) {
+      final Tracker tracker = trackers.get(i);
+      if (tracker.isAtEndOfPushedData() && !pushNextFrame(i)) {
+        return nextAwait();
+      }
+    }
+
+    // Initialize new output frame, if needed.
+    startNewFrameIfNeeded();
+
+    while (!allTrackersAreAtEnd()
+           && !trackers.get(LEFT).needsMoreData()
+           && !trackers.get(RIGHT).needsMoreData()) {
+      // Algorithm can proceed: not all trackers are at the end of their streams, and no tracker needs more data to
+      // read the current cursor or move it forward.
+      if (nextIterationRunnable != null) {
+        final Runnable tmp = nextIterationRunnable;
+        nextIterationRunnable = null;
+        tmp.run();
+      }
+
+      final int markCmp = compareMarks();
+
+      // Two rows match if the keys compare equal _and_ neither key has a null component. (x JOIN y ON x.a = y.a does
+      // not match rows where "x.a" is null.)
+      final boolean match = markCmp == 0 && trackers.get(LEFT).hasCompletelyNonNullMark();
+
+      // If marked keys are equal on both sides ("match"), at least one side must have a complete set of rows
+      // for the marked key.
+      if (match && trackerWithCompleteSetForCurrentKey < 0) {
+        for (int i = 0; i < inputChannels.size(); i++) {
+          final Tracker tracker = trackers.get(i);
+
+          // Fetch up to one frame from each tracker, to check if that tracker has a complete set.
+          // Can't fetch more than one frame, because channels are only guaranteed to have one frame per run.
+          if (tracker.hasCompleteSetForMark() || (pushNextFrame(i) && tracker.hasCompleteSetForMark())) {
+            trackerWithCompleteSetForCurrentKey = i;
+            break;
+          }
+        }
+
+        if (trackerWithCompleteSetForCurrentKey < 0) {
+          // Algorithm cannot proceed; fetch more frames on the next run.
+          return nextAwait();
+        }
+      }
+
+      if (match || (markCmp <= 0 && joinType.isLefty()) || (markCmp >= 0 && joinType.isRighty())) {
+        // Emit row, if there's room in the current frameWriter.
+        joinColumnSelectorFactory.cmp = markCmp;
+        joinColumnSelectorFactory.match = match;
+
+        if (!frameWriter.addSelection()) {
+          if (frameWriter.getNumRows() > 0) {
+            // Out of space in the current frame. Run again without moving cursors.
+            flushCurrentFrame();
+            return ReturnOrAwait.runAgain();
+          } else {
+            throw new FrameRowTooLargeException(frameWriterFactory.allocatorCapacity());
+          }
+        }
+      }
+
+      // Advance one or both trackers.
+      if (match) {
+        // Matching keys. First advance the tracker with the complete set.
+        final Tracker tracker = trackers.get(trackerWithCompleteSetForCurrentKey);
+        final Tracker otherTracker = trackers.get(trackerWithCompleteSetForCurrentKey == LEFT ? RIGHT : LEFT);
+
+        tracker.advance();
+        if (!tracker.isCurrentSameKeyAsMark()) {
+          // Reached end of complete set. Advance the other tracker.
+          otherTracker.advance();
+
+          // On next iteration (when we're sure to have data) either rewind the complete-set tracker, or update marks
+          // of both, as appropriate.
+          onNextIteration(() -> {
+            if (otherTracker.isCurrentSameKeyAsMark()) {
+              otherTracker.markCurrent(); // Set mark to enable cleanup of old frames.
+              tracker.rewindToMark();
+            } else {
+              // Reached end of the other side too. Advance marks on both trackers.
+              tracker.markCurrent();
+              otherTracker.markCurrent();
+              trackerWithCompleteSetForCurrentKey = -1;
+            }
+          });
+        }
+      } else {
+        final int trackerToAdvance;
+
+        if (markCmp < 0) {
+          trackerToAdvance = LEFT;
+        } else if (markCmp > 0) {
+          trackerToAdvance = RIGHT;
+        } else {
+          // Key is null on both sides. Note that there is a preference for running through the left side first
+          // on a FULL join. It doesn't really matter which side we run through first, but we do need to be consistent
+          // for the benefit of the logic in "shouldEmitColumnValue".
+          trackerToAdvance = joinType.isLefty() ? LEFT : RIGHT;
+        }
+
+        final Tracker tracker = trackers.get(trackerToAdvance);
+
+        tracker.advance();
+
+        // On next iteration (when we're sure to have data), update mark if the key changed.
+        onNextIteration(() -> {
+          if (!tracker.isCurrentSameKeyAsMark()) {
+            tracker.markCurrent();
+            trackerWithCompleteSetForCurrentKey = -1;
+          }
+        });
+      }
+    }
+
+    if (allTrackersAreAtEnd()) {
+      flushCurrentFrame();
+      return ReturnOrAwait.returnObject(0L);
+    } else {
+      // Keep reading.
+      return nextAwait();
+    }
+  }
+
+  @Override
+  public void cleanup() throws IOException
+  {
+    FrameProcessors.closeAll(inputChannels(), outputChannels(), frameWriter, () -> trackers.forEach(Tracker::clear));
+  }
+
+  /**
+   * Returns a {@link ReturnOrAwait#awaitAll} for the channel numbers that need more data and have not yet hit their
+   * buffered-bytes limit, {@link Limits#MAX_BUFFERED_BYTES_FOR_SORT_MERGE_JOIN}.
+   *
+   * If all channels have hit their limit, throws {@link MSQException} with {@link TooManyRowsWithSameKeyFault}.
+   */
+  private ReturnOrAwait<Long> nextAwait()
+  {
+    final IntSet awaitSet = new IntOpenHashSet();
+    int trackerAtLimit = -1;
+
+    for (int i = 0; i < inputChannels.size(); i++) {
+      final Tracker tracker = trackers.get(i);
+      if (tracker.needsMoreData()) {
+        if (tracker.totalBytesBuffered() < Limits.MAX_BUFFERED_BYTES_FOR_SORT_MERGE_JOIN) {
+          awaitSet.add(i);
+        } else if (trackerAtLimit < 0) {
+          trackerAtLimit = i;
+        }
+      }
+    }
+
+    if (awaitSet.isEmpty() && trackerAtLimit > 0) {
+      // All trackers that need more data are at their max buffered bytes limit. Generate a nice exception.
+      final Tracker tracker = trackers.get(trackerAtLimit);
+      if (tracker.totalBytesBuffered() > Limits.MAX_BUFFERED_BYTES_FOR_SORT_MERGE_JOIN) {
+        // Generate a nice exception.
+        throw new MSQException(
+            new TooManyRowsWithSameKeyFault(
+                tracker.readMarkKey(),
+                tracker.totalBytesBuffered(),
+                Limits.MAX_BUFFERED_BYTES_FOR_SORT_MERGE_JOIN
+            )
+        );
+      }
+    }
+
+    return ReturnOrAwait.awaitAll(awaitSet);
+  }
+
+  /**
+   * Whether all trackers return true from {@link Tracker#isAtEnd()}.
+   */
+  private boolean allTrackersAreAtEnd()
+  {
+    for (Tracker tracker : trackers) {
+      if (!tracker.isAtEnd()) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * Compares the marked rows of the two {@link #trackers}.
+   *
+   * @throws IllegalStateException if either tracker does not have a marked row and is not completely done
+   */
+  private int compareMarks()
+  {
+    final Tracker leftTracker = trackers.get(LEFT);
+    final Tracker rightTracker = trackers.get(RIGHT);
+
+    Preconditions.checkState(leftTracker.hasMark() || leftTracker.isAtEnd(), "left.hasMark || left.isAtEnd");
+    Preconditions.checkState(rightTracker.hasMark() || rightTracker.isAtEnd(), "right.hasMark || right.isAtEnd");
+
+    if (!leftTracker.hasMark()) {
+      return rightTracker.markFrame < 0 ? 0 : 1;
+    } else if (!rightTracker.hasMark()) {
+      return -1;
+    } else {
+      final FrameHolder leftHolder = leftTracker.holders.get(leftTracker.markFrame);
+      final FrameHolder rightHolder = rightTracker.holders.get(rightTracker.markFrame);
+      return leftHolder.comparisonWidget.compare(
+          leftTracker.markRow,
+          rightHolder.comparisonWidget,
+          rightTracker.markRow
+      );
+    }
+  }
+
+  /**
+   * Pushes a frame from the indicated channel into the appropriate tracker. Returns true if a frame was pushed
+   * or if the channel is finished.
+   */
+  private boolean pushNextFrame(final int channelNumber)
+  {
+    final ReadableFrameChannel channel = inputChannels.get(channelNumber);
+    final Tracker tracker = trackers.get(channelNumber);
+
+    if (!channel.isFinished() && !channel.canRead()) {
+      return false;
+    } else if (channel.isFinished()) {
+      tracker.push(null);
+      return true;
+    } else {
+      final Frame frame = channel.read();
+
+      if (frame.numRows() == 0) {
+        // Skip, read next.
+        return false;
+      } else {
+        tracker.push(frame);
+        return true;
+      }
+    }
+  }
+
+  private void onNextIteration(final Runnable runnable)
+  {
+    if (nextIterationRunnable != null) {
+      throw new ISE("postAdvanceRunnable already set");
+    } else {
+      nextIterationRunnable = runnable;
+    }
+  }
+
+  private void startNewFrameIfNeeded()
+  {
+    if (frameWriter == null) {
+      frameWriter = frameWriterFactory.newFrameWriter(joinColumnSelectorFactory);
+    }
+  }
+
+  private void flushCurrentFrame() throws IOException
+  {
+    if (frameWriter != null) {
+      if (frameWriter.getNumRows() > 0) {
+        final Frame frame = Frame.wrap(frameWriter.toByteArray());
+        frameWriter.close();
+        frameWriter = null;
+        outputChannel.write(new FrameWithPartition(frame, FrameWithPartition.NO_PARTITION));
+      }
+    }
+  }
+
+  /**
+   * Tracks the current set of rows that have the same key from a sequence of frames.
+   *
+   * markFrame and markRow are set when we encounter a new key, which enables rewinding and re-reading data with the
+   * same key.
+   */
+  private static class Tracker
+  {
+    /**
+     * Frame holders for the current frame, as well as immediately prior frames that share the same marked key.
+     * Prior frames are cleared on each call to {@link #markCurrent()}.
+     */
+    private final List<FrameHolder> holders = new ArrayList<>();
+    private final ReadableInput input;
+    private final List<KeyColumn> keyColumns;
+
+    // markFrame and markRow are the first frame and row with the current key.
+    private int markFrame = -1;
+    private int markRow = -1;
+
+    // currentFrame is the frame containing the current cursor row.
+    private int currentFrame = -1;
+
+    // done indicates that no more data is available in the channel.
+    private boolean done;
+
+    public Tracker(ReadableInput input, List<KeyColumn> keyColumns)
+    {
+      this.input = input;
+      this.keyColumns = keyColumns;
+    }
+
+    /**
+     * Adds a holder for a frame. If this is the first frame, sets the current cursor position and mark to the first
+     * row of the frame. Otherwise, the cursor position and mark are not changed.
+     *
+     * Pushing a null frame indicates no more frames are coming.
+     *
+     * @param frame frame, or null indicating no more frames are coming
+     */
+    public void push(final Frame frame)
+    {
+      if (frame == null) {
+        done = true;
+        return;
+      }
+
+      if (done) {
+        throw new ISE("Cannot push frames when already done");
+      }
+
+      final boolean atEndOfPushedData = isAtEndOfPushedData();
+      final FrameReader frameReader = input.getChannelFrameReader();
+      final FrameCursor cursor = FrameProcessors.makeCursor(frame, frameReader);
+      final FrameComparisonWidget comparisonWidget =
+          frameReader.makeComparisonWidget(frame, keyColumns);
+
+      final RowSignature.Builder keySignatureBuilder = RowSignature.builder();
+      for (final KeyColumn keyColumn : keyColumns) {
+        keySignatureBuilder.add(
+            keyColumn.columnName(),
+            frameReader.signature().getColumnType(keyColumn.columnName()).orElse(null)
+        );
+      }
+
+      holders.add(
+          new FrameHolder(
+              frame,
+              RowKeyReader.create(keySignatureBuilder.build()),
+              cursor,
+              comparisonWidget
+          )
+      );
+
+      if (atEndOfPushedData) {
+        // Move currentFrame so it points at the next row, which we now have, instead of an "isDone" cursor.
+        currentFrame = currentFrame < 0 ? 0 : currentFrame + 1;
+      }
+
+      if (markFrame < 0) {
+        // Cleared mark means we want the current row to be marked.
+        markFrame = currentFrame;
+        markRow = 0;
+      }
+    }
+
+    /**
+     * Number of bytes currently buffered in {@link #holders}.
+     */
+    public long totalBytesBuffered()
+    {
+      long bytes = 0;
+      for (final FrameHolder holder : holders) {
+        bytes += holder.frame.numBytes();
+      }
+      return bytes;
+    }
+
+    /**
+     * Cursor containing the current row.
+     */
+    @Nullable
+    public FrameCursor currentCursor()
+    {
+      if (currentFrame < 0) {
+        return null;
+      } else {
+        return holders.get(currentFrame).cursor;
+      }
+    }
+
+    /**
+     * Advances the current row (the current row of {@link #currentFrame}). After calling this method,
+     * {@link #isAtEndOfPushedData()} may start returning true.
+     */
+    public void advance()
+    {
+      assert !isAtEndOfPushedData();
+
+      final FrameHolder currentHolder = holders.get(currentFrame);
+
+      currentHolder.cursor.advance();
+
+      if (currentHolder.cursor.isDone() && currentFrame + 1 < holders.size()) {
+        currentFrame++;
+        holders.get(currentFrame).cursor.reset();
+      }
+    }
+
+    /**
+     * Whether this tracker has a marked row.
+     */
+    public boolean hasMark()
+    {
+      return markFrame >= 0;
+    }
+
+    /**
+     * Whether this tracker has a marked row that is completely nonnull.
+     */
+    public boolean hasCompletelyNonNullMark()
+    {
+      return hasMark() && !holders.get(markFrame).comparisonWidget.isPartiallyNullKey(markRow);
+    }
+
+    /**
+     * Reads the current marked key.
+     */
+    @Nullable
+    public List<Object> readMarkKey()
+    {
+      if (!hasMark()) {
+        return null;
+      }
+
+      final FrameHolder markHolder = holders.get(markFrame);
+      final RowKey markKey = markHolder.comparisonWidget.readKey(markRow);
+      return markHolder.keyReader.read(markKey);
+    }
+
+    /**
+     * Rewind to the mark row: the first one with the current key.
+     *
+     * @throws IllegalStateException if there is no marked row
+     */
+    public void rewindToMark()
+    {
+      if (markFrame < 0) {
+        throw new ISE("No mark");
+      }
+
+      currentFrame = markFrame;
+      holders.get(currentFrame).cursor.setCurrentRow(markRow);
+    }
+
+    /**
+     * Set the mark row to the current row. Used when data from the old mark to the current row is no longer needed.
+     */
+    public void markCurrent()
+    {
+      if (isAtEndOfPushedData()) {
+        clear();
+      } else {
+        // Remove unnecessary holders, now that the mark has moved on.
+        while (currentFrame > 0) {
+          if (currentFrame == holders.size() - 1) {
+            final FrameHolder lastHolder = holders.get(currentFrame);
+            holders.clear();
+            holders.add(lastHolder);
+            currentFrame = 0;
+          } else {
+            holders.remove(0);
+            currentFrame--;
+          }
+        }
+
+        markFrame = 0;
+        markRow = holders.get(currentFrame).cursor.getCurrentRow();
+      }
+    }
+
+    /**
+     * Whether the current cursor is past the end of the last frame for which we have data.
+     */
+    public boolean isAtEndOfPushedData()
+    {
+      return currentFrame < 0 || (currentFrame == holders.size() - 1 && holders.get(currentFrame).cursor.isDone());
+    }
+
+    /**
+     * Whether the current cursor is past the end of all data that will ever be pushed.
+     */
+    public boolean isAtEnd()
+    {
+      return done && isAtEndOfPushedData();
+    }
+
+    /**
+     * Whether this tracker needs more data in order to read the current cursor location or move it forward.
+     */
+    public boolean needsMoreData()
+    {
+      return !done && isAtEndOfPushedData();
+    }
+
+    /**
+     * Whether this tracker contains all rows for the marked key.
+     *
+     * @throws IllegalStateException if there is no marked key
+     */
+    public boolean hasCompleteSetForMark()
+    {
+      if (markFrame < 0) {
+        throw new ISE("No mark");
+      }
+
+      if (done) {
+        return true;
+      }
+
+      final FrameHolder lastHolder = holders.get(holders.size() - 1);
+      return !isSameKeyAsMark(lastHolder, lastHolder.frame.numRows() - 1);
+    }
+
+    /**
+     * Whether the current position (the current row of the {@link #currentFrame}) compares equally to the mark row.
+     * If {@link #isAtEnd()}, returns true iff there is no mark row.
+     */
+    public boolean isCurrentSameKeyAsMark()
+    {
+      if (isAtEnd()) {
+        return markFrame < 0;
+      } else {
+        assert !isAtEndOfPushedData();
+        final FrameHolder headHolder = holders.get(currentFrame);
+        return isSameKeyAsMark(headHolder, headHolder.cursor.getCurrentRow());
+      }
+    }
+
+    /**
+     * Clears the current mark and all buffered frames. Does not change {@link #done}.
+     */
+    public void clear()
+    {
+      holders.clear();
+      markFrame = -1;
+      markRow = -1;
+      currentFrame = -1;
+    }
+
+    /**
+     * Whether the provided frame and row compares equally to the mark row. The provided row must be at, or after,
+     * the mark row.
+     */
+    private boolean isSameKeyAsMark(final FrameHolder holder, final int row)
+    {
+      if (markFrame < 0) {
+        throw new ISE("No marked frame");
+      }
+      if (row < 0 || row >= holder.frame.numRows()) {
+        throw new ISE("Row [%d] out of bounds", row);
+      }
+
+      final FrameHolder markHolder = holders.get(markFrame);
+      final int cmp = markHolder.comparisonWidget.compare(markRow, holder.comparisonWidget, row);
+
+      if (cmp > 0) {
+        // The provided row is at, or after, the marked row.
+        // Therefore, cmp > 0 may indicate that input was provided out of order.
+        throw new ISE("Row compares higher than mark; out-of-order input?");
+      }
+
+      return cmp == 0;
+    }
+  }
+
+  /**
+   * Selector for joined rows. This is used as an input to {@link #frameWriter}.
+   */
+  private class JoinColumnSelectorFactory implements ColumnSelectorFactory
+  {
+    /**
+     * Current key comparison between left- and right-hand side.
+     */
+    private int cmp;
+
+    /**
+     * Whether there is a match between the left- and right-hand side. Not equivalent to {@code cmp == 0} in
+     * the case where the key on both sides is null.
+     */
+    private boolean match;
+
+    @Override
+    public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
+    {
+      if (dimensionSpec.getExtractionFn() != null || dimensionSpec.mustDecorate()) {

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [DimensionSpec.getExtractionFn](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4320)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org