You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ma...@apache.org on 2022/01/05 19:19:51 UTC

[pinot] branch master updated: faster metric scans (#7920)

This is an automated email from the ASF dual-hosted git repository.

mayanks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0df8492  faster metric scans (#7920)
0df8492 is described below

commit 0df84927f1b5baff5b8fe3f3c3936fdc8602e9e0
Author: Richard Startin <ri...@startree.ai>
AuthorDate: Wed Jan 5 19:19:21 2022 +0000

    faster metric scans (#7920)
    
    * add fillValues to ForwardIndexReader to accelerate contiguous scans on fixed width metric columns
    
    * review comments
    
    * add benchmark
    
    * move string handling to default implementations
    
    * move methods to support more readers
---
 .../org/apache/pinot/core/common/DataFetcher.java  | 120 +-------------
 .../BenchmarkFixedByteSVForwardIndexReader.java    |  69 ++++++++
 .../forward/BaseChunkSVForwardIndexReader.java     | 177 +++++++++++++++++++++
 .../spi/index/reader/ForwardIndexReader.java       | 156 ++++++++++++++++++
 4 files changed, 406 insertions(+), 116 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java b/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
index 458bf84..1bc593f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
@@ -425,35 +425,7 @@ public class DataFetcher {
         _reader.readDictIds(docIds, length, dictIdBuffer, readerContext);
         _dictionary.readIntValues(dictIdBuffer, length, valueBuffer);
       } else {
-        switch (_reader.getValueType()) {
-          case INT:
-            for (int i = 0; i < length; i++) {
-              valueBuffer[i] = _reader.getInt(docIds[i], readerContext);
-            }
-            break;
-          case LONG:
-            for (int i = 0; i < length; i++) {
-              valueBuffer[i] = (int) _reader.getLong(docIds[i], readerContext);
-            }
-            break;
-          case FLOAT:
-            for (int i = 0; i < length; i++) {
-              valueBuffer[i] = (int) _reader.getFloat(docIds[i], readerContext);
-            }
-            break;
-          case DOUBLE:
-            for (int i = 0; i < length; i++) {
-              valueBuffer[i] = (int) _reader.getDouble(docIds[i], readerContext);
-            }
-            break;
-          case STRING:
-            for (int i = 0; i < length; i++) {
-              valueBuffer[i] = Integer.parseInt(_reader.getString(docIds[i], readerContext));
-            }
-            break;
-          default:
-            throw new IllegalStateException();
-        }
+        _reader.readValuesSV(docIds, length, valueBuffer, readerContext);
       }
     }
 
@@ -469,35 +441,7 @@ public class DataFetcher {
         _reader.readDictIds(docIds, length, dictIdBuffer, readerContext);
         _dictionary.readLongValues(dictIdBuffer, length, valueBuffer);
       } else {
-        switch (_reader.getValueType()) {
-          case INT:
-            for (int i = 0; i < length; i++) {
-              valueBuffer[i] = _reader.getInt(docIds[i], readerContext);
-            }
-            break;
-          case LONG:
-            for (int i = 0; i < length; i++) {
-              valueBuffer[i] = _reader.getLong(docIds[i], readerContext);
-            }
-            break;
-          case FLOAT:
-            for (int i = 0; i < length; i++) {
-              valueBuffer[i] = (long) _reader.getFloat(docIds[i], readerContext);
-            }
-            break;
-          case DOUBLE:
-            for (int i = 0; i < length; i++) {
-              valueBuffer[i] = (long) _reader.getDouble(docIds[i], readerContext);
-            }
-            break;
-          case STRING:
-            for (int i = 0; i < length; i++) {
-              valueBuffer[i] = Long.parseLong(_reader.getString(docIds[i], readerContext));
-            }
-            break;
-          default:
-            throw new IllegalStateException();
-        }
+        _reader.readValuesSV(docIds, length, valueBuffer, readerContext);
       }
     }
 
@@ -513,35 +457,7 @@ public class DataFetcher {
         _reader.readDictIds(docIds, length, dictIdBuffer, readerContext);
         _dictionary.readFloatValues(dictIdBuffer, length, valueBuffer);
       } else {
-        switch (_reader.getValueType()) {
-          case INT:
-            for (int i = 0; i < length; i++) {
-              valueBuffer[i] = _reader.getInt(docIds[i], readerContext);
-            }
-            break;
-          case LONG:
-            for (int i = 0; i < length; i++) {
-              valueBuffer[i] = _reader.getLong(docIds[i], readerContext);
-            }
-            break;
-          case FLOAT:
-            for (int i = 0; i < length; i++) {
-              valueBuffer[i] = _reader.getFloat(docIds[i], readerContext);
-            }
-            break;
-          case DOUBLE:
-            for (int i = 0; i < length; i++) {
-              valueBuffer[i] = (float) _reader.getDouble(docIds[i], readerContext);
-            }
-            break;
-          case STRING:
-            for (int i = 0; i < length; i++) {
-              valueBuffer[i] = Float.parseFloat(_reader.getString(docIds[i], readerContext));
-            }
-            break;
-          default:
-            throw new IllegalStateException();
-        }
+        _reader.readValuesSV(docIds, length, valueBuffer, readerContext);
       }
     }
 
