You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/19 21:56:34 UTC

(pinot) branch master updated: Fix derived column from MV column (#12028)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 32db500160 Fix derived column from MV column (#12028)
32db500160 is described below

commit 32db50016011bc64b9548438e9d1df0e266e27a0
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Sun Nov 19 13:56:27 2023 -0800

    Fix derived column from MV column (#12028)
---
 .../optimizer/filter/NumericalFilterOptimizer.java | 24 ++++++-------
 .../tests/OfflineClusterIntegrationTest.java       | 41 ++++++++++++++--------
 .../defaultcolumn/BaseDefaultColumnHandler.java    |  2 +-
 .../segment/index/readers/StringDictionary.java    | 32 +++++++++++++++++
 .../segment/readers/PinotSegmentColumnReader.java  | 38 +++++++++++++++++---
 .../pinot/segment/spi/index/reader/Dictionary.java | 24 +++++++++++++
 6 files changed, 129 insertions(+), 32 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/NumericalFilterOptimizer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/NumericalFilterOptimizer.java
index 7e8cddfc93..d5e88d4d1f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/NumericalFilterOptimizer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/NumericalFilterOptimizer.java
@@ -26,6 +26,7 @@ import org.apache.pinot.common.request.ExpressionType;
 import org.apache.pinot.common.request.Function;
 import org.apache.pinot.common.request.Literal;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.sql.FilterKind;
 
@@ -84,7 +85,7 @@ public class NumericalFilterOptimizer extends BaseAndOrBooleanFilterOptimizer {
         Expression lhs = operands.get(0);
         Expression rhs = operands.get(1);
         if (isNumericLiteral(rhs)) {
-          FieldSpec.DataType dataType = getDataType(lhs, schema);
+          DataType dataType = getDataType(lhs, schema);
           if (dataType != null && dataType.isNumeric()) {
             switch (kind) {
               case EQUALS:
@@ -94,7 +95,7 @@ public class NumericalFilterOptimizer extends BaseAndOrBooleanFilterOptimizer {
               case GREATER_THAN_OR_EQUAL:
               case LESS_THAN:
               case LESS_THAN_OR_EQUAL:
-                return rewriteRangeExpression(filterExpression, kind, lhs, rhs, schema);
+                return rewriteRangeExpression(filterExpression, kind, dataType, rhs);
               default:
                 break;
             }
@@ -109,7 +110,7 @@ public class NumericalFilterOptimizer extends BaseAndOrBooleanFilterOptimizer {
    * Rewrite expressions of form "column = literal" or "column != literal" to ensure that RHS literal is the same
    * datatype as LHS column.
    */
-  private static Expression rewriteEqualsExpression(Expression equals, FilterKind kind, FieldSpec.DataType dataType,
+  private static Expression rewriteEqualsExpression(Expression equals, FilterKind kind, DataType dataType,
       Expression rhs) {
     // Get expression operator
     boolean result = kind == FilterKind.NOT_EQUALS;
@@ -201,11 +202,8 @@ public class NumericalFilterOptimizer extends BaseAndOrBooleanFilterOptimizer {
    * Rewrite expressions of form "column > literal", "column >= literal", "column < literal", and "column <= literal"
    * to ensure that RHS literal is the same datatype as LHS column.
    */
-  private static Expression rewriteRangeExpression(Expression range, FilterKind kind, Expression lhs, Expression rhs,
-      Schema schema) {
-    // Get column data type.
-    FieldSpec.DataType dataType = schema.getFieldSpecFor(lhs.getIdentifier().getName()).getDataType();
-
+  private static Expression rewriteRangeExpression(Expression range, FilterKind kind, DataType dataType,
+      Expression rhs) {
     switch (rhs.getLiteral().getSetField()) {
       case SHORT_VALUE:
       case INT_VALUE:
@@ -378,7 +376,7 @@ public class NumericalFilterOptimizer extends BaseAndOrBooleanFilterOptimizer {
 
   /** @return field data type extracted from the expression. null if we can't determine the type. */
   @Nullable
-  private static FieldSpec.DataType getDataType(Expression expression, Schema schema) {
+  private static DataType getDataType(Expression expression, Schema schema) {
     if (expression.getType() == ExpressionType.IDENTIFIER) {
       String column = expression.getIdentifier().getName();
       FieldSpec fieldSpec = schema.getFieldSpecFor(column);
@@ -390,13 +388,13 @@ public class NumericalFilterOptimizer extends BaseAndOrBooleanFilterOptimizer {
       // expression is not identifier but we can also determine the data type.
       String targetTypeLiteral =
           expression.getFunctionCall().getOperands().get(1).getLiteral().getStringValue().toUpperCase();
-      FieldSpec.DataType dataType;
+      DataType dataType;
       if ("INTEGER".equals(targetTypeLiteral)) {
-        dataType = FieldSpec.DataType.INT;
+        dataType = DataType.INT;
       } else if ("VARCHAR".equals(targetTypeLiteral)) {
-        dataType = FieldSpec.DataType.STRING;
+        dataType = DataType.STRING;
       } else {
-        dataType = FieldSpec.DataType.valueOf(targetTypeLiteral);
+        dataType = DataType.valueOf(targetTypeLiteral);
       }
       return dataType;
     }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 6f9b125a4c..824991d86f 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -321,12 +321,6 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     }
   }
 
-  @Override
-  protected void testQuery(String pinotQuery, String h2Query)
-      throws Exception {
-    super.testQuery(pinotQuery, h2Query);
-  }
-
   private void testQueryError(String query, int errorCode)
       throws Exception {
     JsonNode response = postQuery(query);
@@ -1464,10 +1458,12 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
    *   <li>"NewAddedMVStringDimension", DIMENSION, STRING, multi-value, default ("null")</li>
    *   <li>"NewAddedSVJSONDimension", DIMENSION, JSON, single-value, default ("null")</li>
    *   <li>"NewAddedSVBytesDimension", DIMENSION, BYTES, single-value, default (byte[0])</li>
-   *   <li>"NewAddedDerivedHoursSinceEpoch", DATE_TIME, INT, single-value, default (Integer.MIN_VALUE)</li>
-   *   <li>"NewAddedDerivedTimestamp", DATE_TIME, TIMESTAMP, single-value, default (EPOCH)</li>
-   *   <li>"NewAddedDerivedSVBooleanDimension", DIMENSION, BOOLEAN, single-value, default (false)</li>
-   *   <li>"NewAddedDerivedMVStringDimension", DATE_TIME, STRING, multi-value</li>
+   *   <li>"NewAddedDerivedHoursSinceEpoch", DATE_TIME, INT, single-value, DaysSinceEpoch * 24</li>
+   *   <li>"NewAddedDerivedTimestamp", DATE_TIME, TIMESTAMP, single-value, DaysSinceEpoch * 24 * 3600 * 1000</li>
+   *   <li>"NewAddedDerivedSVBooleanDimension", DIMENSION, BOOLEAN, single-value, ActualElapsedTime > 0</li>
+   *   <li>"NewAddedDerivedMVStringDimension", DIMENSION, STRING, multi-value, split(DestCityName, ', ')</li>
+   *   <li>"NewAddedDerivedDivAirportSeqIDs", DIMENSION, INT, multi-value, DivAirportSeqIDs</li>
+   *   <li>"NewAddedDerivedDivAirportSeqIDsString", DIMENSION, STRING, multi-value, DivAirportSeqIDs</li>
    * </ul>
    */
   @Test(dependsOnMethods = "testAggregateMetadataAPI", dataProvider = "useBothQueryEngines")
@@ -1480,7 +1476,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     reloadWithExtraColumns();
     JsonNode queryResponse = postQuery(SELECT_STAR_QUERY);
     assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
-    assertEquals(queryResponse.get("resultTable").get("dataSchema").get("columnNames").size(), 98);
+    assertEquals(queryResponse.get("resultTable").get("dataSchema").get("columnNames").size(), 100);
 
     testNewAddedColumns();
     testExpressionOverride();
@@ -1558,6 +1554,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     schema.addField(new DateTimeFieldSpec("NewAddedDerivedTimestamp", DataType.TIMESTAMP, "TIMESTAMP", "1:DAYS"));
     schema.addField(new DimensionFieldSpec("NewAddedDerivedSVBooleanDimension", DataType.BOOLEAN, true));
     schema.addField(new DimensionFieldSpec("NewAddedDerivedMVStringDimension", DataType.STRING, false));
+    schema.addField(new DimensionFieldSpec("NewAddedDerivedDivAirportSeqIDs", DataType.INT, false));
+    schema.addField(new DimensionFieldSpec("NewAddedDerivedDivAirportSeqIDsString", DataType.STRING, false));
     addSchema(schema);
 
     TableConfig tableConfig = getOfflineTableConfig();
@@ -1565,7 +1563,9 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
         Arrays.asList(new TransformConfig("NewAddedDerivedHoursSinceEpoch", "DaysSinceEpoch * 24"),
             new TransformConfig("NewAddedDerivedTimestamp", "DaysSinceEpoch * 24 * 3600 * 1000"),
             new TransformConfig("NewAddedDerivedSVBooleanDimension", "ActualElapsedTime > 0"),
-            new TransformConfig("NewAddedDerivedMVStringDimension", "split(DestCityName, ', ')"));
+            new TransformConfig("NewAddedDerivedMVStringDimension", "split(DestCityName, ', ')"),
+            new TransformConfig("NewAddedDerivedDivAirportSeqIDs", "DivAirportSeqIDs"),
+            new TransformConfig("NewAddedDerivedDivAirportSeqIDsString", "DivAirportSeqIDs"));
     IngestionConfig ingestionConfig = new IngestionConfig();
     ingestionConfig.setTransformConfigs(transformConfigs);
     tableConfig.setIngestionConfig(ingestionConfig);
@@ -1667,6 +1667,19 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     h2Query = "SELECT COUNT(*) FROM mytable WHERE DestState = 'CA'";
     testQuery(pinotQuery, h2Query);
 
+    pinotQuery = "SELECT COUNT(*) FROM mytable WHERE DivAirportSeqIDs > 1100000";
+    JsonNode response = postQuery(pinotQuery);
+    JsonNode rows = response.get("resultTable").get("rows");
+    long count = rows.get(0).get(0).asLong();
+    pinotQuery = "SELECT COUNT(*) FROM mytable WHERE NewAddedDerivedDivAirportSeqIDs > 1100000";
+    response = postQuery(pinotQuery);
+    rows = response.get("resultTable").get("rows");
+    assertEquals(rows.get(0).get(0).asLong(), count);
+    pinotQuery = "SELECT COUNT(*) FROM mytable WHERE CAST(NewAddedDerivedDivAirportSeqIDsString AS INT) > 1100000";
+    response = postQuery(pinotQuery);
+    rows = response.get("resultTable").get("rows");
+    assertEquals(rows.get(0).get(0).asLong(), count);
+
     // Test queries with new added metric column in aggregation function
     pinotQuery = "SELECT SUM(NewAddedIntMetric) FROM mytable WHERE DaysSinceEpoch <= 16312";
     h2Query = "SELECT COUNT(*) FROM mytable WHERE DaysSinceEpoch <= 16312";
@@ -1684,8 +1697,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     // Test other query forms with new added columns
     pinotQuery =
         "SELECT NewAddedMVStringDimension, SUM(NewAddedFloatMetric) FROM mytable GROUP BY NewAddedMVStringDimension";
-    JsonNode response = postQuery(pinotQuery);
-    JsonNode rows = response.get("resultTable").get("rows");
+    response = postQuery(pinotQuery);
+    rows = response.get("resultTable").get("rows");
     assertEquals(rows.size(), 1);
     JsonNode row = rows.get(0);
     assertEquals(row.size(), 2);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index a8c6f91917..9fc15e86b1 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -581,7 +581,7 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
         outputValues[i] = outputValue;
         if (outputValueType == null) {
           Class<?> outputValueClass = outputValue.getClass();
-          outputValueType = FunctionUtils.getParameterType(outputValueClass);
+          outputValueType = FunctionUtils.getArgumentType(outputValueClass);
           Preconditions.checkState(outputValueType != null, "Unsupported output value class: %s", outputValueClass);
         }
       }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/StringDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/StringDictionary.java
index 502e4b79cc..5d173749f8 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/StringDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/StringDictionary.java
@@ -87,6 +87,14 @@ public class StringDictionary extends BaseImmutableDictionary {
     }
   }
 
+  @Override
+  public void readIntValues(int[] dictIds, int length, Integer[] outValues) {
+    byte[] buffer = getBuffer();
+    for (int i = 0; i < length; i++) {
+      outValues[i] = Integer.parseInt(getUnpaddedString(dictIds[i], buffer));
+    }
+  }
+
   @Override
   public void readLongValues(int[] dictIds, int length, long[] outValues) {
     byte[] buffer = getBuffer();
@@ -95,6 +103,14 @@ public class StringDictionary extends BaseImmutableDictionary {
     }
   }
 
+  @Override
+  public void readLongValues(int[] dictIds, int length, Long[] outValues) {
+    byte[] buffer = getBuffer();
+    for (int i = 0; i < length; i++) {
+      outValues[i] = Long.parseLong(getUnpaddedString(dictIds[i], buffer));
+    }
+  }
+
   @Override
   public void readFloatValues(int[] dictIds, int length, float[] outValues) {
     byte[] buffer = getBuffer();
@@ -103,6 +119,14 @@ public class StringDictionary extends BaseImmutableDictionary {
     }
   }
 
+  @Override
+  public void readFloatValues(int[] dictIds, int length, Float[] outValues) {
+    byte[] buffer = getBuffer();
+    for (int i = 0; i < length; i++) {
+      outValues[i] = Float.parseFloat(getUnpaddedString(dictIds[i], buffer));
+    }
+  }
+
   @Override
   public void readDoubleValues(int[] dictIds, int length, double[] outValues) {
     byte[] buffer = getBuffer();
@@ -111,6 +135,14 @@ public class StringDictionary extends BaseImmutableDictionary {
     }
   }
 
+  @Override
+  public void readDoubleValues(int[] dictIds, int length, Double[] outValues) {
+    byte[] buffer = getBuffer();
+    for (int i = 0; i < length; i++) {
+      outValues[i] = Double.parseDouble(getUnpaddedString(dictIds[i], buffer));
+    }
+  }
+
   @Override
   public void readStringValues(int[] dictIds, int length, String[] outValues) {
     byte[] buffer = getBuffer();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
index ca1b6b81f3..53d8c38129 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
@@ -96,11 +96,41 @@ public class PinotSegmentColumnReader implements Closeable {
         return _dictionary.get(_forwardIndexReader.getDictId(docId, _forwardIndexReaderContext));
       } else {
         int numValues = _forwardIndexReader.getDictIdMV(docId, _dictIdBuffer, _forwardIndexReaderContext);
-        Object[] values = new Object[numValues];
-        for (int i = 0; i < numValues; i++) {
-          values[i] = _dictionary.get(_dictIdBuffer[i]);
+        DataType storedType = _dictionary.getValueType();
+        switch (storedType) {
+          case INT: {
+            Integer[] values = new Integer[numValues];
+            _dictionary.readIntValues(_dictIdBuffer, numValues, values);
+            return values;
+          }
+          case LONG: {
+            Long[] values = new Long[numValues];
+            _dictionary.readLongValues(_dictIdBuffer, numValues, values);
+            return values;
+          }
+          case FLOAT: {
+            Float[] values = new Float[numValues];
+            _dictionary.readFloatValues(_dictIdBuffer, numValues, values);
+            return values;
+          }
+          case DOUBLE: {
+            Double[] values = new Double[numValues];
+            _dictionary.readDoubleValues(_dictIdBuffer, numValues, values);
+            return values;
+          }
+          case STRING: {
+            String[] values = new String[numValues];
+            _dictionary.readStringValues(_dictIdBuffer, numValues, values);
+            return values;
+          }
+          case BYTES: {
+            byte[][] values = new byte[numValues][];
+            _dictionary.readBytesValues(_dictIdBuffer, numValues, values);
+            return values;
+          }
+          default:
+            throw new IllegalStateException("Unsupported MV type: " + storedType);
         }
-        return values;
       }
     } else {
       // Raw index based
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/Dictionary.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/Dictionary.java
index a0305940db..433ffab4cd 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/Dictionary.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/Dictionary.java
@@ -208,24 +208,48 @@ public interface Dictionary extends IndexReader {
     }
   }
 
+  default void readIntValues(int[] dictIds, int length, Integer[] outValues) {
+    for (int i = 0; i < length; i++) {
+      outValues[i] = getIntValue(dictIds[i]);
+    }
+  }
+
   default void readLongValues(int[] dictIds, int length, long[] outValues) {
     for (int i = 0; i < length; i++) {
       outValues[i] = getLongValue(dictIds[i]);
     }
   }
 
+  default void readLongValues(int[] dictIds, int length, Long[] outValues) {
+    for (int i = 0; i < length; i++) {
+      outValues[i] = getLongValue(dictIds[i]);
+    }
+  }
+
   default void readFloatValues(int[] dictIds, int length, float[] outValues) {
     for (int i = 0; i < length; i++) {
       outValues[i] = getFloatValue(dictIds[i]);
     }
   }
 
+  default void readFloatValues(int[] dictIds, int length, Float[] outValues) {
+    for (int i = 0; i < length; i++) {
+      outValues[i] = getFloatValue(dictIds[i]);
+    }
+  }
+
   default void readDoubleValues(int[] dictIds, int length, double[] outValues) {
     for (int i = 0; i < length; i++) {
       outValues[i] = getDoubleValue(dictIds[i]);
     }
   }
 
+  default void readDoubleValues(int[] dictIds, int length, Double[] outValues) {
+    for (int i = 0; i < length; i++) {
+      outValues[i] = getDoubleValue(dictIds[i]);
+    }
+  }
+
   default void readBigDecimalValues(int[] dictIds, int length, BigDecimal[] outValues) {
     for (int i = 0; i < length; i++) {
       outValues[i] = getBigDecimalValue(dictIds[i]);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org