You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/19 21:56:34 UTC
(pinot) branch master updated: Fix derived column from MV column (#12028)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 32db500160 Fix derived column from MV column (#12028)
32db500160 is described below
commit 32db50016011bc64b9548438e9d1df0e266e27a0
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Sun Nov 19 13:56:27 2023 -0800
Fix derived column from MV column (#12028)
---
.../optimizer/filter/NumericalFilterOptimizer.java | 24 ++++++-------
.../tests/OfflineClusterIntegrationTest.java | 41 ++++++++++++++--------
.../defaultcolumn/BaseDefaultColumnHandler.java | 2 +-
.../segment/index/readers/StringDictionary.java | 32 +++++++++++++++++
.../segment/readers/PinotSegmentColumnReader.java | 38 +++++++++++++++++---
.../pinot/segment/spi/index/reader/Dictionary.java | 24 +++++++++++++
6 files changed, 129 insertions(+), 32 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/NumericalFilterOptimizer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/NumericalFilterOptimizer.java
index 7e8cddfc93..d5e88d4d1f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/NumericalFilterOptimizer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/NumericalFilterOptimizer.java
@@ -26,6 +26,7 @@ import org.apache.pinot.common.request.ExpressionType;
import org.apache.pinot.common.request.Function;
import org.apache.pinot.common.request.Literal;
import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.sql.FilterKind;
@@ -84,7 +85,7 @@ public class NumericalFilterOptimizer extends BaseAndOrBooleanFilterOptimizer {
Expression lhs = operands.get(0);
Expression rhs = operands.get(1);
if (isNumericLiteral(rhs)) {
- FieldSpec.DataType dataType = getDataType(lhs, schema);
+ DataType dataType = getDataType(lhs, schema);
if (dataType != null && dataType.isNumeric()) {
switch (kind) {
case EQUALS:
@@ -94,7 +95,7 @@ public class NumericalFilterOptimizer extends BaseAndOrBooleanFilterOptimizer {
case GREATER_THAN_OR_EQUAL:
case LESS_THAN:
case LESS_THAN_OR_EQUAL:
- return rewriteRangeExpression(filterExpression, kind, lhs, rhs, schema);
+ return rewriteRangeExpression(filterExpression, kind, dataType, rhs);
default:
break;
}
@@ -109,7 +110,7 @@ public class NumericalFilterOptimizer extends BaseAndOrBooleanFilterOptimizer {
* Rewrite expressions of form "column = literal" or "column != literal" to ensure that RHS literal is the same
* datatype as LHS column.
*/
- private static Expression rewriteEqualsExpression(Expression equals, FilterKind kind, FieldSpec.DataType dataType,
+ private static Expression rewriteEqualsExpression(Expression equals, FilterKind kind, DataType dataType,
Expression rhs) {
// Get expression operator
boolean result = kind == FilterKind.NOT_EQUALS;
@@ -201,11 +202,8 @@ public class NumericalFilterOptimizer extends BaseAndOrBooleanFilterOptimizer {
* Rewrite expressions of form "column > literal", "column >= literal", "column < literal", and "column <= literal"
* to ensure that RHS literal is the same datatype as LHS column.
*/
- private static Expression rewriteRangeExpression(Expression range, FilterKind kind, Expression lhs, Expression rhs,
- Schema schema) {
- // Get column data type.
- FieldSpec.DataType dataType = schema.getFieldSpecFor(lhs.getIdentifier().getName()).getDataType();
-
+ private static Expression rewriteRangeExpression(Expression range, FilterKind kind, DataType dataType,
+ Expression rhs) {
switch (rhs.getLiteral().getSetField()) {
case SHORT_VALUE:
case INT_VALUE:
@@ -378,7 +376,7 @@ public class NumericalFilterOptimizer extends BaseAndOrBooleanFilterOptimizer {
/** @return field data type extracted from the expression. null if we can't determine the type. */
@Nullable
- private static FieldSpec.DataType getDataType(Expression expression, Schema schema) {
+ private static DataType getDataType(Expression expression, Schema schema) {
if (expression.getType() == ExpressionType.IDENTIFIER) {
String column = expression.getIdentifier().getName();
FieldSpec fieldSpec = schema.getFieldSpecFor(column);
@@ -390,13 +388,13 @@ public class NumericalFilterOptimizer extends BaseAndOrBooleanFilterOptimizer {
// expression is not identifier but we can also determine the data type.
String targetTypeLiteral =
expression.getFunctionCall().getOperands().get(1).getLiteral().getStringValue().toUpperCase();
- FieldSpec.DataType dataType;
+ DataType dataType;
if ("INTEGER".equals(targetTypeLiteral)) {
- dataType = FieldSpec.DataType.INT;
+ dataType = DataType.INT;
} else if ("VARCHAR".equals(targetTypeLiteral)) {
- dataType = FieldSpec.DataType.STRING;
+ dataType = DataType.STRING;
} else {
- dataType = FieldSpec.DataType.valueOf(targetTypeLiteral);
+ dataType = DataType.valueOf(targetTypeLiteral);
}
return dataType;
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 6f9b125a4c..824991d86f 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -321,12 +321,6 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
}
}
- @Override
- protected void testQuery(String pinotQuery, String h2Query)
- throws Exception {
- super.testQuery(pinotQuery, h2Query);
- }
-
private void testQueryError(String query, int errorCode)
throws Exception {
JsonNode response = postQuery(query);
@@ -1464,10 +1458,12 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
* <li>"NewAddedMVStringDimension", DIMENSION, STRING, multi-value, default ("null")</li>
* <li>"NewAddedSVJSONDimension", DIMENSION, JSON, single-value, default ("null")</li>
* <li>"NewAddedSVBytesDimension", DIMENSION, BYTES, single-value, default (byte[0])</li>
- * <li>"NewAddedDerivedHoursSinceEpoch", DATE_TIME, INT, single-value, default (Integer.MIN_VALUE)</li>
- * <li>"NewAddedDerivedTimestamp", DATE_TIME, TIMESTAMP, single-value, default (EPOCH)</li>
- * <li>"NewAddedDerivedSVBooleanDimension", DIMENSION, BOOLEAN, single-value, default (false)</li>
- * <li>"NewAddedDerivedMVStringDimension", DATE_TIME, STRING, multi-value</li>
+ * <li>"NewAddedDerivedHoursSinceEpoch", DATE_TIME, INT, single-value, DaysSinceEpoch * 24</li>
+ * <li>"NewAddedDerivedTimestamp", DATE_TIME, TIMESTAMP, single-value, DaysSinceEpoch * 24 * 3600 * 1000</li>
+ * <li>"NewAddedDerivedSVBooleanDimension", DIMENSION, BOOLEAN, single-value, ActualElapsedTime > 0</li>
+ * <li>"NewAddedDerivedMVStringDimension", DIMENSION, STRING, multi-value, split(DestCityName, ', ')</li>
+ * <li>"NewAddedDerivedDivAirportSeqIDs", DIMENSION, INT, multi-value, DivAirportSeqIDs</li>
+ * <li>"NewAddedDerivedDivAirportSeqIDsString", DIMENSION, STRING, multi-value, DivAirportSeqIDs</li>
* </ul>
*/
@Test(dependsOnMethods = "testAggregateMetadataAPI", dataProvider = "useBothQueryEngines")
@@ -1480,7 +1476,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
reloadWithExtraColumns();
JsonNode queryResponse = postQuery(SELECT_STAR_QUERY);
assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
- assertEquals(queryResponse.get("resultTable").get("dataSchema").get("columnNames").size(), 98);
+ assertEquals(queryResponse.get("resultTable").get("dataSchema").get("columnNames").size(), 100);
testNewAddedColumns();
testExpressionOverride();
@@ -1558,6 +1554,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
schema.addField(new DateTimeFieldSpec("NewAddedDerivedTimestamp", DataType.TIMESTAMP, "TIMESTAMP", "1:DAYS"));
schema.addField(new DimensionFieldSpec("NewAddedDerivedSVBooleanDimension", DataType.BOOLEAN, true));
schema.addField(new DimensionFieldSpec("NewAddedDerivedMVStringDimension", DataType.STRING, false));
+ schema.addField(new DimensionFieldSpec("NewAddedDerivedDivAirportSeqIDs", DataType.INT, false));
+ schema.addField(new DimensionFieldSpec("NewAddedDerivedDivAirportSeqIDsString", DataType.STRING, false));
addSchema(schema);
TableConfig tableConfig = getOfflineTableConfig();
@@ -1565,7 +1563,9 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
Arrays.asList(new TransformConfig("NewAddedDerivedHoursSinceEpoch", "DaysSinceEpoch * 24"),
new TransformConfig("NewAddedDerivedTimestamp", "DaysSinceEpoch * 24 * 3600 * 1000"),
new TransformConfig("NewAddedDerivedSVBooleanDimension", "ActualElapsedTime > 0"),
- new TransformConfig("NewAddedDerivedMVStringDimension", "split(DestCityName, ', ')"));
+ new TransformConfig("NewAddedDerivedMVStringDimension", "split(DestCityName, ', ')"),
+ new TransformConfig("NewAddedDerivedDivAirportSeqIDs", "DivAirportSeqIDs"),
+ new TransformConfig("NewAddedDerivedDivAirportSeqIDsString", "DivAirportSeqIDs"));
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(transformConfigs);
tableConfig.setIngestionConfig(ingestionConfig);
@@ -1667,6 +1667,19 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
h2Query = "SELECT COUNT(*) FROM mytable WHERE DestState = 'CA'";
testQuery(pinotQuery, h2Query);
+ pinotQuery = "SELECT COUNT(*) FROM mytable WHERE DivAirportSeqIDs > 1100000";
+ JsonNode response = postQuery(pinotQuery);
+ JsonNode rows = response.get("resultTable").get("rows");
+ long count = rows.get(0).get(0).asLong();
+ pinotQuery = "SELECT COUNT(*) FROM mytable WHERE NewAddedDerivedDivAirportSeqIDs > 1100000";
+ response = postQuery(pinotQuery);
+ rows = response.get("resultTable").get("rows");
+ assertEquals(rows.get(0).get(0).asLong(), count);
+ pinotQuery = "SELECT COUNT(*) FROM mytable WHERE CAST(NewAddedDerivedDivAirportSeqIDsString AS INT) > 1100000";
+ response = postQuery(pinotQuery);
+ rows = response.get("resultTable").get("rows");
+ assertEquals(rows.get(0).get(0).asLong(), count);
+
// Test queries with new added metric column in aggregation function
pinotQuery = "SELECT SUM(NewAddedIntMetric) FROM mytable WHERE DaysSinceEpoch <= 16312";
h2Query = "SELECT COUNT(*) FROM mytable WHERE DaysSinceEpoch <= 16312";
@@ -1684,8 +1697,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
// Test other query forms with new added columns
pinotQuery =
"SELECT NewAddedMVStringDimension, SUM(NewAddedFloatMetric) FROM mytable GROUP BY NewAddedMVStringDimension";
- JsonNode response = postQuery(pinotQuery);
- JsonNode rows = response.get("resultTable").get("rows");
+ response = postQuery(pinotQuery);
+ rows = response.get("resultTable").get("rows");
assertEquals(rows.size(), 1);
JsonNode row = rows.get(0);
assertEquals(row.size(), 2);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
index a8c6f91917..9fc15e86b1 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java
@@ -581,7 +581,7 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler {
outputValues[i] = outputValue;
if (outputValueType == null) {
Class<?> outputValueClass = outputValue.getClass();
- outputValueType = FunctionUtils.getParameterType(outputValueClass);
+ outputValueType = FunctionUtils.getArgumentType(outputValueClass);
Preconditions.checkState(outputValueType != null, "Unsupported output value class: %s", outputValueClass);
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/StringDictionary.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/StringDictionary.java
index 502e4b79cc..5d173749f8 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/StringDictionary.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/StringDictionary.java
@@ -87,6 +87,14 @@ public class StringDictionary extends BaseImmutableDictionary {
}
}
+ @Override
+ public void readIntValues(int[] dictIds, int length, Integer[] outValues) {
+ byte[] buffer = getBuffer();
+ for (int i = 0; i < length; i++) {
+ outValues[i] = Integer.parseInt(getUnpaddedString(dictIds[i], buffer));
+ }
+ }
+
@Override
public void readLongValues(int[] dictIds, int length, long[] outValues) {
byte[] buffer = getBuffer();
@@ -95,6 +103,14 @@ public class StringDictionary extends BaseImmutableDictionary {
}
}
+ @Override
+ public void readLongValues(int[] dictIds, int length, Long[] outValues) {
+ byte[] buffer = getBuffer();
+ for (int i = 0; i < length; i++) {
+ outValues[i] = Long.parseLong(getUnpaddedString(dictIds[i], buffer));
+ }
+ }
+
@Override
public void readFloatValues(int[] dictIds, int length, float[] outValues) {
byte[] buffer = getBuffer();
@@ -103,6 +119,14 @@ public class StringDictionary extends BaseImmutableDictionary {
}
}
+ @Override
+ public void readFloatValues(int[] dictIds, int length, Float[] outValues) {
+ byte[] buffer = getBuffer();
+ for (int i = 0; i < length; i++) {
+ outValues[i] = Float.parseFloat(getUnpaddedString(dictIds[i], buffer));
+ }
+ }
+
@Override
public void readDoubleValues(int[] dictIds, int length, double[] outValues) {
byte[] buffer = getBuffer();
@@ -111,6 +135,14 @@ public class StringDictionary extends BaseImmutableDictionary {
}
}
+ @Override
+ public void readDoubleValues(int[] dictIds, int length, Double[] outValues) {
+ byte[] buffer = getBuffer();
+ for (int i = 0; i < length; i++) {
+ outValues[i] = Double.parseDouble(getUnpaddedString(dictIds[i], buffer));
+ }
+ }
+
@Override
public void readStringValues(int[] dictIds, int length, String[] outValues) {
byte[] buffer = getBuffer();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
index ca1b6b81f3..53d8c38129 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
@@ -96,11 +96,41 @@ public class PinotSegmentColumnReader implements Closeable {
return _dictionary.get(_forwardIndexReader.getDictId(docId, _forwardIndexReaderContext));
} else {
int numValues = _forwardIndexReader.getDictIdMV(docId, _dictIdBuffer, _forwardIndexReaderContext);
- Object[] values = new Object[numValues];
- for (int i = 0; i < numValues; i++) {
- values[i] = _dictionary.get(_dictIdBuffer[i]);
+ DataType storedType = _dictionary.getValueType();
+ switch (storedType) {
+ case INT: {
+ Integer[] values = new Integer[numValues];
+ _dictionary.readIntValues(_dictIdBuffer, numValues, values);
+ return values;
+ }
+ case LONG: {
+ Long[] values = new Long[numValues];
+ _dictionary.readLongValues(_dictIdBuffer, numValues, values);
+ return values;
+ }
+ case FLOAT: {
+ Float[] values = new Float[numValues];
+ _dictionary.readFloatValues(_dictIdBuffer, numValues, values);
+ return values;
+ }
+ case DOUBLE: {
+ Double[] values = new Double[numValues];
+ _dictionary.readDoubleValues(_dictIdBuffer, numValues, values);
+ return values;
+ }
+ case STRING: {
+ String[] values = new String[numValues];
+ _dictionary.readStringValues(_dictIdBuffer, numValues, values);
+ return values;
+ }
+ case BYTES: {
+ byte[][] values = new byte[numValues][];
+ _dictionary.readBytesValues(_dictIdBuffer, numValues, values);
+ return values;
+ }
+ default:
+ throw new IllegalStateException("Unsupported MV type: " + storedType);
}
- return values;
}
} else {
// Raw index based
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/Dictionary.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/Dictionary.java
index a0305940db..433ffab4cd 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/Dictionary.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/Dictionary.java
@@ -208,24 +208,48 @@ public interface Dictionary extends IndexReader {
}
}
+ default void readIntValues(int[] dictIds, int length, Integer[] outValues) {
+ for (int i = 0; i < length; i++) {
+ outValues[i] = getIntValue(dictIds[i]);
+ }
+ }
+
default void readLongValues(int[] dictIds, int length, long[] outValues) {
for (int i = 0; i < length; i++) {
outValues[i] = getLongValue(dictIds[i]);
}
}
+ default void readLongValues(int[] dictIds, int length, Long[] outValues) {
+ for (int i = 0; i < length; i++) {
+ outValues[i] = getLongValue(dictIds[i]);
+ }
+ }
+
default void readFloatValues(int[] dictIds, int length, float[] outValues) {
for (int i = 0; i < length; i++) {
outValues[i] = getFloatValue(dictIds[i]);
}
}
+ default void readFloatValues(int[] dictIds, int length, Float[] outValues) {
+ for (int i = 0; i < length; i++) {
+ outValues[i] = getFloatValue(dictIds[i]);
+ }
+ }
+
default void readDoubleValues(int[] dictIds, int length, double[] outValues) {
for (int i = 0; i < length; i++) {
outValues[i] = getDoubleValue(dictIds[i]);
}
}
+ default void readDoubleValues(int[] dictIds, int length, Double[] outValues) {
+ for (int i = 0; i < length; i++) {
+ outValues[i] = getDoubleValue(dictIds[i]);
+ }
+ }
+
default void readBigDecimalValues(int[] dictIds, int length, BigDecimal[] outValues) {
for (int i = 0; i < length; i++) {
outValues[i] = getBigDecimalValue(dictIds[i]);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org