You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2020/09/04 19:15:43 UTC

[incubator-pinot] branch add-row-based-schema-validator created (now 98216c7)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a change to branch add-row-based-schema-validator
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 98216c7  Add row based schema validation code to detect schema mismatch

This branch includes the following new commits:

     new 98216c7  Add row based schema validation code to detect schema mismatch

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Add row based schema validation code to detect schema mismatch

Posted by jl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch add-row-based-schema-validator
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 98216c725bd1bec72db2cbab4ac2b12d7ff237bf
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Fri Sep 4 12:15:14 2020 -0700

    Add row based schema validation code to detect schema mismatch
---
 .../apache/pinot/common/utils/PinotDataType.java   |   9 ++
 .../recordtransformer/DataTypeTransformer.java     |  50 ++++++++--
 .../pinot/core/segment/creator/SegmentCreator.java |   4 +-
 .../creator/impl/SegmentColumnarIndexCreator.java  |  29 +++++-
 .../impl/SegmentIndexCreationDriverImpl.java       |   3 +-
 .../recordtransformer/DataTypeTransformerTest.java |  70 ++++++-------
 .../pinot/query/executor/QueryExecutorTest.java    |  15 ++-
 .../hadoop/job/mappers/SegmentCreationMapper.java  |  22 ++--
 .../hadoop/data/IngestionSchemaValidatorTest.java  | 111 ++++++++++++++++-----
 .../data/test_sample_data_multi_value.avro         | Bin 0 -> 12222227 bytes
 .../avro/AvroIngestionSchemaValidator.java         |  53 +++++-----
 .../pinot/spi/data/IngestionSchemaValidator.java   |   8 +-
 .../spi/data/RowBasedSchemaValidationResults.java  |  64 ++++++++++++
 ...Validator.java => SchemaValidationResults.java} |  38 +++----
 .../apache/pinot/spi/data/readers/GenericRow.java  |  18 ++++
 15 files changed, 358 insertions(+), 136 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
