You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2020/09/04 19:15:44 UTC
[incubator-pinot] 01/01: Add row based schema validation code to
detect schema mismatch
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch add-row-based-schema-validator
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 98216c725bd1bec72db2cbab4ac2b12d7ff237bf
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Fri Sep 4 12:15:14 2020 -0700
Add row based schema validation code to detect schema mismatch
---
.../apache/pinot/common/utils/PinotDataType.java | 9 ++
.../recordtransformer/DataTypeTransformer.java | 50 ++++++++--
.../pinot/core/segment/creator/SegmentCreator.java | 4 +-
.../creator/impl/SegmentColumnarIndexCreator.java | 29 +++++-
.../impl/SegmentIndexCreationDriverImpl.java | 3 +-
.../recordtransformer/DataTypeTransformerTest.java | 70 ++++++-------
.../pinot/query/executor/QueryExecutorTest.java | 15 ++-
.../hadoop/job/mappers/SegmentCreationMapper.java | 22 ++--
.../hadoop/data/IngestionSchemaValidatorTest.java | 111 ++++++++++++++++-----
.../data/test_sample_data_multi_value.avro | Bin 0 -> 12222227 bytes
.../avro/AvroIngestionSchemaValidator.java | 53 +++++-----
.../pinot/spi/data/IngestionSchemaValidator.java | 8 +-
.../spi/data/RowBasedSchemaValidationResults.java | 64 ++++++++++++
...Validator.java => SchemaValidationResults.java} | 38 +++----
.../apache/pinot/spi/data/readers/GenericRow.java | 18 ++++
15 files changed, 358 insertions(+), 136 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
index 97c017e..c46a33e 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
@@ -586,22 +586,31 @@ public enum PinotDataType {
public PinotDataType getSingleValueType() {
switch (this) {
+ case BYTE:
case BYTE_ARRAY:
return BYTE;
+ case CHARACTER:
case CHARACTER_ARRAY:
return CHARACTER;
+ case SHORT:
case SHORT_ARRAY:
return SHORT;
+ case INTEGER:
case INTEGER_ARRAY:
return INTEGER;
+ case LONG:
case LONG_ARRAY:
return LONG;
+ case FLOAT:
case FLOAT_ARRAY:
return FLOAT;
+ case DOUBLE:
case DOUBLE_ARRAY:
return DOUBLE;
+ case STRING:
case STRING_ARRAY:
return STRING;
+ case OBJECT:
case OBJECT_ARRAY:
return OBJECT;
default:
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java
index 4ab665c..a8bee3e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java
@@ -24,8 +24,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.common.utils.PinotDataType;
import org.apache.pinot.spi.data.FieldSpec;
@@ -85,7 +87,7 @@ public class DataTypeTransformer implements RecordTransformer {
continue;
}
PinotDataType dest = entry.getValue();
- value = standardize(column, value, dest.isSingleValue());
+ value = standardize(record, column, value, dest.isSingleValue());
// NOTE: The standardized value could be null for empty Collection/Map/Object[].
if (value == null) {
record.putValue(column, null);
@@ -109,6 +111,9 @@ public class DataTypeTransformer implements RecordTransformer {
}
}
if (source != dest) {
+ if (source.getSingleValueType() != dest.getSingleValueType()) {
+ putValueAsSetToKey(record, GenericRow.DATA_TYPE_MISMATCH_KEY, column);
+ }
value = dest.convert(value, source);
}
@@ -127,28 +132,39 @@ public class DataTypeTransformer implements RecordTransformer {
*/
@VisibleForTesting
@Nullable
- static Object standardize(String column, @Nullable Object value, boolean isSingleValue) {
+ static Object standardize(GenericRow record, String column, @Nullable Object value, boolean isSingleValue) {
+ return standardize(record, column, value, isSingleValue, 1);
+ }
+
+ static Object standardize(GenericRow record, String column, @Nullable Object value, boolean isSingleValue, int level) {
if (value == null) {
return null;
}
+ // If it's single-value column and the value is Collection/Map/Object[], mark the key.
if (value instanceof Collection) {
- return standardizeCollection(column, (Collection) value, isSingleValue);
+ return standardizeCollection(record, column, (Collection) value, isSingleValue, level);
}
if (value instanceof Map) {
- return standardizeCollection(column, ((Map) value).values(), isSingleValue);
+ // If it's a map structure, mark the key.
+ putValueAsSetToKey(record, GenericRow.MULTI_VALUE_STRUCTURE_MISMATCH_KEY, column);
+ Collection values = ((Map) value).values();
+ return standardizeCollection(record, column, values, isSingleValue, level);
}
if (value instanceof Object[]) {
+ if (isSingleValue && level == 1) {
+ putValueAsSetToKey(record, GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY, column);
+ }
Object[] values = (Object[]) value;
int numValues = values.length;
if (numValues == 0) {
return null;
}
if (numValues == 1) {
- return standardize(column, values[0], isSingleValue);
+ return standardize(record, column, values[0], isSingleValue, level + 1);
}
List<Object> standardizedValues = new ArrayList<>(numValues);
for (Object singleValue : values) {
- Object standardizedValue = standardize(column, singleValue, true);
+ Object standardizedValue = standardize(record, column, singleValue, true, level + 1);
if (standardizedValue != null) {
standardizedValues.add(standardizedValue);
}
@@ -164,20 +180,27 @@ public class DataTypeTransformer implements RecordTransformer {
Arrays.toString(values), column);
return standardizedValues.toArray();
}
+ // If it's multi-value column and the level is 1, mark the key.
+ if (!isSingleValue && level == 1) {
+ putValueAsSetToKey(record, GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY, column);
+ }
return value;
}
- private static Object standardizeCollection(String column, Collection collection, boolean isSingleValue) {
+ private static Object standardizeCollection(GenericRow record, String column, Collection collection, boolean isSingleValue, int level) {
+ if (isSingleValue && level == 1) {
+ putValueAsSetToKey(record, GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY, column);
+ }
int numValues = collection.size();
if (numValues == 0) {
return null;
}
if (numValues == 1) {
- return standardize(column, collection.iterator().next(), isSingleValue);
+ return standardize(record, column, collection.iterator().next(), isSingleValue, level + 1);
}
List<Object> standardizedValues = new ArrayList<>(numValues);
for (Object singleValue : collection) {
- Object standardizedValue = standardize(column, singleValue, true);
+ Object standardizedValue = standardize(record, column, singleValue, true, level + 1);
if (standardizedValue != null) {
standardizedValues.add(standardizedValue);
}
@@ -193,4 +216,13 @@ public class DataTypeTransformer implements RecordTransformer {
.checkState(!isSingleValue, "Cannot read single-value from Collection: %s for column: %s", collection, column);
return standardizedValues.toArray();
}
+
+ private static void putValueAsSetToKey(GenericRow record, String key, String value) {
+ Set<String> valueSet = (Set) record.getValue(key);
+ if (valueSet == null) {
+ valueSet = new HashSet<>();
+ record.putValue(key, valueSet);
+ }
+ valueSet.add(value);
+ }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java
index 2ba6246..adf6141 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java
@@ -23,6 +23,7 @@ import java.io.File;
import java.io.IOException;
import java.util.Map;
import org.apache.commons.configuration.ConfigurationException;
+import org.apache.pinot.spi.data.IngestionSchemaValidator;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
@@ -43,7 +44,8 @@ public interface SegmentCreator extends Closeable {
* @throws Exception
*/
void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreationInfo segmentIndexCreationInfo,
- Map<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir)
+ Map<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir,
+ IngestionSchemaValidator ingestionSchemaValidator)
throws Exception;
/**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
index 4489dc8..b0ed11d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -57,6 +57,8 @@ import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.FieldSpec.FieldType;
+import org.apache.pinot.spi.data.IngestionSchemaValidator;
+import org.apache.pinot.spi.data.RowBasedSchemaValidationResults;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.utils.TimeUtils;
@@ -79,6 +81,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
// TODO Refactor class name to match interface name
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentColumnarIndexCreator.class);
private SegmentGeneratorConfig config;
+ private IngestionSchemaValidator _ingestionSchemaValidator;
private Map<String, ColumnIndexCreationInfo> indexCreationInfoMap;
private Map<String, SegmentDictionaryCreator> _dictionaryCreatorMap = new HashMap<>();
private Map<String, ForwardIndexCreator> _forwardIndexCreatorMap = new HashMap<>();
@@ -96,11 +99,13 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
@Override
public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreationInfo segmentIndexCreationInfo,
- Map<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir)
+ Map<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir,
+ IngestionSchemaValidator ingestionSchemaValidator)
throws Exception {
docIdCounter = 0;
config = segmentCreationSpec;
this.indexCreationInfoMap = indexCreationInfoMap;
+ _ingestionSchemaValidator = ingestionSchemaValidator;
// Check that the output directory does not exist
Preconditions.checkState(!outDir.exists(), "Segment output directory: %s already exists", outDir);
@@ -304,6 +309,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
@Override
public void indexRow(GenericRow row) {
+ validateRowBasedSchemas(row);
for (Map.Entry<String, ForwardIndexCreator> entry : _forwardIndexCreatorMap.entrySet()) {
String columnName = entry.getKey();
ForwardIndexCreator forwardIndexCreator = entry.getValue();
@@ -400,6 +406,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
nullValueVectorCreator.seal();
}
writeMetadata();
+ _ingestionSchemaValidator.getRowBasedSchemaValidationResults().gatherRowBasedSchemaValidationResults();
}
private void writeMetadata()
@@ -558,6 +565,26 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
}
}
+ private void validateRowBasedSchemas(GenericRow row) {
+ if (_ingestionSchemaValidator == null) {
+ return;
+ }
+ RowBasedSchemaValidationResults rowBasedSchemaValidationResults = _ingestionSchemaValidator.getRowBasedSchemaValidationResults();
+
+ if (row.getValue(GenericRow.MULTI_VALUE_STRUCTURE_MISMATCH_KEY) != null) {
+ Set<String> columns = (Set) row.getValue(GenericRow.MULTI_VALUE_STRUCTURE_MISMATCH_KEY);
+ rowBasedSchemaValidationResults.collectMultiValueStructureMismatchColumns(columns);
+ }
+ if (row.getValue(GenericRow.DATA_TYPE_MISMATCH_KEY) != null) {
+ Set<String> columns = (Set) row.getValue(GenericRow.DATA_TYPE_MISMATCH_KEY);
+ rowBasedSchemaValidationResults.collectDataTypeMismatchColumns(columns);
+ }
+ if (row.getValue(GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY) != null) {
+ Set<String> columns = (Set) row.getValue(GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY);
+ rowBasedSchemaValidationResults.collectSingleValueMultiValueFieldMismatchColumns(columns);
+ }
+ }
+
/**
* Helper method to check whether the given value is a valid property value.
* <p>Value is invalid iff:
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index cabdc47..e5a4b84 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -187,7 +187,8 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
try {
// Initialize the index creation using the per-column statistics information
- indexCreator.init(config, segmentIndexCreationInfo, indexCreationInfoMap, dataSchema, tempIndexDir);
+ indexCreator.init(config, segmentIndexCreationInfo, indexCreationInfoMap, dataSchema, tempIndexDir,
+ _ingestionSchemaValidator);
// Build the index
recordReader.rewind();
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformerTest.java
index a60c460..be4ad8d 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformerTest.java
@@ -23,6 +23,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.pinot.spi.data.readers.GenericRow;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
@@ -39,17 +40,18 @@ public class DataTypeTransformerTest {
/**
* Tests for Map
*/
+ GenericRow record = new GenericRow();
// Empty Map
Map<String, Object> map = Collections.emptyMap();
- assertNull(DataTypeTransformer.standardize(COLUMN, map, true));
- assertNull(DataTypeTransformer.standardize(COLUMN, map, false));
+ assertNull(DataTypeTransformer.standardize(record, COLUMN, map, true));
+ assertNull(DataTypeTransformer.standardize(record, COLUMN, map, false));
// Map with single entry
String expectedValue = "testValue";
map = Collections.singletonMap("testKey", expectedValue);
- assertEquals(DataTypeTransformer.standardize(COLUMN, map, true), expectedValue);
- assertEquals(DataTypeTransformer.standardize(COLUMN, map, false), expectedValue);
+ assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, true), expectedValue);
+ assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValue);
// Map with multiple entries
Object[] expectedValues = new Object[]{"testValue1", "testValue2"};
@@ -58,12 +60,12 @@ public class DataTypeTransformerTest {
map.put("testKey2", "testValue2");
try {
// Should fail because Map with multiple entries cannot be standardized as single value
- DataTypeTransformer.standardize(COLUMN, map, true);
+ DataTypeTransformer.standardize(record, COLUMN, map, true);
fail();
} catch (Exception e) {
// Expected
}
- assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, map, false), expectedValues);
+ assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValues);
/**
* Tests for List
@@ -71,24 +73,24 @@ public class DataTypeTransformerTest {
// Empty List
List<Object> list = Collections.emptyList();
- assertNull(DataTypeTransformer.standardize(COLUMN, list, true));
- assertNull(DataTypeTransformer.standardize(COLUMN, list, false));
+ assertNull(DataTypeTransformer.standardize(record, COLUMN, list, true));
+ assertNull(DataTypeTransformer.standardize(record, COLUMN, list, false));
// List with single entry
list = Collections.singletonList(expectedValue);
- assertEquals(DataTypeTransformer.standardize(COLUMN, list, true), expectedValue);
- assertEquals(DataTypeTransformer.standardize(COLUMN, list, false), expectedValue);
+ assertEquals(DataTypeTransformer.standardize(record, COLUMN, list, true), expectedValue);
+ assertEquals(DataTypeTransformer.standardize(record, COLUMN, list, false), expectedValue);
// List with multiple entries
list = Arrays.asList(expectedValues);
try {
// Should fail because List with multiple entries cannot be standardized as single value
- DataTypeTransformer.standardize(COLUMN, list, true);
+ DataTypeTransformer.standardize(record, COLUMN, list, true);
fail();
} catch (Exception e) {
// Expected
}
- assertEquals((Object[]) DataTypeTransformer.standardize(COLUMN, list, false), expectedValues);
+ assertEquals((Object[]) DataTypeTransformer.standardize(record, COLUMN, list, false), expectedValues);
/**
* Tests for Object[]
@@ -96,24 +98,24 @@ public class DataTypeTransformerTest {
// Empty Object[]
Object[] values = new Object[0];
- assertNull(DataTypeTransformer.standardize(COLUMN, values, true));
- assertNull(DataTypeTransformer.standardize(COLUMN, values, false));
+ assertNull(DataTypeTransformer.standardize(record, COLUMN, values, true));
+ assertNull(DataTypeTransformer.standardize(record, COLUMN, values, false));
// Object[] with single entry
values = new Object[]{expectedValue};
- assertEquals(DataTypeTransformer.standardize(COLUMN, values, true), expectedValue);
- assertEquals(DataTypeTransformer.standardize(COLUMN, values, false), expectedValue);
+ assertEquals(DataTypeTransformer.standardize(record, COLUMN, values, true), expectedValue);
+ assertEquals(DataTypeTransformer.standardize(record, COLUMN, values, false), expectedValue);
// Object[] with multiple entries
values = new Object[]{"testValue1", "testValue2"};
try {
// Should fail because Object[] with multiple entries cannot be standardized as single value
- DataTypeTransformer.standardize(COLUMN, values, true);
+ DataTypeTransformer.standardize(record, COLUMN, values, true);
fail();
} catch (Exception e) {
// Expected
}
- assertEquals((Object[]) DataTypeTransformer.standardize(COLUMN, values, false), expectedValues);
+ assertEquals((Object[]) DataTypeTransformer.standardize(record, COLUMN, values, false), expectedValues);
/**
* Tests for nested Map/List/Object[]
@@ -121,32 +123,32 @@ public class DataTypeTransformerTest {
// Map with empty List
map = Collections.singletonMap("testKey", Collections.emptyList());
- assertNull(DataTypeTransformer.standardize(COLUMN, map, true));
- assertNull(DataTypeTransformer.standardize(COLUMN, map, false));
+ assertNull(DataTypeTransformer.standardize(record, COLUMN, map, true));
+ assertNull(DataTypeTransformer.standardize(record, COLUMN, map, false));
// Map with single-entry List
map = Collections.singletonMap("testKey", Collections.singletonList(expectedValue));
- assertEquals(DataTypeTransformer.standardize(COLUMN, map, true), expectedValue);
- assertEquals(DataTypeTransformer.standardize(COLUMN, map, false), expectedValue);
+ assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, true), expectedValue);
+ assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValue);
// Map with one empty Map and one single-entry Map
map = new HashMap<>();
map.put("testKey1", Collections.emptyMap());
map.put("testKey2", Collections.singletonMap("testKey", expectedValue));
// Can be standardized into single value because empty Map should be ignored
- assertEquals(DataTypeTransformer.standardize(COLUMN, map, true), expectedValue);
- assertEquals(DataTypeTransformer.standardize(COLUMN, map, false), expectedValue);
+ assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, true), expectedValue);
+ assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValue);
// Map with multi-entries List
map = Collections.singletonMap("testKey", Arrays.asList(expectedValues));
try {
// Should fail because Map with multiple entries cannot be standardized as single value
- DataTypeTransformer.standardize(COLUMN, map, true);
+ DataTypeTransformer.standardize(record, COLUMN, map, true);
fail();
} catch (Exception e) {
// Expected
}
- assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, map, false), expectedValues);
+ assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValues);
// Map with one empty Map, one single-entry List and one single-entry Object[]
map = new HashMap<>();
@@ -155,12 +157,12 @@ public class DataTypeTransformerTest {
map.put("testKey3", new Object[]{"testValue2"});
try {
// Should fail because Map with multiple entries cannot be standardized as single value
- DataTypeTransformer.standardize(COLUMN, map, true);
+ DataTypeTransformer.standardize(record, COLUMN, map, true);
fail();
} catch (Exception e) {
// Expected
}
- assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, map, false), expectedValues);
+ assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValues);
// List with two single-entry Maps and one empty Map
list = Arrays
@@ -168,35 +170,35 @@ public class DataTypeTransformerTest {
Collections.emptyMap());
try {
// Should fail because List with multiple entries cannot be standardized as single value
- DataTypeTransformer.standardize(COLUMN, list, true);
+ DataTypeTransformer.standardize(record, COLUMN, list, true);
fail();
} catch (Exception e) {
// Expected
}
- assertEquals((Object[]) DataTypeTransformer.standardize(COLUMN, list, false), expectedValues);
+ assertEquals((Object[]) DataTypeTransformer.standardize(record, COLUMN, list, false), expectedValues);
// Object[] with two single-entry Maps
values = new Object[]{Collections.singletonMap("testKey", "testValue1"), Collections.singletonMap("testKey",
"testValue2")};
try {
// Should fail because Object[] with multiple entries cannot be standardized as single value
- DataTypeTransformer.standardize(COLUMN, values, true);
+ DataTypeTransformer.standardize(record, COLUMN, values, true);
fail();
} catch (Exception e) {
// Expected
}
- assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, values, false), expectedValues);
+ assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(record, COLUMN, values, false), expectedValues);
// Object[] with one empty Object[], one multi-entries List of nested Map/List/Object[]
values = new Object[]{new Object[0], Collections.singletonList(
Collections.singletonMap("testKey", "testValue1")), Collections.singletonMap("testKey",
Arrays.asList(new Object[]{"testValue2"}, Collections.emptyMap()))};
try {
- DataTypeTransformer.standardize(COLUMN, values, true);
+ DataTypeTransformer.standardize(record, COLUMN, values, true);
fail();
} catch (Exception e) {
// Expected
}
- assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, values, false), expectedValues);
+ assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(record, COLUMN, values, false), expectedValues);
}
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
index c28a2d8..994f684 100644
--- a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
@@ -93,10 +93,17 @@ public class QueryExecutorTest {
driver.init(config);
driver.build();
IngestionSchemaValidator ingestionSchemaValidator = driver.getIngestionSchemaValidator();
- Assert.assertFalse(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected());
- Assert.assertFalse(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
- Assert.assertFalse(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected());
- Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
+
+ Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getDataTypeMismatchResult()
+ .isMismatchDetected());
+ Assert.assertFalse(
+ ingestionSchemaValidator.getFileBasedSchemaValidationResults().getSingleValueMultiValueFieldMismatchResult()
+ .isMismatchDetected());
+ Assert.assertFalse(
+ ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMultiValueStructureMismatchResult()
+ .isMismatchDetected());
+ Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMissingPinotColumnResult()
+ .isMismatchDetected());
_indexSegments.add(ImmutableSegmentLoader.load(new File(INDEX_DIR, driver.getSegmentName()), ReadMode.mmap));
_segmentNames.add(driver.getSegmentName());
}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
index fcc5653..aa3812b 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
@@ -52,8 +52,10 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableCustomConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.RowBasedSchemaValidationResults;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.IngestionSchemaValidator;
+import org.apache.pinot.spi.data.SchemaValidationResults;
import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
import org.apache.pinot.spi.utils.DataSizeUtils;
@@ -387,23 +389,27 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
if (ingestionSchemaValidator == null) {
return;
}
- if (ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected()) {
+ SchemaValidationResults fileBasedSchemaValidationResults = ingestionSchemaValidator.getFileBasedSchemaValidationResults();
+ if (fileBasedSchemaValidationResults.getDataTypeMismatchResult().isMismatchDetected()) {
_dataTypeMismatch++;
- _logger.warn(ingestionSchemaValidator.getDataTypeMismatchResult().getMismatchReason());
+ _logger.warn(fileBasedSchemaValidationResults.getDataTypeMismatchResult().getMismatchReason());
}
- if (ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected()) {
+ if (fileBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected()) {
_singleValueMultiValueFieldMismatch++;
- ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().getMismatchReason();
+ fileBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().getMismatchReason();
}
- if (ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected()) {
+ if (fileBasedSchemaValidationResults.getMultiValueStructureMismatchResult().isMismatchDetected()) {
_multiValueStructureMismatch++;
- ingestionSchemaValidator.getMultiValueStructureMismatchResult().getMismatchReason();
+ fileBasedSchemaValidationResults.getMultiValueStructureMismatchResult().getMismatchReason();
}
- if (ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected()) {
+ if (fileBasedSchemaValidationResults.getMissingPinotColumnResult().isMismatchDetected()) {
_missingPinotColumn++;
- ingestionSchemaValidator.getMissingPinotColumnResult().getMismatchReason();
+ fileBasedSchemaValidationResults.getMissingPinotColumnResult().getMismatchReason();
}
+ RowBasedSchemaValidationResults rowBasedSchemaValidationResults = ingestionSchemaValidator.getRowBasedSchemaValidationResults();
+ //TODO add logic to detect.
+
if (isSchemaMismatch() && _failIfSchemaMismatch) {
throw new RuntimeException("Schema mismatch detected. Forcing to fail the job. Please checking log message above.");
}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/data/IngestionSchemaValidatorTest.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/data/IngestionSchemaValidatorTest.java
index fec3583..9170b79 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/data/IngestionSchemaValidatorTest.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/data/IngestionSchemaValidatorTest.java
@@ -20,17 +20,35 @@ package org.apache.pinot.hadoop.data;
import com.google.common.base.Preconditions;
import java.io.File;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.RowBasedSchemaValidationResults;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.IngestionSchemaValidator;
import org.apache.pinot.spi.data.SchemaValidatorFactory;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class IngestionSchemaValidatorTest {
+ private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "QueryExecutorTest");
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ FileUtils.deleteQuietly(INDEX_DIR);
+ Assert.assertTrue(INDEX_DIR.mkdirs());
+ }
+
@Test
- public void testAvroIngestionSchemaValidator()
+ public void testAvroIngestionSchemaValidatorFileBasedSchemaValidation()
throws Exception {
String inputFilePath = new File(
Preconditions.checkNotNull(IngestionSchemaValidatorTest.class.getClassLoader().getResource("data/test_sample_data.avro"))
@@ -47,10 +65,10 @@ public class IngestionSchemaValidatorTest {
IngestionSchemaValidator ingestionSchemaValidator =
SchemaValidatorFactory.getSchemaValidator(pinotSchema, recordReaderClassName, inputFilePath);
Assert.assertNotNull(ingestionSchemaValidator);
- Assert.assertFalse(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected());
- Assert.assertFalse(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
- Assert.assertFalse(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected());
- Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
+ Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getDataTypeMismatchResult().isMismatchDetected());
+ Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
+ Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMultiValueStructureMismatchResult().isMismatchDetected());
+ Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMissingPinotColumnResult().isMismatchDetected());
// Adding one extra column
pinotSchema = new Schema.SchemaBuilder()
@@ -64,12 +82,11 @@ public class IngestionSchemaValidatorTest {
ingestionSchemaValidator =
SchemaValidatorFactory.getSchemaValidator(pinotSchema, recordReaderClassName, inputFilePath);
Assert.assertNotNull(ingestionSchemaValidator);
- Assert.assertFalse(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected());
- Assert.assertFalse(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
- Assert.assertFalse(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected());
- Assert.assertTrue(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
- Assert.assertNotNull(ingestionSchemaValidator.getMissingPinotColumnResult().getMismatchReason());
- System.out.println(ingestionSchemaValidator.getMissingPinotColumnResult().getMismatchReason());
+ Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getDataTypeMismatchResult().isMismatchDetected());
+ Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
+ Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMultiValueStructureMismatchResult().isMismatchDetected());
+ Assert.assertTrue(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMissingPinotColumnResult().isMismatchDetected());
+ Assert.assertNotNull(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMissingPinotColumnResult().getMismatchReason());
// Change the data type of column1 from LONG to STRING
pinotSchema = new Schema.SchemaBuilder()
@@ -81,12 +98,11 @@ public class IngestionSchemaValidatorTest {
ingestionSchemaValidator =
SchemaValidatorFactory.getSchemaValidator(pinotSchema, recordReaderClassName, inputFilePath);
Assert.assertNotNull(ingestionSchemaValidator);
- Assert.assertTrue(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected());
- Assert.assertNotNull(ingestionSchemaValidator.getDataTypeMismatchResult().getMismatchReason());
- System.out.println(ingestionSchemaValidator.getDataTypeMismatchResult().getMismatchReason());
- Assert.assertFalse(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
- Assert.assertFalse(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected());
- Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
+ Assert.assertTrue(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getDataTypeMismatchResult().isMismatchDetected());
+ Assert.assertNotNull(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getDataTypeMismatchResult().getMismatchReason());
+ Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
+ Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMultiValueStructureMismatchResult().isMismatchDetected());
+ Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMissingPinotColumnResult().isMismatchDetected());
// Change column2 from single-value column to multi-value column
pinotSchema = new Schema.SchemaBuilder()
@@ -98,14 +114,59 @@ public class IngestionSchemaValidatorTest {
ingestionSchemaValidator =
SchemaValidatorFactory.getSchemaValidator(pinotSchema, recordReaderClassName, inputFilePath);
Assert.assertNotNull(ingestionSchemaValidator);
- Assert.assertFalse(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected());
- Assert.assertTrue(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
- Assert.assertNotNull(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().getMismatchReason());
- System.out.println(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().getMismatchReason());
- Assert.assertTrue(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected());
- Assert.assertNotNull(ingestionSchemaValidator.getMultiValueStructureMismatchResult().getMismatchReason());
- System.out.println(ingestionSchemaValidator.getMultiValueStructureMismatchResult().getMismatchReason());
- Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
+ Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getDataTypeMismatchResult().isMismatchDetected());
+ Assert.assertTrue(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
+ Assert.assertNotNull(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getSingleValueMultiValueFieldMismatchResult().getMismatchReason());
+ Assert.assertTrue(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMultiValueStructureMismatchResult().isMismatchDetected());
+ Assert.assertNotNull(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMultiValueStructureMismatchResult().getMismatchReason());
+ Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMissingPinotColumnResult().isMismatchDetected());
+ }
+
+ @Test
+ public void testAvroIngestionValidatorRowBasedSchemaValidation()
+ throws Exception {
+ String tableName = "testTable";
+ File avroFile = new File(
+ Preconditions.checkNotNull(IngestionSchemaValidatorTest.class.getClassLoader().getResource("data/test_sample_data_multi_value.avro"))
+ .getFile());
+
+ // column 2 is of int type in the AVRO.
+ // column3 and column16 are both of array of map structure.
+ Schema pinotSchema = new Schema.SchemaBuilder()
+ .addSingleValueDimension("column1", FieldSpec.DataType.STRING)
+ .addSingleValueDimension("column2", FieldSpec.DataType.LONG)
+ .addSingleValueDimension("column3", FieldSpec.DataType.STRING)
+ .addMultiValueDimension("column16", FieldSpec.DataType.STRING)
+ .addMetric("metric_nus_impressions", FieldSpec.DataType.LONG).build();
+
+ SegmentGeneratorConfig segmentGeneratorConfig =
+ new SegmentGeneratorConfig(new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).build(),
+ pinotSchema);
+ segmentGeneratorConfig.setInputFilePath(avroFile.getAbsolutePath());
+ segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
+ segmentGeneratorConfig.setTableName(tableName);
+
+ SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+ driver.init(segmentGeneratorConfig);
+ driver.build();
+
+ RowBasedSchemaValidationResults rowBasedSchemaValidationResults = driver.getIngestionSchemaValidator().getRowBasedSchemaValidationResults();
+ Assert.assertTrue(rowBasedSchemaValidationResults.getDataTypeMismatchResult().isMismatchDetected());
+ Assert.assertNotNull(rowBasedSchemaValidationResults.getDataTypeMismatchResult().getMismatchReason());
+ System.out.println(rowBasedSchemaValidationResults.getDataTypeMismatchResult().getMismatchReason());
+
+ Assert.assertTrue(rowBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
+ Assert.assertNotNull(rowBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().getMismatchReason());
+ System.out.println(rowBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().getMismatchReason());
+
+ Assert.assertTrue(rowBasedSchemaValidationResults.getMultiValueStructureMismatchResult().isMismatchDetected());
+ Assert.assertNotNull(rowBasedSchemaValidationResults.getMultiValueStructureMismatchResult().getMismatchReason());
+ System.out.println(rowBasedSchemaValidationResults.getMultiValueStructureMismatchResult().getMismatchReason());
+
+ }
+ @AfterClass
+ public void tearDown() {
+ FileUtils.deleteQuietly(INDEX_DIR);
}
}
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/resources/data/test_sample_data_multi_value.avro b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/resources/data/test_sample_data_multi_value.avro
new file mode 100644
index 0000000..4e4a4d8
Binary files /dev/null and b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/resources/data/test_sample_data_multi_value.avro differ
diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroIngestionSchemaValidator.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroIngestionSchemaValidator.java
index d0ee84f..ee6706d 100644
--- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroIngestionSchemaValidator.java
+++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroIngestionSchemaValidator.java
@@ -23,22 +23,23 @@ import java.io.IOException;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericRecord;
import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.RowBasedSchemaValidationResults;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.IngestionSchemaValidator;
-import org.apache.pinot.spi.data.SchemaValidatorResult;
+import org.apache.pinot.spi.data.SchemaValidationResults;
/**
* Schema validator to validate pinot schema and avro schema
*/
public class AvroIngestionSchemaValidator implements IngestionSchemaValidator {
+ private SchemaValidationResults _fileBasedSchemaValidationResults = new SchemaValidationResults();
+ private RowBasedSchemaValidationResults _rowBasedSchemaValidationResults = new RowBasedSchemaValidationResults();
+
private org.apache.avro.Schema _avroSchema;
private Schema _pinotSchema;
- private SchemaValidatorResult _dataTypeMismatch = new SchemaValidatorResult();
- private SchemaValidatorResult _singleValueMultiValueFieldMismatch = new SchemaValidatorResult();
- private SchemaValidatorResult _multiValueStructureMismatch = new SchemaValidatorResult();
- private SchemaValidatorResult _missingPinotColumn = new SchemaValidatorResult();
+
public AvroIngestionSchemaValidator() {
}
@@ -48,7 +49,7 @@ public class AvroIngestionSchemaValidator implements IngestionSchemaValidator {
_pinotSchema = pinotSchema;
_avroSchema = extractAvroSchemaFromFile(inputFilePath);
- validateSchemas();
+ validateFileBasedSchemas();
}
@Override
@@ -57,23 +58,13 @@ public class AvroIngestionSchemaValidator implements IngestionSchemaValidator {
}
@Override
- public SchemaValidatorResult getDataTypeMismatchResult() {
- return _dataTypeMismatch;
- }
-
- @Override
- public SchemaValidatorResult getSingleValueMultiValueFieldMismatchResult() {
- return _singleValueMultiValueFieldMismatch;
- }
-
- @Override
- public SchemaValidatorResult getMultiValueStructureMismatchResult() {
- return _multiValueStructureMismatch;
+ public SchemaValidationResults getFileBasedSchemaValidationResults() {
+ return _fileBasedSchemaValidationResults;
}
@Override
- public SchemaValidatorResult getMissingPinotColumnResult() {
- return _missingPinotColumn;
+ public RowBasedSchemaValidationResults getRowBasedSchemaValidationResults() {
+ return _rowBasedSchemaValidationResults;
}
private org.apache.avro.Schema extractAvroSchemaFromFile(String inputPath) {
@@ -87,12 +78,12 @@ public class AvroIngestionSchemaValidator implements IngestionSchemaValidator {
}
}
- private void validateSchemas() {
+ private void validateFileBasedSchemas() {
for (String columnName : _pinotSchema.getPhysicalColumnNames()) {
FieldSpec fieldSpec = _pinotSchema.getFieldSpecFor(columnName);
org.apache.avro.Schema.Field avroColumnField = _avroSchema.getField(columnName);
if (avroColumnField == null) {
- _missingPinotColumn.addMismatchReason(String
+ _fileBasedSchemaValidationResults.getMissingPinotColumnResult().addMismatchReason(String
.format("The Pinot column: (%s: %s) is missing in the %s schema of input data.", columnName,
fieldSpec.getDataType().name(), getInputSchemaType()));
continue;
@@ -116,7 +107,7 @@ public class AvroIngestionSchemaValidator implements IngestionSchemaValidator {
}
if (!fieldSpec.getDataType().name().equalsIgnoreCase(avroColumnType.toString())) {
- _dataTypeMismatch.addMismatchReason(String
+ _fileBasedSchemaValidationResults.getDataTypeMismatchResult().addMismatchReason(String
.format("The Pinot column: (%s: %s) doesn't match with the column (%s: %s) in input %s schema.", columnName,
fieldSpec.getDataType().name(), avroColumnSchema.getName(), avroColumnType.toString(),
getInputSchemaType()));
@@ -125,20 +116,22 @@ public class AvroIngestionSchemaValidator implements IngestionSchemaValidator {
if (fieldSpec.isSingleValueField()) {
if (avroColumnType.ordinal() < org.apache.avro.Schema.Type.STRING.ordinal()) {
// the column is a complex structure
- _singleValueMultiValueFieldMismatch.addMismatchReason(String.format(
- "The Pinot column: %s is 'single-value' column but the column: %s from input %s is 'multi-value' column.",
- columnName, avroColumnSchema.getName(), getInputSchemaType()));
+ _fileBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().addMismatchReason(String
+ .format(
+ "The Pinot column: %s is 'single-value' column but the column: %s from input %s is 'multi-value' column.",
+ columnName, avroColumnSchema.getName(), getInputSchemaType()));
}
} else {
if (avroColumnType.ordinal() >= org.apache.avro.Schema.Type.STRING.ordinal()) {
// the column is a complex structure
- _singleValueMultiValueFieldMismatch.addMismatchReason(String.format(
- "The Pinot column: %s is 'multi-value' column but the column: %s from input %s schema is 'single-value' column.",
- columnName, avroColumnSchema.getName(), getInputSchemaType()));
+ _fileBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().addMismatchReason(String
+ .format(
+ "The Pinot column: %s is 'multi-value' column but the column: %s from input %s schema is 'single-value' column.",
+ columnName, avroColumnSchema.getName(), getInputSchemaType()));
}
if (avroColumnType != org.apache.avro.Schema.Type.ARRAY) {
// multi-value column should use array structure for now.
- _multiValueStructureMismatch.addMismatchReason(String.format(
+ _fileBasedSchemaValidationResults.getMultiValueStructureMismatchResult().addMismatchReason(String.format(
"The Pinot column: %s is 'multi-value' column but the column: %s from input %s schema is of '%s' type, which should have been of 'array' type.",
columnName, avroColumnSchema.getName(), getInputSchemaType(), avroColumnType.getName()));
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java
index 045327a..67a3da9 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java
@@ -28,11 +28,7 @@ public interface IngestionSchemaValidator {
String getInputSchemaType();
- SchemaValidatorResult getDataTypeMismatchResult();
+ SchemaValidationResults getFileBasedSchemaValidationResults();
- SchemaValidatorResult getSingleValueMultiValueFieldMismatchResult();
-
- SchemaValidatorResult getMultiValueStructureMismatchResult();
-
- SchemaValidatorResult getMissingPinotColumnResult();
+ RowBasedSchemaValidationResults getRowBasedSchemaValidationResults();
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/RowBasedSchemaValidationResults.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/RowBasedSchemaValidationResults.java
new file mode 100644
index 0000000..655275a
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/RowBasedSchemaValidationResults.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * This is the extension class on top of {@code SchemaValidationResults} class, since row based schema validation will
+ * be called much more frequently than the file base schema validation. We collect all the mismatch columns into the hash
+ * set and then generate the mismatch message all at once.
+ */
+public class RowBasedSchemaValidationResults extends SchemaValidationResults {
+
+ private Set<String> _dataTypeMismatchColumns = new HashSet<>();
+ private Set<String> _singleValueMultiValueFieldMismatchColumns = new HashSet<>();
+ private Set<String> _multiValueStructureMismatchColumns = new HashSet<>();
+
+ public void collectDataTypeMismatchColumns(Set<String> columns) {
+ _dataTypeMismatchColumns.addAll(columns);
+ }
+
+ public void collectSingleValueMultiValueFieldMismatchColumns(Set<String> columns) {
+ _singleValueMultiValueFieldMismatchColumns.addAll(columns);
+ }
+
+ public void collectMultiValueStructureMismatchColumns(Set<String> columns) {
+ _multiValueStructureMismatchColumns.addAll(columns);
+ }
+
+ public void gatherRowBasedSchemaValidationResults() {
+ if (!_dataTypeMismatchColumns.isEmpty()) {
+ _dataTypeMismatch.addMismatchReason(String.format("Found data type mismatch from the following Pinot columns: %s",
+ _dataTypeMismatchColumns.toString()));
+ }
+ if (!_singleValueMultiValueFieldMismatchColumns.isEmpty()) {
+ _singleValueMultiValueFieldMismatch.addMismatchReason(String
+ .format("Found single-value multi-value field mismatch from the following Pinot columns: %s",
+ _singleValueMultiValueFieldMismatchColumns.toString()));
+ }
+ if (!_multiValueStructureMismatchColumns.isEmpty()) {
+ _multiValueStructureMismatch.addMismatchReason(String
+ .format("Found multi-value structure mismatch from the following Pinot columns: %s",
+ _multiValueStructureMismatchColumns.toString()));
+ }
+ }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/SchemaValidationResults.java
similarity index 51%
copy from pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java
copy to pinot-spi/src/main/java/org/apache/pinot/spi/data/SchemaValidationResults.java
index 045327a..2d793c0 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/SchemaValidationResults.java
@@ -18,21 +18,25 @@
*/
package org.apache.pinot.spi.data;
-
-/**
- * Validator to validate the schema between Pinot schema and input raw data schema
- */
-public interface IngestionSchemaValidator {
-
- void init(Schema pinotSchema, String inputFilePath);
-
- String getInputSchemaType();
-
- SchemaValidatorResult getDataTypeMismatchResult();
-
- SchemaValidatorResult getSingleValueMultiValueFieldMismatchResult();
-
- SchemaValidatorResult getMultiValueStructureMismatchResult();
-
- SchemaValidatorResult getMissingPinotColumnResult();
+public class SchemaValidationResults {
+ SchemaValidatorResult _dataTypeMismatch = new SchemaValidatorResult();
+ SchemaValidatorResult _singleValueMultiValueFieldMismatch = new SchemaValidatorResult();
+ SchemaValidatorResult _multiValueStructureMismatch = new SchemaValidatorResult();
+ SchemaValidatorResult _missingPinotColumnResult = new SchemaValidatorResult();
+
+ public SchemaValidatorResult getDataTypeMismatchResult() {
+ return _dataTypeMismatch;
+ }
+
+ public SchemaValidatorResult getSingleValueMultiValueFieldMismatchResult() {
+ return _singleValueMultiValueFieldMismatch;
+ }
+
+ public SchemaValidatorResult getMultiValueStructureMismatchResult() {
+ return _multiValueStructureMismatch;
+ }
+
+ public SchemaValidatorResult getMissingPinotColumnResult() {
+ return _missingPinotColumnResult;
+ }
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
index 5c45d6b..fc3f9a3 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
@@ -61,6 +61,24 @@ public class GenericRow {
*/
public static final String SKIP_RECORD_KEY = "$SKIP_RECORD_KEY$";
+ /**
+ * This key is used to identify whether there is data type mismatch so that it requires a data type conversion.
+ * E.g. the Pinot column is of int type, whereas the input column is of long type.
+ */
+ public static final String DATA_TYPE_MISMATCH_KEY = "$DATA_TYPE_MISMATCH_KEY$";
+
+ /**
+ * This key is used to identify whether the input value is a map structure for multi-value column.
+ * This is necessary for us to identify whether there is any existing use case that is leveraging this way to fetch values.
+ */
+ public static final String MULTI_VALUE_STRUCTURE_MISMATCH_KEY = "$MULTI_VALUE_STRUCTURE_MISMATCH_KEY$";
+
+ /**
+ * This key is used to identify whether there is a single-value multi-value mismatch. E.g. the Pinot column is single-value,
+ * whereas the input data is a Collection/Map/object[].
+ */
+ public static final String SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY = "$SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY$";
+
private final Map<String, Object> _fieldToValueMap = new HashMap<>();
private final Set<String> _nullValueFields = new HashSet<>();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org