You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/02/15 12:27:19 UTC

[GitHub] [pinot] richardstartin opened a new pull request #8204: reduce allocation rate in LookupTransformFunction

richardstartin opened a new pull request #8204:
URL: https://github.com/apache/pinot/pull/8204


   Addresses high allocation rates observed in `LookupTransformFunction` by:
   
   * avoiding conversion of primitive blocks to boxed arrays
   * use the arrays in `BaseTransformFunction` which are reused between block evaluations
   * don't materialise a temporary join column output, write the result directly into the output


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] richardstartin commented on a change in pull request #8204: reduce allocation rate in LookupTransformFunction

Posted by GitBox <gi...@apache.org>.
richardstartin commented on a change in pull request #8204:
URL: https://github.com/apache/pinot/pull/8204#discussion_r807389188



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java
##########
@@ -143,191 +153,251 @@ public TransformResultMetadata getResultMetadata() {
         _lookupColumnFieldSpec.isSingleValueField(), false);
   }
 
-  private Object[] lookup(ProjectionBlock projectionBlock) {
+  @FunctionalInterface
+  private interface ValueAcceptor {
+    void accept(int index, Object value);
+  }
+
+  private void lookup(ProjectionBlock projectionBlock, ValueAcceptor valueAcceptor) {
     int numPkColumns = _joinKeys.size();
     int numDocuments = projectionBlock.getNumDocs();
-    Object[][] pkColumns = new Object[numPkColumns][];
+    Object[] pkColumns = new Object[numPkColumns];
     for (int c = 0; c < numPkColumns; c++) {
       DataType storedType = _joinValueFieldSpecs.get(c).getDataType().getStoredType();
       TransformFunction tf = _joinValueFunctions.get(c);
       switch (storedType) {
         case INT:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToIntValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToIntValuesSV(projectionBlock);
           break;
         case LONG:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToLongValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToLongValuesSV(projectionBlock);
           break;
         case FLOAT:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToFloatValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToFloatValuesSV(projectionBlock);
           break;
         case DOUBLE:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToDoubleValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToDoubleValuesSV(projectionBlock);
           break;
         case STRING:
           pkColumns[c] = tf.transformToStringValuesSV(projectionBlock);
           break;
         case BYTES:
-          byte[][] primitiveValues = tf.transformToBytesValuesSV(projectionBlock);
-          pkColumns[c] = new ByteArray[numDocuments];
-          for (int i = 0; i < numDocuments; i++) {
-            pkColumns[c][i] = new ByteArray(primitiveValues[i]);
-          }
+          pkColumns[c] = tf.transformToBytesValuesSV(projectionBlock);
           break;
         default:
           throw new IllegalStateException("Unknown column type for primary key");
       }
     }
 
-    Object[] resultSet = new Object[numDocuments];
     Object[] pkValues = new Object[numPkColumns];
+    PrimaryKey primaryKey = new PrimaryKey(pkValues);
     for (int i = 0; i < numDocuments; i++) {
       // prepare pk
       for (int c = 0; c < numPkColumns; c++) {
-        pkValues[c] = pkColumns[c][i];
+        if (pkColumns[c] instanceof int[]) {
+          pkValues[c] = ((int[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof long[]) {
+          pkValues[c] = ((long[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof String[]) {
+          pkValues[c] = ((String[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof float[]) {
+          pkValues[c] = ((float[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof double[]) {
+          pkValues[c] = ((double[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof byte[][]) {
+          pkValues[c] = new ByteArray(((byte[][]) pkColumns[c])[i]);
+        }
       }
       // lookup
-      GenericRow row = _dataManager.lookupRowByPrimaryKey(new PrimaryKey(pkValues));
-      if (row != null) {
-        resultSet[i] = row.getValue(_dimColumnName);
-      }
+      GenericRow row = _dataManager.lookupRowByPrimaryKey(primaryKey);
+      Object value = row == null ? null : row.getValue(_dimColumnName);
+      valueAcceptor.accept(i, value);
     }
-    return resultSet;
   }
 
   @Override
   public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    int[] resultSet = new int[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).intValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).intValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_intValuesSV == null || _intValuesSV.length < numDocs) {
+      _intValuesSV = new int[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setIntSV);
+    return _intValuesSV;
   }
 
   @Override
   public long[] transformToLongValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    long[] resultSet = new long[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).longValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).longValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_longValuesSV == null || _longValuesSV.length < numDocs) {
+      _longValuesSV = new long[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setLongSV);
+    return _longValuesSV;
   }
 
   @Override
   public float[] transformToFloatValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    float[] resultSet = new float[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).floatValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).floatValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_floatValuesSV == null || _floatValuesSV.length < numDocs) {
+      _floatValuesSV = new float[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setFloatSV);
+    return _floatValuesSV;
   }
 
   @Override
   public double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    double[] resultSet = new double[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).doubleValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).doubleValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_doubleValuesSV == null || _doubleValuesSV.length < numDocs) {
+      _doubleValuesSV = new double[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setDoubleSV);
+    return _doubleValuesSV;
   }
 
   @Override
   public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    String[] resultSet = new String[lookupObjects.length];
-    Arrays.fill(resultSet, _lookupColumnFieldSpec.getDefaultNullValueString());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = lookupObjects[i].toString();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_stringValuesSV == null || _stringValuesSV.length < numDocs) {
+      _stringValuesSV = new String[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setStringSV);
+    return _stringValuesSV;
   }
 
   @Override
   public byte[][] transformToBytesValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    byte[][] resultSet = new byte[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (byte[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_byteValuesSV == null || _byteValuesSV.length < numDocs) {
+      _byteValuesSV = new byte[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setBytesSV);
+    return _byteValuesSV;
   }
 
   @Override
   public int[][] transformToIntValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    int[][] resultSet = new int[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (int[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_intValuesMV == null || _intValuesMV.length < numDocs) {
+      _intValuesMV = new int[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setIntMV);
+    return _intValuesMV;
   }
 
   @Override
   public long[][] transformToLongValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    long[][] resultSet = new long[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (long[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_longValuesMV == null || _longValuesMV.length < numDocs) {
+      _longValuesMV = new long[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setLongMV);
+    return _longValuesMV;
   }
 
   @Override
   public float[][] transformToFloatValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    float[][] resultSet = new float[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (float[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_floatValuesMV == null || _floatValuesMV.length < numDocs) {
+      _floatValuesMV = new float[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setFloatMV);
+    return _floatValuesMV;
   }
 
   @Override
   public double[][] transformToDoubleValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    double[][] resultSet = new double[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (double[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_doubleValuesMV == null || _doubleValuesMV.length < numDocs) {
+      _doubleValuesMV = new double[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setDoubleMV);
+    return _doubleValuesMV;
   }
 
   @Override
   public String[][] transformToStringValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    String[][] resultSet = new String[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (String[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_stringValuesMV == null || _stringValuesMV.length < numDocs) {
+      _stringValuesMV = new String[numDocs][];
+    }
+    lookup(projectionBlock, this::setStringMV);
+    return _stringValuesMV;
+  }
+
+  private void setIntSV(int index, Object value) {
+    if (value instanceof Number) {
+      _intValuesSV[index] = ((Number) value).intValue();
+    } else {
+      _intValuesSV[index] = _nullIntValue;
+    }
+  }
+
+  private void setLongSV(int index, Object value) {
+    if (value instanceof Number) {
+      _longValuesSV[index] = ((Number) value).longValue();
+    } else {
+      _longValuesSV[index] = _nullLongValue;
+    }
+  }
+
+  private void setFloatSV(int index, Object value) {
+    if (value instanceof Number) {
+      _floatValuesSV[index] = ((Number) value).floatValue();
+    } else {
+      _floatValuesSV[index] = _nullFloatValue;
+    }
+  }
+
+  private void setDoubleSV(int index, Object value) {
+    if (value instanceof Number) {
+      _doubleValuesSV[index] = ((Number) value).doubleValue();
+    } else {
+      _doubleValuesSV[index] = _nullDoubleValue;
+    }
+  }
+
+  private void setStringSV(int index, Object value) {
+    if (value != null) {
+      _stringValuesSV[index] = String.valueOf(value);
+    } else {
+      _stringValuesSV[index] = _lookupColumnFieldSpec.getDefaultNullValueString();
+    }
+  }
+
+  private void setBytesSV(int index, Object value) {
+    if (value instanceof byte[]) {
+      _byteValuesSV[index] = (byte[]) value;
+    }

Review comment:
       this isn't a behaviour change




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] richardstartin commented on a change in pull request #8204: reduce allocation rate in LookupTransformFunction

Posted by GitBox <gi...@apache.org>.
richardstartin commented on a change in pull request #8204:
URL: https://github.com/apache/pinot/pull/8204#discussion_r807404118



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java
##########
@@ -143,191 +153,251 @@ public TransformResultMetadata getResultMetadata() {
         _lookupColumnFieldSpec.isSingleValueField(), false);
   }
 
-  private Object[] lookup(ProjectionBlock projectionBlock) {
+  @FunctionalInterface
+  private interface ValueAcceptor {
+    void accept(int index, Object value);
+  }
+
+  private void lookup(ProjectionBlock projectionBlock, ValueAcceptor valueAcceptor) {
     int numPkColumns = _joinKeys.size();
     int numDocuments = projectionBlock.getNumDocs();
-    Object[][] pkColumns = new Object[numPkColumns][];
+    Object[] pkColumns = new Object[numPkColumns];
     for (int c = 0; c < numPkColumns; c++) {
       DataType storedType = _joinValueFieldSpecs.get(c).getDataType().getStoredType();
       TransformFunction tf = _joinValueFunctions.get(c);
       switch (storedType) {
         case INT:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToIntValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToIntValuesSV(projectionBlock);
           break;
         case LONG:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToLongValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToLongValuesSV(projectionBlock);
           break;
         case FLOAT:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToFloatValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToFloatValuesSV(projectionBlock);
           break;
         case DOUBLE:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToDoubleValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToDoubleValuesSV(projectionBlock);
           break;
         case STRING:
           pkColumns[c] = tf.transformToStringValuesSV(projectionBlock);
           break;
         case BYTES:
-          byte[][] primitiveValues = tf.transformToBytesValuesSV(projectionBlock);
-          pkColumns[c] = new ByteArray[numDocuments];
-          for (int i = 0; i < numDocuments; i++) {
-            pkColumns[c][i] = new ByteArray(primitiveValues[i]);
-          }
+          pkColumns[c] = tf.transformToBytesValuesSV(projectionBlock);
           break;
         default:
           throw new IllegalStateException("Unknown column type for primary key");
       }
     }
 
-    Object[] resultSet = new Object[numDocuments];
     Object[] pkValues = new Object[numPkColumns];
+    PrimaryKey primaryKey = new PrimaryKey(pkValues);
     for (int i = 0; i < numDocuments; i++) {
       // prepare pk
       for (int c = 0; c < numPkColumns; c++) {
-        pkValues[c] = pkColumns[c][i];
+        if (pkColumns[c] instanceof int[]) {
+          pkValues[c] = ((int[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof long[]) {
+          pkValues[c] = ((long[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof String[]) {
+          pkValues[c] = ((String[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof float[]) {
+          pkValues[c] = ((float[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof double[]) {
+          pkValues[c] = ((double[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof byte[][]) {
+          pkValues[c] = new ByteArray(((byte[][]) pkColumns[c])[i]);
+        }
       }
       // lookup
-      GenericRow row = _dataManager.lookupRowByPrimaryKey(new PrimaryKey(pkValues));
-      if (row != null) {
-        resultSet[i] = row.getValue(_dimColumnName);
-      }
+      GenericRow row = _dataManager.lookupRowByPrimaryKey(primaryKey);
+      Object value = row == null ? null : row.getValue(_dimColumnName);
+      valueAcceptor.accept(i, value);
     }
-    return resultSet;
   }
 
   @Override
   public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    int[] resultSet = new int[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).intValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).intValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_intValuesSV == null || _intValuesSV.length < numDocs) {
+      _intValuesSV = new int[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setIntSV);
+    return _intValuesSV;
   }
 
   @Override
   public long[] transformToLongValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    long[] resultSet = new long[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).longValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).longValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_longValuesSV == null || _longValuesSV.length < numDocs) {
+      _longValuesSV = new long[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setLongSV);
+    return _longValuesSV;
   }
 
   @Override
   public float[] transformToFloatValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    float[] resultSet = new float[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).floatValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).floatValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_floatValuesSV == null || _floatValuesSV.length < numDocs) {
+      _floatValuesSV = new float[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setFloatSV);
+    return _floatValuesSV;
   }
 
   @Override
   public double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    double[] resultSet = new double[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).doubleValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).doubleValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_doubleValuesSV == null || _doubleValuesSV.length < numDocs) {
+      _doubleValuesSV = new double[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setDoubleSV);
+    return _doubleValuesSV;
   }
 
   @Override
   public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    String[] resultSet = new String[lookupObjects.length];
-    Arrays.fill(resultSet, _lookupColumnFieldSpec.getDefaultNullValueString());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = lookupObjects[i].toString();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_stringValuesSV == null || _stringValuesSV.length < numDocs) {
+      _stringValuesSV = new String[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setStringSV);
+    return _stringValuesSV;
   }
 
   @Override
   public byte[][] transformToBytesValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    byte[][] resultSet = new byte[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (byte[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_byteValuesSV == null || _byteValuesSV.length < numDocs) {
+      _byteValuesSV = new byte[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setBytesSV);
+    return _byteValuesSV;
   }
 
   @Override
   public int[][] transformToIntValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    int[][] resultSet = new int[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (int[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_intValuesMV == null || _intValuesMV.length < numDocs) {
+      _intValuesMV = new int[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setIntMV);
+    return _intValuesMV;
   }
 
   @Override
   public long[][] transformToLongValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    long[][] resultSet = new long[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (long[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_longValuesMV == null || _longValuesMV.length < numDocs) {
+      _longValuesMV = new long[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setLongMV);
+    return _longValuesMV;
   }
 
   @Override
   public float[][] transformToFloatValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    float[][] resultSet = new float[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (float[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_floatValuesMV == null || _floatValuesMV.length < numDocs) {
+      _floatValuesMV = new float[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setFloatMV);
+    return _floatValuesMV;
   }
 
   @Override
   public double[][] transformToDoubleValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    double[][] resultSet = new double[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (double[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_doubleValuesMV == null || _doubleValuesMV.length < numDocs) {
+      _doubleValuesMV = new double[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setDoubleMV);
+    return _doubleValuesMV;
   }
 
   @Override
   public String[][] transformToStringValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    String[][] resultSet = new String[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (String[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_stringValuesMV == null || _stringValuesMV.length < numDocs) {
+      _stringValuesMV = new String[numDocs][];
+    }
+    lookup(projectionBlock, this::setStringMV);
+    return _stringValuesMV;
+  }
+
+  private void setIntSV(int index, Object value) {
+    if (value instanceof Number) {
+      _intValuesSV[index] = ((Number) value).intValue();
+    } else {
+      _intValuesSV[index] = _nullIntValue;
+    }
+  }
+
+  private void setLongSV(int index, Object value) {
+    if (value instanceof Number) {
+      _longValuesSV[index] = ((Number) value).longValue();
+    } else {
+      _longValuesSV[index] = _nullLongValue;
+    }
+  }
+
+  private void setFloatSV(int index, Object value) {
+    if (value instanceof Number) {
+      _floatValuesSV[index] = ((Number) value).floatValue();
+    } else {
+      _floatValuesSV[index] = _nullFloatValue;
+    }
+  }
+
+  private void setDoubleSV(int index, Object value) {
+    if (value instanceof Number) {
+      _doubleValuesSV[index] = ((Number) value).doubleValue();
+    } else {
+      _doubleValuesSV[index] = _nullDoubleValue;
+    }
+  }
+
+  private void setStringSV(int index, Object value) {
+    if (value != null) {
+      _stringValuesSV[index] = String.valueOf(value);
+    } else {
+      _stringValuesSV[index] = _lookupColumnFieldSpec.getDefaultNullValueString();
+    }
+  }
+
+  private void setBytesSV(int index, Object value) {
+    if (value instanceof byte[]) {
+      _byteValuesSV[index] = (byte[]) value;
+    }

Review comment:
       Good catch, I had missed that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] richardstartin commented on a change in pull request #8204: reduce allocation rate in LookupTransformFunction

Posted by GitBox <gi...@apache.org>.
richardstartin commented on a change in pull request #8204:
URL: https://github.com/apache/pinot/pull/8204#discussion_r807406982



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java
##########
@@ -143,191 +153,251 @@ public TransformResultMetadata getResultMetadata() {
         _lookupColumnFieldSpec.isSingleValueField(), false);
   }
 
-  private Object[] lookup(ProjectionBlock projectionBlock) {
+  @FunctionalInterface
+  private interface ValueAcceptor {
+    void accept(int index, Object value);
+  }
+
+  private void lookup(ProjectionBlock projectionBlock, ValueAcceptor valueAcceptor) {
     int numPkColumns = _joinKeys.size();
     int numDocuments = projectionBlock.getNumDocs();
-    Object[][] pkColumns = new Object[numPkColumns][];
+    Object[] pkColumns = new Object[numPkColumns];
     for (int c = 0; c < numPkColumns; c++) {
       DataType storedType = _joinValueFieldSpecs.get(c).getDataType().getStoredType();
       TransformFunction tf = _joinValueFunctions.get(c);
       switch (storedType) {
         case INT:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToIntValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToIntValuesSV(projectionBlock);
           break;
         case LONG:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToLongValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToLongValuesSV(projectionBlock);
           break;
         case FLOAT:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToFloatValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToFloatValuesSV(projectionBlock);
           break;
         case DOUBLE:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToDoubleValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToDoubleValuesSV(projectionBlock);
           break;
         case STRING:
           pkColumns[c] = tf.transformToStringValuesSV(projectionBlock);
           break;
         case BYTES:
-          byte[][] primitiveValues = tf.transformToBytesValuesSV(projectionBlock);
-          pkColumns[c] = new ByteArray[numDocuments];
-          for (int i = 0; i < numDocuments; i++) {
-            pkColumns[c][i] = new ByteArray(primitiveValues[i]);
-          }
+          pkColumns[c] = tf.transformToBytesValuesSV(projectionBlock);
           break;
         default:
           throw new IllegalStateException("Unknown column type for primary key");
       }
     }
 
-    Object[] resultSet = new Object[numDocuments];
     Object[] pkValues = new Object[numPkColumns];
+    PrimaryKey primaryKey = new PrimaryKey(pkValues);
     for (int i = 0; i < numDocuments; i++) {
       // prepare pk
       for (int c = 0; c < numPkColumns; c++) {
-        pkValues[c] = pkColumns[c][i];
+        if (pkColumns[c] instanceof int[]) {
+          pkValues[c] = ((int[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof long[]) {
+          pkValues[c] = ((long[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof String[]) {
+          pkValues[c] = ((String[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof float[]) {
+          pkValues[c] = ((float[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof double[]) {
+          pkValues[c] = ((double[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof byte[][]) {
+          pkValues[c] = new ByteArray(((byte[][]) pkColumns[c])[i]);
+        }
       }
       // lookup
-      GenericRow row = _dataManager.lookupRowByPrimaryKey(new PrimaryKey(pkValues));
-      if (row != null) {
-        resultSet[i] = row.getValue(_dimColumnName);
-      }
+      GenericRow row = _dataManager.lookupRowByPrimaryKey(primaryKey);
+      Object value = row == null ? null : row.getValue(_dimColumnName);
+      valueAcceptor.accept(i, value);
     }
-    return resultSet;
   }
 
   @Override
   public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    int[] resultSet = new int[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).intValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).intValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_intValuesSV == null || _intValuesSV.length < numDocs) {
+      _intValuesSV = new int[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setIntSV);
+    return _intValuesSV;
   }
 
   @Override
   public long[] transformToLongValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    long[] resultSet = new long[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).longValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).longValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_longValuesSV == null || _longValuesSV.length < numDocs) {
+      _longValuesSV = new long[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setLongSV);
+    return _longValuesSV;
   }
 
   @Override
   public float[] transformToFloatValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    float[] resultSet = new float[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).floatValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).floatValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_floatValuesSV == null || _floatValuesSV.length < numDocs) {
+      _floatValuesSV = new float[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setFloatSV);
+    return _floatValuesSV;
   }
 
   @Override
   public double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    double[] resultSet = new double[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).doubleValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).doubleValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_doubleValuesSV == null || _doubleValuesSV.length < numDocs) {
+      _doubleValuesSV = new double[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setDoubleSV);
+    return _doubleValuesSV;
   }
 
   @Override
   public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    String[] resultSet = new String[lookupObjects.length];
-    Arrays.fill(resultSet, _lookupColumnFieldSpec.getDefaultNullValueString());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = lookupObjects[i].toString();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_stringValuesSV == null || _stringValuesSV.length < numDocs) {
+      _stringValuesSV = new String[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setStringSV);
+    return _stringValuesSV;
   }
 
   @Override
   public byte[][] transformToBytesValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    byte[][] resultSet = new byte[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (byte[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_byteValuesSV == null || _byteValuesSV.length < numDocs) {
+      _byteValuesSV = new byte[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setBytesSV);
+    return _byteValuesSV;
   }
 
   @Override
   public int[][] transformToIntValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    int[][] resultSet = new int[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (int[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_intValuesMV == null || _intValuesMV.length < numDocs) {
+      _intValuesMV = new int[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setIntMV);
+    return _intValuesMV;
   }
 
   @Override
   public long[][] transformToLongValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    long[][] resultSet = new long[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (long[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_longValuesMV == null || _longValuesMV.length < numDocs) {
+      _longValuesMV = new long[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setLongMV);
+    return _longValuesMV;
   }
 
   @Override
   public float[][] transformToFloatValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    float[][] resultSet = new float[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (float[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_floatValuesMV == null || _floatValuesMV.length < numDocs) {
+      _floatValuesMV = new float[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setFloatMV);
+    return _floatValuesMV;
   }
 
   @Override
   public double[][] transformToDoubleValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    double[][] resultSet = new double[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (double[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_doubleValuesMV == null || _doubleValuesMV.length < numDocs) {
+      _doubleValuesMV = new double[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setDoubleMV);
+    return _doubleValuesMV;
   }
 
   @Override
   public String[][] transformToStringValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    String[][] resultSet = new String[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (String[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_stringValuesMV == null || _stringValuesMV.length < numDocs) {
+      _stringValuesMV = new String[numDocs][];
+    }
+    lookup(projectionBlock, this::setStringMV);
+    return _stringValuesMV;
+  }
+
+  private void setIntSV(int index, Object value) {

Review comment:
       I annotated the interface.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter edited a comment on pull request #8204: reduce allocation rate in LookupTransformFunction

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #8204:
URL: https://github.com/apache/pinot/pull/8204#issuecomment-1040923407


   # [Codecov](https://codecov.io/gh/apache/pinot/pull/8204?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#8204](https://codecov.io/gh/apache/pinot/pull/8204?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3005614) into [master](https://codecov.io/gh/apache/pinot/commit/80424086e701a0962656d1bfec2b9965cc36d22d?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (8042408) will **decrease** coverage by `3.57%`.
   > The diff coverage is `73.52%`.
   
   > :exclamation: Current head 3005614 differs from pull request most recent head 611b69b. Consider uploading reports for the commit 611b69b to get more accurate results
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pinot/pull/8204/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pinot/pull/8204?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #8204      +/-   ##
   ============================================
   - Coverage     70.97%   67.39%   -3.58%     
   + Complexity     4313     4233      -80     
   ============================================
     Files          1626     1226     -400     
     Lines         84851    61896   -22955     
     Branches      12790     9631    -3159     
   ============================================
   - Hits          60221    41714   -18507     
   + Misses        20496    17220    -3276     
   + Partials       4134     2962    -1172     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | integration2 | `?` | |
   | unittests1 | `67.39% <73.52%> (-0.01%)` | :arrow_down: |
   | unittests2 | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/8204?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...or/transform/function/LookupTransformFunction.java](https://codecov.io/gh/apache/pinot/pull/8204/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9vcGVyYXRvci90cmFuc2Zvcm0vZnVuY3Rpb24vTG9va3VwVHJhbnNmb3JtRnVuY3Rpb24uamF2YQ==) | `79.05% <73.52%> (-9.06%)` | :arrow_down: |
   | [...a/org/apache/pinot/common/metrics/MinionMeter.java](https://codecov.io/gh/apache/pinot/pull/8204/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9NaW5pb25NZXRlci5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...g/apache/pinot/common/metrics/ControllerMeter.java](https://codecov.io/gh/apache/pinot/pull/8204/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9Db250cm9sbGVyTWV0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../apache/pinot/common/metrics/BrokerQueryPhase.java](https://codecov.io/gh/apache/pinot/pull/8204/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9Ccm9rZXJRdWVyeVBoYXNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../apache/pinot/common/metrics/MinionQueryPhase.java](https://codecov.io/gh/apache/pinot/pull/8204/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9NaW5pb25RdWVyeVBoYXNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ache/pinot/server/access/AccessControlFactory.java](https://codecov.io/gh/apache/pinot/pull/8204/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VydmVyL2FjY2Vzcy9BY2Nlc3NDb250cm9sRmFjdG9yeS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...he/pinot/common/messages/SegmentReloadMessage.java](https://codecov.io/gh/apache/pinot/pull/8204/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWVzc2FnZXMvU2VnbWVudFJlbG9hZE1lc3NhZ2UuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...pinot/core/data/manager/realtime/TimerService.java](https://codecov.io/gh/apache/pinot/pull/8204/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvVGltZXJTZXJ2aWNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...not/common/exception/HttpErrorStatusException.java](https://codecov.io/gh/apache/pinot/pull/8204/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vZXhjZXB0aW9uL0h0dHBFcnJvclN0YXR1c0V4Y2VwdGlvbi5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...t/core/plan/StreamingInstanceResponsePlanNode.java](https://codecov.io/gh/apache/pinot/pull/8204/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9wbGFuL1N0cmVhbWluZ0luc3RhbmNlUmVzcG9uc2VQbGFuTm9kZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [615 more](https://codecov.io/gh/apache/pinot/pull/8204/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/8204?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/8204?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [8042408...611b69b](https://codecov.io/gh/apache/pinot/pull/8204?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a change in pull request #8204: reduce allocation rate in LookupTransformFunction

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #8204:
URL: https://github.com/apache/pinot/pull/8204#discussion_r807402356



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java
##########
@@ -143,191 +153,251 @@ public TransformResultMetadata getResultMetadata() {
         _lookupColumnFieldSpec.isSingleValueField(), false);
   }
 
-  private Object[] lookup(ProjectionBlock projectionBlock) {
+  @FunctionalInterface
+  private interface ValueAcceptor {
+    void accept(int index, Object value);
+  }
+
+  private void lookup(ProjectionBlock projectionBlock, ValueAcceptor valueAcceptor) {
     int numPkColumns = _joinKeys.size();
     int numDocuments = projectionBlock.getNumDocs();
-    Object[][] pkColumns = new Object[numPkColumns][];
+    Object[] pkColumns = new Object[numPkColumns];
     for (int c = 0; c < numPkColumns; c++) {
       DataType storedType = _joinValueFieldSpecs.get(c).getDataType().getStoredType();
       TransformFunction tf = _joinValueFunctions.get(c);
       switch (storedType) {
         case INT:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToIntValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToIntValuesSV(projectionBlock);
           break;
         case LONG:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToLongValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToLongValuesSV(projectionBlock);
           break;
         case FLOAT:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToFloatValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToFloatValuesSV(projectionBlock);
           break;
         case DOUBLE:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToDoubleValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToDoubleValuesSV(projectionBlock);
           break;
         case STRING:
           pkColumns[c] = tf.transformToStringValuesSV(projectionBlock);
           break;
         case BYTES:
-          byte[][] primitiveValues = tf.transformToBytesValuesSV(projectionBlock);
-          pkColumns[c] = new ByteArray[numDocuments];
-          for (int i = 0; i < numDocuments; i++) {
-            pkColumns[c][i] = new ByteArray(primitiveValues[i]);
-          }
+          pkColumns[c] = tf.transformToBytesValuesSV(projectionBlock);
           break;
         default:
           throw new IllegalStateException("Unknown column type for primary key");
       }
     }
 
-    Object[] resultSet = new Object[numDocuments];
     Object[] pkValues = new Object[numPkColumns];
+    PrimaryKey primaryKey = new PrimaryKey(pkValues);
     for (int i = 0; i < numDocuments; i++) {
       // prepare pk
       for (int c = 0; c < numPkColumns; c++) {
-        pkValues[c] = pkColumns[c][i];
+        if (pkColumns[c] instanceof int[]) {
+          pkValues[c] = ((int[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof long[]) {
+          pkValues[c] = ((long[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof String[]) {
+          pkValues[c] = ((String[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof float[]) {
+          pkValues[c] = ((float[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof double[]) {
+          pkValues[c] = ((double[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof byte[][]) {
+          pkValues[c] = new ByteArray(((byte[][]) pkColumns[c])[i]);
+        }
       }
       // lookup
-      GenericRow row = _dataManager.lookupRowByPrimaryKey(new PrimaryKey(pkValues));
-      if (row != null) {
-        resultSet[i] = row.getValue(_dimColumnName);
-      }
+      GenericRow row = _dataManager.lookupRowByPrimaryKey(primaryKey);
+      Object value = row == null ? null : row.getValue(_dimColumnName);
+      valueAcceptor.accept(i, value);
     }
-    return resultSet;
   }
 
   @Override
   public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    int[] resultSet = new int[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).intValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).intValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_intValuesSV == null || _intValuesSV.length < numDocs) {
+      _intValuesSV = new int[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setIntSV);
+    return _intValuesSV;
   }
 
   @Override
   public long[] transformToLongValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    long[] resultSet = new long[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).longValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).longValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_longValuesSV == null || _longValuesSV.length < numDocs) {
+      _longValuesSV = new long[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setLongSV);
+    return _longValuesSV;
   }
 
   @Override
   public float[] transformToFloatValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    float[] resultSet = new float[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).floatValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).floatValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_floatValuesSV == null || _floatValuesSV.length < numDocs) {
+      _floatValuesSV = new float[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setFloatSV);
+    return _floatValuesSV;
   }
 
   @Override
   public double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    double[] resultSet = new double[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).doubleValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).doubleValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_doubleValuesSV == null || _doubleValuesSV.length < numDocs) {
+      _doubleValuesSV = new double[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setDoubleSV);
+    return _doubleValuesSV;
   }
 
   @Override
   public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    String[] resultSet = new String[lookupObjects.length];
-    Arrays.fill(resultSet, _lookupColumnFieldSpec.getDefaultNullValueString());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = lookupObjects[i].toString();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_stringValuesSV == null || _stringValuesSV.length < numDocs) {
+      _stringValuesSV = new String[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setStringSV);
+    return _stringValuesSV;
   }
 
   @Override
   public byte[][] transformToBytesValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    byte[][] resultSet = new byte[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (byte[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_byteValuesSV == null || _byteValuesSV.length < numDocs) {
+      _byteValuesSV = new byte[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setBytesSV);
+    return _byteValuesSV;
   }
 
   @Override
   public int[][] transformToIntValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    int[][] resultSet = new int[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (int[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_intValuesMV == null || _intValuesMV.length < numDocs) {
+      _intValuesMV = new int[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setIntMV);
+    return _intValuesMV;
   }
 
   @Override
   public long[][] transformToLongValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    long[][] resultSet = new long[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (long[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_longValuesMV == null || _longValuesMV.length < numDocs) {
+      _longValuesMV = new long[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setLongMV);
+    return _longValuesMV;
   }
 
   @Override
   public float[][] transformToFloatValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    float[][] resultSet = new float[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (float[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_floatValuesMV == null || _floatValuesMV.length < numDocs) {
+      _floatValuesMV = new float[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setFloatMV);
+    return _floatValuesMV;
   }
 
   @Override
   public double[][] transformToDoubleValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    double[][] resultSet = new double[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (double[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_doubleValuesMV == null || _doubleValuesMV.length < numDocs) {
+      _doubleValuesMV = new double[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setDoubleMV);
+    return _doubleValuesMV;
   }
 
   @Override
   public String[][] transformToStringValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    String[][] resultSet = new String[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (String[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_stringValuesMV == null || _stringValuesMV.length < numDocs) {
+      _stringValuesMV = new String[numDocs][];
+    }
+    lookup(projectionBlock, this::setStringMV);
+    return _stringValuesMV;
+  }
+
+  private void setIntSV(int index, Object value) {
+    if (value instanceof Number) {
+      _intValuesSV[index] = ((Number) value).intValue();
+    } else {
+      _intValuesSV[index] = _nullIntValue;
+    }
+  }
+
+  private void setLongSV(int index, Object value) {
+    if (value instanceof Number) {
+      _longValuesSV[index] = ((Number) value).longValue();
+    } else {
+      _longValuesSV[index] = _nullLongValue;
+    }
+  }
+
+  private void setFloatSV(int index, Object value) {
+    if (value instanceof Number) {
+      _floatValuesSV[index] = ((Number) value).floatValue();
+    } else {
+      _floatValuesSV[index] = _nullFloatValue;
+    }
+  }
+
+  private void setDoubleSV(int index, Object value) {
+    if (value instanceof Number) {
+      _doubleValuesSV[index] = ((Number) value).doubleValue();
+    } else {
+      _doubleValuesSV[index] = _nullDoubleValue;
+    }
+  }
+
+  private void setStringSV(int index, Object value) {
+    if (value != null) {
+      _stringValuesSV[index] = String.valueOf(value);
+    } else {
+      _stringValuesSV[index] = _lookupColumnFieldSpec.getDefaultNullValueString();
+    }
+  }
+
+  private void setBytesSV(int index, Object value) {
+    if (value instanceof byte[]) {
+      _byteValuesSV[index] = (byte[]) value;
+    }

Review comment:
       It actually is. Notice that we allocate zero length arrays in the original implementation, and `byte[0]` is used as the default value




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #8204: reduce allocation rate in LookupTransformFunction

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #8204:
URL: https://github.com/apache/pinot/pull/8204#issuecomment-1040923407


   # [Codecov](https://codecov.io/gh/apache/pinot/pull/8204?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#8204](https://codecov.io/gh/apache/pinot/pull/8204?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (611b69b) into [master](https://codecov.io/gh/apache/pinot/commit/80424086e701a0962656d1bfec2b9965cc36d22d?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (8042408) will **decrease** coverage by `56.81%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pinot/pull/8204/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pinot/pull/8204?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #8204       +/-   ##
   =============================================
   - Coverage     70.97%   14.15%   -56.82%     
   + Complexity     4313       81     -4232     
   =============================================
     Files          1626     1581       -45     
     Lines         84851    83018     -1833     
     Branches      12790    12581      -209     
   =============================================
   - Hits          60221    11751    -48470     
   - Misses        20496    70386    +49890     
   + Partials       4134      881     -3253     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | integration2 | `?` | |
   | unittests1 | `?` | |
   | unittests2 | `14.15% <0.00%> (+<0.01%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/8204?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...or/transform/function/LookupTransformFunction.java](https://codecov.io/gh/apache/pinot/pull/8204/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9vcGVyYXRvci90cmFuc2Zvcm0vZnVuY3Rpb24vTG9va3VwVHJhbnNmb3JtRnVuY3Rpb24uamF2YQ==) | `0.00% <0.00%> (-88.12%)` | :arrow_down: |
   | [...ain/java/org/apache/pinot/core/data/table/Key.java](https://codecov.io/gh/apache/pinot/pull/8204/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL3RhYmxlL0tleS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../java/org/apache/pinot/spi/utils/BooleanUtils.java](https://codecov.io/gh/apache/pinot/pull/8204/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdXRpbHMvQm9vbGVhblV0aWxzLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../java/org/apache/pinot/core/data/table/Record.java](https://codecov.io/gh/apache/pinot/pull/8204/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL3RhYmxlL1JlY29yZC5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../java/org/apache/pinot/core/util/GroupByUtils.java](https://codecov.io/gh/apache/pinot/pull/8204/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS91dGlsL0dyb3VwQnlVdGlscy5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ava/org/apache/pinot/spi/config/table/FSTType.java](https://codecov.io/gh/apache/pinot/pull/8204/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvY29uZmlnL3RhYmxlL0ZTVFR5cGUuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ava/org/apache/pinot/spi/data/MetricFieldSpec.java](https://codecov.io/gh/apache/pinot/pull/8204/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvZGF0YS9NZXRyaWNGaWVsZFNwZWMuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...va/org/apache/pinot/spi/utils/BigDecimalUtils.java](https://codecov.io/gh/apache/pinot/pull/8204/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdXRpbHMvQmlnRGVjaW1hbFV0aWxzLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...java/org/apache/pinot/common/tier/TierFactory.java](https://codecov.io/gh/apache/pinot/pull/8204/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdGllci9UaWVyRmFjdG9yeS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...java/org/apache/pinot/common/utils/StringUtil.java](https://codecov.io/gh/apache/pinot/pull/8204/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvU3RyaW5nVXRpbC5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [1296 more](https://codecov.io/gh/apache/pinot/pull/8204/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/8204?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/8204?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [8042408...611b69b](https://codecov.io/gh/apache/pinot/pull/8204?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] richardstartin merged pull request #8204: reduce allocation rate in LookupTransformFunction

Posted by GitBox <gi...@apache.org>.
richardstartin merged pull request #8204:
URL: https://github.com/apache/pinot/pull/8204


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a change in pull request #8204: reduce allocation rate in LookupTransformFunction

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #8204:
URL: https://github.com/apache/pinot/pull/8204#discussion_r807413047



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java
##########
@@ -143,191 +161,263 @@ public TransformResultMetadata getResultMetadata() {
         _lookupColumnFieldSpec.isSingleValueField(), false);
   }
 
-  private Object[] lookup(ProjectionBlock projectionBlock) {
+  @FunctionalInterface
+  private interface ValueAcceptor {
+    void accept(int index, @Nullable Object value);
+  }
+
+  private void lookup(ProjectionBlock projectionBlock, ValueAcceptor valueAcceptor) {
     int numPkColumns = _joinKeys.size();
     int numDocuments = projectionBlock.getNumDocs();
-    Object[][] pkColumns = new Object[numPkColumns][];
+    Object[] pkColumns = new Object[numPkColumns];
     for (int c = 0; c < numPkColumns; c++) {
       DataType storedType = _joinValueFieldSpecs.get(c).getDataType().getStoredType();
       TransformFunction tf = _joinValueFunctions.get(c);
       switch (storedType) {
         case INT:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToIntValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToIntValuesSV(projectionBlock);
           break;
         case LONG:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToLongValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToLongValuesSV(projectionBlock);
           break;
         case FLOAT:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToFloatValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToFloatValuesSV(projectionBlock);
           break;
         case DOUBLE:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToDoubleValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToDoubleValuesSV(projectionBlock);
           break;
         case STRING:
           pkColumns[c] = tf.transformToStringValuesSV(projectionBlock);
           break;
         case BYTES:
-          byte[][] primitiveValues = tf.transformToBytesValuesSV(projectionBlock);
-          pkColumns[c] = new ByteArray[numDocuments];
-          for (int i = 0; i < numDocuments; i++) {
-            pkColumns[c][i] = new ByteArray(primitiveValues[i]);
-          }
+          pkColumns[c] = tf.transformToBytesValuesSV(projectionBlock);
           break;
         default:
           throw new IllegalStateException("Unknown column type for primary key");
       }
     }
 
-    Object[] resultSet = new Object[numDocuments];
     Object[] pkValues = new Object[numPkColumns];
+    PrimaryKey primaryKey = new PrimaryKey(pkValues);
     for (int i = 0; i < numDocuments; i++) {
       // prepare pk
       for (int c = 0; c < numPkColumns; c++) {
-        pkValues[c] = pkColumns[c][i];
+        if (pkColumns[c] instanceof int[]) {
+          pkValues[c] = ((int[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof long[]) {
+          pkValues[c] = ((long[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof String[]) {
+          pkValues[c] = ((String[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof float[]) {
+          pkValues[c] = ((float[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof double[]) {
+          pkValues[c] = ((double[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof byte[][]) {
+          pkValues[c] = new ByteArray(((byte[][]) pkColumns[c])[i]);
+        }
       }
       // lookup
-      GenericRow row = _dataManager.lookupRowByPrimaryKey(new PrimaryKey(pkValues));
-      if (row != null) {
-        resultSet[i] = row.getValue(_dimColumnName);
-      }
+      GenericRow row = _dataManager.lookupRowByPrimaryKey(primaryKey);
+      Object value = row == null ? null : row.getValue(_dimColumnName);
+      valueAcceptor.accept(i, value);
     }
-    return resultSet;
   }
 
   @Override
   public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    int[] resultSet = new int[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).intValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).intValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_intValuesSV == null || _intValuesSV.length < numDocs) {
+      _intValuesSV = new int[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setIntSV);
+    return _intValuesSV;
   }
 
   @Override
   public long[] transformToLongValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    long[] resultSet = new long[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).longValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).longValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_longValuesSV == null || _longValuesSV.length < numDocs) {
+      _longValuesSV = new long[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setLongSV);
+    return _longValuesSV;
   }
 
   @Override
   public float[] transformToFloatValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    float[] resultSet = new float[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).floatValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).floatValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_floatValuesSV == null || _floatValuesSV.length < numDocs) {
+      _floatValuesSV = new float[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setFloatSV);
+    return _floatValuesSV;
   }
 
   @Override
   public double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    double[] resultSet = new double[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).doubleValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).doubleValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_doubleValuesSV == null || _doubleValuesSV.length < numDocs) {
+      _doubleValuesSV = new double[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setDoubleSV);
+    return _doubleValuesSV;
   }
 
   @Override
   public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    String[] resultSet = new String[lookupObjects.length];
-    Arrays.fill(resultSet, _lookupColumnFieldSpec.getDefaultNullValueString());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = lookupObjects[i].toString();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_stringValuesSV == null || _stringValuesSV.length < numDocs) {
+      _stringValuesSV = new String[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setStringSV);
+    return _stringValuesSV;
   }
 
   @Override
   public byte[][] transformToBytesValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    byte[][] resultSet = new byte[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (byte[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_byteValuesSV == null || _byteValuesSV.length < numDocs) {
+      _byteValuesSV = new byte[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setBytesSV);
+    return _byteValuesSV;
   }
 
   @Override
   public int[][] transformToIntValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    int[][] resultSet = new int[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (int[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_intValuesMV == null || _intValuesMV.length < numDocs) {
+      _intValuesMV = new int[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setIntMV);
+    return _intValuesMV;
   }
 
   @Override
   public long[][] transformToLongValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    long[][] resultSet = new long[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (long[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_longValuesMV == null || _longValuesMV.length < numDocs) {
+      _longValuesMV = new long[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setLongMV);
+    return _longValuesMV;
   }
 
   @Override
   public float[][] transformToFloatValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    float[][] resultSet = new float[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (float[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_floatValuesMV == null || _floatValuesMV.length < numDocs) {
+      _floatValuesMV = new float[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setFloatMV);
+    return _floatValuesMV;
   }
 
   @Override
   public double[][] transformToDoubleValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    double[][] resultSet = new double[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (double[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_doubleValuesMV == null || _doubleValuesMV.length < numDocs) {
+      _doubleValuesMV = new double[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setDoubleMV);
+    return _doubleValuesMV;
   }
 
   @Override
   public String[][] transformToStringValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    String[][] resultSet = new String[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (String[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_stringValuesMV == null || _stringValuesMV.length < numDocs) {
+      _stringValuesMV = new String[numDocs][];
+    }
+    lookup(projectionBlock, this::setStringMV);
+    return _stringValuesMV;
+  }
+
+  private void setIntSV(int index, Object value) {
+    if (value instanceof Number) {
+      _intValuesSV[index] = ((Number) value).intValue();
+    } else {
+      _intValuesSV[index] = _nullIntValue;
+    }
+  }
+
+  private void setLongSV(int index, Object value) {
+    if (value instanceof Number) {
+      _longValuesSV[index] = ((Number) value).longValue();
+    } else {
+      _longValuesSV[index] = _nullLongValue;
+    }
+  }
+
+  private void setFloatSV(int index, Object value) {
+    if (value instanceof Number) {
+      _floatValuesSV[index] = ((Number) value).floatValue();
+    } else {
+      _floatValuesSV[index] = _nullFloatValue;
+    }
+  }
+
+  private void setDoubleSV(int index, Object value) {
+    if (value instanceof Number) {
+      _doubleValuesSV[index] = ((Number) value).doubleValue();
+    } else {
+      _doubleValuesSV[index] = _nullDoubleValue;
+    }
+  }
+
+  private void setStringSV(int index, Object value) {
+    if (value != null) {
+      _stringValuesSV[index] = String.valueOf(value);
+    } else {
+      _stringValuesSV[index] = _lookupColumnFieldSpec.getDefaultNullValueString();

Review comment:
       Should we also cache this value because `getDefaultNullValueString()` does have some overhead

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java
##########
@@ -143,191 +153,251 @@ public TransformResultMetadata getResultMetadata() {
         _lookupColumnFieldSpec.isSingleValueField(), false);
   }
 
-  private Object[] lookup(ProjectionBlock projectionBlock) {
+  @FunctionalInterface
+  private interface ValueAcceptor {
+    void accept(int index, Object value);
+  }
+
+  private void lookup(ProjectionBlock projectionBlock, ValueAcceptor valueAcceptor) {
     int numPkColumns = _joinKeys.size();
     int numDocuments = projectionBlock.getNumDocs();
-    Object[][] pkColumns = new Object[numPkColumns][];
+    Object[] pkColumns = new Object[numPkColumns];
     for (int c = 0; c < numPkColumns; c++) {
       DataType storedType = _joinValueFieldSpecs.get(c).getDataType().getStoredType();
       TransformFunction tf = _joinValueFunctions.get(c);
       switch (storedType) {
         case INT:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToIntValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToIntValuesSV(projectionBlock);
           break;
         case LONG:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToLongValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToLongValuesSV(projectionBlock);
           break;
         case FLOAT:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToFloatValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToFloatValuesSV(projectionBlock);
           break;
         case DOUBLE:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToDoubleValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToDoubleValuesSV(projectionBlock);
           break;
         case STRING:
           pkColumns[c] = tf.transformToStringValuesSV(projectionBlock);
           break;
         case BYTES:
-          byte[][] primitiveValues = tf.transformToBytesValuesSV(projectionBlock);
-          pkColumns[c] = new ByteArray[numDocuments];
-          for (int i = 0; i < numDocuments; i++) {
-            pkColumns[c][i] = new ByteArray(primitiveValues[i]);
-          }
+          pkColumns[c] = tf.transformToBytesValuesSV(projectionBlock);
           break;
         default:
           throw new IllegalStateException("Unknown column type for primary key");
       }
     }
 
-    Object[] resultSet = new Object[numDocuments];
     Object[] pkValues = new Object[numPkColumns];
+    PrimaryKey primaryKey = new PrimaryKey(pkValues);
     for (int i = 0; i < numDocuments; i++) {
       // prepare pk
       for (int c = 0; c < numPkColumns; c++) {
-        pkValues[c] = pkColumns[c][i];
+        if (pkColumns[c] instanceof int[]) {
+          pkValues[c] = ((int[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof long[]) {
+          pkValues[c] = ((long[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof String[]) {
+          pkValues[c] = ((String[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof float[]) {
+          pkValues[c] = ((float[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof double[]) {
+          pkValues[c] = ((double[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof byte[][]) {
+          pkValues[c] = new ByteArray(((byte[][]) pkColumns[c])[i]);
+        }
       }
       // lookup
-      GenericRow row = _dataManager.lookupRowByPrimaryKey(new PrimaryKey(pkValues));
-      if (row != null) {
-        resultSet[i] = row.getValue(_dimColumnName);
-      }
+      GenericRow row = _dataManager.lookupRowByPrimaryKey(primaryKey);
+      Object value = row == null ? null : row.getValue(_dimColumnName);
+      valueAcceptor.accept(i, value);
     }
-    return resultSet;
   }
 
   @Override
   public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    int[] resultSet = new int[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).intValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).intValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_intValuesSV == null || _intValuesSV.length < numDocs) {
+      _intValuesSV = new int[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setIntSV);
+    return _intValuesSV;
   }
 
   @Override
   public long[] transformToLongValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    long[] resultSet = new long[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).longValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).longValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_longValuesSV == null || _longValuesSV.length < numDocs) {
+      _longValuesSV = new long[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setLongSV);
+    return _longValuesSV;
   }
 
   @Override
   public float[] transformToFloatValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    float[] resultSet = new float[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).floatValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).floatValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_floatValuesSV == null || _floatValuesSV.length < numDocs) {
+      _floatValuesSV = new float[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setFloatSV);
+    return _floatValuesSV;
   }
 
   @Override
   public double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    double[] resultSet = new double[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).doubleValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).doubleValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_doubleValuesSV == null || _doubleValuesSV.length < numDocs) {
+      _doubleValuesSV = new double[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setDoubleSV);
+    return _doubleValuesSV;
   }
 
   @Override
   public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    String[] resultSet = new String[lookupObjects.length];
-    Arrays.fill(resultSet, _lookupColumnFieldSpec.getDefaultNullValueString());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = lookupObjects[i].toString();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_stringValuesSV == null || _stringValuesSV.length < numDocs) {
+      _stringValuesSV = new String[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setStringSV);
+    return _stringValuesSV;
   }
 
   @Override
   public byte[][] transformToBytesValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    byte[][] resultSet = new byte[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (byte[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_byteValuesSV == null || _byteValuesSV.length < numDocs) {
+      _byteValuesSV = new byte[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setBytesSV);
+    return _byteValuesSV;
   }
 
   @Override
   public int[][] transformToIntValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    int[][] resultSet = new int[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (int[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_intValuesMV == null || _intValuesMV.length < numDocs) {
+      _intValuesMV = new int[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setIntMV);
+    return _intValuesMV;
   }
 
   @Override
   public long[][] transformToLongValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    long[][] resultSet = new long[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (long[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_longValuesMV == null || _longValuesMV.length < numDocs) {
+      _longValuesMV = new long[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setLongMV);
+    return _longValuesMV;
   }
 
   @Override
   public float[][] transformToFloatValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    float[][] resultSet = new float[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (float[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_floatValuesMV == null || _floatValuesMV.length < numDocs) {
+      _floatValuesMV = new float[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setFloatMV);
+    return _floatValuesMV;
   }
 
   @Override
   public double[][] transformToDoubleValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    double[][] resultSet = new double[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (double[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_doubleValuesMV == null || _doubleValuesMV.length < numDocs) {
+      _doubleValuesMV = new double[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setDoubleMV);
+    return _doubleValuesMV;
   }
 
   @Override
   public String[][] transformToStringValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    String[][] resultSet = new String[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (String[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_stringValuesMV == null || _stringValuesMV.length < numDocs) {
+      _stringValuesMV = new String[numDocs][];
+    }
+    lookup(projectionBlock, this::setStringMV);
+    return _stringValuesMV;
+  }
+
+  private void setIntSV(int index, Object value) {

Review comment:
       Suggest also annotating the methods to inform the developers to handle `null` value properly




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a change in pull request #8204: reduce allocation rate in LookupTransformFunction

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #8204:
URL: https://github.com/apache/pinot/pull/8204#discussion_r807383598



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java
##########
@@ -143,191 +153,251 @@ public TransformResultMetadata getResultMetadata() {
         _lookupColumnFieldSpec.isSingleValueField(), false);
   }
 
-  private Object[] lookup(ProjectionBlock projectionBlock) {
+  @FunctionalInterface
+  private interface ValueAcceptor {
+    void accept(int index, Object value);
+  }
+
+  private void lookup(ProjectionBlock projectionBlock, ValueAcceptor valueAcceptor) {
     int numPkColumns = _joinKeys.size();
     int numDocuments = projectionBlock.getNumDocs();
-    Object[][] pkColumns = new Object[numPkColumns][];
+    Object[] pkColumns = new Object[numPkColumns];
     for (int c = 0; c < numPkColumns; c++) {
       DataType storedType = _joinValueFieldSpecs.get(c).getDataType().getStoredType();
       TransformFunction tf = _joinValueFunctions.get(c);
       switch (storedType) {
         case INT:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToIntValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToIntValuesSV(projectionBlock);
           break;
         case LONG:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToLongValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToLongValuesSV(projectionBlock);
           break;
         case FLOAT:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToFloatValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToFloatValuesSV(projectionBlock);
           break;
         case DOUBLE:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToDoubleValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToDoubleValuesSV(projectionBlock);
           break;
         case STRING:
           pkColumns[c] = tf.transformToStringValuesSV(projectionBlock);
           break;
         case BYTES:
-          byte[][] primitiveValues = tf.transformToBytesValuesSV(projectionBlock);
-          pkColumns[c] = new ByteArray[numDocuments];
-          for (int i = 0; i < numDocuments; i++) {
-            pkColumns[c][i] = new ByteArray(primitiveValues[i]);
-          }
+          pkColumns[c] = tf.transformToBytesValuesSV(projectionBlock);
           break;
         default:
           throw new IllegalStateException("Unknown column type for primary key");
       }
     }
 
-    Object[] resultSet = new Object[numDocuments];
     Object[] pkValues = new Object[numPkColumns];
+    PrimaryKey primaryKey = new PrimaryKey(pkValues);
     for (int i = 0; i < numDocuments; i++) {
       // prepare pk
       for (int c = 0; c < numPkColumns; c++) {
-        pkValues[c] = pkColumns[c][i];
+        if (pkColumns[c] instanceof int[]) {
+          pkValues[c] = ((int[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof long[]) {
+          pkValues[c] = ((long[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof String[]) {
+          pkValues[c] = ((String[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof float[]) {
+          pkValues[c] = ((float[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof double[]) {
+          pkValues[c] = ((double[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof byte[][]) {
+          pkValues[c] = new ByteArray(((byte[][]) pkColumns[c])[i]);
+        }
       }
       // lookup
-      GenericRow row = _dataManager.lookupRowByPrimaryKey(new PrimaryKey(pkValues));
-      if (row != null) {
-        resultSet[i] = row.getValue(_dimColumnName);
-      }
+      GenericRow row = _dataManager.lookupRowByPrimaryKey(primaryKey);
+      Object value = row == null ? null : row.getValue(_dimColumnName);
+      valueAcceptor.accept(i, value);
     }
-    return resultSet;
   }
 
   @Override
   public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    int[] resultSet = new int[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).intValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).intValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_intValuesSV == null || _intValuesSV.length < numDocs) {
+      _intValuesSV = new int[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setIntSV);
+    return _intValuesSV;
   }
 
   @Override
   public long[] transformToLongValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    long[] resultSet = new long[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).longValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).longValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_longValuesSV == null || _longValuesSV.length < numDocs) {
+      _longValuesSV = new long[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setLongSV);
+    return _longValuesSV;
   }
 
   @Override
   public float[] transformToFloatValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    float[] resultSet = new float[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).floatValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).floatValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_floatValuesSV == null || _floatValuesSV.length < numDocs) {
+      _floatValuesSV = new float[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setFloatSV);
+    return _floatValuesSV;
   }
 
   @Override
   public double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    double[] resultSet = new double[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).doubleValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).doubleValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_doubleValuesSV == null || _doubleValuesSV.length < numDocs) {
+      _doubleValuesSV = new double[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setDoubleSV);
+    return _doubleValuesSV;
   }
 
   @Override
   public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    String[] resultSet = new String[lookupObjects.length];
-    Arrays.fill(resultSet, _lookupColumnFieldSpec.getDefaultNullValueString());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = lookupObjects[i].toString();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_stringValuesSV == null || _stringValuesSV.length < numDocs) {
+      _stringValuesSV = new String[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setStringSV);
+    return _stringValuesSV;
   }
 
   @Override
   public byte[][] transformToBytesValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    byte[][] resultSet = new byte[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (byte[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_byteValuesSV == null || _byteValuesSV.length < numDocs) {
+      _byteValuesSV = new byte[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setBytesSV);
+    return _byteValuesSV;
   }
 
   @Override
   public int[][] transformToIntValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    int[][] resultSet = new int[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (int[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_intValuesMV == null || _intValuesMV.length < numDocs) {
+      _intValuesMV = new int[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setIntMV);
+    return _intValuesMV;
   }
 
   @Override
   public long[][] transformToLongValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    long[][] resultSet = new long[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (long[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_longValuesMV == null || _longValuesMV.length < numDocs) {
+      _longValuesMV = new long[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setLongMV);
+    return _longValuesMV;
   }
 
   @Override
   public float[][] transformToFloatValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    float[][] resultSet = new float[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (float[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_floatValuesMV == null || _floatValuesMV.length < numDocs) {
+      _floatValuesMV = new float[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setFloatMV);
+    return _floatValuesMV;
   }
 
   @Override
   public double[][] transformToDoubleValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    double[][] resultSet = new double[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (double[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_doubleValuesMV == null || _doubleValuesMV.length < numDocs) {
+      _doubleValuesMV = new double[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setDoubleMV);
+    return _doubleValuesMV;
   }
 
   @Override
   public String[][] transformToStringValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    String[][] resultSet = new String[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (String[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_stringValuesMV == null || _stringValuesMV.length < numDocs) {
+      _stringValuesMV = new String[numDocs][];
+    }
+    lookup(projectionBlock, this::setStringMV);
+    return _stringValuesMV;
+  }
+
+  private void setIntSV(int index, Object value) {

Review comment:
       (minor) Let annotate value as `nullable`, same for other places

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/LookupTransformFunction.java
##########
@@ -143,191 +153,251 @@ public TransformResultMetadata getResultMetadata() {
         _lookupColumnFieldSpec.isSingleValueField(), false);
   }
 
-  private Object[] lookup(ProjectionBlock projectionBlock) {
+  @FunctionalInterface
+  private interface ValueAcceptor {
+    void accept(int index, Object value);
+  }
+
+  private void lookup(ProjectionBlock projectionBlock, ValueAcceptor valueAcceptor) {
     int numPkColumns = _joinKeys.size();
     int numDocuments = projectionBlock.getNumDocs();
-    Object[][] pkColumns = new Object[numPkColumns][];
+    Object[] pkColumns = new Object[numPkColumns];
     for (int c = 0; c < numPkColumns; c++) {
       DataType storedType = _joinValueFieldSpecs.get(c).getDataType().getStoredType();
       TransformFunction tf = _joinValueFunctions.get(c);
       switch (storedType) {
         case INT:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToIntValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToIntValuesSV(projectionBlock);
           break;
         case LONG:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToLongValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToLongValuesSV(projectionBlock);
           break;
         case FLOAT:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToFloatValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToFloatValuesSV(projectionBlock);
           break;
         case DOUBLE:
-          pkColumns[c] = ArrayUtils.toObject(tf.transformToDoubleValuesSV(projectionBlock));
+          pkColumns[c] = tf.transformToDoubleValuesSV(projectionBlock);
           break;
         case STRING:
           pkColumns[c] = tf.transformToStringValuesSV(projectionBlock);
           break;
         case BYTES:
-          byte[][] primitiveValues = tf.transformToBytesValuesSV(projectionBlock);
-          pkColumns[c] = new ByteArray[numDocuments];
-          for (int i = 0; i < numDocuments; i++) {
-            pkColumns[c][i] = new ByteArray(primitiveValues[i]);
-          }
+          pkColumns[c] = tf.transformToBytesValuesSV(projectionBlock);
           break;
         default:
           throw new IllegalStateException("Unknown column type for primary key");
       }
     }
 
-    Object[] resultSet = new Object[numDocuments];
     Object[] pkValues = new Object[numPkColumns];
+    PrimaryKey primaryKey = new PrimaryKey(pkValues);
     for (int i = 0; i < numDocuments; i++) {
       // prepare pk
       for (int c = 0; c < numPkColumns; c++) {
-        pkValues[c] = pkColumns[c][i];
+        if (pkColumns[c] instanceof int[]) {
+          pkValues[c] = ((int[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof long[]) {
+          pkValues[c] = ((long[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof String[]) {
+          pkValues[c] = ((String[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof float[]) {
+          pkValues[c] = ((float[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof double[]) {
+          pkValues[c] = ((double[]) pkColumns[c])[i];
+        } else if (pkColumns[c] instanceof byte[][]) {
+          pkValues[c] = new ByteArray(((byte[][]) pkColumns[c])[i]);
+        }
       }
       // lookup
-      GenericRow row = _dataManager.lookupRowByPrimaryKey(new PrimaryKey(pkValues));
-      if (row != null) {
-        resultSet[i] = row.getValue(_dimColumnName);
-      }
+      GenericRow row = _dataManager.lookupRowByPrimaryKey(primaryKey);
+      Object value = row == null ? null : row.getValue(_dimColumnName);
+      valueAcceptor.accept(i, value);
     }
-    return resultSet;
   }
 
   @Override
   public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    int[] resultSet = new int[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).intValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).intValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_intValuesSV == null || _intValuesSV.length < numDocs) {
+      _intValuesSV = new int[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setIntSV);
+    return _intValuesSV;
   }
 
   @Override
   public long[] transformToLongValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    long[] resultSet = new long[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).longValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).longValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_longValuesSV == null || _longValuesSV.length < numDocs) {
+      _longValuesSV = new long[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setLongSV);
+    return _longValuesSV;
   }
 
   @Override
   public float[] transformToFloatValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    float[] resultSet = new float[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).floatValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).floatValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_floatValuesSV == null || _floatValuesSV.length < numDocs) {
+      _floatValuesSV = new float[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setFloatSV);
+    return _floatValuesSV;
   }
 
   @Override
   public double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    double[] resultSet = new double[lookupObjects.length];
-    Arrays.fill(resultSet, ((Number) _lookupColumnFieldSpec.getDefaultNullValue()).doubleValue());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = ((Number) lookupObjects[i]).doubleValue();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_doubleValuesSV == null || _doubleValuesSV.length < numDocs) {
+      _doubleValuesSV = new double[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setDoubleSV);
+    return _doubleValuesSV;
   }
 
   @Override
   public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    String[] resultSet = new String[lookupObjects.length];
-    Arrays.fill(resultSet, _lookupColumnFieldSpec.getDefaultNullValueString());
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = lookupObjects[i].toString();
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_stringValuesSV == null || _stringValuesSV.length < numDocs) {
+      _stringValuesSV = new String[numDocs];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setStringSV);
+    return _stringValuesSV;
   }
 
   @Override
   public byte[][] transformToBytesValuesSV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    byte[][] resultSet = new byte[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (byte[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_byteValuesSV == null || _byteValuesSV.length < numDocs) {
+      _byteValuesSV = new byte[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setBytesSV);
+    return _byteValuesSV;
   }
 
   @Override
   public int[][] transformToIntValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    int[][] resultSet = new int[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (int[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_intValuesMV == null || _intValuesMV.length < numDocs) {
+      _intValuesMV = new int[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setIntMV);
+    return _intValuesMV;
   }
 
   @Override
   public long[][] transformToLongValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    long[][] resultSet = new long[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (long[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_longValuesMV == null || _longValuesMV.length < numDocs) {
+      _longValuesMV = new long[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setLongMV);
+    return _longValuesMV;
   }
 
   @Override
   public float[][] transformToFloatValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    float[][] resultSet = new float[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (float[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_floatValuesMV == null || _floatValuesMV.length < numDocs) {
+      _floatValuesMV = new float[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setFloatMV);
+    return _floatValuesMV;
   }
 
   @Override
   public double[][] transformToDoubleValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    double[][] resultSet = new double[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (double[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_doubleValuesMV == null || _doubleValuesMV.length < numDocs) {
+      _doubleValuesMV = new double[numDocs][];
     }
-    return resultSet;
+    lookup(projectionBlock, this::setDoubleMV);
+    return _doubleValuesMV;
   }
 
   @Override
   public String[][] transformToStringValuesMV(ProjectionBlock projectionBlock) {
-    Object[] lookupObjects = lookup(projectionBlock);
-    String[][] resultSet = new String[lookupObjects.length][0];
-    for (int i = 0; i < lookupObjects.length; i++) {
-      if (lookupObjects[i] != null) {
-        resultSet[i] = (String[]) lookupObjects[i];
-      }
+    int numDocs = projectionBlock.getNumDocs();
+    if (_stringValuesMV == null || _stringValuesMV.length < numDocs) {
+      _stringValuesMV = new String[numDocs][];
+    }
+    lookup(projectionBlock, this::setStringMV);
+    return _stringValuesMV;
+  }
+
+  private void setIntSV(int index, Object value) {
+    if (value instanceof Number) {
+      _intValuesSV[index] = ((Number) value).intValue();
+    } else {
+      _intValuesSV[index] = _nullIntValue;
+    }
+  }
+
+  private void setLongSV(int index, Object value) {
+    if (value instanceof Number) {
+      _longValuesSV[index] = ((Number) value).longValue();
+    } else {
+      _longValuesSV[index] = _nullLongValue;
+    }
+  }
+
+  private void setFloatSV(int index, Object value) {
+    if (value instanceof Number) {
+      _floatValuesSV[index] = ((Number) value).floatValue();
+    } else {
+      _floatValuesSV[index] = _nullFloatValue;
+    }
+  }
+
+  private void setDoubleSV(int index, Object value) {
+    if (value instanceof Number) {
+      _doubleValuesSV[index] = ((Number) value).doubleValue();
+    } else {
+      _doubleValuesSV[index] = _nullDoubleValue;
+    }
+  }
+
+  private void setStringSV(int index, Object value) {
+    if (value != null) {
+      _stringValuesSV[index] = String.valueOf(value);
+    } else {
+      _stringValuesSV[index] = _lookupColumnFieldSpec.getDefaultNullValueString();
+    }
+  }
+
+  private void setBytesSV(int index, Object value) {
+    if (value instanceof byte[]) {
+      _byteValuesSV[index] = (byte[]) value;
+    }

Review comment:
       Since we didn't fill the default values, we need to put the default value here. Same for other methods without `else`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org