@@ -557,35 +473,7 @@ public class DataFetcher {
         _reader.readDictIds(docIds, length, dictIdBuffer, readerContext);
         _dictionary.readDoubleValues(dictIdBuffer, length, valueBuffer);
       } else {
-        switch (_reader.getValueType()) {
-          case INT:
-            for (int i = 0; i < length; i++) {
-              valueBuffer[i] = _reader.getInt(docIds[i], readerContext);
-            }
-            break;
-          case LONG:
-            for (int i = 0; i < length; i++) {
-              valueBuffer[i] = _reader.getLong(docIds[i], readerContext);
-            }
-            break;
-          case FLOAT:
-            for (int i = 0; i < length; i++) {
-              valueBuffer[i] = _reader.getFloat(docIds[i], readerContext);
-            }
-            break;
-          case DOUBLE:
-            for (int i = 0; i < length; i++) {
-              valueBuffer[i] = _reader.getDouble(docIds[i], readerContext);
-            }
-            break;
-          case STRING:
-            for (int i = 0; i < length; i++) {
-              valueBuffer[i] = Double.parseDouble(_reader.getString(docIds[i], readerContext));
-            }
-            break;
-          default:
-            throw new IllegalStateException();
-        }
+        _reader.readValuesSV(docIds, length, valueBuffer, readerContext);
       }
     }
 
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkFixedByteSVForwardIndexReader.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkFixedByteSVForwardIndexReader.java
index f5c7e37..2c24443 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkFixedByteSVForwardIndexReader.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkFixedByteSVForwardIndexReader.java
@@ -58,23 +58,29 @@ public class BenchmarkFixedByteSVForwardIndexReader {
   private long[] _longBuffer;
   private FixedByteChunkSVForwardIndexReader _compressedReader;
   private FixedBytePower2ChunkSVForwardIndexReader _compressedPow2Reader;
+  private FixedByteChunkSVForwardIndexReader _uncompressedReader;
 
   @Setup(Level.Trial)
   public void setup()
       throws IOException {
     FileUtils.forceMkdir(INDEX_DIR);
+    File uncompressedIndexFile = new File(INDEX_DIR, UUID.randomUUID().toString());
     File compressedIndexFile = new File(INDEX_DIR, UUID.randomUUID().toString());
     File pow2CompressedIndexFile = new File(INDEX_DIR, UUID.randomUUID().toString());
     _doubleBuffer = new double[_blockSize];
     _longBuffer = new long[_blockSize];
     try (FixedByteChunkSVForwardIndexWriter writer = new FixedByteChunkSVForwardIndexWriter(compressedIndexFile,
         ChunkCompressionType.LZ4, _numBlocks * _blockSize, 1000, Long.BYTES, 3);
+        FixedByteChunkSVForwardIndexWriter passthroughWriter = new FixedByteChunkSVForwardIndexWriter(
+            uncompressedIndexFile,
+            ChunkCompressionType.PASS_THROUGH, _numBlocks * _blockSize, 1000, Long.BYTES, 3);
         FixedByteChunkSVForwardIndexWriter pow2Writer = new FixedByteChunkSVForwardIndexWriter(pow2CompressedIndexFile,
             ChunkCompressionType.LZ4, _numBlocks * _blockSize, 1000, Long.BYTES, 4)) {
       for (int i = 0; i < _numBlocks * _blockSize; i++) {
         long next = ThreadLocalRandom.current().nextLong();
         writer.putLong(next);
         pow2Writer.putLong(next);
+        passthroughWriter.putLong(next);
       }
     }
     _compressedReader = new FixedByteChunkSVForwardIndexReader(PinotDataBuffer.loadBigEndianFile(compressedIndexFile),
@@ -82,6 +88,9 @@ public class BenchmarkFixedByteSVForwardIndexReader {
     _compressedPow2Reader =
         new FixedBytePower2ChunkSVForwardIndexReader(PinotDataBuffer.loadBigEndianFile(pow2CompressedIndexFile),
             FieldSpec.DataType.LONG);
+    _uncompressedReader =
+        new FixedByteChunkSVForwardIndexReader(PinotDataBuffer.loadBigEndianFile(uncompressedIndexFile),
+            FieldSpec.DataType.LONG);
     _docIds = new int[_blockSize];
   }
 
@@ -144,4 +153,64 @@ public class BenchmarkFixedByteSVForwardIndexReader {
       }
     }
   }
+
+  @Benchmark
+  public void readDoublesBatch(Blackhole bh)
+      throws IOException {
+    try (ChunkReaderContext context = _uncompressedReader.createContext()) {
+      for (int block = 0; block < _numBlocks; block++) {
+        for (int i = 0; i < _docIds.length; i++) {
+          _docIds[i] = block * _blockSize + i;
+        }
+        _uncompressedReader.readValuesSV(_docIds, _docIds.length, _doubleBuffer, context);
+        bh.consume(_doubleBuffer);
+      }
+    }
+  }
+
+  @Benchmark
+  public void readDoubles(Blackhole bh)
+      throws IOException {
+    try (ChunkReaderContext context = _uncompressedReader.createContext()) {
+      for (int block = 0; block < _numBlocks; block++) {
+        for (int i = 0; i < _docIds.length; i++) {
+          _docIds[i] = block * _blockSize + i;
+        }
+        for (int i = 0; i < _docIds.length; i++) {
+          _doubleBuffer[i] = _uncompressedReader.getLong(_docIds[i], context);
+        }
+        bh.consume(_doubleBuffer);
+      }
+    }
+  }
+
+  @Benchmark
+  public void readLongsBatch(Blackhole bh)
+      throws IOException {
+    try (ChunkReaderContext context = _uncompressedReader.createContext()) {
+      for (int block = 0; block < _numBlocks; block++) {
+        for (int i = 0; i < _docIds.length; i++) {
+          _docIds[i] = block * _blockSize + i;
+        }
+        _uncompressedReader.readValuesSV(_docIds, _docIds.length, _longBuffer, context);
+        bh.consume(_longBuffer);
+      }
+    }
+  }
+
+  @Benchmark
+  public void readLongs(Blackhole bh)
+      throws IOException {
+    try (ChunkReaderContext context = _uncompressedReader.createContext()) {
+      for (int block = 0; block < _numBlocks; block++) {
+        for (int i = 0; i < _docIds.length; i++) {
+          _docIds[i] = block * _blockSize + i;
+        }
+        for (int i = 0; i < _docIds.length; i++) {
+          _longBuffer[i] = _uncompressedReader.getLong(_docIds[i], context);
+        }
+        bh.consume(_longBuffer);
+      }
+    }
+  }
 }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java
