You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by an...@apache.org on 2024/02/08 23:20:27 UTC
(pinot) branch master updated: Skip invalid json string rather than throwing error during json indexing (#12238)
This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8684e046c0 Skip invalid json string rather than throwing error during json indexing (#12238)
8684e046c0 is described below
commit 8684e046c0bce504224a3a6179be6b51117511ce
Author: Xuanyi Li <xu...@uber.com>
AuthorDate: Thu Feb 8 15:20:21 2024 -0800
Skip invalid json string rather than throwing error during json indexing (#12238)
---
.../realtime/impl/json/MutableJsonIndexImpl.java | 3 +-
.../impl/inv/json/BaseJsonIndexCreator.java | 2 +-
.../segment/local/segment/index/JsonIndexTest.java | 47 ++++++++++++++++++++++
.../pinot/spi/config/table/JsonIndexConfig.java | 20 +++++++--
.../java/org/apache/pinot/spi/utils/JsonUtils.java | 19 ++++++++-
5 files changed, 84 insertions(+), 7 deletions(-)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
index 46e7798204..7e632a19b2 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/json/MutableJsonIndexImpl.java
@@ -78,8 +78,7 @@ public class MutableJsonIndexImpl implements MutableJsonIndex {
public void add(String jsonString)
throws IOException {
try {
- List<Map<String, String>> flattenedRecords =
- JsonUtils.flatten(JsonUtils.stringToJsonNode(jsonString), _jsonIndexConfig);
+ List<Map<String, String>> flattenedRecords = JsonUtils.flatten(jsonString, _jsonIndexConfig);
_writeLock.lock();
try {
addFlattenedRecords(flattenedRecords);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java
index 264bc6044b..5c3409fe45 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/inv/json/BaseJsonIndexCreator.java
@@ -91,7 +91,7 @@ public abstract class BaseJsonIndexCreator implements JsonIndexCreator {
@Override
public void add(String jsonString)
throws IOException {
- addFlattenedRecords(JsonUtils.flatten(JsonUtils.stringToJsonNode(jsonString), _jsonIndexConfig));
+ addFlattenedRecords(JsonUtils.flatten(jsonString, _jsonIndexConfig));
}
/**
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
index 4d3cbf8cae..461f8eb93e 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/JsonIndexTest.java
@@ -18,9 +18,11 @@
*/
package org.apache.pinot.segment.local.segment.index;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
@@ -69,6 +71,7 @@ public class JsonIndexTest {
FileUtils.deleteDirectory(INDEX_DIR);
}
+
@Test
public void testSmallIndex()
throws Exception {
@@ -371,6 +374,50 @@ public class JsonIndexTest {
}
}
+ @Test
+ public void testSkipInvalidJsonEnable() throws Exception {
+ JsonIndexConfig jsonIndexConfig = new JsonIndexConfig();
+ jsonIndexConfig.setSkipInvalidJson(true);
+ // the braces don't match and cannot be parsed
+ String[] records = {"{\"key1\":\"va\""};
+
+ createIndex(true, jsonIndexConfig, records);
+ File onHeapIndexFile = new File(INDEX_DIR, ON_HEAP_COLUMN_NAME + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
+ Assert.assertTrue(onHeapIndexFile.exists());
+
+ createIndex(false, jsonIndexConfig, records);
+ File offHeapIndexFile = new File(INDEX_DIR, OFF_HEAP_COLUMN_NAME + V1Constants.Indexes.JSON_INDEX_FILE_EXTENSION);
+ Assert.assertTrue(offHeapIndexFile.exists());
+
+ try (PinotDataBuffer onHeapDataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(onHeapIndexFile);
+ PinotDataBuffer offHeapDataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(offHeapIndexFile);
+ JsonIndexReader onHeapIndexReader = new ImmutableJsonIndexReader(onHeapDataBuffer, records.length);
+ JsonIndexReader offHeapIndexReader = new ImmutableJsonIndexReader(offHeapDataBuffer, records.length);
+ MutableJsonIndexImpl mutableJsonIndex = new MutableJsonIndexImpl(jsonIndexConfig)) {
+ for (String record : records) {
+ mutableJsonIndex.add(record);
+ }
+ Map<String, RoaringBitmap> onHeapRes = onHeapIndexReader.getMatchingDocsMap("");
+ Map<String, RoaringBitmap> offHeapRes = offHeapIndexReader.getMatchingDocsMap("");
+ Map<String, RoaringBitmap> mutableRes = mutableJsonIndex.getMatchingDocsMap("");
+ Map<String, RoaringBitmap> expectedRes = Collections.singletonMap(JsonUtils.SKIPPED_VALUE_REPLACEMENT,
+ RoaringBitmap.bitmapOf(0));
+ Assert.assertEquals(expectedRes, onHeapRes);
+ Assert.assertEquals(expectedRes, offHeapRes);
+ Assert.assertEquals(expectedRes, mutableRes);
+ }
+ }
+
+ @Test(expectedExceptions = JsonProcessingException.class)
+ public void testSkipInvalidJsonDisabled() throws Exception {
+ // by default, skipInvalidJson is disabled
+ JsonIndexConfig jsonIndexConfig = new JsonIndexConfig();
+ // the braces don't match and cannot be parsed
+ String[] records = {"{\"key1\":\"va\""};
+
+ createIndex(true, jsonIndexConfig, records);
+ }
+
public static class ConfTest extends AbstractSerdeIndexContract {
@Test
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/JsonIndexConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/JsonIndexConfig.java
index cada2fe4a4..1a0964138e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/JsonIndexConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/JsonIndexConfig.java
@@ -41,6 +41,8 @@ import javax.annotation.Nullable;
* - excludeFields: Exclude the given fields, e.g. "b", "c", even if it is under the included paths.
* - maxValueLength: Exclude field values which are longer than this length. A value of "0" disables this filter.
* Excluded values will be replaced with JsonUtils.SKIPPED_VALUE_REPLACEMENT.
+ * - skipInvalidJson: If the raw data is not a valid json string, then replace with {"":SKIPPED_VALUE_REPLACEMENT}
+ * and continue indexing on following Json records.
*/
public class JsonIndexConfig extends IndexConfig {
public static final JsonIndexConfig DISABLED = new JsonIndexConfig(true);
@@ -52,6 +54,7 @@ public class JsonIndexConfig extends IndexConfig {
private Set<String> _excludePaths;
private Set<String> _excludeFields;
private int _maxValueLength = 0;
+ private boolean _skipInvalidJson = false;
public JsonIndexConfig() {
super(false);
@@ -68,7 +71,8 @@ public class JsonIndexConfig extends IndexConfig {
@JsonProperty("includePaths") @Nullable Set<String> includePaths,
@JsonProperty("excludePaths") @Nullable Set<String> excludePaths,
@JsonProperty("excludeFields") @Nullable Set<String> excludeFields,
- @JsonProperty("maxValueLength") int maxValueLength) {
+ @JsonProperty("maxValueLength") int maxValueLength,
+ @JsonProperty("skipInvalidJson") boolean skipInvalidJson) {
super(disabled);
_maxLevels = maxLevels;
_excludeArray = excludeArray;
@@ -77,6 +81,7 @@ public class JsonIndexConfig extends IndexConfig {
_excludePaths = excludePaths;
_excludeFields = excludeFields;
_maxValueLength = maxValueLength;
+ _skipInvalidJson = skipInvalidJson;
}
public int getMaxLevels() {
@@ -143,6 +148,14 @@ public class JsonIndexConfig extends IndexConfig {
_maxValueLength = maxValueLength;
}
+ public boolean getSkipInvalidJson() {
+ return _skipInvalidJson;
+ }
+
+ public void setSkipInvalidJson(boolean skipInvalidJson) {
+ _skipInvalidJson = skipInvalidJson;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -158,12 +171,13 @@ public class JsonIndexConfig extends IndexConfig {
return _maxLevels == config._maxLevels && _excludeArray == config._excludeArray
&& _disableCrossArrayUnnest == config._disableCrossArrayUnnest && Objects.equals(_includePaths,
config._includePaths) && Objects.equals(_excludePaths, config._excludePaths) && Objects.equals(_excludeFields,
- config._excludeFields) && _maxValueLength == config._maxValueLength;
+ config._excludeFields) && _maxValueLength == config._maxValueLength
+ && _skipInvalidJson == config._skipInvalidJson;
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), _maxLevels, _excludeArray, _disableCrossArrayUnnest, _includePaths,
- _excludePaths, _excludeFields, _maxValueLength);
+ _excludePaths, _excludeFields, _maxValueLength, _skipInvalidJson);
}
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
index 8593b2f8cb..396c21ad74 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/JsonUtils.java
@@ -76,6 +76,8 @@ public class JsonUtils {
public static final String ARRAY_INDEX_KEY = ".$index";
public static final String SKIPPED_VALUE_REPLACEMENT = "$SKIPPED$";
public static final int MAX_COMBINATIONS = 100_000;
+ private static final List<Map<String, String>> SKIPPED_FLATTENED_RECORD =
+ Collections.singletonList(Collections.singletonMap(VALUE_KEY, SKIPPED_VALUE_REPLACEMENT));
// For querying
public static final String WILDCARD = "*";
@@ -356,7 +358,7 @@ public class JsonUtils {
* ]
* </pre>
*/
- public static List<Map<String, String>> flatten(JsonNode node, JsonIndexConfig jsonIndexConfig) {
+ protected static List<Map<String, String>> flatten(JsonNode node, JsonIndexConfig jsonIndexConfig) {
try {
return flatten(node, jsonIndexConfig, 0, "$", false);
} catch (OutOfMemoryError oom) {
@@ -719,4 +721,19 @@ public class JsonUtils {
}
}
}
+
+ public static List<Map<String, String>> flatten(String jsonString, JsonIndexConfig jsonIndexConfig)
+ throws IOException {
+ JsonNode jsonNode;
+ try {
+ jsonNode = JsonUtils.stringToJsonNode(jsonString);
+ } catch (JsonProcessingException e) {
+ if (jsonIndexConfig.getSkipInvalidJson()) {
+ return SKIPPED_FLATTENED_RECORD;
+ } else {
+ throw e;
+ }
+ }
+ return JsonUtils.flatten(jsonNode, jsonIndexConfig);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org