You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by an...@apache.org on 2024/02/08 23:20:27 UTC

(pinot) branch master updated: Skip invalid json string rather than throwing error during json indexing (#12238)

This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8684e046c0 Skip invalid json string rather than throwing error during json indexing (#12238)
8684e046c0 is described below

commit 8684e046c0bce504224a3a6179be6b51117511ce
Author: Xuanyi Li <xu...@uber.com>
AuthorDate: Thu Feb 8 15:20:21 2024 -0800

    Skip invalid json string rather than throwing error during json indexing (#12238)
---
 .../realtime/impl/json/MutableJsonIndexImpl.java   |  3 +-
 .../impl/inv/json/BaseJsonIndexCreator.java        |  2 +-
 .../segment/local/segment/index/JsonIndexTest.java | 47 ++++++++++++++++++++++
 .../pinot/spi/config/table/JsonIndexConfig.java    | 20 +++++++--
 .../java/org/apache/pinot/spi/utils/JsonUtils.java | 19 ++++++++-
 5 files changed, 84 insertions(+), 7 deletions(-)

diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
index 46e7798204..7e632a19b2 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
@@ -78,8 +78,7 @@ public class MutableJsonIndexImpl implements MutableJsonIndex {
   public void add(String jsonString)
       throws IOException {
     try {
-      List<Map<String, String>> flattenedRecords =
-          JsonUtils.flatten(JsonUtils.stringToJsonNode(jsonString), _jsonIndexConfig);
+      List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonString, _jsonIndexConfig);
       _writeLock.lock();
       try {
         addFlattenedRecords(flattenedRecords);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java
index 264bc6044b..5c3409fe45 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java
@@ -91,7 +91,7 @@ public abstract class BaseJsonIndexCreator implements JsonIndexCreator {
   @Override
   public void add(String jsonString)
       throws IOException {
-    addFlattenedRecords(JsonUtils.flatten(JsonUtils.stringToJsonNode(jsonString), _jsonIndexConfig));
+    addFlattenedRecords(JsonUtils.flatten(jsonString, _jsonIndexConfig));
   }
 
   /**
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
index 4d3cbf8cae..461f8eb93e 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
@@ -18,9 +18,11 @@
  */
 package org.apache.pinot.segment.local.segment.index;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -69,6 +71,7 @@ public class JsonIndexTest {
     FileUtils.deleteDirectory(INDEX_DIR);
   }
 
+
   @Test
   public void testSmallIndex()
       throws Exception {
@@ -371,6 +374,50 @@ public class JsonIndexTest {
     }
   }
 
+  @Test
+  public void testSkipInvalidJsonEnable() throws Exception {
+    JsonIndexConfig jsonIndexConfig = new JsonIndexConfig();
+    jsonIndexConfig.setSkipInvalidJson(true);
+    // the braces don't match and cannot be parsed
+    String[] records = {"{\"key1\":\"va\""};
+
+    createIndex(true, jsonIndexConfig, records);
+    File onHeapIndexFile = new File(INDEX_DIR, ON_HEAP_COLUMN_NAME + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
+    Assert.assertTrue(onHeapIndexFile.exists());
+
+    createIndex(false, jsonIndexConfig, records);
+    File offHeapIndexFile = new File(INDEX_DIR, OFF_HEAP_COLUMN_NAME + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
+    Assert.assertTrue(offHeapIndexFile.exists());
+
+    try (PinotDataBuffer onHeapDataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(onHeapIndexFile);
+        PinotDataBuffer offHeapDataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(offHeapIndexFile);
+        JsonIndexReader onHeapIndexReader = new ImmutableJsonIndexReader(onHeapDataBuffer, records.length);
+        JsonIndexReader offHeapIndexReader = new ImmutableJsonIndexReader(offHeapDataBuffer, records.length);
+        MutableJsonIndexImpl mutableJsonIndex = new MutableJsonIndexImpl(jsonIndexConfig)) {
+      for (String record : records) {
+        mutableJsonIndex.add(record);
+      }
+      Map<String, RoaringBitmap> onHeapRes = onHeapIndexReader.getMatchingDocsMap("");
+      Map<String, RoaringBitmap> offHeapRes = offHeapIndexReader.getMatchingDocsMap("");
+      Map<String, RoaringBitmap> mutableRes = mutableJsonIndex.getMatchingDocsMap("");
+      Map<String, RoaringBitmap> expectedRes = Collections.singletonMap(JsonUtils.SKIPPED_VALUE_REPLACEMENT,
+          RoaringBitmap.bitmapOf(0));
+      Assert.assertEquals(expectedRes, onHeapRes);
+      Assert.assertEquals(expectedRes, offHeapRes);
+      Assert.assertEquals(expectedRes, mutableRes);
+    }
+  }
+
+  @Test(expectedExceptions = JsonProcessingException.class)
+  public void testSkipInvalidJsonDisabled() throws Exception {
+    // by default, skipInvalidJson is disabled
+    JsonIndexConfig jsonIndexConfig = new JsonIndexConfig();
+    // the braces don't match and cannot be parsed
+    String[] records = {"{\"key1\":\"va\""};
+
+    createIndex(true, jsonIndexConfig, records);
+  }
+
   public static class ConfTest extends AbstractSerdeIndexContract {
 
     @Test
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/JsonIndexConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/JsonIndexConfig.java
index cada2fe4a4..1a0964138e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/JsonIndexConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/JsonIndexConfig.java
@@ -41,6 +41,8 @@ import javax.annotation.Nullable;
  * - excludeFields: Exclude the given fields, e.g. "b", "c", even if it is under the included paths.
  * - maxValueLength: Exclude field values which are longer than this length. A value of "0" disables this filter.
  *                   Excluded values will be replaced with JsonUtils.SKIPPED_VALUE_REPLACEMENT.
+ * - skipInvalidJson: If the raw data is not a valid json string, then replace with {"":SKIPPED_VALUE_REPLACEMENT}
+ *                    and continue indexing on following Json records.
  */
 public class JsonIndexConfig extends IndexConfig {
   public static final JsonIndexConfig DISABLED = new JsonIndexConfig(true);
@@ -52,6 +54,7 @@ public class JsonIndexConfig extends IndexConfig {
   private Set<String> _excludePaths;
   private Set<String> _excludeFields;
   private int _maxValueLength = 0;
+  private boolean _skipInvalidJson = false;
 
   public JsonIndexConfig() {
     super(false);
@@ -68,7 +71,8 @@ public class JsonIndexConfig extends IndexConfig {
       @JsonProperty("includePaths") @Nullable Set<String> includePaths,
       @JsonProperty("excludePaths") @Nullable Set<String> excludePaths,
       @JsonProperty("excludeFields") @Nullable Set<String> excludeFields,
-      @JsonProperty("maxValueLength") int maxValueLength) {
+      @JsonProperty("maxValueLength") int maxValueLength,
+      @JsonProperty("skipInvalidJson") boolean skipInvalidJson) {
     super(disabled);
     _maxLevels = maxLevels;
     _excludeArray = excludeArray;
@@ -77,6 +81,7 @@ public class JsonIndexConfig extends IndexConfig {
     _excludePaths = excludePaths;
     _excludeFields = excludeFields;
     _maxValueLength = maxValueLength;
+    _skipInvalidJson = skipInvalidJson;
   }
 
   public int getMaxLevels() {
@@ -143,6 +148,14 @@ public class JsonIndexConfig extends IndexConfig {
     _maxValueLength = maxValueLength;
   }
 
+  public boolean getSkipInvalidJson() {
+    return _skipInvalidJson;
+  }
+
+  public void setSkipInvalidJson(boolean skipInvalidJson) {
+    _skipInvalidJson = skipInvalidJson;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -158,12 +171,13 @@ public class JsonIndexConfig extends IndexConfig {
     return _maxLevels == config._maxLevels && _excludeArray == config._excludeArray
         && _disableCrossArrayUnnest == config._disableCrossArrayUnnest && Objects.equals(_includePaths,
         config._includePaths) && Objects.equals(_excludePaths, config._excludePaths) && Objects.equals(_excludeFields,
-        config._excludeFields) && _maxValueLength == config._maxValueLength;
+        config._excludeFields) && _maxValueLength == config._maxValueLength
+        && _skipInvalidJson == config._skipInvalidJson;
   }
 
   @Override
   public int hashCode() {
     return Objects.hash(super.hashCode(), _maxLevels, _excludeArray, _disableCrossArrayUnnest, _includePaths,
-        _excludePaths, _excludeFields, _maxValueLength);
+        _excludePaths, _excludeFields, _maxValueLength, _skipInvalidJson);
   }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
index 8593b2f8cb..396c21ad74 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
@@ -76,6 +76,8 @@ public class JsonUtils {
   public static final String ARRAY_INDEX_KEY = ".$index";
   public static final String SKIPPED_VALUE_REPLACEMENT = "$SKIPPED$";
   public static final int MAX_COMBINATIONS = 100_000;
+  private static final List<Map<String, String>> SKIPPED_FLATTENED_RECORD =
+      Collections.singletonList(Collections.singletonMap(VALUE_KEY, SKIPPED_VALUE_REPLACEMENT));
 
   // For querying
   public static final String WILDCARD = "*";
@@ -356,7 +358,7 @@ public class JsonUtils {
    * ]
    * </pre>
    */
-  public static List<Map<String, String>> flatten(JsonNode node, JsonIndexConfig jsonIndexConfig) {
+  protected static List<Map<String, String>> flatten(JsonNode node, JsonIndexConfig jsonIndexConfig) {
     try {
       return flatten(node, jsonIndexConfig, 0, "$", false);
     } catch (OutOfMemoryError oom) {
@@ -719,4 +721,19 @@ public class JsonUtils {
       }
     }
   }
+
+  public static List<Map<String, String>> flatten(String jsonString, JsonIndexConfig jsonIndexConfig)
+      throws IOException {
+    JsonNode jsonNode;
+    try {
+      jsonNode = JsonUtils.stringToJsonNode(jsonString);
+    } catch (JsonProcessingException e) {
+      if (jsonIndexConfig.getSkipInvalidJson()) {
+        return SKIPPED_FLATTENED_RECORD;
+      } else {
+        throw e;
+      }
+    }
+    return JsonUtils.flatten(jsonNode, jsonIndexConfig);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org