index dce4a90..289cfe3 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java
@@ -21,6 +21,10 @@ package org.apache.pinot.segment.local.segment.index.readers.forward;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.DoubleBuffer;
+import java.nio.FloatBuffer;
+import java.nio.IntBuffer;
+import java.nio.LongBuffer;
 import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
 import org.apache.pinot.segment.local.io.writer.impl.BaseChunkSVForwardIndexWriter;
 import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
@@ -168,8 +172,181 @@ public abstract class BaseChunkSVForwardIndexReader implements ForwardIndexReade
   }
 
   @Override
+  public void readValuesSV(int[] docIds, int length, int[] values, ChunkReaderContext context) {
+    if (getValueType().isFixedWidth() && !_isCompressed && isContiguousRange(docIds, length)) {
+      switch (getValueType()) {
+        case INT: {
+          int minOffset = docIds[0] * Integer.BYTES;
+          IntBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Integer.BYTES).asIntBuffer();
+          buffer.get(values, 0, length);
+        }
+        break;
+        case LONG: {
+          int minOffset = docIds[0] * Long.BYTES;
+          LongBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Long.BYTES).asLongBuffer();
+          for (int i = 0; i < buffer.limit(); i++) {
+            values[i] = (int) buffer.get(i);
+          }
+        }
+        break;
+        case FLOAT: {
+          int minOffset = docIds[0] * Float.BYTES;
+          FloatBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Float.BYTES).asFloatBuffer();
+          for (int i = 0; i < buffer.limit(); i++) {
+            values[i] = (int) buffer.get(i);
+          }
+        }
+        break;
+        case DOUBLE: {
+          int minOffset = docIds[0] * Double.BYTES;
+          DoubleBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Double.BYTES).asDoubleBuffer();
+          for (int i = 0; i < buffer.limit(); i++) {
+            values[i] = (int) buffer.get(i);
+          }
+        }
+        break;
+        default:
+          throw new IllegalArgumentException();
+      }
+    } else {
+      ForwardIndexReader.super.readValuesSV(docIds, length, values, context);
+    }
+  }
+
+  @Override
+  public void readValuesSV(int[] docIds, int length, long[] values, ChunkReaderContext context) {
+    if (getValueType().isFixedWidth() && !_isCompressed && isContiguousRange(docIds, length)) {
+      switch (getValueType()) {
+        case INT: {
+          int minOffset = docIds[0] * Integer.BYTES;
+          IntBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Integer.BYTES).asIntBuffer();
+          for (int i = 0; i < buffer.limit(); i++) {
+            values[i] = buffer.get(i);
+          }
+        }
+        break;
+        case LONG: {
+          int minOffset = docIds[0] * Long.BYTES;
+          LongBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Long.BYTES).asLongBuffer();
+          buffer.get(values, 0, length);
+        }
+        break;
+        case FLOAT: {
+          int minOffset = docIds[0] * Float.BYTES;
+          FloatBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Float.BYTES).asFloatBuffer();
+          for (int i = 0; i < buffer.limit(); i++) {
+            values[i] = (long) buffer.get(i);
+          }
+        }
+        break;
+        case DOUBLE: {
+          int minOffset = docIds[0] * Double.BYTES;
+          DoubleBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Double.BYTES).asDoubleBuffer();
+          for (int i = 0; i < buffer.limit(); i++) {
+            values[i] = (long) buffer.get(i);
+          }
+        }
+        break;
+        default:
+          throw new IllegalArgumentException();
+      }
+    } else {
+      ForwardIndexReader.super.readValuesSV(docIds, length, values, context);
+    }
+  }
+
+  @Override
+  public void readValuesSV(int[] docIds, int length, float[] values, ChunkReaderContext context) {
+    if (getValueType().isFixedWidth() && !_isCompressed && isContiguousRange(docIds, length)) {
+      switch (getValueType()) {
+        case INT: {
+          int minOffset = docIds[0] * Integer.BYTES;
+          IntBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Integer.BYTES).asIntBuffer();
+          for (int i = 0; i < buffer.limit(); i++) {
+            values[i] = buffer.get(i);
+          }
+        }
+        break;
+        case LONG: {
+          int minOffset = docIds[0] * Long.BYTES;
+          LongBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Long.BYTES).asLongBuffer();
+          for (int i = 0; i < buffer.limit(); i++) {
+            values[i] = buffer.get(i);
+          }
+        }
+        break;
+        case FLOAT: {
+          int minOffset = docIds[0] * Float.BYTES;
+          FloatBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Float.BYTES).asFloatBuffer();
+          buffer.get(values, 0, length);
+        }
+        break;
+        case DOUBLE: {
+          int minOffset = docIds[0] * Double.BYTES;
+          DoubleBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Double.BYTES).asDoubleBuffer();
+          for (int i = 0; i < buffer.limit(); i++) {
+            values[i] = (float) buffer.get(i);
+          }
+        }
+        break;
+        default:
+          throw new IllegalArgumentException();
+      }
+    } else {
+      ForwardIndexReader.super.readValuesSV(docIds, length, values, context);
+    }
+  }
+
+  @Override
+  public void readValuesSV(int[] docIds, int length, double[] values, ChunkReaderContext context) {
+    if (getValueType().isFixedWidth() && !_isCompressed && isContiguousRange(docIds, length)) {
+      switch (getValueType()) {
+        case INT: {
+          int minOffset = docIds[0] * Integer.BYTES;
+          IntBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Integer.BYTES).asIntBuffer();
+          for (int i = 0; i < buffer.limit(); i++) {
+            values[i] = buffer.get(i);
+          }
+        }
+        break;
+        case LONG: {
+          int minOffset = docIds[0] * Long.BYTES;
+          getLong(0, context);
+          LongBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Long.BYTES).asLongBuffer();
+          for (int i = 0; i < buffer.limit(); i++) {
+            values[i] = buffer.get(i);
+          }
+        }
+        break;
+        case FLOAT: {
+          int minOffset = docIds[0] * Float.BYTES;
+          FloatBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Float.BYTES).asFloatBuffer();
+          for (int i = 0; i < buffer.limit(); i++) {
+            values[i] = buffer.get(i);
+          }
+        }
+        break;
+        case DOUBLE: {
+          int minOffset = docIds[0] * Double.BYTES;
+          DoubleBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Double.BYTES).asDoubleBuffer();
+          buffer.get(values, 0, length);
+        }
+        break;
+        default:
+          throw new IllegalArgumentException();
+      }
+    } else {
+      ForwardIndexReader.super.readValuesSV(docIds, length, values, context);
+    }
+  }
+
+  @Override
   public void close() {
     // NOTE: DO NOT close the PinotDataBuffer here because it is tracked by the caller and might be reused later. The
     // caller is responsible of closing the PinotDataBuffer.
   }
