You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/06/22 06:43:43 UTC
[incubator-pinot] branch master updated: Misc fixes for json data
type (#7057)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new bd792a3 Misc fixes for json data type (#7057)
bd792a3 is described below
commit bd792a3dc489546d6b9661ae20e98a7b7859a8f2
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Mon Jun 21 23:43:28 2021 -0700
Misc fixes for json data type (#7057)
- Fix realtime-table with raw JSON data type index
- Enhance `JsonExtractScalarTransformFunction`:
- Handle exception when reading json path
- Reuse value buffer
- In json quick-start, configure column as JSON data type
---
.../JsonExtractScalarTransformFunction.java | 304 +++++++++++++--------
.../indexsegment/mutable/MutableSegmentImpl.java | 26 +-
.../batch/githubEvents/githubEvents_schema.json | 9 +-
.../stream/meetupRsvp/json_meetupRsvp_schema.json | 12 +-
4 files changed, 216 insertions(+), 135 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java
index 0c7ae69..2aeecc8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java
@@ -27,12 +27,12 @@ import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
import com.jayway.jsonpath.spi.json.JsonProvider;
import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider;
import com.jayway.jsonpath.spi.mapper.MappingProvider;
-import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.core.operator.blocks.ProjectionBlock;
import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.plan.DocIdSetPlanNode;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -105,210 +105,298 @@ public class JsonExtractScalarTransformFunction extends BaseTransformFunction {
@Override
public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
- final String[] stringValuesSV = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
- final int[] results = new int[projectionBlock.getNumDocs()];
- for (int i = 0; i < results.length; i++) {
- Object read = JSON_PARSER_CONTEXT.parse(stringValuesSV[i]).read(_jsonPath);
- if (read == null) {
+ if (_intValuesSV == null) {
+ _intValuesSV = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+ }
+
+ String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
+ int numDocs = projectionBlock.getNumDocs();
+ for (int i = 0; i < numDocs; i++) {
+ Object result = null;
+ try {
+ result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+ } catch (Exception ignored) {
+ }
+ if (result == null) {
if (_defaultValue != null) {
- results[i] = (int) _defaultValue;
+ _intValuesSV[i] = (int) _defaultValue;
continue;
}
throw new RuntimeException(
- String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPath, stringValuesSV[i]));
+ String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPath, jsonStrings[i]));
}
- if (read instanceof Number) {
- results[i] = ((Number) read).intValue();
+ if (result instanceof Number) {
+ _intValuesSV[i] = ((Number) result).intValue();
} else {
- results[i] = Integer.parseInt(read.toString());
+ _intValuesSV[i] = Integer.parseInt(result.toString());
}
}
- return results;
+ return _intValuesSV;
}
@Override
public long[] transformToLongValuesSV(ProjectionBlock projectionBlock) {
- final String[] stringValuesSV = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
- final long[] results = new long[projectionBlock.getNumDocs()];
- for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
- Object read = JSON_PARSER_CONTEXT.parse(stringValuesSV[i]).read(_jsonPath);
- if (read == null) {
+ if (_longValuesSV == null) {
+ _longValuesSV = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+ }
+
+ String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
+ int numDocs = projectionBlock.getNumDocs();
+ for (int i = 0; i < numDocs; i++) {
+ Object result = null;
+ try {
+ result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+ } catch (Exception ignored) {
+ }
+ if (result == null) {
if (_defaultValue != null) {
- results[i] = (long) _defaultValue;
+ _longValuesSV[i] = (long) _defaultValue;
continue;
}
throw new RuntimeException(
- String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPath, stringValuesSV[i]));
+ String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPath, jsonStrings[i]));
}
- if (read instanceof Number) {
- results[i] = ((Number) read).longValue();
+ if (result instanceof Number) {
+ _longValuesSV[i] = ((Number) result).longValue();
} else {
// Handle scientific notation
- results[i] = Double.valueOf(read.toString()).longValue();
+ _longValuesSV[i] = (long) Double.parseDouble(result.toString());
}
}
- return results;
+ return _longValuesSV;
}
@Override
public float[] transformToFloatValuesSV(ProjectionBlock projectionBlock) {
- final String[] stringValuesSV = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
- final float[] results = new float[projectionBlock.getNumDocs()];
- for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
- Object read = JSON_PARSER_CONTEXT.parse(stringValuesSV[i]).read(_jsonPath);
- if (read == null) {
+ if (_floatValuesSV == null) {
+ _floatValuesSV = new float[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+ }
+
+ String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
+ int numDocs = projectionBlock.getNumDocs();
+ for (int i = 0; i < numDocs; i++) {
+ Object result = null;
+ try {
+ result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+ } catch (Exception ignored) {
+ }
+ if (result == null) {
if (_defaultValue != null) {
- results[i] = (float) _defaultValue;
+ _floatValuesSV[i] = (float) _defaultValue;
continue;
}
throw new RuntimeException(
- String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPath, stringValuesSV[i]));
+ String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPath, jsonStrings[i]));
}
- if (read instanceof Number) {
- results[i] = ((Number) read).floatValue();
+ if (result instanceof Number) {
+ _floatValuesSV[i] = ((Number) result).floatValue();
} else {
- results[i] = Double.valueOf(read.toString()).floatValue();
+ _floatValuesSV[i] = Float.parseFloat(result.toString());
}
}
- return results;
+ return _floatValuesSV;
}
@Override
public double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock) {
- final String[] stringValuesSV = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
- final double[] results = new double[projectionBlock.getNumDocs()];
- for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
- Object read = JSON_PARSER_CONTEXT.parse(stringValuesSV[i]).read(_jsonPath);
- if (read == null) {
+ if (_doubleValuesSV == null) {
+ _doubleValuesSV = new double[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+ }
+
+ String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
+ int numDocs = projectionBlock.getNumDocs();
+ for (int i = 0; i < numDocs; i++) {
+ Object result = null;
+ try {
+ result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+ } catch (Exception ignored) {
+ }
+ if (result == null) {
if (_defaultValue != null) {
- results[i] = (double) _defaultValue;
+ _doubleValuesSV[i] = (double) _defaultValue;
continue;
}
throw new RuntimeException(
- String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPath, stringValuesSV[i]));
+ String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPath, jsonStrings[i]));
}
- if (read instanceof Number) {
- results[i] = ((Number) read).doubleValue();
- } else if (read instanceof BigDecimal) {
- results[i] = ((BigDecimal) read).doubleValue();
+ if (result instanceof Number) {
+ _doubleValuesSV[i] = ((Number) result).doubleValue();
} else {
- results[i] = Double.valueOf(read.toString()).doubleValue();
+ _doubleValuesSV[i] = Double.parseDouble(result.toString());
}
}
- return results;
+ return _doubleValuesSV;
}
@Override
public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) {
- final String[] stringValuesSV = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
- final String[] results = new String[projectionBlock.getNumDocs()];
- for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
- Object read = JSON_PARSER_CONTEXT.parse(stringValuesSV[i]).read(_jsonPath);
- if (read == null) {
+ if (_stringValuesSV == null) {
+ _stringValuesSV = new String[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+ }
+
+ String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
+ int numDocs = projectionBlock.getNumDocs();
+ for (int i = 0; i < numDocs; i++) {
+ Object result = null;
+ try {
+ result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+ } catch (Exception ignored) {
+ }
+ if (result == null) {
if (_defaultValue != null) {
- results[i] = (String) _defaultValue;
+ _stringValuesSV[i] = (String) _defaultValue;
continue;
}
throw new RuntimeException(
- String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPath, stringValuesSV[i]));
+ String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPath, jsonStrings[i]));
}
- if (read instanceof String) {
- results[i] = read.toString();
+ if (result instanceof String) {
+ _stringValuesSV[i] = (String) result;
} else {
- results[i] = JsonUtils.objectToJsonNode(read).toString();
+ _stringValuesSV[i] = JsonUtils.objectToJsonNode(result).toString();
}
}
- return results;
+ return _stringValuesSV;
}
@Override
public int[][] transformToIntValuesMV(ProjectionBlock projectionBlock) {
- final String[] stringValuesMV = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
- final int[][] results = new int[projectionBlock.getNumDocs()][];
- for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
- final List<Integer> intVals = JSON_PARSER_CONTEXT.parse(stringValuesMV[i]).read(_jsonPath);
- if (intVals == null) {
- results[i] = new int[0];
+ if (_intValuesMV == null) {
+ _intValuesMV = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
+ }
+
+ String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
+ int numDocs = projectionBlock.getNumDocs();
+ for (int i = 0; i < numDocs; i++) {
+ List<Integer> result = null;
+ try {
+ result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+ } catch (Exception ignored) {
+ }
+ if (result == null) {
+ _intValuesMV[i] = new int[0];
continue;
}
- results[i] = new int[intVals.size()];
- for (int j = 0; j < intVals.size(); j++) {
- results[i][j] = intVals.get(j);
+ int numValues = result.size();
+ int[] values = new int[numValues];
+ for (int j = 0; j < numValues; j++) {
+ values[j] = result.get(j);
}
+ _intValuesMV[i] = values;
}
- return results;
+ return _intValuesMV;
}
@Override
public long[][] transformToLongValuesMV(ProjectionBlock projectionBlock) {
- final String[] stringValuesMV = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
- final long[][] results = new long[projectionBlock.getNumDocs()][];
- for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
- final List<Long> longVals = JSON_PARSER_CONTEXT.parse(stringValuesMV[i]).read(_jsonPath);
- if (longVals == null) {
- results[i] = new long[0];
+ if (_longValuesMV == null) {
+ _longValuesMV = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
+ }
+
+ String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
+ int length = projectionBlock.getNumDocs();
+ for (int i = 0; i < length; i++) {
+ List<Long> result = null;
+ try {
+ result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+ } catch (Exception ignored) {
+ }
+ if (result == null) {
+ _longValuesMV[i] = new long[0];
continue;
}
- results[i] = new long[longVals.size()];
- for (int j = 0; j < longVals.size(); j++) {
- results[i][j] = longVals.get(j);
+ int numValues = result.size();
+ long[] values = new long[numValues];
+ for (int j = 0; j < numValues; j++) {
+ values[j] = result.get(j);
}
+ _longValuesMV[i] = values;
}
- return results;
+ return _longValuesMV;
}
@Override
public float[][] transformToFloatValuesMV(ProjectionBlock projectionBlock) {
- final String[] stringValuesMV = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
- final float[][] results = new float[projectionBlock.getNumDocs()][];
- for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
- final List<Float> floatVals = JSON_PARSER_CONTEXT.parse(stringValuesMV[i]).read(_jsonPath);
- if (floatVals == null) {
- results[i] = new float[0];
+ if (_floatValuesMV == null) {
+ _floatValuesMV = new float[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
+ }
+
+ String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
+ int length = projectionBlock.getNumDocs();
+ for (int i = 0; i < length; i++) {
+ List<Float> result = null;
+ try {
+ result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+ } catch (Exception ignored) {
+ }
+ if (result == null) {
+ _floatValuesMV[i] = new float[0];
continue;
}
- results[i] = new float[floatVals.size()];
- for (int j = 0; j < floatVals.size(); j++) {
- results[i][j] = floatVals.get(j);
+ int numValues = result.size();
+ float[] values = new float[numValues];
+ for (int j = 0; j < numValues; j++) {
+ values[j] = result.get(j);
}
+ _floatValuesMV[i] = values;
}
- return results;
+ return _floatValuesMV;
}
@Override
public double[][] transformToDoubleValuesMV(ProjectionBlock projectionBlock) {
- final String[] stringValuesMV = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
- final double[][] results = new double[projectionBlock.getNumDocs()][];
- for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
- final List<Double> doubleVals = JSON_PARSER_CONTEXT.parse(stringValuesMV[i]).read(_jsonPath);
- if (doubleVals == null) {
- results[i] = new double[0];
+ if (_doubleValuesMV == null) {
+ _doubleValuesMV = new double[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
+ }
+
+ String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
+ int length = projectionBlock.getNumDocs();
+ for (int i = 0; i < length; i++) {
+ List<Double> result = null;
+ try {
+ result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+ } catch (Exception ignored) {
+ }
+ if (result == null) {
+ _doubleValuesMV[i] = new double[0];
continue;
}
- results[i] = new double[doubleVals.size()];
- for (int j = 0; j < doubleVals.size(); j++) {
- results[i][j] = doubleVals.get(j);
+ int numValues = result.size();
+ double[] values = new double[numValues];
+ for (int j = 0; j < numValues; j++) {
+ values[j] = result.get(j);
}
+ _doubleValuesMV[i] = values;
}
- return results;
+ return _doubleValuesMV;
}
@Override
public String[][] transformToStringValuesMV(ProjectionBlock projectionBlock) {
- final String[] stringValuesMV = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
- final String[][] results = new String[projectionBlock.getNumDocs()][];
- for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
- final List<String> stringVals = JSON_PARSER_CONTEXT.parse(stringValuesMV[i]).read(_jsonPath);
- if (stringVals == null) {
- results[i] = new String[0];
+ if (_stringValuesMV == null) {
+ _stringValuesMV = new String[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
+ }
+
+ String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
+ int length = projectionBlock.getNumDocs();
+ for (int i = 0; i < length; i++) {
+ List<String> result = null;
+ try {
+ result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+ } catch (Exception ignored) {
+ }
+ if (result == null) {
+ _stringValuesMV[i] = new String[0];
continue;
}
- results[i] = new String[stringVals.size()];
- for (int j = 0; j < stringVals.size(); j++) {
- results[i][j] = stringVals.get(j);
+ int numValues = result.size();
+ String[] values = new String[numValues];
+ for (int j = 0; j < numValues; j++) {
+ values[j] = result.get(j);
}
+ _stringValuesMV[i] = values;
}
- return results;
+ return _stringValuesMV;
}
static {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 50dea95..6a09d63 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -178,20 +178,20 @@ public class MutableSegmentImpl implements MutableSegment {
realtimeSegmentZKMetadata.getEndTime(), realtimeSegmentZKMetadata.getTimeUnit(),
realtimeSegmentZKMetadata.getTotalDocs(), realtimeSegmentZKMetadata.getCrc(), _schema) {
@Override
- public int getTotalDocs() {
- return _numDocsIndexed;
- }
+ public int getTotalDocs() {
+ return _numDocsIndexed;
+ }
- @Override
- public long getLastIndexedTimestamp() {
- return _lastIndexedTimeMs;
- }
+ @Override
+ public long getLastIndexedTimestamp() {
+ return _lastIndexedTimeMs;
+ }
- @Override
- public long getLatestIngestionTimestamp() {
- return _latestIngestionTimeMs;
- }
- };
+ @Override
+ public long getLatestIngestionTimestamp() {
+ return _latestIngestionTimeMs;
+ }
+ };
_offHeap = config.isOffHeap();
_memoryManager = config.getMemoryManager();
@@ -581,7 +581,7 @@ public class MutableSegmentImpl implements MutableSegment {
// Update forward index
DataType dataType = fieldSpec.getDataType();
- switch (dataType) {
+ switch (dataType.getStoredType()) {
case INT:
forwardIndex.setInt(docId, (Integer) value);
break;
diff --git a/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_schema.json b/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_schema.json
index 8ff7135..aafd259 100644
--- a/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_schema.json
+++ b/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_schema.json
@@ -10,18 +10,15 @@
},
{
"name": "actor",
- "dataType": "STRING",
- "maxLength": 2147483647
+ "dataType": "JSON"
},
{
"name": "repo",
- "dataType": "STRING",
- "maxLength": 2147483647
+ "dataType": "JSON"
},
{
"name": "payload",
- "dataType": "STRING",
- "maxLength": 2147483647
+ "dataType": "JSON"
},
{
"name": "public",
diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_schema.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_schema.json
index 4c51869..afeee95 100644
--- a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_schema.json
+++ b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_schema.json
@@ -3,23 +3,19 @@
"dimensionFieldSpecs": [
{
"name": "event_json",
- "dataType": "STRING",
- "maxLength": 2147483647
+ "dataType": "JSON"
},
{
"name": "group_json",
- "dataType": "STRING",
- "maxLength": 2147483647
+ "dataType": "JSON"
},
{
"name": "member_json",
- "dataType": "STRING",
- "maxLength": 2147483647
+ "dataType": "JSON"
},
{
"name": "venue_json",
- "dataType": "STRING",
- "maxLength": 2147483647
+ "dataType": "JSON"
},
{
"name": "response",
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org