You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sa...@apache.org on 2024/02/29 15:35:28 UTC
(pinot) branch master updated: Pass explicit TypeRef when evaluating MV jsonPath (#12524)
This is an automated email from the ASF dual-hosted git repository.
saurabhd336 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new bddd3612bc Pass explicit TypeRef when evaluating MV jsonPath (#12524)
bddd3612bc is described below
commit bddd3612bc0f8a5927f34f14d93fb969ac81e79b
Author: Saurabh Dubey <sa...@gmail.com>
AuthorDate: Thu Feb 29 21:05:21 2024 +0530
Pass explicit TypeRef when evaluating MV jsonPath (#12524)
Co-authored-by: Saurabh Dubey <sa...@saurabhs-macbook-pro-1.tail8a064.ts.net>
---
.../evaluators/DefaultJsonPathEvaluator.java | 89 ++++++++++---
.../evaluators/DefaultJsonPathEvaluatorTest.java | 146 +++++++++++++++++++++
2 files changed, 215 insertions(+), 20 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluator.java b/pinot-core/src/main/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluator.java
index f22474354c..cae1e3c17b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluator.java
@@ -25,6 +25,7 @@ import com.jayway.jsonpath.InvalidPathException;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.ParseContext;
+import com.jayway.jsonpath.TypeRef;
import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider;
import java.math.BigDecimal;
@@ -58,6 +59,16 @@ public final class DefaultJsonPathEvaluator implements JsonPathEvaluator {
private static final float[] EMPTY_FLOATS = new float[0];
private static final double[] EMPTY_DOUBLES = new double[0];
private static final String[] EMPTY_STRINGS = new String[0];
+ private static final TypeRef<List<Integer>> INTEGER_LIST_TYPE = new TypeRef<List<Integer>>() {
+ };
+ private static final TypeRef<List<Long>> LONG_LIST_TYPE = new TypeRef<List<Long>>() {
+ };
+ private static final TypeRef<List<Float>> FLOAT_LIST_TYPE = new TypeRef<List<Float>>() {
+ };
+ private static final TypeRef<List<Double>> DOUBLE_LIST_TYPE = new TypeRef<List<Double>>() {
+ };
+ private static final TypeRef<List<String>> STRING_LIST_TYPE = new TypeRef<List<String>>() {
+ };
public static JsonPathEvaluator create(String jsonPath, @Nullable Object defaultValue) {
try {
@@ -274,23 +285,23 @@ public final class DefaultJsonPathEvaluator implements JsonPathEvaluator {
reader.readDictIds(docIds, length, dictIdsBuffer, context);
if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
for (int i = 0; i < length; i++) {
- processList(i, extractFromBytes(dictionary, dictIdsBuffer[i]), valueBuffer);
+ processList(i, extractFromBytes(dictionary, dictIdsBuffer[i], INTEGER_LIST_TYPE), valueBuffer);
}
} else {
for (int i = 0; i < length; i++) {
- processList(i, extractFromString(dictionary, dictIdsBuffer[i]), valueBuffer);
+ processList(i, extractFromString(dictionary, dictIdsBuffer[i], INTEGER_LIST_TYPE), valueBuffer);
}
}
} else {
switch (reader.getStoredType()) {
case STRING:
for (int i = 0; i < length; i++) {
- processList(i, extractFromString(reader, context, docIds[i]), valueBuffer);
+ processList(i, extractFromString(reader, context, docIds[i], INTEGER_LIST_TYPE), valueBuffer);
}
break;
case BYTES:
for (int i = 0; i < length; i++) {
- processList(i, extractFromBytes(reader, context, docIds[i]), valueBuffer);
+ processList(i, extractFromBytes(reader, context, docIds[i], INTEGER_LIST_TYPE), valueBuffer);
}
break;
default:
@@ -305,23 +316,23 @@ public final class DefaultJsonPathEvaluator implements JsonPathEvaluator {
reader.readDictIds(docIds, length, dictIdsBuffer, context);
if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
for (int i = 0; i < length; i++) {
- processList(i, extractFromBytes(dictionary, dictIdsBuffer[i]), valueBuffer);
+ processList(i, extractFromBytes(dictionary, dictIdsBuffer[i], LONG_LIST_TYPE), valueBuffer);
}
} else {
for (int i = 0; i < length; i++) {
- processList(i, extractFromString(dictionary, dictIdsBuffer[i]), valueBuffer);
+ processList(i, extractFromString(dictionary, dictIdsBuffer[i], LONG_LIST_TYPE), valueBuffer);
}
}
} else {
switch (reader.getStoredType()) {
case STRING:
for (int i = 0; i < length; i++) {
- processList(i, extractFromString(reader, context, docIds[i]), valueBuffer);
+ processList(i, extractFromString(reader, context, docIds[i], LONG_LIST_TYPE), valueBuffer);
}
break;
case BYTES:
for (int i = 0; i < length; i++) {
- processList(i, extractFromBytes(reader, context, docIds[i]), valueBuffer);
+ processList(i, extractFromBytes(reader, context, docIds[i], LONG_LIST_TYPE), valueBuffer);
}
break;
default:
@@ -336,23 +347,23 @@ public final class DefaultJsonPathEvaluator implements JsonPathEvaluator {
reader.readDictIds(docIds, length, dictIdsBuffer, context);
if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
for (int i = 0; i < length; i++) {
- processList(i, extractFromBytes(dictionary, dictIdsBuffer[i]), valueBuffer);
+ processList(i, extractFromBytes(dictionary, dictIdsBuffer[i], FLOAT_LIST_TYPE), valueBuffer);
}
} else {
for (int i = 0; i < length; i++) {
- processList(i, extractFromString(dictionary, dictIdsBuffer[i]), valueBuffer);
+ processList(i, extractFromString(dictionary, dictIdsBuffer[i], FLOAT_LIST_TYPE), valueBuffer);
}
}
} else {
switch (reader.getStoredType()) {
case STRING:
for (int i = 0; i < length; i++) {
- processList(i, extractFromString(reader, context, docIds[i]), valueBuffer);
+ processList(i, extractFromString(reader, context, docIds[i], FLOAT_LIST_TYPE), valueBuffer);
}
break;
case BYTES:
for (int i = 0; i < length; i++) {
- processList(i, extractFromBytes(reader, context, docIds[i]), valueBuffer);
+ processList(i, extractFromBytes(reader, context, docIds[i], FLOAT_LIST_TYPE), valueBuffer);
}
break;
default:
@@ -367,23 +378,23 @@ public final class DefaultJsonPathEvaluator implements JsonPathEvaluator {
reader.readDictIds(docIds, length, dictIdsBuffer, context);
if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
for (int i = 0; i < length; i++) {
- processList(i, extractFromBytes(dictionary, dictIdsBuffer[i]), valueBuffer);
+ processList(i, extractFromBytes(dictionary, dictIdsBuffer[i], DOUBLE_LIST_TYPE), valueBuffer);
}
} else {
for (int i = 0; i < length; i++) {
- processList(i, extractFromString(dictionary, dictIdsBuffer[i]), valueBuffer);
+ processList(i, extractFromString(dictionary, dictIdsBuffer[i], DOUBLE_LIST_TYPE), valueBuffer);
}
}
} else {
switch (reader.getStoredType()) {
case STRING:
for (int i = 0; i < length; i++) {
- processList(i, extractFromString(reader, context, docIds[i]), valueBuffer);
+ processList(i, extractFromString(reader, context, docIds[i], DOUBLE_LIST_TYPE), valueBuffer);
}
break;
case BYTES:
for (int i = 0; i < length; i++) {
- processList(i, extractFromBytes(reader, context, docIds[i]), valueBuffer);
+ processList(i, extractFromBytes(reader, context, docIds[i], DOUBLE_LIST_TYPE), valueBuffer);
}
break;
default:
@@ -398,23 +409,23 @@ public final class DefaultJsonPathEvaluator implements JsonPathEvaluator {
reader.readDictIds(docIds, length, dictIdsBuffer, context);
if (dictionary.getValueType() == FieldSpec.DataType.BYTES) {
for (int i = 0; i < length; i++) {
- processList(i, extractFromBytes(dictionary, dictIdsBuffer[i]), valueBuffer);
+ processList(i, extractFromBytes(dictionary, dictIdsBuffer[i], STRING_LIST_TYPE), valueBuffer);
}
} else {
for (int i = 0; i < length; i++) {
- processList(i, extractFromString(dictionary, dictIdsBuffer[i]), valueBuffer);
+ processList(i, extractFromString(dictionary, dictIdsBuffer[i], STRING_LIST_TYPE), valueBuffer);
}
}
} else {
switch (reader.getStoredType()) {
case STRING:
for (int i = 0; i < length; i++) {
- processList(i, extractFromString(reader, context, docIds[i]), valueBuffer);
+ processList(i, extractFromString(reader, context, docIds[i], STRING_LIST_TYPE), valueBuffer);
}
break;
case BYTES:
for (int i = 0; i < length; i++) {
- processList(i, extractFromBytes(reader, context, docIds[i]), valueBuffer);
+ processList(i, extractFromBytes(reader, context, docIds[i], STRING_LIST_TYPE), valueBuffer);
}
break;
default:
@@ -432,6 +443,15 @@ public final class DefaultJsonPathEvaluator implements JsonPathEvaluator {
}
}
+ @Nullable
+ private <T> T extractFromBytes(Dictionary dictionary, int dictId, TypeRef<T> ref) {
+ try {
+ return JSON_PARSER_CONTEXT.parseUtf8(dictionary.getBytesValue(dictId)).read(_jsonPath, ref);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
@Nullable
private <T, R extends ForwardIndexReaderContext> T extractFromBytes(ForwardIndexReader<R> reader, R context,
int docId) {
@@ -442,6 +462,16 @@ public final class DefaultJsonPathEvaluator implements JsonPathEvaluator {
}
}
+ @Nullable
+ private <T, R extends ForwardIndexReaderContext> T extractFromBytes(ForwardIndexReader<R> reader, R context,
+ int docId, TypeRef<T> ref) {
+ try {
+ return JSON_PARSER_CONTEXT.parseUtf8(reader.getBytes(docId, context)).read(_jsonPath, ref);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
@Nullable
private <T> T extractFromBytesWithExactBigDecimal(Dictionary dictionary, int dictId) {
try {
@@ -470,6 +500,15 @@ public final class DefaultJsonPathEvaluator implements JsonPathEvaluator {
}
}
+ @Nullable
+ private <T> T extractFromString(Dictionary dictionary, int dictId, TypeRef<T> ref) {
+ try {
+ return JSON_PARSER_CONTEXT.parse(dictionary.getStringValue(dictId)).read(_jsonPath, ref);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
@Nullable
private <T, R extends ForwardIndexReaderContext> T extractFromString(ForwardIndexReader<R> reader, R context,
int docId) {
@@ -480,6 +519,16 @@ public final class DefaultJsonPathEvaluator implements JsonPathEvaluator {
}
}
+ @Nullable
+ private <T, R extends ForwardIndexReaderContext> T extractFromString(ForwardIndexReader<R> reader, R context,
+ int docId, TypeRef<T> ref) {
+ try {
+ return JSON_PARSER_CONTEXT.parseUtf8(reader.getBytes(docId, context)).read(_jsonPath, ref);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
@Nullable
private <T> T extractFromStringWithExactBigDecimal(Dictionary dictionary, int dictId) {
try {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluatorTest.java
new file mode 100644
index 0000000000..c7b304667c
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/evaluators/DefaultJsonPathEvaluatorTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.common.evaluators;
+
+import java.nio.charset.StandardCharsets;
+import org.apache.pinot.segment.spi.evaluator.json.JsonPathEvaluator;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
+
+
+public class DefaultJsonPathEvaluatorTest {
+ @Test
+ public void testNonDictIntegerArray() {
+ String json = "{\"values\": [1, 2, 3, 4, 5]}";
+ String path = "$.values[0:3]";
+ JsonPathEvaluator evaluator = DefaultJsonPathEvaluator.create(path, new int[]{});
+ ForwardIndexReader<ForwardIndexReaderContext> reader = mock(ForwardIndexReader.class);
+ when(reader.isDictionaryEncoded()).thenReturn(false);
+ when(reader.getBytes(eq(0), any())).thenReturn(json.getBytes(StandardCharsets.UTF_8));
+ when(reader.getStoredType()).thenReturn(FieldSpec.DataType.STRING);
+ when(reader.createContext()).thenReturn(null);
+
+ // Read as ints
+ int[][] buffer = new int[1][3];
+ evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, buffer);
+ assertArrayEquals(buffer, new int[][]{{1, 2, 3}});
+
+ // Read as longs
+ long[][] longBuffer = new long[1][3];
+ evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, longBuffer);
+ assertArrayEquals(longBuffer, new long[][]{{1, 2, 3}});
+
+ // Read as floats
+ float[][] floatBuffer = new float[1][3];
+ evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, floatBuffer);
+ assertArrayEquals(floatBuffer, new float[][]{{1.0f, 2.0f, 3.0f}});
+
+ // Read as doubles
+ double[][] doubleBuffer = new double[1][3];
+ evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, doubleBuffer);
+ assertArrayEquals(doubleBuffer, new double[][]{{1.0, 2.0, 3.0}});
+
+ // Read as strings
+ String[][] stringBuffer = new String[1][3];
+ evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, stringBuffer);
+ assertArrayEquals(stringBuffer, new String[][]{{"1", "2", "3"}});
+ }
+
+ @Test
+ public void testNonDictStringArray() {
+ String json = "{\"values\": [\"1\", \"2\", \"3\", \"4\", \"5\"]}";
+ String path = "$.values[0:3]";
+ JsonPathEvaluator evaluator = DefaultJsonPathEvaluator.create(path, new int[]{});
+ ForwardIndexReader<ForwardIndexReaderContext> reader = mock(ForwardIndexReader.class);
+ when(reader.isDictionaryEncoded()).thenReturn(false);
+ when(reader.getBytes(eq(0), any())).thenReturn(json.getBytes(StandardCharsets.UTF_8));
+ when(reader.getStoredType()).thenReturn(FieldSpec.DataType.STRING);
+ when(reader.createContext()).thenReturn(null);
+
+ // Read as ints
+ int[][] buffer = new int[1][3];
+ evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, buffer);
+ assertArrayEquals(buffer, new int[][]{{1, 2, 3}});
+
+ // Read as longs
+ long[][] longBuffer = new long[1][3];
+ evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, longBuffer);
+ assertArrayEquals(longBuffer, new long[][]{{1, 2, 3}});
+
+ // Read as floats
+ float[][] floatBuffer = new float[1][3];
+ evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, floatBuffer);
+ assertArrayEquals(floatBuffer, new float[][]{{1.0f, 2.0f, 3.0f}});
+
+ // Read as doubles
+ double[][] doubleBuffer = new double[1][3];
+ evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, doubleBuffer);
+ assertArrayEquals(doubleBuffer, new double[][]{{1.0, 2.0, 3.0}});
+
+ // Read as strings
+ String[][] stringBuffer = new String[1][3];
+ evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, stringBuffer);
+ assertArrayEquals(stringBuffer, new String[][]{{"1", "2", "3"}});
+ }
+
+ @Test
+ public void testNonDictDoubleArray() {
+ String json = "{\"values\": [1.0, 2.0, 3.0, 4.0, 5.0]}";
+ String path = "$.values[0:3]";
+ JsonPathEvaluator evaluator = DefaultJsonPathEvaluator.create(path, new int[]{});
+ ForwardIndexReader<ForwardIndexReaderContext> reader = mock(ForwardIndexReader.class);
+ when(reader.isDictionaryEncoded()).thenReturn(false);
+ when(reader.getBytes(eq(0), any())).thenReturn(json.getBytes(StandardCharsets.UTF_8));
+ when(reader.getStoredType()).thenReturn(FieldSpec.DataType.STRING);
+ when(reader.createContext()).thenReturn(null);
+
+ // Read as ints
+ int[][] buffer = new int[1][3];
+ evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, buffer);
+ assertArrayEquals(buffer, new int[][]{{1, 2, 3}});
+
+ // Read as longs
+ long[][] longBuffer = new long[1][3];
+ evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, longBuffer);
+ assertArrayEquals(longBuffer, new long[][]{{1, 2, 3}});
+
+ // Read as floats
+ float[][] floatBuffer = new float[1][3];
+ evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, floatBuffer);
+ assertArrayEquals(floatBuffer, new float[][]{{1.0f, 2.0f, 3.0f}});
+
+ // Read as doubles
+ double[][] doubleBuffer = new double[1][3];
+ evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, doubleBuffer);
+ assertArrayEquals(doubleBuffer, new double[][]{{1.0, 2.0, 3.0}});
+
+ // Read as strings
+ String[][] stringBuffer = new String[1][3];
+ evaluator.evaluateBlock(new int[]{0}, 1, reader, null, null, null, stringBuffer);
+ assertArrayEquals(stringBuffer, new String[][]{{"1.0", "2.0", "3.0"}});
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org