You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "github-code-scanning[bot] (via GitHub)" <gi...@apache.org> on 2023/05/24 07:00:54 UTC

[GitHub] [druid] github-code-scanning[bot] commented on a diff in pull request #13952: Limit the subquery results by memory usage

github-code-scanning[bot] commented on code in PR #13952:
URL: https://github.com/apache/druid/pull/13952#discussion_r1203557149


##########
processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java:
##########
@@ -67,4 +76,59 @@
       );
     }
   }
+
+  /**
+   * Writes a {@link Cursor} to a sequence of {@link Frame}. This method iterates over the rows of the cursor,
+   * and writes the columns to the frames
+   *
+   * @param cursor                 Cursor to write to the frame
+   * @param frameWriterFactory     Frame writer factory to write to the frame.
+   *                               Determines the signature of the rows that are written to the frames
+   * @param memoryAllocatorFactory Allocator factory which creates the frames for the result sequence

Review Comment:
   ## Spurious Javadoc @param tags
   
   @param tag "memoryAllocatorFactory" does not match any actual parameter of method "cursorToFrames()".
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4982)



##########
processing/src/main/java/org/apache/druid/segment/join/table/FrameBasedIndexedTable.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.join.table;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.read.columnar.FrameColumnReader;
+import org.apache.druid.frame.read.columnar.FrameColumnReaders;
+import org.apache.druid.frame.segment.FrameStorageAdapter;
+import org.apache.druid.frame.segment.columnar.FrameQueryableIndex;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.FrameBasedInlineDataSource;
+import org.apache.druid.query.FrameSignaturePair;
+import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.NilColumnValueSelector;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.SimpleAscendingOffset;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.BaseColumn;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class FrameBasedIndexedTable implements IndexedTable
+{
+  private static final Logger LOG = new Logger(BroadcastSegmentIndexedTable.class);
+  private static final byte CACHE_PREFIX = 0x01;
+
+  private final Set<String> keyColumns;
+  private final RowSignature rowSignature;
+  private final String version;
+  private final List<IndexedTable.Index> keyColumnsIndexes;
+  private final int numRows;
+  private final List<QueryableIndex> frameQueryableIndexes = new ArrayList<>();
+  private final List<Integer> cumulativeRowCount = new ArrayList<>();
+
+
+  public FrameBasedIndexedTable(
+      final FrameBasedInlineDataSource frameBasedInlineDataSource,
+      final Set<String> keyColumns,
+      final String version
+  )
+  {
+    this.keyColumns = keyColumns;
+    this.version = version;
+    this.rowSignature = frameBasedInlineDataSource.getRowSignature();
+
+    int rowCount = 0;
+    for (FrameSignaturePair frameSignaturePair : frameBasedInlineDataSource.getFrames()) {
+      Frame frame = frameSignaturePair.getFrame();
+      RowSignature frameRowSignature = frameSignaturePair.getRowSignature();
+      frameQueryableIndexes.add(new FrameQueryableIndex(
+          frame,
+          frameRowSignature,
+          createColumnReaders(frameRowSignature)
+      ));
+      rowCount += frame.numRows();
+      cumulativeRowCount.add(rowCount);
+    }
+
+    this.numRows = rowCount;
+
+    final ArrayList<RowBasedIndexBuilder> indexBuilders = new ArrayList<>(rowSignature.size());
+    final List<String> keyColumnNames = new ArrayList<>(keyColumns.size());
+
+    for (int i = 0; i < rowSignature.size(); i++) {
+      final RowBasedIndexBuilder m;
+      final String columnName = rowSignature.getColumnName(i);
+      if (keyColumns.contains(columnName)) {
+        final ColumnType keyType =
+            rowSignature.getColumnType(i).orElse(IndexedTableJoinMatcher.DEFAULT_KEY_TYPE);
+
+        m = new RowBasedIndexBuilder(keyType);
+        keyColumnNames.add(columnName);
+      } else {
+        m = null;
+      }
+      indexBuilders.add(m);
+    }
+
+    final Sequence<Pair<Cursor, Function<String, ColumnCapabilities>>> cursors = Sequences.concat(
+        frameBasedInlineDataSource
+            .getFrames()
+            .stream()
+            .map(frameSignaturePair -> {
+              Frame frame = frameSignaturePair.getFrame();
+              RowSignature rowSignature = frameSignaturePair.getRowSignature();
+              FrameStorageAdapter frameStorageAdapter =
+                  new FrameStorageAdapter(frame, FrameReader.create(rowSignature, true), Intervals.ETERNITY);
+              return frameStorageAdapter.makeCursors(null, Intervals.ETERNITY, VirtualColumns.EMPTY, Granularities.ALL, false, null)
+                  .map(cursor ->
+                           Pair.<Cursor, Function<String, ColumnCapabilities>>of(cursor, frameStorageAdapter::getColumnCapabilities
+                  ));
+            })
+            .collect(Collectors.toList())
+    );
+
+    final Sequence<Integer> sequence = Sequences.map(
+        cursors,
+        cursorWithColumnCapabilities -> {
+          Cursor cursor = cursorWithColumnCapabilities.lhs;
+          Function<String, ColumnCapabilities> columnCapabilitiesFunction = cursorWithColumnCapabilities.rhs;
+          if (cursor == null) {
+            return 0;
+          }
+          int rowNumber = 0;
+          ColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory();
+
+          // this should really be optimized to use dimension selectors where possible to populate indexes from bitmap
+          // indexes, but, an optimization for another day
+          final List<BaseObjectColumnValueSelector> selectors = keyColumnNames
+              .stream()
+              .map(columnName -> {
+                // multi-value dimensions are not currently supported
+                if (columnCapabilitiesFunction.apply(columnName).hasMultipleValues().isMaybeTrue()) {
+                  return NilColumnValueSelector.instance();
+                }
+                return columnSelectorFactory.makeColumnValueSelector(columnName);
+              })
+              .collect(Collectors.toList());
+
+          while (!cursor.isDone()) {
+            for (int keyColumnSelectorIndex = 0; keyColumnSelectorIndex < selectors.size(); keyColumnSelectorIndex++) {
+              final String keyColumnName = keyColumnNames.get(keyColumnSelectorIndex);
+              final int columnPosition = rowSignature.indexOf(keyColumnName);
+              final RowBasedIndexBuilder keyColumnIndexBuilder = indexBuilders.get(columnPosition);
+              keyColumnIndexBuilder.add(selectors.get(keyColumnSelectorIndex).getObject());
+            }
+
+            if (rowNumber % 100_000 == 0) {
+              if (rowNumber == 0) {
+                LOG.debug("Indexed first row for frame based datasource");
+              } else {
+                LOG.debug("Indexed row %s for frame based datasource", rowNumber);
+              }
+            }
+            rowNumber++;
+            cursor.advance();
+          }
+          return rowNumber;
+        }
+    );
+
+    Integer totalRows = sequence.accumulate(0, (accumulated, in) -> accumulated += in);
+
+    this.keyColumnsIndexes = indexBuilders.stream()
+                                          .map(builder -> builder != null ? builder.build() : null)
+                                          .collect(Collectors.toList());
+
+    LOG.info("Created FrameBasedIndexedTable with %s rows.", totalRows);
+  }
+
+  @Override
+  public String version()
+  {
+    return version;
+  }
+
+  @Override
+  public Set<String> keyColumns()
+  {
+    return keyColumns;
+  }
+
+  @Override
+  public RowSignature rowSignature()
+  {
+    return rowSignature;
+  }
+
+  @Override
+  public int numRows()
+  {
+    return numRows;
+  }
+
+  @Override
+  public Index columnIndex(int column)
+  {
+    return RowBasedIndexedTable.getKeyColumnIndex(column, keyColumnsIndexes);
+
+  }
+
+  @Override
+  public Reader columnReader(int column)
+  {
+
+    if (!rowSignature.contains(column)) {
+      throw new IAE("Column[%d] is not a valid column for the frame based datasource");

Review Comment:
   ## Missing format argument
   
   This format call refers to 1 argument(s) but only supplies 0 argument(s).
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4983)



##########
processing/src/test/java/org/apache/druid/segment/join/table/FrameBasedIndexedTableTest.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.join.table;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.allocation.HeapMemoryAllocator;
+import org.apache.druid.frame.allocation.SingleMemoryAllocatorFactory;
+import org.apache.druid.frame.segment.FrameCursorUtils;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.frame.write.FrameWriters;
+import org.apache.druid.query.FrameBasedInlineDataSource;
+import org.apache.druid.query.FrameSignaturePair;
+import org.apache.druid.query.IterableRowsCursorHelper;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+public class FrameBasedIndexedTableTest extends InitializedNullHandlingTest
+{
+  private static final String STRING_COL_1 = "market";
+  private static final String LONG_COL_1 = "longNumericNull";
+  private static final String DOUBLE_COL_1 = "doubleNumericNull";
+  private static final String FLOAT_COL_1 = "floatNumericNull";
+  private static final String STRING_COL_2 = "market";
+  private static final String MULTI_VALUE_COLUMN = "placementish";
+  private static final String DIM_NOT_EXISTS = "DIM_NOT_EXISTS";
+
+  private static final List<Object[]> DATASOURCE_ROWS =
+      ImmutableList.<Object[]>builder()
+                   .add(new Object[]{"spot", 1L, 1.1d, 3.1f, "preferred", new Object[]{"val1", "val2"}})
+                   .add(new Object[]{"spot", 1L, 1.1d, 3.1f, "preferred", new Object[]{"val1", "val2"}})
+                   .add(new Object[]{"spot", 2L, 1.3d, 3.1f, "preferred", new Object[]{"val1", "val2"}})
+                   .add(new Object[]{"spot", 1L, 1.1d, 3.1f, "preferred", new Object[]{"val1", "val2"}})
+                   .add(new Object[]{"spot", 1L, 1.1d, 3.1f, "preferred", new Object[]{"val1", "val2"}})
+                   .add(new Object[]{"spot", 1L, 1.5d, 3.1f, "preferred", new Object[]{"val1", "val2"}})
+                   .add(new Object[]{"spot", 4L, 1.1d, 3.1f, "preferred", new Object[]{"val1", "val2"}})
+                   .add(new Object[]{"spot", 1L, 1.7d, 3.1f, "preferred", new Object[]{"val1", "val2"}})
+                   .add(new Object[]{"spot", 5L, 1.1d, 3.1f, "preferred", new Object[]{"val1", "val2"}})
+                   .build();
+
+  private static final RowSignature ROW_SIGNATURE =
+      RowSignature.builder()
+                  .add(STRING_COL_1, ColumnType.STRING)
+                  .add(LONG_COL_1, ColumnType.LONG)
+                  .add(DOUBLE_COL_1, ColumnType.DOUBLE)
+                  .add(FLOAT_COL_1, ColumnType.FLOAT)
+                  .add(STRING_COL_1, ColumnType.STRING)
+                  .add(MULTI_VALUE_COLUMN, ColumnType.STRING_ARRAY)
+                  .build();
+
+  private static final Set<String> KEY_COLUMNS = ImmutableSet.<String>builder()
+                                                             .add(STRING_COL_1)
+                                                             .add(LONG_COL_1)
+                                                             .add(DOUBLE_COL_1)
+                                                             .add(FLOAT_COL_1)
+                                                             .add(MULTI_VALUE_COLUMN)
+                                                             .add(DIM_NOT_EXISTS)
+                                                             .build();
+
+
+  private FrameBasedInlineDataSource dataSource;
+  private FrameBasedIndexedTable frameBasedIndexedTable;
+
+  @Before
+  public void setup()
+  {
+    Cursor cursor = IterableRowsCursorHelper.getCursorFromIterable(DATASOURCE_ROWS, ROW_SIGNATURE);
+    FrameWriterFactory frameWriterFactory = FrameWriters.makeFrameWriterFactory(
+        FrameType.COLUMNAR,
+        new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()),
+        ROW_SIGNATURE,
+        new ArrayList<>()
+    );
+    Frame frame = Iterables.getOnlyElement(FrameCursorUtils.cursorToFrames(cursor, frameWriterFactory).toList());
+
+    dataSource = new FrameBasedInlineDataSource(
+        ImmutableList.of(new FrameSignaturePair(frame, ROW_SIGNATURE)),
+        ROW_SIGNATURE
+    );
+
+    frameBasedIndexedTable = new FrameBasedIndexedTable(dataSource, KEY_COLUMNS, "test");
+
+  }
+
+  @Test
+  public void testInitShouldGenerateCorrectTable()
+  {
+    Assert.assertEquals(9, frameBasedIndexedTable.numRows());
+  }
+
+  @Test
+  public void testStringKeyColumn()
+  {
+    // lets try a few values out
+    final String[] vals = new String[]{"spot", "total_market", "upfront"};
+    checkIndexAndReader(STRING_COL_1, vals);
+  }
+
+  @Test
+  public void testLongKeyColumn()
+  {
+    final Long[] vals = new Long[]{NullHandling.replaceWithDefault() ? 0L : null, 10L, 20L};
+    checkIndexAndReader(LONG_COL_1, vals);
+  }
+
+  @Test
+  public void testFloatKeyColumn()
+  {
+    final Float[] vals = new Float[]{NullHandling.replaceWithDefault() ? 0.0f : null, 10.0f, 20.0f};
+    checkIndexAndReader(FLOAT_COL_1, vals);
+  }
+
+  @Test
+  public void testDoubleKeyColumn()
+  {
+    final Double[] vals = new Double[]{NullHandling.replaceWithDefault() ? 0.0 : null, 10.0, 20.0};
+    checkIndexAndReader(DOUBLE_COL_1, vals);
+  }
+
+  private void checkIndexAndReader(String columnName, Object[] vals)
+  {
+    checkIndexAndReader(columnName, vals, new Object[0]);
+  }
+
+  private void checkIndexAndReader(String columnName, Object[] vals, Object[] nonmatchingVals)

Review Comment:
   ## Useless parameter
   
   The parameter 'vals' is never used.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4984)



##########
processing/src/main/java/org/apache/druid/segment/join/table/FrameBasedIndexedTable.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.join.table;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.read.columnar.FrameColumnReader;
+import org.apache.druid.frame.read.columnar.FrameColumnReaders;
+import org.apache.druid.frame.segment.FrameStorageAdapter;
+import org.apache.druid.frame.segment.columnar.FrameQueryableIndex;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.FrameBasedInlineDataSource;
+import org.apache.druid.query.FrameSignaturePair;
+import org.apache.druid.segment.BaseObjectColumnValueSelector;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.NilColumnValueSelector;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.SimpleAscendingOffset;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.BaseColumn;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class FrameBasedIndexedTable implements IndexedTable
+{
+  private static final Logger LOG = new Logger(BroadcastSegmentIndexedTable.class);
+  private static final byte CACHE_PREFIX = 0x01;
+
+  private final Set<String> keyColumns;
+  private final RowSignature rowSignature;
+  private final String version;
+  private final List<IndexedTable.Index> keyColumnsIndexes;
+  private final int numRows;
+  private final List<QueryableIndex> frameQueryableIndexes = new ArrayList<>();
+  private final List<Integer> cumulativeRowCount = new ArrayList<>();
+
+
+  public FrameBasedIndexedTable(
+      final FrameBasedInlineDataSource frameBasedInlineDataSource,
+      final Set<String> keyColumns,
+      final String version
+  )
+  {
+    this.keyColumns = keyColumns;
+    this.version = version;
+    this.rowSignature = frameBasedInlineDataSource.getRowSignature();
+
+    int rowCount = 0;
+    for (FrameSignaturePair frameSignaturePair : frameBasedInlineDataSource.getFrames()) {
+      Frame frame = frameSignaturePair.getFrame();
+      RowSignature frameRowSignature = frameSignaturePair.getRowSignature();
+      frameQueryableIndexes.add(new FrameQueryableIndex(
+          frame,
+          frameRowSignature,
+          createColumnReaders(frameRowSignature)
+      ));
+      rowCount += frame.numRows();
+      cumulativeRowCount.add(rowCount);
+    }
+
+    this.numRows = rowCount;
+
+    final ArrayList<RowBasedIndexBuilder> indexBuilders = new ArrayList<>(rowSignature.size());
+    final List<String> keyColumnNames = new ArrayList<>(keyColumns.size());
+
+    for (int i = 0; i < rowSignature.size(); i++) {
+      final RowBasedIndexBuilder m;
+      final String columnName = rowSignature.getColumnName(i);
+      if (keyColumns.contains(columnName)) {
+        final ColumnType keyType =
+            rowSignature.getColumnType(i).orElse(IndexedTableJoinMatcher.DEFAULT_KEY_TYPE);
+
+        m = new RowBasedIndexBuilder(keyType);
+        keyColumnNames.add(columnName);
+      } else {
+        m = null;
+      }
+      indexBuilders.add(m);
+    }
+
+    final Sequence<Pair<Cursor, Function<String, ColumnCapabilities>>> cursors = Sequences.concat(
+        frameBasedInlineDataSource
+            .getFrames()
+            .stream()
+            .map(frameSignaturePair -> {
+              Frame frame = frameSignaturePair.getFrame();
+              RowSignature rowSignature = frameSignaturePair.getRowSignature();
+              FrameStorageAdapter frameStorageAdapter =
+                  new FrameStorageAdapter(frame, FrameReader.create(rowSignature, true), Intervals.ETERNITY);
+              return frameStorageAdapter.makeCursors(null, Intervals.ETERNITY, VirtualColumns.EMPTY, Granularities.ALL, false, null)
+                  .map(cursor ->
+                           Pair.<Cursor, Function<String, ColumnCapabilities>>of(cursor, frameStorageAdapter::getColumnCapabilities
+                  ));
+            })
+            .collect(Collectors.toList())
+    );
+
+    final Sequence<Integer> sequence = Sequences.map(
+        cursors,
+        cursorWithColumnCapabilities -> {
+          Cursor cursor = cursorWithColumnCapabilities.lhs;
+          Function<String, ColumnCapabilities> columnCapabilitiesFunction = cursorWithColumnCapabilities.rhs;
+          if (cursor == null) {
+            return 0;
+          }
+          int rowNumber = 0;
+          ColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory();
+
+          // this should really be optimized to use dimension selectors where possible to populate indexes from bitmap
+          // indexes, but, an optimization for another day
+          final List<BaseObjectColumnValueSelector> selectors = keyColumnNames
+              .stream()
+              .map(columnName -> {
+                // multi-value dimensions are not currently supported
+                if (columnCapabilitiesFunction.apply(columnName).hasMultipleValues().isMaybeTrue()) {
+                  return NilColumnValueSelector.instance();
+                }
+                return columnSelectorFactory.makeColumnValueSelector(columnName);
+              })
+              .collect(Collectors.toList());
+
+          while (!cursor.isDone()) {
+            for (int keyColumnSelectorIndex = 0; keyColumnSelectorIndex < selectors.size(); keyColumnSelectorIndex++) {
+              final String keyColumnName = keyColumnNames.get(keyColumnSelectorIndex);
+              final int columnPosition = rowSignature.indexOf(keyColumnName);
+              final RowBasedIndexBuilder keyColumnIndexBuilder = indexBuilders.get(columnPosition);
+              keyColumnIndexBuilder.add(selectors.get(keyColumnSelectorIndex).getObject());
+            }
+
+            if (rowNumber % 100_000 == 0) {
+              if (rowNumber == 0) {
+                LOG.debug("Indexed first row for frame based datasource");
+              } else {
+                LOG.debug("Indexed row %s for frame based datasource", rowNumber);
+              }
+            }
+            rowNumber++;
+            cursor.advance();
+          }
+          return rowNumber;
+        }
+    );
+
+    Integer totalRows = sequence.accumulate(0, (accumulated, in) -> accumulated += in);
+
+    this.keyColumnsIndexes = indexBuilders.stream()
+                                          .map(builder -> builder != null ? builder.build() : null)
+                                          .collect(Collectors.toList());
+
+    LOG.info("Created FrameBasedIndexedTable with %s rows.", totalRows);
+  }
+
+  @Override
+  public String version()
+  {
+    return version;
+  }
+
+  @Override
+  public Set<String> keyColumns()
+  {
+    return keyColumns;
+  }
+
+  @Override
+  public RowSignature rowSignature()
+  {
+    return rowSignature;
+  }
+
+  @Override
+  public int numRows()
+  {
+    return numRows;
+  }
+
+  @Override
+  public Index columnIndex(int column)
+  {
+    return RowBasedIndexedTable.getKeyColumnIndex(column, keyColumnsIndexes);
+
+  }
+
+  @Override
+  public Reader columnReader(int column)
+  {
+
+    if (!rowSignature.contains(column)) {
+      throw new IAE("Column[%d] is not a valid column for the frame based datasource");
+    }
+
+    String columnName = rowSignature.getColumnName(column);
+    final SimpleAscendingOffset offset = new SimpleAscendingOffset(numRows());
+    final List<BaseObjectColumnValueSelector<?>> columnValueSelectors = new ArrayList<>();
+    final Set<Closeable> closeables = new HashSet<>();
+
+    for (QueryableIndex frameQueryableIndex : frameQueryableIndexes) {
+      ColumnHolder columnHolder = frameQueryableIndex.getColumnHolder(columnName);
+      if (columnHolder == null) {
+        columnValueSelectors.add(NilColumnValueSelector.instance());
+      } else {
+        BaseColumn baseColumn = columnHolder.getColumn();
+        columnValueSelectors.add(baseColumn.makeColumnValueSelector(offset));
+        closeables.add(baseColumn);
+      }
+    }
+
+    return new Reader()
+    {
+      @Nullable
+      @Override
+      public Object read(int row)
+      {
+        int frameIndex = binSearch(cumulativeRowCount, row);
+        if (frameIndex == frameQueryableIndexes.size()) {
+          throw new IndexOutOfBoundsException(
+              StringUtils.format("Requested row index [%d], Max row count [%d]", row, numRows())
+          );
+        }
+        // The offset needs to be changed as well
+        int adjustedOffset = frameIndex == 0 ? row : row - cumulativeRowCount.get(frameIndex - 1);

Review Comment:
   ## User-controlled data in arithmetic expression
   
   This arithmetic expression depends on a [user-provided value](1), potentially causing an underflow.
   This arithmetic expression depends on a [user-provided value](2), potentially causing an underflow.
   This arithmetic expression depends on a [user-provided value](3), potentially causing an underflow.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4986)



##########
processing/src/test/java/org/apache/druid/segment/join/table/FrameBasedIndexedTableTest.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.join.table;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.allocation.HeapMemoryAllocator;
+import org.apache.druid.frame.allocation.SingleMemoryAllocatorFactory;
+import org.apache.druid.frame.segment.FrameCursorUtils;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.frame.write.FrameWriters;
+import org.apache.druid.query.FrameBasedInlineDataSource;
+import org.apache.druid.query.FrameSignaturePair;
+import org.apache.druid.query.IterableRowsCursorHelper;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+public class FrameBasedIndexedTableTest extends InitializedNullHandlingTest
+{
+  private static final String STRING_COL_1 = "market";
+  private static final String LONG_COL_1 = "longNumericNull";
+  private static final String DOUBLE_COL_1 = "doubleNumericNull";
+  private static final String FLOAT_COL_1 = "floatNumericNull";
+  private static final String STRING_COL_2 = "market";
+  private static final String MULTI_VALUE_COLUMN = "placementish";
+  private static final String DIM_NOT_EXISTS = "DIM_NOT_EXISTS";
+
+  private static final List<Object[]> DATASOURCE_ROWS =
+      ImmutableList.<Object[]>builder()
+                   .add(new Object[]{"spot", 1L, 1.1d, 3.1f, "preferred", new Object[]{"val1", "val2"}})
+                   .add(new Object[]{"spot", 1L, 1.1d, 3.1f, "preferred", new Object[]{"val1", "val2"}})
+                   .add(new Object[]{"spot", 2L, 1.3d, 3.1f, "preferred", new Object[]{"val1", "val2"}})
+                   .add(new Object[]{"spot", 1L, 1.1d, 3.1f, "preferred", new Object[]{"val1", "val2"}})
+                   .add(new Object[]{"spot", 1L, 1.1d, 3.1f, "preferred", new Object[]{"val1", "val2"}})
+                   .add(new Object[]{"spot", 1L, 1.5d, 3.1f, "preferred", new Object[]{"val1", "val2"}})
+                   .add(new Object[]{"spot", 4L, 1.1d, 3.1f, "preferred", new Object[]{"val1", "val2"}})
+                   .add(new Object[]{"spot", 1L, 1.7d, 3.1f, "preferred", new Object[]{"val1", "val2"}})
+                   .add(new Object[]{"spot", 5L, 1.1d, 3.1f, "preferred", new Object[]{"val1", "val2"}})
+                   .build();
+
+  private static final RowSignature ROW_SIGNATURE =
+      RowSignature.builder()
+                  .add(STRING_COL_1, ColumnType.STRING)
+                  .add(LONG_COL_1, ColumnType.LONG)
+                  .add(DOUBLE_COL_1, ColumnType.DOUBLE)
+                  .add(FLOAT_COL_1, ColumnType.FLOAT)
+                  .add(STRING_COL_1, ColumnType.STRING)
+                  .add(MULTI_VALUE_COLUMN, ColumnType.STRING_ARRAY)
+                  .build();
+
+  private static final Set<String> KEY_COLUMNS = ImmutableSet.<String>builder()
+                                                             .add(STRING_COL_1)
+                                                             .add(LONG_COL_1)
+                                                             .add(DOUBLE_COL_1)
+                                                             .add(FLOAT_COL_1)
+                                                             .add(MULTI_VALUE_COLUMN)
+                                                             .add(DIM_NOT_EXISTS)
+                                                             .build();
+
+
+  private FrameBasedInlineDataSource dataSource;
+  private FrameBasedIndexedTable frameBasedIndexedTable;
+
+  @Before
+  public void setup()
+  {
+    Cursor cursor = IterableRowsCursorHelper.getCursorFromIterable(DATASOURCE_ROWS, ROW_SIGNATURE);
+    FrameWriterFactory frameWriterFactory = FrameWriters.makeFrameWriterFactory(
+        FrameType.COLUMNAR,
+        new SingleMemoryAllocatorFactory(HeapMemoryAllocator.unlimited()),
+        ROW_SIGNATURE,
+        new ArrayList<>()
+    );
+    Frame frame = Iterables.getOnlyElement(FrameCursorUtils.cursorToFrames(cursor, frameWriterFactory).toList());
+
+    dataSource = new FrameBasedInlineDataSource(
+        ImmutableList.of(new FrameSignaturePair(frame, ROW_SIGNATURE)),
+        ROW_SIGNATURE
+    );
+
+    frameBasedIndexedTable = new FrameBasedIndexedTable(dataSource, KEY_COLUMNS, "test");
+
+  }
+
+  @Test
+  public void testInitShouldGenerateCorrectTable()
+  {
+    Assert.assertEquals(9, frameBasedIndexedTable.numRows());
+  }
+
+  @Test
+  public void testStringKeyColumn()
+  {
+    // lets try a few values out
+    final String[] vals = new String[]{"spot", "total_market", "upfront"};
+    checkIndexAndReader(STRING_COL_1, vals);
+  }
+
+  @Test
+  public void testLongKeyColumn()
+  {
+    final Long[] vals = new Long[]{NullHandling.replaceWithDefault() ? 0L : null, 10L, 20L};
+    checkIndexAndReader(LONG_COL_1, vals);
+  }
+
+  @Test
+  public void testFloatKeyColumn()
+  {
+    final Float[] vals = new Float[]{NullHandling.replaceWithDefault() ? 0.0f : null, 10.0f, 20.0f};
+    checkIndexAndReader(FLOAT_COL_1, vals);
+  }
+
+  @Test
+  public void testDoubleKeyColumn()
+  {
+    final Double[] vals = new Double[]{NullHandling.replaceWithDefault() ? 0.0 : null, 10.0, 20.0};
+    checkIndexAndReader(DOUBLE_COL_1, vals);
+  }
+
+  private void checkIndexAndReader(String columnName, Object[] vals)
+  {
+    checkIndexAndReader(columnName, vals, new Object[0]);
+  }
+
+  private void checkIndexAndReader(String columnName, Object[] vals, Object[] nonmatchingVals)

Review Comment:
   ## Useless parameter
   
   The parameter 'nonmatchingVals' is never used.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4985)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org