+
+  private boolean isContiguousRange(int[] docIds, int length) {
+    return docIds[length - 1] - docIds[0] == length - 1;
+  }
 }
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
index 941522c..c6b9809 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
@@ -103,6 +103,162 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends
    */
 
   /**
+   * Fills the values
+   * @param docIds Array containing the document ids to read
+   * @param length Number of values to read
+   * @param values Values to fill
+   * @param context Reader context
+   */
+  default void readValuesSV(int[] docIds, int length, int[] values, T context) {
+    switch (getValueType()) {
+      case INT:
+        for (int i = 0; i < length; i++) {
+          values[i] = getInt(docIds[i], context);
+        }
+        break;
+      case LONG:
+        for (int i = 0; i < length; i++) {
+          values[i] = (int) getLong(docIds[i], context);
+        }
+        break;
+      case FLOAT:
+        for (int i = 0; i < length; i++) {
+          values[i] = (int) getFloat(docIds[i], context);
+        }
+        break;
+      case DOUBLE:
+        for (int i = 0; i < length; i++) {
+          values[i] = (int) getDouble(docIds[i], context);
+        }
+        break;
+      case STRING:
+        for (int i = 0; i < length; i++) {
+          values[i] = Integer.parseInt(getString(docIds[i], context));
+        }
+        break;
+      default:
+        throw new IllegalArgumentException();
+    }
+  }
+
+  /**
+   * Fills the values
+   * @param docIds Array containing the document ids to read
+   * @param length Number of values to read
+   * @param values Values to fill
+   * @param context Reader context
+   */
+  default void readValuesSV(int[] docIds, int length, long[] values, T context) {
+    switch (getValueType()) {
+      case INT:
+        for (int i = 0; i < length; i++) {
+          values[i] = getInt(docIds[i], context);
+        }
+        break;
+      case LONG:
+        for (int i = 0; i < length; i++) {
+          values[i] = getLong(docIds[i], context);
+        }
+        break;
+      case FLOAT:
+        for (int i = 0; i < length; i++) {
+          values[i] = (long) getFloat(docIds[i], context);
+        }
+        break;
+      case DOUBLE:
+        for (int i = 0; i < length; i++) {
+          values[i] = (long) getDouble(docIds[i], context);
+        }
+        break;
+      case STRING:
+        for (int i = 0; i < length; i++) {
+          values[i] = Long.parseLong(getString(docIds[i], context));
+        }
+        break;
+      default:
+        throw new IllegalArgumentException();
+    }
+  }
+
+  /**
+   * Fills the values
+   * @param docIds Array containing the document ids to read
+   * @param length Number of values to read
+   * @param values Values to fill
+   * @param context Reader context
+   */
+  default void readValuesSV(int[] docIds, int length, float[] values, T context) {
+    switch (getValueType()) {
+      case INT:
+        for (int i = 0; i < length; i++) {
+          values[i] = getInt(docIds[i], context);
+        }
+        break;
+      case LONG:
+        for (int i = 0; i < length; i++) {
+          values[i] = getLong(docIds[i], context);
+        }
+        break;
+      case FLOAT:
+        for (int i = 0; i < length; i++) {
+          values[i] = getFloat(docIds[i], context);
+        }
+        break;
+      case DOUBLE:
+        for (int i = 0; i < length; i++) {
+          values[i] = (float) getDouble(docIds[i], context);
+        }
+        break;
+      case STRING:
+        for (int i = 0; i < length; i++) {
+          values[i] = Float.parseFloat(getString(docIds[i], context));
+        }
+        break;
+      default:
+        throw new IllegalArgumentException();
+    }
+  }
+
+  /**
+   * Fills the values
+   * @param docIds Array containing the document ids to read
+   * @param length Number of values to read
+   * @param values Values to fill
+   * @param context Reader context
+   */
+  default void readValuesSV(int[] docIds, int length, double[] values, T context) {
+    switch (getValueType()) {
+      case INT:
+        for (int i = 0; i < length; i++) {
+          values[i] = getInt(docIds[i], context);
+        }
+        break;
+      case LONG:
+        for (int i = 0; i < length; i++) {
+          values[i] = getLong(docIds[i], context);
+        }
+        break;
+      case FLOAT:
+        for (int i = 0; i < length; i++) {
+          values[i] = getFloat(docIds[i], context);
+        }
+        break;
+      case DOUBLE:
+        for (int i = 0; i < length; i++) {
+          values[i] = getDouble(docIds[i], context);
+        }
+        break;
+      case STRING:
+        for (int i = 0; i < length; i++) {
+          values[i] = Double.parseDouble(getString(docIds[i], context));
+        }
+        break;
+      default:
+        throw new IllegalArgumentException();
+    }
+  }
+
+  /**
    * Reads the INT value at the given document id.
    *
    * @param docId Document id

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org