You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/06/22 06:43:43 UTC

[incubator-pinot] branch master updated: Misc fixes for json data type (#7057)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new bd792a3  Misc fixes for json data type (#7057)
bd792a3 is described below

commit bd792a3dc489546d6b9661ae20e98a7b7859a8f2
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Mon Jun 21 23:43:28 2021 -0700

    Misc fixes for json data type (#7057)
    
    - Fix realtime-table with raw JSON data type index
    - Enhance `JsonExtractScalarTransformFunction`:
      - Handle exception when reading json path
      - Reuse value buffer
    - In json quick-start, configure column as JSON data type
---
 .../JsonExtractScalarTransformFunction.java        | 304 +++++++++++++--------
 .../indexsegment/mutable/MutableSegmentImpl.java   |  26 +-
 .../batch/githubEvents/githubEvents_schema.json    |   9 +-
 .../stream/meetupRsvp/json_meetupRsvp_schema.json  |  12 +-
 4 files changed, 216 insertions(+), 135 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java
index 0c7ae69..2aeecc8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/JsonExtractScalarTransformFunction.java
@@ -27,12 +27,12 @@ import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
 import com.jayway.jsonpath.spi.json.JsonProvider;
 import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider;
 import com.jayway.jsonpath.spi.mapper.MappingProvider;
-import java.math.BigDecimal;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.core.operator.blocks.ProjectionBlock;
 import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.plan.DocIdSetPlanNode;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -105,210 +105,298 @@ public class JsonExtractScalarTransformFunction extends BaseTransformFunction {
 
   @Override
   public int[] transformToIntValuesSV(ProjectionBlock projectionBlock) {
-    final String[] stringValuesSV = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
-    final int[] results = new int[projectionBlock.getNumDocs()];
-    for (int i = 0; i < results.length; i++) {
-      Object read = JSON_PARSER_CONTEXT.parse(stringValuesSV[i]).read(_jsonPath);
-      if (read == null) {
+    if (_intValuesSV == null) {
+      _intValuesSV = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    }
+
+    String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
+    int numDocs = projectionBlock.getNumDocs();
+    for (int i = 0; i < numDocs; i++) {
+      Object result = null;
+      try {
+        result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+      } catch (Exception ignored) {
+      }
+      if (result == null) {
         if (_defaultValue != null) {
-          results[i] = (int) _defaultValue;
+          _intValuesSV[i] = (int) _defaultValue;
           continue;
         }
         throw new RuntimeException(
-            String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPath, stringValuesSV[i]));
+            String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPath, jsonStrings[i]));
       }
-      if (read instanceof Number) {
-        results[i] = ((Number) read).intValue();
+      if (result instanceof Number) {
+        _intValuesSV[i] = ((Number) result).intValue();
       } else {
-        results[i] = Integer.parseInt(read.toString());
+        _intValuesSV[i] = Integer.parseInt(result.toString());
       }
     }
-    return results;
+    return _intValuesSV;
   }
 
   @Override
   public long[] transformToLongValuesSV(ProjectionBlock projectionBlock) {
-    final String[] stringValuesSV = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
-    final long[] results = new long[projectionBlock.getNumDocs()];
-    for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
-      Object read = JSON_PARSER_CONTEXT.parse(stringValuesSV[i]).read(_jsonPath);
-      if (read == null) {
+    if (_longValuesSV == null) {
+      _longValuesSV = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    }
+
+    String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
+    int numDocs = projectionBlock.getNumDocs();
+    for (int i = 0; i < numDocs; i++) {
+      Object result = null;
+      try {
+        result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+      } catch (Exception ignored) {
+      }
+      if (result == null) {
         if (_defaultValue != null) {
-          results[i] = (long) _defaultValue;
+          _longValuesSV[i] = (long) _defaultValue;
           continue;
         }
         throw new RuntimeException(
-            String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPath, stringValuesSV[i]));
+            String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPath, jsonStrings[i]));
       }
-      if (read instanceof Number) {
-        results[i] = ((Number) read).longValue();
+      if (result instanceof Number) {
+        _longValuesSV[i] = ((Number) result).longValue();
       } else {
         // Handle scientific notation
-        results[i] = Double.valueOf(read.toString()).longValue();
+        _longValuesSV[i] = (long) Double.parseDouble(result.toString());
       }
     }
-    return results;
+    return _longValuesSV;
   }
 
   @Override
   public float[] transformToFloatValuesSV(ProjectionBlock projectionBlock) {
-    final String[] stringValuesSV = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
-    final float[] results = new float[projectionBlock.getNumDocs()];
-    for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
-      Object read = JSON_PARSER_CONTEXT.parse(stringValuesSV[i]).read(_jsonPath);
-      if (read == null) {
+    if (_floatValuesSV == null) {
+      _floatValuesSV = new float[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    }
+
+    String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
+    int numDocs = projectionBlock.getNumDocs();
+    for (int i = 0; i < numDocs; i++) {
+      Object result = null;
+      try {
+        result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+      } catch (Exception ignored) {
+      }
+      if (result == null) {
         if (_defaultValue != null) {
-          results[i] = (float) _defaultValue;
+          _floatValuesSV[i] = (float) _defaultValue;
           continue;
         }
         throw new RuntimeException(
-            String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPath, stringValuesSV[i]));
+            String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPath, jsonStrings[i]));
       }
-      if (read instanceof Number) {
-        results[i] = ((Number) read).floatValue();
+      if (result instanceof Number) {
+        _floatValuesSV[i] = ((Number) result).floatValue();
       } else {
-        results[i] = Double.valueOf(read.toString()).floatValue();
+        _floatValuesSV[i] = Float.parseFloat(result.toString());
       }
     }
-    return results;
+    return _floatValuesSV;
   }
 
   @Override
   public double[] transformToDoubleValuesSV(ProjectionBlock projectionBlock) {
-    final String[] stringValuesSV = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
-    final double[] results = new double[projectionBlock.getNumDocs()];
-    for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
-      Object read = JSON_PARSER_CONTEXT.parse(stringValuesSV[i]).read(_jsonPath);
-      if (read == null) {
+    if (_doubleValuesSV == null) {
+      _doubleValuesSV = new double[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    }
+
+    String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
+    int numDocs = projectionBlock.getNumDocs();
+    for (int i = 0; i < numDocs; i++) {
+      Object result = null;
+      try {
+        result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+      } catch (Exception ignored) {
+      }
+      if (result == null) {
         if (_defaultValue != null) {
-          results[i] = (double) _defaultValue;
+          _doubleValuesSV[i] = (double) _defaultValue;
           continue;
         }
         throw new RuntimeException(
-            String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPath, stringValuesSV[i]));
+            String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPath, jsonStrings[i]));
       }
-      if (read instanceof Number) {
-        results[i] = ((Number) read).doubleValue();
-      } else if (read instanceof BigDecimal) {
-        results[i] = ((BigDecimal) read).doubleValue();
+      if (result instanceof Number) {
+        _doubleValuesSV[i] = ((Number) result).doubleValue();
       } else {
-        results[i] = Double.valueOf(read.toString()).doubleValue();
+        _doubleValuesSV[i] = Double.parseDouble(result.toString());
       }
     }
-    return results;
+    return _doubleValuesSV;
   }
 
   @Override
   public String[] transformToStringValuesSV(ProjectionBlock projectionBlock) {
-    final String[] stringValuesSV = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
-    final String[] results = new String[projectionBlock.getNumDocs()];
-    for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
-      Object read = JSON_PARSER_CONTEXT.parse(stringValuesSV[i]).read(_jsonPath);
-      if (read == null) {
+    if (_stringValuesSV == null) {
+      _stringValuesSV = new String[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+    }
+
+    String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
+    int numDocs = projectionBlock.getNumDocs();
+    for (int i = 0; i < numDocs; i++) {
+      Object result = null;
+      try {
+        result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+      } catch (Exception ignored) {
+      }
+      if (result == null) {
         if (_defaultValue != null) {
-          results[i] = (String) _defaultValue;
+          _stringValuesSV[i] = (String) _defaultValue;
           continue;
         }
         throw new RuntimeException(
-            String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPath, stringValuesSV[i]));
+            String.format("Illegal Json Path: [%s], when reading [%s]", _jsonPath, jsonStrings[i]));
       }
-      if (read instanceof String) {
-        results[i] = read.toString();
+      if (result instanceof String) {
+        _stringValuesSV[i] = (String) result;
       } else {
-        results[i] = JsonUtils.objectToJsonNode(read).toString();
+        _stringValuesSV[i] = JsonUtils.objectToJsonNode(result).toString();
       }
     }
-    return results;
+    return _stringValuesSV;
   }
 
   @Override
   public int[][] transformToIntValuesMV(ProjectionBlock projectionBlock) {
-    final String[] stringValuesMV = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
-    final int[][] results = new int[projectionBlock.getNumDocs()][];
-    for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
-      final List<Integer> intVals = JSON_PARSER_CONTEXT.parse(stringValuesMV[i]).read(_jsonPath);
-      if (intVals == null) {
-        results[i] = new int[0];
+    if (_intValuesMV == null) {
+      _intValuesMV = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
+    }
+
+    String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
+    int numDocs = projectionBlock.getNumDocs();
+    for (int i = 0; i < numDocs; i++) {
+      List<Integer> result = null;
+      try {
+        result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+      } catch (Exception ignored) {
+      }
+      if (result == null) {
+        _intValuesMV[i] = new int[0];
         continue;
       }
-      results[i] = new int[intVals.size()];
-      for (int j = 0; j < intVals.size(); j++) {
-        results[i][j] = intVals.get(j);
+      int numValues = result.size();
+      int[] values = new int[numValues];
+      for (int j = 0; j < numValues; j++) {
+        values[j] = result.get(j);
       }
+      _intValuesMV[i] = values;
     }
-    return results;
+    return _intValuesMV;
   }
 
   @Override
   public long[][] transformToLongValuesMV(ProjectionBlock projectionBlock) {
-    final String[] stringValuesMV = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
-    final long[][] results = new long[projectionBlock.getNumDocs()][];
-    for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
-      final List<Long> longVals = JSON_PARSER_CONTEXT.parse(stringValuesMV[i]).read(_jsonPath);
-      if (longVals == null) {
-        results[i] = new long[0];
+    if (_longValuesMV == null) {
+      _longValuesMV = new long[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
+    }
+
+    String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
+    int length = projectionBlock.getNumDocs();
+    for (int i = 0; i < length; i++) {
+      List<Long> result = null;
+      try {
+        result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+      } catch (Exception ignored) {
+      }
+      if (result == null) {
+        _longValuesMV[i] = new long[0];
         continue;
       }
-      results[i] = new long[longVals.size()];
-      for (int j = 0; j < longVals.size(); j++) {
-        results[i][j] = longVals.get(j);
+      int numValues = result.size();
+      long[] values = new long[numValues];
+      for (int j = 0; j < numValues; j++) {
+        values[j] = result.get(j);
       }
+      _longValuesMV[i] = values;
     }
-    return results;
+    return _longValuesMV;
   }
 
   @Override
   public float[][] transformToFloatValuesMV(ProjectionBlock projectionBlock) {
-    final String[] stringValuesMV = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
-    final float[][] results = new float[projectionBlock.getNumDocs()][];
-    for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
-      final List<Float> floatVals = JSON_PARSER_CONTEXT.parse(stringValuesMV[i]).read(_jsonPath);
-      if (floatVals == null) {
-        results[i] = new float[0];
+    if (_floatValuesMV == null) {
+      _floatValuesMV = new float[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
+    }
+
+    String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
+    int length = projectionBlock.getNumDocs();
+    for (int i = 0; i < length; i++) {
+      List<Float> result = null;
+      try {
+        result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+      } catch (Exception ignored) {
+      }
+      if (result == null) {
+        _floatValuesMV[i] = new float[0];
         continue;
       }
-      results[i] = new float[floatVals.size()];
-      for (int j = 0; j < floatVals.size(); j++) {
-        results[i][j] = floatVals.get(j);
+      int numValues = result.size();
+      float[] values = new float[numValues];
+      for (int j = 0; j < numValues; j++) {
+        values[j] = result.get(j);
       }
+      _floatValuesMV[i] = values;
     }
-    return results;
+    return _floatValuesMV;
   }
 
   @Override
   public double[][] transformToDoubleValuesMV(ProjectionBlock projectionBlock) {
-    final String[] stringValuesMV = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
-    final double[][] results = new double[projectionBlock.getNumDocs()][];
-    for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
-      final List<Double> doubleVals = JSON_PARSER_CONTEXT.parse(stringValuesMV[i]).read(_jsonPath);
-      if (doubleVals == null) {
-        results[i] = new double[0];
+    if (_doubleValuesMV == null) {
+      _doubleValuesMV = new double[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
+    }
+
+    String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
+    int length = projectionBlock.getNumDocs();
+    for (int i = 0; i < length; i++) {
+      List<Double> result = null;
+      try {
+        result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+      } catch (Exception ignored) {
+      }
+      if (result == null) {
+        _doubleValuesMV[i] = new double[0];
         continue;
       }
-      results[i] = new double[doubleVals.size()];
-      for (int j = 0; j < doubleVals.size(); j++) {
-        results[i][j] = doubleVals.get(j);
+      int numValues = result.size();
+      double[] values = new double[numValues];
+      for (int j = 0; j < numValues; j++) {
+        values[j] = result.get(j);
       }
+      _doubleValuesMV[i] = values;
     }
-    return results;
+    return _doubleValuesMV;
   }
 
   @Override
   public String[][] transformToStringValuesMV(ProjectionBlock projectionBlock) {
-    final String[] stringValuesMV = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
-    final String[][] results = new String[projectionBlock.getNumDocs()][];
-    for (int i = 0; i < projectionBlock.getNumDocs(); i++) {
-      final List<String> stringVals = JSON_PARSER_CONTEXT.parse(stringValuesMV[i]).read(_jsonPath);
-      if (stringVals == null) {
-        results[i] = new String[0];
+    if (_stringValuesMV == null) {
+      _stringValuesMV = new String[DocIdSetPlanNode.MAX_DOC_PER_CALL][];
+    }
+
+    String[] jsonStrings = _jsonFieldTransformFunction.transformToStringValuesSV(projectionBlock);
+    int length = projectionBlock.getNumDocs();
+    for (int i = 0; i < length; i++) {
+      List<String> result = null;
+      try {
+        result = JSON_PARSER_CONTEXT.parse(jsonStrings[i]).read(_jsonPath);
+      } catch (Exception ignored) {
+      }
+      if (result == null) {
+        _stringValuesMV[i] = new String[0];
         continue;
       }
-      results[i] = new String[stringVals.size()];
-      for (int j = 0; j < stringVals.size(); j++) {
-        results[i][j] = stringVals.get(j);
+      int numValues = result.size();
+      String[] values = new String[numValues];
+      for (int j = 0; j < numValues; j++) {
+        values[j] = result.get(j);
       }
+      _stringValuesMV[i] = values;
     }
-    return results;
+    return _stringValuesMV;
   }
 
   static {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 50dea95..6a09d63 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -178,20 +178,20 @@ public class MutableSegmentImpl implements MutableSegment {
             realtimeSegmentZKMetadata.getEndTime(), realtimeSegmentZKMetadata.getTimeUnit(),
             realtimeSegmentZKMetadata.getTotalDocs(), realtimeSegmentZKMetadata.getCrc(), _schema) {
           @Override
-      public int getTotalDocs() {
-        return _numDocsIndexed;
-      }
+          public int getTotalDocs() {
+            return _numDocsIndexed;
+          }
 
-      @Override
-      public long getLastIndexedTimestamp() {
-        return _lastIndexedTimeMs;
-      }
+          @Override
+          public long getLastIndexedTimestamp() {
+            return _lastIndexedTimeMs;
+          }
 
-      @Override
-      public long getLatestIngestionTimestamp() {
-        return _latestIngestionTimeMs;
-      }
-    };
+          @Override
+          public long getLatestIngestionTimestamp() {
+            return _latestIngestionTimeMs;
+          }
+        };
 
     _offHeap = config.isOffHeap();
     _memoryManager = config.getMemoryManager();
@@ -581,7 +581,7 @@ public class MutableSegmentImpl implements MutableSegment {
 
           // Update forward index
           DataType dataType = fieldSpec.getDataType();
-          switch (dataType) {
+          switch (dataType.getStoredType()) {
             case INT:
               forwardIndex.setInt(docId, (Integer) value);
               break;
diff --git a/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_schema.json b/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_schema.json
index 8ff7135..aafd259 100644
--- a/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_schema.json
+++ b/pinot-tools/src/main/resources/examples/batch/githubEvents/githubEvents_schema.json
@@ -10,18 +10,15 @@
     },
     {
       "name": "actor",
-      "dataType": "STRING",
-      "maxLength": 2147483647
+      "dataType": "JSON"
     },
     {
       "name": "repo",
-      "dataType": "STRING",
-      "maxLength": 2147483647
+      "dataType": "JSON"
     },
     {
       "name": "payload",
-      "dataType": "STRING",
-      "maxLength": 2147483647
+      "dataType": "JSON"
     },
     {
       "name": "public",
diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_schema.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_schema.json
index 4c51869..afeee95 100644
--- a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_schema.json
+++ b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/json_meetupRsvp_schema.json
@@ -3,23 +3,19 @@
   "dimensionFieldSpecs": [
     {
       "name": "event_json",
-      "dataType": "STRING",
-      "maxLength": 2147483647
+      "dataType": "JSON"
     },
     {
       "name": "group_json",
-      "dataType": "STRING",
-      "maxLength": 2147483647
+      "dataType": "JSON"
     },
     {
       "name": "member_json",
-      "dataType": "STRING",
-      "maxLength": 2147483647
+      "dataType": "JSON"
     },
     {
       "name": "venue_json",
-      "dataType": "STRING",
-      "maxLength": 2147483647
+      "dataType": "JSON"
     },
     {
       "name": "response",

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org