index 97c017e..c46a33e 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
@@ -586,22 +586,31 @@ public enum PinotDataType {
 
   public PinotDataType getSingleValueType() {
     switch (this) {
+      case BYTE:
       case BYTE_ARRAY:
         return BYTE;
+      case CHARACTER:
       case CHARACTER_ARRAY:
         return CHARACTER;
+      case SHORT:
       case SHORT_ARRAY:
         return SHORT;
+      case INTEGER:
       case INTEGER_ARRAY:
         return INTEGER;
+      case LONG:
       case LONG_ARRAY:
         return LONG;
+      case FLOAT:
       case FLOAT_ARRAY:
         return FLOAT;
+      case DOUBLE:
       case DOUBLE_ARRAY:
         return DOUBLE;
+      case STRING:
       case STRING_ARRAY:
         return STRING;
+      case OBJECT:
       case OBJECT_ARRAY:
         return OBJECT;
       default:
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java
index 4ab665c..a8bee3e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java
@@ -24,8 +24,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.PinotDataType;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -85,7 +87,7 @@ public class DataTypeTransformer implements RecordTransformer {
         continue;
       }
       PinotDataType dest = entry.getValue();
-      value = standardize(column, value, dest.isSingleValue());
+      value = standardize(record, column, value, dest.isSingleValue());
       // NOTE: The standardized value could be null for empty Collection/Map/Object[].
       if (value == null) {
         record.putValue(column, null);
@@ -109,6 +111,9 @@ public class DataTypeTransformer implements RecordTransformer {
         }
       }
       if (source != dest) {
+        if (source.getSingleValueType() != dest.getSingleValueType()) {
+          putValueAsSetToKey(record, GenericRow.DATA_TYPE_MISMATCH_KEY, column);
+        }
         value = dest.convert(value, source);
       }
 
@@ -127,28 +132,39 @@ public class DataTypeTransformer implements RecordTransformer {
    */
   @VisibleForTesting
   @Nullable
-  static Object standardize(String column, @Nullable Object value, boolean isSingleValue) {
+  static Object standardize(GenericRow record, String column, @Nullable Object value, boolean isSingleValue) {
+    return standardize(record, column, value, isSingleValue, 1);
+  }
+
+  static Object standardize(GenericRow record, String column, @Nullable Object value, boolean isSingleValue, int level) {
     if (value == null) {
       return null;
     }
+    // If it's single-value column and the value is Collection/Map/Object[], mark the key.
     if (value instanceof Collection) {
-      return standardizeCollection(column, (Collection) value, isSingleValue);
+      return standardizeCollection(record, column, (Collection) value, isSingleValue, level);
     }
     if (value instanceof Map) {
-      return standardizeCollection(column, ((Map) value).values(), isSingleValue);
+      // If it's a map structure, mark the key.
+      putValueAsSetToKey(record, GenericRow.MULTI_VALUE_STRUCTURE_MISMATCH_KEY, column);
+      Collection values = ((Map) value).values();
+      return standardizeCollection(record, column, values, isSingleValue, level);
     }
     if (value instanceof Object[]) {
+      if (isSingleValue && level == 1) {
+        putValueAsSetToKey(record, GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY, column);
+      }
       Object[] values = (Object[]) value;
       int numValues = values.length;
       if (numValues == 0) {
         return null;
       }
       if (numValues == 1) {
-        return standardize(column, values[0], isSingleValue);
+        return standardize(record, column, values[0], isSingleValue, level + 1);
       }
       List<Object> standardizedValues = new ArrayList<>(numValues);
       for (Object singleValue : values) {
-        Object standardizedValue = standardize(column, singleValue, true);
+        Object standardizedValue = standardize(record, column, singleValue, true, level + 1);
         if (standardizedValue != null) {
           standardizedValues.add(standardizedValue);
         }
@@ -164,20 +180,27 @@ public class DataTypeTransformer implements RecordTransformer {
           Arrays.toString(values), column);
       return standardizedValues.toArray();
     }
+    // If it's multi-value column and the level is 1, mark the key.
+    if (!isSingleValue && level == 1) {
+      putValueAsSetToKey(record, GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY, column);
+    }
     return value;
   }
 
-  private static Object standardizeCollection(String column, Collection collection, boolean isSingleValue) {
+  private static Object standardizeCollection(GenericRow record, String column, Collection collection, boolean isSingleValue, int level) {
+    if (isSingleValue && level == 1) {
+      putValueAsSetToKey(record, GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY, column);
+    }
     int numValues = collection.size();
     if (numValues == 0) {
       return null;
     }
     if (numValues == 1) {
-      return standardize(column, collection.iterator().next(), isSingleValue);
+      return standardize(record, column, collection.iterator().next(), isSingleValue, level + 1);
     }
     List<Object> standardizedValues = new ArrayList<>(numValues);
     for (Object singleValue : collection) {
-      Object standardizedValue = standardize(column, singleValue, true);
+      Object standardizedValue = standardize(record, column, singleValue, true, level + 1);
       if (standardizedValue != null) {
         standardizedValues.add(standardizedValue);
       }
@@ -193,4 +216,13 @@ public class DataTypeTransformer implements RecordTransformer {
         .checkState(!isSingleValue, "Cannot read single-value from Collection: %s for column: %s", collection, column);
     return standardizedValues.toArray();
   }
+
+  private static void putValueAsSetToKey(GenericRow record, String key, String value) {
+    Set<String> valueSet = (Set) record.getValue(key);
+    if (valueSet == null) {
+      valueSet = new HashSet<>();
+      record.putValue(key, valueSet);
+    }
+    valueSet.add(value);
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java
index 2ba6246..adf6141 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Map;
 import org.apache.commons.configuration.ConfigurationException;
+import org.apache.pinot.spi.data.IngestionSchemaValidator;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
@@ -43,7 +44,8 @@ public interface SegmentCreator extends Closeable {
    * @throws Exception
    */
   void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreationInfo segmentIndexCreationInfo,
-      Map<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir)
+      Map<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir,
+      IngestionSchemaValidator ingestionSchemaValidator)
       throws Exception;
 
   /**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
index 4489dc8..b0ed11d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -57,6 +57,8 @@ import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.FieldSpec.FieldType;
+import org.apache.pinot.spi.data.IngestionSchemaValidator;
+import org.apache.pinot.spi.data.RowBasedSchemaValidationResults;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.utils.TimeUtils;
@@ -79,6 +81,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
   // TODO Refactor class name to match interface name
   private static final Logger LOGGER = LoggerFactory.getLogger(SegmentColumnarIndexCreator.class);
   private SegmentGeneratorConfig config;
+  private IngestionSchemaValidator _ingestionSchemaValidator;
   private Map<String, ColumnIndexCreationInfo> indexCreationInfoMap;
   private Map<String, SegmentDictionaryCreator> _dictionaryCreatorMap = new HashMap<>();
   private Map<String, ForwardIndexCreator> _forwardIndexCreatorMap = new HashMap<>();
@@ -96,11 +99,13 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
 
   @Override
   public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreationInfo segmentIndexCreationInfo,
-      Map<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir)
+      Map<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir,
+      IngestionSchemaValidator ingestionSchemaValidator)
       throws Exception {
     docIdCounter = 0;
     config = segmentCreationSpec;
     this.indexCreationInfoMap = indexCreationInfoMap;
+    _ingestionSchemaValidator = ingestionSchemaValidator;
 
     // Check that the output directory does not exist
     Preconditions.checkState(!outDir.exists(), "Segment output directory: %s already exists", outDir);
@@ -304,6 +309,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
 
   @Override
   public void indexRow(GenericRow row) {
+    validateRowBasedSchemas(row);
     for (Map.Entry<String, ForwardIndexCreator> entry : _forwardIndexCreatorMap.entrySet()) {
       String columnName = entry.getKey();
       ForwardIndexCreator forwardIndexCreator = entry.getValue();
@@ -400,6 +406,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
       nullValueVectorCreator.seal();
     }
     writeMetadata();
+    _ingestionSchemaValidator.getRowBasedSchemaValidationResults().gatherRowBasedSchemaValidationResults();
   }
 
   private void writeMetadata()
@@ -558,6 +565,26 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
     }
   }
 
+  private void validateRowBasedSchemas(GenericRow row) {
+    if (_ingestionSchemaValidator == null) {
+      return;
+    }
+    RowBasedSchemaValidationResults rowBasedSchemaValidationResults = _ingestionSchemaValidator.getRowBasedSchemaValidationResults();
+
+    if (row.getValue(GenericRow.MULTI_VALUE_STRUCTURE_MISMATCH_KEY) != null) {
+      Set<String> columns = (Set) row.getValue(GenericRow.MULTI_VALUE_STRUCTURE_MISMATCH_KEY);
+      rowBasedSchemaValidationResults.collectMultiValueStructureMismatchColumns(columns);
+    }
+    if (row.getValue(GenericRow.DATA_TYPE_MISMATCH_KEY) != null) {
+      Set<String> columns = (Set) row.getValue(GenericRow.DATA_TYPE_MISMATCH_KEY);
+      rowBasedSchemaValidationResults.collectDataTypeMismatchColumns(columns);
+    }
+    if (row.getValue(GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY) != null) {
+      Set<String> columns = (Set) row.getValue(GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY);
+      rowBasedSchemaValidationResults.collectSingleValueMultiValueFieldMismatchColumns(columns);
+    }
+  }
+
   /**
    * Helper method to check whether the given value is a valid property value.
    * <p>Value is invalid iff:
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index cabdc47..e5a4b84 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -187,7 +187,8 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
 
     try {
       // Initialize the index creation using the per-column statistics information
-      indexCreator.init(config, segmentIndexCreationInfo, indexCreationInfoMap, dataSchema, tempIndexDir);
+      indexCreator.init(config, segmentIndexCreationInfo, indexCreationInfoMap, dataSchema, tempIndexDir,
+          _ingestionSchemaValidator);
 
       // Build the index
       recordReader.rewind();
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformerTest.java
index a60c460..be4ad8d 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformerTest.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.pinot.spi.data.readers.GenericRow;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
@@ -39,17 +40,18 @@ public class DataTypeTransformerTest {
     /**
      * Tests for Map
      */
+    GenericRow record = new GenericRow();
 
     // Empty Map
     Map<String, Object> map = Collections.emptyMap();
-    assertNull(DataTypeTransformer.standardize(COLUMN, map, true));
-    assertNull(DataTypeTransformer.standardize(COLUMN, map, false));
+    assertNull(DataTypeTransformer.standardize(record, COLUMN, map, true));
+    assertNull(DataTypeTransformer.standardize(record, COLUMN, map, false));
 
     // Map with single entry
     String expectedValue = "testValue";
     map = Collections.singletonMap("testKey", expectedValue);
-    assertEquals(DataTypeTransformer.standardize(COLUMN, map, true), expectedValue);
-    assertEquals(DataTypeTransformer.standardize(COLUMN, map, false), expectedValue);
+    assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, true), expectedValue);
+    assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValue);
 
     // Map with multiple entries
     Object[] expectedValues = new Object[]{"testValue1", "testValue2"};
@@ -58,12 +60,12 @@ public class DataTypeTransformerTest {
     map.put("testKey2", "testValue2");
     try {
       // Should fail because Map with multiple entries cannot be standardized as single value
-      DataTypeTransformer.standardize(COLUMN, map, true);
+      DataTypeTransformer.standardize(record, COLUMN, map, true);
       fail();
     } catch (Exception e) {
       // Expected
     }
-    assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, map, false), expectedValues);
+    assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValues);
 
     /**
      * Tests for List
@@ -71,24 +73,24 @@ public class DataTypeTransformerTest {
 
     // Empty List
     List<Object> list = Collections.emptyList();
-    assertNull(DataTypeTransformer.standardize(COLUMN, list, true));
-    assertNull(DataTypeTransformer.standardize(COLUMN, list, false));
+    assertNull(DataTypeTransformer.standardize(record, COLUMN, list, true));
+    assertNull(DataTypeTransformer.standardize(record, COLUMN, list, false));
 
     // List with single entry
     list = Collections.singletonList(expectedValue);
-    assertEquals(DataTypeTransformer.standardize(COLUMN, list, true), expectedValue);
-    assertEquals(DataTypeTransformer.standardize(COLUMN, list, false), expectedValue);
+    assertEquals(DataTypeTransformer.standardize(record, COLUMN, list, true), expectedValue);
+    assertEquals(DataTypeTransformer.standardize(record, COLUMN, list, false), expectedValue);
 
     // List with multiple entries
     list = Arrays.asList(expectedValues);
     try {
       // Should fail because List with multiple entries cannot be standardized as single value
-      DataTypeTransformer.standardize(COLUMN, list, true);
+      DataTypeTransformer.standardize(record, COLUMN, list, true);
       fail();
     } catch (Exception e) {
       // Expected
     }
-    assertEquals((Object[]) DataTypeTransformer.standardize(COLUMN, list, false), expectedValues);
+    assertEquals((Object[]) DataTypeTransformer.standardize(record, COLUMN, list, false), expectedValues);
 
     /**
      * Tests for Object[]
@@ -96,24 +98,24 @@ public class DataTypeTransformerTest {
 
     // Empty Object[]
     Object[] values = new Object[0];
-    assertNull(DataTypeTransformer.standardize(COLUMN, values, true));
-    assertNull(DataTypeTransformer.standardize(COLUMN, values, false));
+    assertNull(DataTypeTransformer.standardize(record, COLUMN, values, true));
+    assertNull(DataTypeTransformer.standardize(record, COLUMN, values, false));
 
     // Object[] with single entry
     values = new Object[]{expectedValue};
-    assertEquals(DataTypeTransformer.standardize(COLUMN, values, true), expectedValue);
-    assertEquals(DataTypeTransformer.standardize(COLUMN, values, false), expectedValue);
+    assertEquals(DataTypeTransformer.standardize(record, COLUMN, values, true), expectedValue);
+    assertEquals(DataTypeTransformer.standardize(record, COLUMN, values, false), expectedValue);
 
     // Object[] with multiple entries
     values = new Object[]{"testValue1", "testValue2"};
     try {
       // Should fail because Object[] with multiple entries cannot be standardized as single value
-      DataTypeTransformer.standardize(COLUMN, values, true);
+      DataTypeTransformer.standardize(record, COLUMN, values, true);
       fail();
     } catch (Exception e) {
       // Expected
     }
-    assertEquals((Object[]) DataTypeTransformer.standardize(COLUMN, values, false), expectedValues);
+    assertEquals((Object[]) DataTypeTransformer.standardize(record, COLUMN, values, false), expectedValues);
 
     /**
      * Tests for nested Map/List/Object[]
@@ -121,32 +123,32 @@ public class DataTypeTransformerTest {
 
     // Map with empty List
     map = Collections.singletonMap("testKey", Collections.emptyList());
-    assertNull(DataTypeTransformer.standardize(COLUMN, map, true));
-    assertNull(DataTypeTransformer.standardize(COLUMN, map, false));
+    assertNull(DataTypeTransformer.standardize(record, COLUMN, map, true));
+    assertNull(DataTypeTransformer.standardize(record, COLUMN, map, false));
 
     // Map with single-entry List
     map = Collections.singletonMap("testKey", Collections.singletonList(expectedValue));
-    assertEquals(DataTypeTransformer.standardize(COLUMN, map, true), expectedValue);
-    assertEquals(DataTypeTransformer.standardize(COLUMN, map, false), expectedValue);
+    assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, true), expectedValue);
+    assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValue);
 
     // Map with one empty Map and one single-entry Map
     map = new HashMap<>();
     map.put("testKey1", Collections.emptyMap());
     map.put("testKey2", Collections.singletonMap("testKey", expectedValue));
     // Can be standardized into single value because empty Map should be ignored
-    assertEquals(DataTypeTransformer.standardize(COLUMN, map, true), expectedValue);
-    assertEquals(DataTypeTransformer.standardize(COLUMN, map, false), expectedValue);
+    assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, true), expectedValue);
+    assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValue);
 
     // Map with multi-entries List
     map = Collections.singletonMap("testKey", Arrays.asList(expectedValues));
     try {
       // Should fail because Map with multiple entries cannot be standardized as single value
-      DataTypeTransformer.standardize(COLUMN, map, true);
+      DataTypeTransformer.standardize(record, COLUMN, map, true);
       fail();
     } catch (Exception e) {
       // Expected
     }
-    assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, map, false), expectedValues);
+    assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValues);
 
     // Map with one empty Map, one single-entry List and one single-entry Object[]
     map = new HashMap<>();
@@ -155,12 +157,12 @@ public class DataTypeTransformerTest {
     map.put("testKey3", new Object[]{"testValue2"});
     try {
       // Should fail because Map with multiple entries cannot be standardized as single value
-      DataTypeTransformer.standardize(COLUMN, map, true);
+      DataTypeTransformer.standardize(record, COLUMN, map, true);
       fail();
     } catch (Exception e) {
       // Expected
     }
-    assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, map, false), expectedValues);
+    assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValues);
 
     // List with two single-entry Maps and one empty Map
     list = Arrays
@@ -168,35 +170,35 @@ public class DataTypeTransformerTest {
             Collections.emptyMap());
     try {
       // Should fail because List with multiple entries cannot be standardized as single value
-      DataTypeTransformer.standardize(COLUMN, list, true);
+      DataTypeTransformer.standardize(record, COLUMN, list, true);
       fail();
     } catch (Exception e) {
       // Expected
     }
-    assertEquals((Object[]) DataTypeTransformer.standardize(COLUMN, list, false), expectedValues);
+    assertEquals((Object[]) DataTypeTransformer.standardize(record, COLUMN, list, false), expectedValues);
 
     // Object[] with two single-entry Maps
     values = new Object[]{Collections.singletonMap("testKey", "testValue1"), Collections.singletonMap("testKey",
         "testValue2")};
     try {
       // Should fail because Object[] with multiple entries cannot be standardized as single value
-      DataTypeTransformer.standardize(COLUMN, values, true);
+      DataTypeTransformer.standardize(record, COLUMN, values, true);
       fail();
     } catch (Exception e) {
       // Expected
     }
-    assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, values, false), expectedValues);
+    assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(record, COLUMN, values, false), expectedValues);
 
     // Object[] with one empty Object[], one multi-entries List of nested Map/List/Object[]
     values = new Object[]{new Object[0], Collections.singletonList(
         Collections.singletonMap("testKey", "testValue1")), Collections.singletonMap("testKey",
         Arrays.asList(new Object[]{"testValue2"}, Collections.emptyMap()))};
     try {
-      DataTypeTransformer.standardize(COLUMN, values, true);
+      DataTypeTransformer.standardize(record, COLUMN, values, true);
       fail();
     } catch (Exception e) {
       // Expected
     }
-    assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, values, false), expectedValues);
+    assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(record, COLUMN, values, false), expectedValues);
   }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
index c28a2d8..994f684 100644
--- a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
@@ -93,10 +93,17 @@ public class QueryExecutorTest {
       driver.init(config);
       driver.build();
       IngestionSchemaValidator ingestionSchemaValidator = driver.getIngestionSchemaValidator();
-      Assert.assertFalse(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected());
-      Assert.assertFalse(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
-      Assert.assertFalse(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected());
-      Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
+
+      Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getDataTypeMismatchResult()
+          .isMismatchDetected());
+      Assert.assertFalse(
+          ingestionSchemaValidator.getFileBasedSchemaValidationResults().getSingleValueMultiValueFieldMismatchResult()
+              .isMismatchDetected());
+      Assert.assertFalse(
+          ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMultiValueStructureMismatchResult()
+              .isMismatchDetected());
+      Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMissingPinotColumnResult()
+          .isMismatchDetected());
       _indexSegments.add(ImmutableSegmentLoader.load(new File(INDEX_DIR, driver.getSegmentName()), ReadMode.mmap));
       _segmentNames.add(driver.getSegmentName());
     }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
index fcc5653..aa3812b 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
@@ -52,8 +52,10 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableCustomConfig;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.RowBasedSchemaValidationResults;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.IngestionSchemaValidator;
+import org.apache.pinot.spi.data.SchemaValidationResults;
 import org.apache.pinot.spi.data.readers.FileFormat;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
 import org.apache.pinot.spi.utils.DataSizeUtils;
@@ -387,23 +389,27 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
     if (ingestionSchemaValidator == null) {
       return;
     }
-    if (ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected()) {
+    SchemaValidationResults fileBasedSchemaValidationResults = ingestionSchemaValidator.getFileBasedSchemaValidationResults();
+    if (fileBasedSchemaValidationResults.getDataTypeMismatchResult().isMismatchDetected()) {
       _dataTypeMismatch++;
-      _logger.warn(ingestionSchemaValidator.getDataTypeMismatchResult().getMismatchReason());
+      _logger.warn(fileBasedSchemaValidationResults.getDataTypeMismatchResult().getMismatchReason());
     }
-    if (ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected()) {
+    if (fileBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected()) {
       _singleValueMultiValueFieldMismatch++;
-      ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().getMismatchReason();
+      fileBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().getMismatchReason();
     }
-    if (ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected()) {
+    if (fileBasedSchemaValidationResults.getMultiValueStructureMismatchResult().isMismatchDetected()) {
       _multiValueStructureMismatch++;
-      ingestionSchemaValidator.getMultiValueStructureMismatchResult().getMismatchReason();
+      fileBasedSchemaValidationResults.getMultiValueStructureMismatchResult().getMismatchReason();
     }
-    if (ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected()) {
+    if (fileBasedSchemaValidationResults.getMissingPinotColumnResult().isMismatchDetected()) {
       _missingPinotColumn++;
-      ingestionSchemaValidator.getMissingPinotColumnResult().getMismatchReason();
+      fileBasedSchemaValidationResults.getMissingPinotColumnResult().getMismatchReason();
     }
 
+    RowBasedSchemaValidationResults rowBasedSchemaValidationResults = ingestionSchemaValidator.getRowBasedSchemaValidationResults();
+    //TODO add logic to detect.
+
     if (isSchemaMismatch() && _failIfSchemaMismatch) {
       throw new RuntimeException("Schema mismatch detected. Forcing to fail the job. Please checking log message above.");
     }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/data/IngestionSchemaValidatorTest.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/data/IngestionSchemaValidatorTest.java
index fec3583..9170b79 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/data/IngestionSchemaValidatorTest.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/data/IngestionSchemaValidatorTest.java
@@ -20,17 +20,35 @@ package org.apache.pinot.hadoop.data;
 
 import com.google.common.base.Preconditions;
 import java.io.File;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.RowBasedSchemaValidationResults;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.IngestionSchemaValidator;
 import org.apache.pinot.spi.data.SchemaValidatorFactory;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 
 public class IngestionSchemaValidatorTest {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "QueryExecutorTest");
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteQuietly(INDEX_DIR);
+    Assert.assertTrue(INDEX_DIR.mkdirs());
+  }
+
   @Test
-  public void testAvroIngestionSchemaValidator()
+  public void testAvroIngestionSchemaValidatorFileBasedSchemaValidation()
       throws Exception {
     String inputFilePath = new File(
         Preconditions.checkNotNull(IngestionSchemaValidatorTest.class.getClassLoader().getResource("data/test_sample_data.avro"))
@@ -47,10 +65,10 @@ public class IngestionSchemaValidatorTest {
     IngestionSchemaValidator ingestionSchemaValidator =
         SchemaValidatorFactory.getSchemaValidator(pinotSchema, recordReaderClassName, inputFilePath);
     Assert.assertNotNull(ingestionSchemaValidator);
-    Assert.assertFalse(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected());
-    Assert.assertFalse(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
-    Assert.assertFalse(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected());
-    Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
+    Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getDataTypeMismatchResult().isMismatchDetected());
+    Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
+    Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMultiValueStructureMismatchResult().isMismatchDetected());
+    Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMissingPinotColumnResult().isMismatchDetected());
 
     // Adding one extra column
     pinotSchema = new Schema.SchemaBuilder()
@@ -64,12 +82,11 @@ public class IngestionSchemaValidatorTest {
     ingestionSchemaValidator =
         SchemaValidatorFactory.getSchemaValidator(pinotSchema, recordReaderClassName, inputFilePath);
     Assert.assertNotNull(ingestionSchemaValidator);
-    Assert.assertFalse(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected());
-    Assert.assertFalse(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
-    Assert.assertFalse(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected());
-    Assert.assertTrue(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
-    Assert.assertNotNull(ingestionSchemaValidator.getMissingPinotColumnResult().getMismatchReason());
-    System.out.println(ingestionSchemaValidator.getMissingPinotColumnResult().getMismatchReason());
+    Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getDataTypeMismatchResult().isMismatchDetected());
+    Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
+    Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMultiValueStructureMismatchResult().isMismatchDetected());
+    Assert.assertTrue(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMissingPinotColumnResult().isMismatchDetected());
+    Assert.assertNotNull(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMissingPinotColumnResult().getMismatchReason());
 
     // Change the data type of column1 from LONG to STRING
     pinotSchema = new Schema.SchemaBuilder()
@@ -81,12 +98,11 @@ public class IngestionSchemaValidatorTest {
     ingestionSchemaValidator =
         SchemaValidatorFactory.getSchemaValidator(pinotSchema, recordReaderClassName, inputFilePath);
     Assert.assertNotNull(ingestionSchemaValidator);
-    Assert.assertTrue(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected());
-    Assert.assertNotNull(ingestionSchemaValidator.getDataTypeMismatchResult().getMismatchReason());
-    System.out.println(ingestionSchemaValidator.getDataTypeMismatchResult().getMismatchReason());
-    Assert.assertFalse(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
-    Assert.assertFalse(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected());
-    Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
+    Assert.assertTrue(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getDataTypeMismatchResult().isMismatchDetected());
+    Assert.assertNotNull(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getDataTypeMismatchResult().getMismatchReason());
+    Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
+    Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMultiValueStructureMismatchResult().isMismatchDetected());
+    Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMissingPinotColumnResult().isMismatchDetected());
 
     // Change column2 from single-value column to multi-value column
     pinotSchema = new Schema.SchemaBuilder()
@@ -98,14 +114,59 @@ public class IngestionSchemaValidatorTest {
     ingestionSchemaValidator =
         SchemaValidatorFactory.getSchemaValidator(pinotSchema, recordReaderClassName, inputFilePath);
     Assert.assertNotNull(ingestionSchemaValidator);
-    Assert.assertFalse(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected());
-    Assert.assertTrue(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
-    Assert.assertNotNull(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().getMismatchReason());
-    System.out.println(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().getMismatchReason());
-    Assert.assertTrue(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected());
-    Assert.assertNotNull(ingestionSchemaValidator.getMultiValueStructureMismatchResult().getMismatchReason());
-    System.out.println(ingestionSchemaValidator.getMultiValueStructureMismatchResult().getMismatchReason());
-    Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
+    Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getDataTypeMismatchResult().isMismatchDetected());
+    Assert.assertTrue(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
+    Assert.assertNotNull(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getSingleValueMultiValueFieldMismatchResult().getMismatchReason());
+    Assert.assertTrue(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMultiValueStructureMismatchResult().isMismatchDetected());
+    Assert.assertNotNull(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMultiValueStructureMismatchResult().getMismatchReason());
+    Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMissingPinotColumnResult().isMismatchDetected());
+  }
+
+  @Test
+  public void testAvroIngestionValidatorRowBasedSchemaValidation()
+      throws Exception {
+    String tableName = "testTable";
+    File avroFile = new File(
+        Preconditions.checkNotNull(IngestionSchemaValidatorTest.class.getClassLoader().getResource("data/test_sample_data_multi_value.avro"))
+            .getFile());
+
+    // column 2 is of int type in the AVRO.
+    // column3 and column16 are both of array of map structure.
+    Schema pinotSchema = new Schema.SchemaBuilder()
+        .addSingleValueDimension("column1", FieldSpec.DataType.STRING)
+        .addSingleValueDimension("column2", FieldSpec.DataType.LONG)
+        .addSingleValueDimension("column3", FieldSpec.DataType.STRING)
+        .addMultiValueDimension("column16", FieldSpec.DataType.STRING)
+        .addMetric("metric_nus_impressions", FieldSpec.DataType.LONG).build();
+
+    SegmentGeneratorConfig segmentGeneratorConfig =
+        new SegmentGeneratorConfig(new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).build(),
+            pinotSchema);
+    segmentGeneratorConfig.setInputFilePath(avroFile.getAbsolutePath());
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
+    segmentGeneratorConfig.setTableName(tableName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+
+    RowBasedSchemaValidationResults rowBasedSchemaValidationResults = driver.getIngestionSchemaValidator().getRowBasedSchemaValidationResults();
+    Assert.assertTrue(rowBasedSchemaValidationResults.getDataTypeMismatchResult().isMismatchDetected());
+    Assert.assertNotNull(rowBasedSchemaValidationResults.getDataTypeMismatchResult().getMismatchReason());
+    System.out.println(rowBasedSchemaValidationResults.getDataTypeMismatchResult().getMismatchReason());
+
+    Assert.assertTrue(rowBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
+    Assert.assertNotNull(rowBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().getMismatchReason());
+    System.out.println(rowBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().getMismatchReason());
+
+    Assert.assertTrue(rowBasedSchemaValidationResults.getMultiValueStructureMismatchResult().isMismatchDetected());
+    Assert.assertNotNull(rowBasedSchemaValidationResults.getMultiValueStructureMismatchResult().getMismatchReason());
+    System.out.println(rowBasedSchemaValidationResults.getMultiValueStructureMismatchResult().getMismatchReason());
+
+  }
 
+  @AfterClass
+  public void tearDown() {
+    FileUtils.deleteQuietly(INDEX_DIR);
   }
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/resources/data/test_sample_data_multi_value.avro b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/resources/data/test_sample_data_multi_value.avro
new file mode 100644
index 0000000..4e4a4d8
Binary files /dev/null and b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/resources/data/test_sample_data_multi_value.avro differ
diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroIngestionSchemaValidator.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroIngestionSchemaValidator.java
index d0ee84f..ee6706d 100644
--- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroIngestionSchemaValidator.java
+++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroIngestionSchemaValidator.java
@@ -23,22 +23,23 @@ import java.io.IOException;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.RowBasedSchemaValidationResults;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.IngestionSchemaValidator;
-import org.apache.pinot.spi.data.SchemaValidatorResult;
+import org.apache.pinot.spi.data.SchemaValidationResults;
 
 
 /**
  * Schema validator to validate pinot schema and avro schema
  */
 public class AvroIngestionSchemaValidator implements IngestionSchemaValidator {
+  private SchemaValidationResults _fileBasedSchemaValidationResults = new SchemaValidationResults();
+  private RowBasedSchemaValidationResults _rowBasedSchemaValidationResults = new RowBasedSchemaValidationResults();
+
   private org.apache.avro.Schema _avroSchema;
   private Schema _pinotSchema;
 
-  private SchemaValidatorResult _dataTypeMismatch = new SchemaValidatorResult();
-  private SchemaValidatorResult _singleValueMultiValueFieldMismatch = new SchemaValidatorResult();
-  private SchemaValidatorResult _multiValueStructureMismatch = new SchemaValidatorResult();
-  private SchemaValidatorResult _missingPinotColumn = new SchemaValidatorResult();
+
 
   public AvroIngestionSchemaValidator() {
   }
@@ -48,7 +49,7 @@ public class AvroIngestionSchemaValidator implements IngestionSchemaValidator {
     _pinotSchema = pinotSchema;
     _avroSchema = extractAvroSchemaFromFile(inputFilePath);
 
-    validateSchemas();
+    validateFileBasedSchemas();
   }
 
   @Override
@@ -57,23 +58,13 @@ public class AvroIngestionSchemaValidator implements IngestionSchemaValidator {
   }
 
   @Override
-  public SchemaValidatorResult getDataTypeMismatchResult() {
-    return _dataTypeMismatch;
-  }
-
-  @Override
-  public SchemaValidatorResult getSingleValueMultiValueFieldMismatchResult() {
-    return _singleValueMultiValueFieldMismatch;
-  }
-
-  @Override
-  public SchemaValidatorResult getMultiValueStructureMismatchResult() {
-    return _multiValueStructureMismatch;
+  public SchemaValidationResults getFileBasedSchemaValidationResults() {
+    return _fileBasedSchemaValidationResults;
   }
 
   @Override
-  public SchemaValidatorResult getMissingPinotColumnResult() {
-    return _missingPinotColumn;
+  public RowBasedSchemaValidationResults getRowBasedSchemaValidationResults() {
+    return _rowBasedSchemaValidationResults;
   }
 
   private org.apache.avro.Schema extractAvroSchemaFromFile(String inputPath) {
@@ -87,12 +78,12 @@ public class AvroIngestionSchemaValidator implements IngestionSchemaValidator {
     }
   }
 
-  private void validateSchemas() {
+  private void validateFileBasedSchemas() {
     for (String columnName : _pinotSchema.getPhysicalColumnNames()) {
       FieldSpec fieldSpec = _pinotSchema.getFieldSpecFor(columnName);
       org.apache.avro.Schema.Field avroColumnField = _avroSchema.getField(columnName);
       if (avroColumnField == null) {
-        _missingPinotColumn.addMismatchReason(String
+        _fileBasedSchemaValidationResults.getMissingPinotColumnResult().addMismatchReason(String
             .format("The Pinot column: (%s: %s) is missing in the %s schema of input data.", columnName,
                 fieldSpec.getDataType().name(), getInputSchemaType()));
         continue;
@@ -116,7 +107,7 @@ public class AvroIngestionSchemaValidator implements IngestionSchemaValidator {
       }
 
       if (!fieldSpec.getDataType().name().equalsIgnoreCase(avroColumnType.toString())) {
-        _dataTypeMismatch.addMismatchReason(String
+        _fileBasedSchemaValidationResults.getDataTypeMismatchResult().addMismatchReason(String
             .format("The Pinot column: (%s: %s) doesn't match with the column (%s: %s) in input %s schema.", columnName,
                 fieldSpec.getDataType().name(), avroColumnSchema.getName(), avroColumnType.toString(),
                 getInputSchemaType()));
@@ -125,20 +116,22 @@ public class AvroIngestionSchemaValidator implements IngestionSchemaValidator {
       if (fieldSpec.isSingleValueField()) {
         if (avroColumnType.ordinal() < org.apache.avro.Schema.Type.STRING.ordinal()) {
           // the column is a complex structure
-          _singleValueMultiValueFieldMismatch.addMismatchReason(String.format(
-              "The Pinot column: %s is 'single-value' column but the column: %s from input %s is 'multi-value' column.",
-              columnName, avroColumnSchema.getName(), getInputSchemaType()));
+          _fileBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().addMismatchReason(String
+              .format(
+                  "The Pinot column: %s is 'single-value' column but the column: %s from input %s is 'multi-value' column.",
+                  columnName, avroColumnSchema.getName(), getInputSchemaType()));
         }
       } else {
         if (avroColumnType.ordinal() >= org.apache.avro.Schema.Type.STRING.ordinal()) {
           // the column is a complex structure
-          _singleValueMultiValueFieldMismatch.addMismatchReason(String.format(
-              "The Pinot column: %s is 'multi-value' column but the column: %s from input %s schema is 'single-value' column.",
-              columnName, avroColumnSchema.getName(), getInputSchemaType()));
+          _fileBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().addMismatchReason(String
+              .format(
+                  "The Pinot column: %s is 'multi-value' column but the column: %s from input %s schema is 'single-value' column.",
+                  columnName, avroColumnSchema.getName(), getInputSchemaType()));
         }
         if (avroColumnType != org.apache.avro.Schema.Type.ARRAY) {
           // multi-value column should use array structure for now.
-          _multiValueStructureMismatch.addMismatchReason(String.format(
+          _fileBasedSchemaValidationResults.getMultiValueStructureMismatchResult().addMismatchReason(String.format(
               "The Pinot column: %s is 'multi-value' column but the column: %s from input %s schema is of '%s' type, which should have been of 'array' type.",
               columnName, avroColumnSchema.getName(), getInputSchemaType(), avroColumnType.getName()));
         }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java
index 045327a..67a3da9 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java
@@ -28,11 +28,7 @@ public interface IngestionSchemaValidator {
 
   String getInputSchemaType();
 
-  SchemaValidatorResult getDataTypeMismatchResult();
+  SchemaValidationResults getFileBasedSchemaValidationResults();
 
-  SchemaValidatorResult getSingleValueMultiValueFieldMismatchResult();
-
-  SchemaValidatorResult getMultiValueStructureMismatchResult();
-
-  SchemaValidatorResult getMissingPinotColumnResult();
+  RowBasedSchemaValidationResults getRowBasedSchemaValidationResults();
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/RowBasedSchemaValidationResults.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/RowBasedSchemaValidationResults.java
new file mode 100644
index 0000000..655275a
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/RowBasedSchemaValidationResults.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * This is the extension class on top of {@code SchemaValidationResults} class, since row based schema validation will
+ * be called much more frequently than the file base schema validation. We collect all the mismatch columns into the hash
+ * set and then generate the mismatch message all at once.
+ */
+public class RowBasedSchemaValidationResults extends SchemaValidationResults {
+
+  private Set<String> _dataTypeMismatchColumns = new HashSet<>();
+  private Set<String> _singleValueMultiValueFieldMismatchColumns = new HashSet<>();
+  private Set<String> _multiValueStructureMismatchColumns = new HashSet<>();
+
+  public void collectDataTypeMismatchColumns(Set<String> columns) {
+    _dataTypeMismatchColumns.addAll(columns);
+  }
+
+  public void collectSingleValueMultiValueFieldMismatchColumns(Set<String> columns) {
+    _singleValueMultiValueFieldMismatchColumns.addAll(columns);
+  }
+
+  public void collectMultiValueStructureMismatchColumns(Set<String> columns) {
+    _multiValueStructureMismatchColumns.addAll(columns);
+  }
+
+  public void gatherRowBasedSchemaValidationResults() {
+    if (!_dataTypeMismatchColumns.isEmpty()) {
+      _dataTypeMismatch.addMismatchReason(String.format("Found data type mismatch from the following Pinot columns: %s",
+          _dataTypeMismatchColumns.toString()));
+    }
+    if (!_singleValueMultiValueFieldMismatchColumns.isEmpty()) {
+      _singleValueMultiValueFieldMismatch.addMismatchReason(String
+          .format("Found single-value multi-value field mismatch from the following Pinot columns: %s",
+              _singleValueMultiValueFieldMismatchColumns.toString()));
+    }
+    if (!_multiValueStructureMismatchColumns.isEmpty()) {
+      _multiValueStructureMismatch.addMismatchReason(String
+          .format("Found multi-value structure mismatch from the following Pinot columns: %s",
+              _multiValueStructureMismatchColumns.toString()));
+    }
+  }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/SchemaValidationResults.java
similarity index 51%
copy from pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java
copy to pinot-spi/src/main/java/org/apache/pinot/spi/data/SchemaValidationResults.java
index 045327a..2d793c0 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/SchemaValidationResults.java
@@ -18,21 +18,25 @@
  */
 package org.apache.pinot.spi.data;
 
-
-/**
- * Validator to validate the schema between Pinot schema and input raw data schema
- */
-public interface IngestionSchemaValidator {
-
-  void init(Schema pinotSchema, String inputFilePath);
-
-  String getInputSchemaType();
-
-  SchemaValidatorResult getDataTypeMismatchResult();
-
-  SchemaValidatorResult getSingleValueMultiValueFieldMismatchResult();
-
-  SchemaValidatorResult getMultiValueStructureMismatchResult();
-
-  SchemaValidatorResult getMissingPinotColumnResult();
+public class SchemaValidationResults {
+  SchemaValidatorResult _dataTypeMismatch = new SchemaValidatorResult();
+  SchemaValidatorResult _singleValueMultiValueFieldMismatch = new SchemaValidatorResult();
+  SchemaValidatorResult _multiValueStructureMismatch = new SchemaValidatorResult();
+  SchemaValidatorResult _missingPinotColumnResult = new SchemaValidatorResult();
+
+  public SchemaValidatorResult getDataTypeMismatchResult() {
+    return _dataTypeMismatch;
+  }
+
+  public SchemaValidatorResult getSingleValueMultiValueFieldMismatchResult() {
+    return _singleValueMultiValueFieldMismatch;
+  }
+
+  public SchemaValidatorResult getMultiValueStructureMismatchResult() {
+    return _multiValueStructureMismatch;
+  }
+
+  public SchemaValidatorResult getMissingPinotColumnResult() {
+    return _missingPinotColumnResult;
+  }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
index 5c45d6b..fc3f9a3 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
@@ -61,6 +61,24 @@ public class GenericRow {
    */
   public static final String SKIP_RECORD_KEY = "$SKIP_RECORD_KEY$";
 
+  /**
+   * This key is used to identify whether there is data type mismatch so that it requires a data type conversion.
+   * E.g. the Pinot column is of int type, whereas the input column is of long type.
+   */
+  public static final String DATA_TYPE_MISMATCH_KEY = "$DATA_TYPE_MISMATCH_KEY$";
+
+  /**
+   * This key is used to identify whether the input value is a map structure for multi-value column.
+   * This is necessary for us to identify whether there is any existing use case that is leveraging this way to fetch values.
+   */
+  public static final String MULTI_VALUE_STRUCTURE_MISMATCH_KEY = "$MULTI_VALUE_STRUCTURE_MISMATCH_KEY$";
+
+  /**
+   * This key is used to identify whether there is a single-value multi-value mismatch. E.g. the Pinot column is single-value,
+   * whereas the input data is a Collection/Map/object[].
+   */
+  public static final String SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY = "$SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY$";
+
   private final Map<String, Object> _fieldToValueMap = new HashMap<>();
   private final Set<String> _nullValueFields = new HashSet<>();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org