You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ri...@apache.org on 2022/02/09 07:00:30 UTC

[pinot] branch revert-8140-allocation-free-datablock-cache created (now c15bf3e)

This is an automated email from the ASF dual-hosted git repository.

richardstartin pushed a change to branch revert-8140-allocation-free-datablock-cache
in repository https://gitbox.apache.org/repos/asf/pinot.git.


      at c15bf3e  Revert "Allocation free `DataBlockCache` lookups (#8140)"

This branch includes the following new commits:

     new c15bf3e  Revert "Allocation free `DataBlockCache` lookups (#8140)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[pinot] 01/01: Revert "Allocation free `DataBlockCache` lookups (#8140)"

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

richardstartin pushed a commit to branch revert-8140-allocation-free-datablock-cache
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit c15bf3eeae5e8c154c36e04b484b5efc088a1f5d
Author: Richard Startin <ri...@startree.ai>
AuthorDate: Wed Feb 9 07:00:03 2022 +0000

    Revert "Allocation free `DataBlockCache` lookups (#8140)"
    
    This reverts commit 65dcfe785e0e65779e12a1e730365b99bc825428.
---
 .../apache/pinot/core/common/DataBlockCache.java   | 120 ++++++++++++---------
 1 file changed, 72 insertions(+), 48 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java b/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java
index 254a43b..24e71b1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java
@@ -18,14 +18,15 @@
  */
 package org.apache.pinot.core.common;
 
-import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import javax.annotation.Nonnull;
 import org.apache.pinot.core.plan.DocIdSetPlanNode;
 import org.apache.pinot.segment.spi.evaluator.TransformEvaluator;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.utils.EqualityUtils;
 
 
 /**
@@ -39,12 +40,12 @@ public class DataBlockCache {
 
   // Mark whether data have been fetched, need to be cleared in initNewBlock()
   private final Set<String> _columnDictIdLoaded = new HashSet<>();
-  private final Map<FieldSpec.DataType, Set<String>> _columnValueLoaded = new EnumMap<>(FieldSpec.DataType.class);
+  private final Set<ColumnTypePair> _columnValueLoaded = new HashSet<>();
   private final Set<String> _columnNumValuesLoaded = new HashSet<>();
 
   // Buffer for data
   private final Map<String, Object> _dictIdsMap = new HashMap<>();
-  private final Map<FieldSpec.DataType, Map<String, Object>> _valuesMap = new HashMap<>();
+  private final Map<ColumnTypePair, Object> _valuesMap = new HashMap<>();
   private final Map<String, int[]> _numValuesMap = new HashMap<>();
 
   private int[] _docIds;
@@ -64,10 +65,9 @@ public class DataBlockCache {
   public void initNewBlock(int[] docIds, int length) {
     _docIds = docIds;
     _length = length;
+
     _columnDictIdLoaded.clear();
-    for (Set<String> columns : _columnValueLoaded.values()) {
-      columns.clear();
-    }
+    _columnValueLoaded.clear();
     _columnNumValuesLoaded.clear();
   }
 
@@ -109,11 +109,12 @@ public class DataBlockCache {
    * @return Array of int values
    */
   public int[] getIntValuesForSVColumn(String column) {
-    int[] intValues = getValues(FieldSpec.DataType.INT, column);
-    if (markLoaded(FieldSpec.DataType.INT, column)) {
+    ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.INT);
+    int[] intValues = (int[]) _valuesMap.get(key);
+    if (_columnValueLoaded.add(key)) {
       if (intValues == null) {
         intValues = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
-        putValues(FieldSpec.DataType.INT, column, intValues);
+        _valuesMap.put(key, intValues);
       }
       _dataFetcher.fetchIntValues(column, _docIds, _length, intValues);
     }
@@ -138,11 +139,12 @@ public class DataBlockCache {
    * @return Array of long values
    */
   public long[] getLongValuesForSVColumn(String column) {
-    long[] longValues = getValues(FieldSpec.DataType.LONG, column);
-    if (markLoaded(FieldSpec.DataType.LONG, column)) {
+    ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.LONG);
+    long[] longValues = (long[]) _valuesMap.get(key);
+    if (_columnValueLoaded.add(key)) {
       if (longValues == null) {
         longValues = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL];
-        putValues(FieldSpec.DataType.LONG, column, longValues);
+        _valuesMap.put(key, longValues);
       }
       _dataFetcher.fetchLongValues(column, _docIds, _length, longValues);
     }
@@ -167,11 +169,12 @@ public class DataBlockCache {
    * @return Array of float values
    */
   public float[] getFloatValuesForSVColumn(String column) {
-    float[] floatValues = getValues(FieldSpec.DataType.FLOAT, column);
-    if (markLoaded(FieldSpec.DataType.FLOAT, column)) {
+    ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.FLOAT);
+    float[] floatValues = (float[]) _valuesMap.get(key);
+    if (_columnValueLoaded.add(key)) {
       if (floatValues == null) {
         floatValues = new float[DocIdSetPlanNode.MAX_DOC_PER_CALL];
-        putValues(FieldSpec.DataType.FLOAT, column, floatValues);
+        _valuesMap.put(key, floatValues);
       }
       _dataFetcher.fetchFloatValues(column, _docIds, _length, floatValues);
     }
@@ -196,11 +199,12 @@ public class DataBlockCache {
    * @return Array of double values
    */
   public double[] getDoubleValuesForSVColumn(String column) {
-    double[] doubleValues = getValues(FieldSpec.DataType.DOUBLE, column);
-    if (markLoaded(FieldSpec.DataType.DOUBLE, column)) {
+    ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.DOUBLE);
+    double[] doubleValues = (double[]) _valuesMap.get(key);
+    if (_columnValueLoaded.add(key)) {
       if (doubleValues == null) {
         doubleValues = new double[DocIdSetPlanNode.MAX_DOC_PER_CALL];
-        putValues(FieldSpec.DataType.DOUBLE, column, doubleValues);
+        _valuesMap.put(key, doubleValues);
       }
       _dataFetcher.fetchDoubleValues(column, _docIds, _length, doubleValues);
     }
@@ -225,11 +229,12 @@ public class DataBlockCache {
    * @return Array of string values
    */
   public String[] getStringValuesForSVColumn(String column) {
-    String[] stringValues = getValues(FieldSpec.DataType.STRING, column);
-    if (markLoaded(FieldSpec.DataType.STRING, column)) {
+    ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.STRING);
+    String[] stringValues = (String[]) _valuesMap.get(key);
+    if (_columnValueLoaded.add(key)) {
       if (stringValues == null) {
         stringValues = new String[DocIdSetPlanNode.MAX_DOC_PER_CALL];
-        putValues(FieldSpec.DataType.STRING, column, stringValues);
+        _valuesMap.put(key, stringValues);
       }
       _dataFetcher.fetchStringValues(column, _docIds, _length, stringValues);
     }
@@ -254,11 +259,13 @@ public class DataBlockCache {
    * @return byte[] for the column
    */
   public byte[][] getBytesValuesForSVColumn(String column) {
-    byte[][] bytesValues = getValues(FieldSpec.DataType.BYTES, column);
-    if (markLoaded(FieldSpec.DataType.BYTES, column)) {
+    ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.BYTES);
+    byte[][] bytesValues = (byte[][]) _valuesMap.get(key);
+
+    if (_columnValueLoaded.add(key)) {
       if (bytesValues == null) {
         bytesValues = new byte[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
-        putValues(FieldSpec.DataType.BYTES, column, bytesValues);
+        _valuesMap.put(key, bytesValues);
       }
       _dataFetcher.fetchBytesValues(column, _docIds, _length, bytesValues);
     }
@@ -294,11 +301,12 @@ public class DataBlockCache {
    * @return Array of int values
    */
   public int[][] getIntValuesForMVColumn(String column) {
-    int[][] intValues = getValues(FieldSpec.DataType.INT, column);
-    if (markLoaded(FieldSpec.DataType.INT, column)) {
+    ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.INT);
+    int[][] intValues = (int[][]) _valuesMap.get(key);
+    if (_columnValueLoaded.add(key)) {
       if (intValues == null) {
         intValues = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
-        putValues(FieldSpec.DataType.INT, column, intValues);
+        _valuesMap.put(key, intValues);
       }
       _dataFetcher.fetchIntValues(column, _docIds, _length, intValues);
     }
@@ -323,11 +331,12 @@ public class DataBlockCache {
    * @return Array of long values
    */
   public long[][] getLongValuesForMVColumn(String column) {
-    long[][] longValues = getValues(FieldSpec.DataType.LONG, column);
-    if (markLoaded(FieldSpec.DataType.LONG, column)) {
+    ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.LONG);
+    long[][] longValues = (long[][]) _valuesMap.get(key);
+    if (_columnValueLoaded.add(key)) {
       if (longValues == null) {
         longValues = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
-        putValues(FieldSpec.DataType.LONG, column, longValues);
+        _valuesMap.put(key, longValues);
       }
       _dataFetcher.fetchLongValues(column, _docIds, _length, longValues);
     }
@@ -352,11 +361,12 @@ public class DataBlockCache {
    * @return Array of float values
    */
   public float[][] getFloatValuesForMVColumn(String column) {
-    float[][] floatValues = getValues(FieldSpec.DataType.FLOAT, column);
-    if (markLoaded(FieldSpec.DataType.FLOAT, column)) {
+    ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.FLOAT);
+    float[][] floatValues = (float[][]) _valuesMap.get(key);
+    if (_columnValueLoaded.add(key)) {
       if (floatValues == null) {
         floatValues = new float[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
-        putValues(FieldSpec.DataType.FLOAT, column, floatValues);
+        _valuesMap.put(key, floatValues);
       }
       _dataFetcher.fetchFloatValues(column, _docIds, _length, floatValues);
     }
@@ -381,11 +391,12 @@ public class DataBlockCache {
    * @return Array of double values
    */
   public double[][] getDoubleValuesForMVColumn(String column) {
-    double[][] doubleValues = getValues(FieldSpec.DataType.DOUBLE, column);
-    if (markLoaded(FieldSpec.DataType.DOUBLE, column)) {
+    ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.DOUBLE);
+    double[][] doubleValues = (double[][]) _valuesMap.get(key);
+    if (_columnValueLoaded.add(key)) {
       if (doubleValues == null) {
         doubleValues = new double[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
-        putValues(FieldSpec.DataType.DOUBLE, column, doubleValues);
+        _valuesMap.put(key, doubleValues);
       }
       _dataFetcher.fetchDoubleValues(column, _docIds, _length, doubleValues);
     }
@@ -410,11 +421,12 @@ public class DataBlockCache {
    * @return Array of string values
    */
   public String[][] getStringValuesForMVColumn(String column) {
-    String[][] stringValues = getValues(FieldSpec.DataType.STRING, column);
-    if (markLoaded(FieldSpec.DataType.STRING, column)) {
+    ColumnTypePair key = new ColumnTypePair(column, FieldSpec.DataType.STRING);
+    String[][] stringValues = (String[][]) _valuesMap.get(key);
+    if (_columnValueLoaded.add(key)) {
       if (stringValues == null) {
         stringValues = new String[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
-        putValues(FieldSpec.DataType.STRING, column, stringValues);
+        _valuesMap.put(key, stringValues);
       }
       _dataFetcher.fetchStringValues(column, _docIds, _length, stringValues);
     }
@@ -450,16 +462,28 @@ public class DataBlockCache {
     return numValues;
   }
 
-  private boolean markLoaded(FieldSpec.DataType dataType, String column) {
-    return _columnValueLoaded.computeIfAbsent(dataType, k -> new HashSet<>()).add(column);
-  }
+  /**
+   * Helper class to store pair of column name and data type.
+   */
+  private static class ColumnTypePair {
+    final String _column;
+    final FieldSpec.DataType _dataType;
 
-  @SuppressWarnings("unchecked")
-  private <T> T getValues(FieldSpec.DataType dataType, String column) {
-    return (T) _valuesMap.computeIfAbsent(dataType, k -> new HashMap<>()).get(column);
-  }
+    ColumnTypePair(@Nonnull String column, @Nonnull FieldSpec.DataType dataType) {
+      _column = column;
+      _dataType = dataType;
+    }
 
-  private void putValues(FieldSpec.DataType dataType, String column, Object values) {
-    _valuesMap.get(dataType).put(column, values);
+    @Override
+    public int hashCode() {
+      return EqualityUtils.hashCodeOf(_column.hashCode(), _dataType.hashCode());
+    }
+
+    @SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
+    @Override
+    public boolean equals(Object obj) {
+      ColumnTypePair that = (ColumnTypePair) obj;
+      return _column.equals(that._column) && _dataType == that._dataType;
+    }
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org