You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2020/09/04 19:15:44 UTC

[incubator-pinot] 01/01: Add row based schema validation code to detect schema mismatch

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch add-row-based-schema-validator
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 98216c725bd1bec72db2cbab4ac2b12d7ff237bf
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Fri Sep 4 12:15:14 2020 -0700

    Add row based schema validation code to detect schema mismatch
---
 .../apache/pinot/common/utils/PinotDataType.java   |   9 ++
 .../recordtransformer/DataTypeTransformer.java     |  50 ++++++++--
 .../pinot/core/segment/creator/SegmentCreator.java |   4 +-
 .../creator/impl/SegmentColumnarIndexCreator.java  |  29 +++++-
 .../impl/SegmentIndexCreationDriverImpl.java       |   3 +-
 .../recordtransformer/DataTypeTransformerTest.java |  70 ++++++-------
 .../pinot/query/executor/QueryExecutorTest.java    |  15 ++-
 .../hadoop/job/mappers/SegmentCreationMapper.java  |  22 ++--
 .../hadoop/data/IngestionSchemaValidatorTest.java  | 111 ++++++++++++++++-----
 .../data/test_sample_data_multi_value.avro         | Bin 0 -> 12222227 bytes
 .../avro/AvroIngestionSchemaValidator.java         |  53 +++++-----
 .../pinot/spi/data/IngestionSchemaValidator.java   |   8 +-
 .../spi/data/RowBasedSchemaValidationResults.java  |  64 ++++++++++++
 ...Validator.java => SchemaValidationResults.java} |  38 +++----
 .../apache/pinot/spi/data/readers/GenericRow.java  |  18 ++++
 15 files changed, 358 insertions(+), 136 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
index 97c017e..c46a33e 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/PinotDataType.java
@@ -586,22 +586,31 @@ public enum PinotDataType {
 
   public PinotDataType getSingleValueType() {
     switch (this) {
+      case BYTE:
       case BYTE_ARRAY:
         return BYTE;
+      case CHARACTER:
       case CHARACTER_ARRAY:
         return CHARACTER;
+      case SHORT:
       case SHORT_ARRAY:
         return SHORT;
+      case INTEGER:
       case INTEGER_ARRAY:
         return INTEGER;
+      case LONG:
       case LONG_ARRAY:
         return LONG;
+      case FLOAT:
       case FLOAT_ARRAY:
         return FLOAT;
+      case DOUBLE:
       case DOUBLE_ARRAY:
         return DOUBLE;
+      case STRING:
       case STRING_ARRAY:
         return STRING;
+      case OBJECT:
       case OBJECT_ARRAY:
         return OBJECT;
       default:
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java
index 4ab665c..a8bee3e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformer.java
@@ -24,8 +24,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.PinotDataType;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -85,7 +87,7 @@ public class DataTypeTransformer implements RecordTransformer {
         continue;
       }
       PinotDataType dest = entry.getValue();
-      value = standardize(column, value, dest.isSingleValue());
+      value = standardize(record, column, value, dest.isSingleValue());
       // NOTE: The standardized value could be null for empty Collection/Map/Object[].
       if (value == null) {
         record.putValue(column, null);
@@ -109,6 +111,9 @@ public class DataTypeTransformer implements RecordTransformer {
         }
       }
       if (source != dest) {
+        if (source.getSingleValueType() != dest.getSingleValueType()) {
+          putValueAsSetToKey(record, GenericRow.DATA_TYPE_MISMATCH_KEY, column);
+        }
         value = dest.convert(value, source);
       }
 
@@ -127,28 +132,39 @@ public class DataTypeTransformer implements RecordTransformer {
    */
   @VisibleForTesting
   @Nullable
-  static Object standardize(String column, @Nullable Object value, boolean isSingleValue) {
+  static Object standardize(GenericRow record, String column, @Nullable Object value, boolean isSingleValue) {
+    return standardize(record, column, value, isSingleValue, 1);
+  }
+
+  static Object standardize(GenericRow record, String column, @Nullable Object value, boolean isSingleValue, int level) {
     if (value == null) {
       return null;
     }
+    // If it's single-value column and the value is Collection/Map/Object[], mark the key.
     if (value instanceof Collection) {
-      return standardizeCollection(column, (Collection) value, isSingleValue);
+      return standardizeCollection(record, column, (Collection) value, isSingleValue, level);
     }
     if (value instanceof Map) {
-      return standardizeCollection(column, ((Map) value).values(), isSingleValue);
+      // If it's a map structure, mark the key.
+      putValueAsSetToKey(record, GenericRow.MULTI_VALUE_STRUCTURE_MISMATCH_KEY, column);
+      Collection values = ((Map) value).values();
+      return standardizeCollection(record, column, values, isSingleValue, level);
     }
     if (value instanceof Object[]) {
+      if (isSingleValue && level == 1) {
+        putValueAsSetToKey(record, GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY, column);
+      }
       Object[] values = (Object[]) value;
       int numValues = values.length;
       if (numValues == 0) {
         return null;
       }
       if (numValues == 1) {
-        return standardize(column, values[0], isSingleValue);
+        return standardize(record, column, values[0], isSingleValue, level + 1);
       }
       List<Object> standardizedValues = new ArrayList<>(numValues);
       for (Object singleValue : values) {
-        Object standardizedValue = standardize(column, singleValue, true);
+        Object standardizedValue = standardize(record, column, singleValue, true, level + 1);
         if (standardizedValue != null) {
           standardizedValues.add(standardizedValue);
         }
@@ -164,20 +180,27 @@ public class DataTypeTransformer implements RecordTransformer {
           Arrays.toString(values), column);
       return standardizedValues.toArray();
     }
+    // If it's multi-value column and the level is 1, mark the key.
+    if (!isSingleValue && level == 1) {
+      putValueAsSetToKey(record, GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY, column);
+    }
     return value;
   }
 
-  private static Object standardizeCollection(String column, Collection collection, boolean isSingleValue) {
+  private static Object standardizeCollection(GenericRow record, String column, Collection collection, boolean isSingleValue, int level) {
+    if (isSingleValue && level == 1) {
+      putValueAsSetToKey(record, GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY, column);
+    }
     int numValues = collection.size();
     if (numValues == 0) {
       return null;
     }
     if (numValues == 1) {
-      return standardize(column, collection.iterator().next(), isSingleValue);
+      return standardize(record, column, collection.iterator().next(), isSingleValue, level + 1);
     }
     List<Object> standardizedValues = new ArrayList<>(numValues);
     for (Object singleValue : collection) {
-      Object standardizedValue = standardize(column, singleValue, true);
+      Object standardizedValue = standardize(record, column, singleValue, true, level + 1);
       if (standardizedValue != null) {
         standardizedValues.add(standardizedValue);
       }
@@ -193,4 +216,13 @@ public class DataTypeTransformer implements RecordTransformer {
         .checkState(!isSingleValue, "Cannot read single-value from Collection: %s for column: %s", collection, column);
     return standardizedValues.toArray();
   }
+
+  private static void putValueAsSetToKey(GenericRow record, String key, String value) {
+    Set<String> valueSet = (Set) record.getValue(key);
+    if (valueSet == null) {
+      valueSet = new HashSet<>();
+      record.putValue(key, valueSet);
+    }
+    valueSet.add(value);
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java
index 2ba6246..adf6141 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/SegmentCreator.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Map;
 import org.apache.commons.configuration.ConfigurationException;
+import org.apache.pinot.spi.data.IngestionSchemaValidator;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
@@ -43,7 +44,8 @@ public interface SegmentCreator extends Closeable {
    * @throws Exception
    */
   void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreationInfo segmentIndexCreationInfo,
-      Map<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir)
+      Map<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir,
+      IngestionSchemaValidator ingestionSchemaValidator)
       throws Exception;
 
   /**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
index 4489dc8..b0ed11d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -57,6 +57,8 @@ import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.FieldSpec.FieldType;
+import org.apache.pinot.spi.data.IngestionSchemaValidator;
+import org.apache.pinot.spi.data.RowBasedSchemaValidationResults;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.utils.TimeUtils;
@@ -79,6 +81,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
   // TODO Refactor class name to match interface name
   private static final Logger LOGGER = LoggerFactory.getLogger(SegmentColumnarIndexCreator.class);
   private SegmentGeneratorConfig config;
+  private IngestionSchemaValidator _ingestionSchemaValidator;
   private Map<String, ColumnIndexCreationInfo> indexCreationInfoMap;
   private Map<String, SegmentDictionaryCreator> _dictionaryCreatorMap = new HashMap<>();
   private Map<String, ForwardIndexCreator> _forwardIndexCreatorMap = new HashMap<>();
@@ -96,11 +99,13 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
 
   @Override
   public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreationInfo segmentIndexCreationInfo,
-      Map<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir)
+      Map<String, ColumnIndexCreationInfo> indexCreationInfoMap, Schema schema, File outDir,
+      IngestionSchemaValidator ingestionSchemaValidator)
       throws Exception {
     docIdCounter = 0;
     config = segmentCreationSpec;
     this.indexCreationInfoMap = indexCreationInfoMap;
+    _ingestionSchemaValidator = ingestionSchemaValidator;
 
     // Check that the output directory does not exist
     Preconditions.checkState(!outDir.exists(), "Segment output directory: %s already exists", outDir);
@@ -304,6 +309,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
 
   @Override
   public void indexRow(GenericRow row) {
+    validateRowBasedSchemas(row);
     for (Map.Entry<String, ForwardIndexCreator> entry : _forwardIndexCreatorMap.entrySet()) {
       String columnName = entry.getKey();
       ForwardIndexCreator forwardIndexCreator = entry.getValue();
@@ -400,6 +406,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
       nullValueVectorCreator.seal();
     }
     writeMetadata();
+    _ingestionSchemaValidator.getRowBasedSchemaValidationResults().gatherRowBasedSchemaValidationResults();
   }
 
   private void writeMetadata()
@@ -558,6 +565,26 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
     }
   }
 
+  private void validateRowBasedSchemas(GenericRow row) {
+    if (_ingestionSchemaValidator == null) {
+      return;
+    }
+    RowBasedSchemaValidationResults rowBasedSchemaValidationResults = _ingestionSchemaValidator.getRowBasedSchemaValidationResults();
+
+    if (row.getValue(GenericRow.MULTI_VALUE_STRUCTURE_MISMATCH_KEY) != null) {
+      Set<String> columns = (Set) row.getValue(GenericRow.MULTI_VALUE_STRUCTURE_MISMATCH_KEY);
+      rowBasedSchemaValidationResults.collectMultiValueStructureMismatchColumns(columns);
+    }
+    if (row.getValue(GenericRow.DATA_TYPE_MISMATCH_KEY) != null) {
+      Set<String> columns = (Set) row.getValue(GenericRow.DATA_TYPE_MISMATCH_KEY);
+      rowBasedSchemaValidationResults.collectDataTypeMismatchColumns(columns);
+    }
+    if (row.getValue(GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY) != null) {
+      Set<String> columns = (Set) row.getValue(GenericRow.SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY);
+      rowBasedSchemaValidationResults.collectSingleValueMultiValueFieldMismatchColumns(columns);
+    }
+  }
+
   /**
    * Helper method to check whether the given value is a valid property value.
    * <p>Value is invalid iff:
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index cabdc47..e5a4b84 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -187,7 +187,8 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
 
     try {
       // Initialize the index creation using the per-column statistics information
-      indexCreator.init(config, segmentIndexCreationInfo, indexCreationInfoMap, dataSchema, tempIndexDir);
+      indexCreator.init(config, segmentIndexCreationInfo, indexCreationInfoMap, dataSchema, tempIndexDir,
+          _ingestionSchemaValidator);
 
       // Build the index
       recordReader.rewind();
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformerTest.java
index a60c460..be4ad8d 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/DataTypeTransformerTest.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.pinot.spi.data.readers.GenericRow;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
@@ -39,17 +40,18 @@ public class DataTypeTransformerTest {
     /**
      * Tests for Map
      */
+    GenericRow record = new GenericRow();
 
     // Empty Map
     Map<String, Object> map = Collections.emptyMap();
-    assertNull(DataTypeTransformer.standardize(COLUMN, map, true));
-    assertNull(DataTypeTransformer.standardize(COLUMN, map, false));
+    assertNull(DataTypeTransformer.standardize(record, COLUMN, map, true));
+    assertNull(DataTypeTransformer.standardize(record, COLUMN, map, false));
 
     // Map with single entry
     String expectedValue = "testValue";
     map = Collections.singletonMap("testKey", expectedValue);
-    assertEquals(DataTypeTransformer.standardize(COLUMN, map, true), expectedValue);
-    assertEquals(DataTypeTransformer.standardize(COLUMN, map, false), expectedValue);
+    assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, true), expectedValue);
+    assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValue);
 
     // Map with multiple entries
     Object[] expectedValues = new Object[]{"testValue1", "testValue2"};
@@ -58,12 +60,12 @@ public class DataTypeTransformerTest {
     map.put("testKey2", "testValue2");
     try {
       // Should fail because Map with multiple entries cannot be standardized as single value
-      DataTypeTransformer.standardize(COLUMN, map, true);
+      DataTypeTransformer.standardize(record, COLUMN, map, true);
       fail();
     } catch (Exception e) {
       // Expected
     }
-    assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, map, false), expectedValues);
+    assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValues);
 
     /**
      * Tests for List
@@ -71,24 +73,24 @@ public class DataTypeTransformerTest {
 
     // Empty List
     List<Object> list = Collections.emptyList();
-    assertNull(DataTypeTransformer.standardize(COLUMN, list, true));
-    assertNull(DataTypeTransformer.standardize(COLUMN, list, false));
+    assertNull(DataTypeTransformer.standardize(record, COLUMN, list, true));
+    assertNull(DataTypeTransformer.standardize(record, COLUMN, list, false));
 
     // List with single entry
     list = Collections.singletonList(expectedValue);
-    assertEquals(DataTypeTransformer.standardize(COLUMN, list, true), expectedValue);
-    assertEquals(DataTypeTransformer.standardize(COLUMN, list, false), expectedValue);
+    assertEquals(DataTypeTransformer.standardize(record, COLUMN, list, true), expectedValue);
+    assertEquals(DataTypeTransformer.standardize(record, COLUMN, list, false), expectedValue);
 
     // List with multiple entries
     list = Arrays.asList(expectedValues);
     try {
       // Should fail because List with multiple entries cannot be standardized as single value
-      DataTypeTransformer.standardize(COLUMN, list, true);
+      DataTypeTransformer.standardize(record, COLUMN, list, true);
       fail();
     } catch (Exception e) {
       // Expected
     }
-    assertEquals((Object[]) DataTypeTransformer.standardize(COLUMN, list, false), expectedValues);
+    assertEquals((Object[]) DataTypeTransformer.standardize(record, COLUMN, list, false), expectedValues);
 
     /**
      * Tests for Object[]
@@ -96,24 +98,24 @@ public class DataTypeTransformerTest {
 
     // Empty Object[]
     Object[] values = new Object[0];
-    assertNull(DataTypeTransformer.standardize(COLUMN, values, true));
-    assertNull(DataTypeTransformer.standardize(COLUMN, values, false));
+    assertNull(DataTypeTransformer.standardize(record, COLUMN, values, true));
+    assertNull(DataTypeTransformer.standardize(record, COLUMN, values, false));
 
     // Object[] with single entry
     values = new Object[]{expectedValue};
-    assertEquals(DataTypeTransformer.standardize(COLUMN, values, true), expectedValue);
-    assertEquals(DataTypeTransformer.standardize(COLUMN, values, false), expectedValue);
+    assertEquals(DataTypeTransformer.standardize(record, COLUMN, values, true), expectedValue);
+    assertEquals(DataTypeTransformer.standardize(record, COLUMN, values, false), expectedValue);
 
     // Object[] with multiple entries
     values = new Object[]{"testValue1", "testValue2"};
     try {
       // Should fail because Object[] with multiple entries cannot be standardized as single value
-      DataTypeTransformer.standardize(COLUMN, values, true);
+      DataTypeTransformer.standardize(record, COLUMN, values, true);
       fail();
     } catch (Exception e) {
       // Expected
     }
-    assertEquals((Object[]) DataTypeTransformer.standardize(COLUMN, values, false), expectedValues);
+    assertEquals((Object[]) DataTypeTransformer.standardize(record, COLUMN, values, false), expectedValues);
 
     /**
      * Tests for nested Map/List/Object[]
@@ -121,32 +123,32 @@ public class DataTypeTransformerTest {
 
     // Map with empty List
     map = Collections.singletonMap("testKey", Collections.emptyList());
-    assertNull(DataTypeTransformer.standardize(COLUMN, map, true));
-    assertNull(DataTypeTransformer.standardize(COLUMN, map, false));
+    assertNull(DataTypeTransformer.standardize(record, COLUMN, map, true));
+    assertNull(DataTypeTransformer.standardize(record, COLUMN, map, false));
 
     // Map with single-entry List
     map = Collections.singletonMap("testKey", Collections.singletonList(expectedValue));
-    assertEquals(DataTypeTransformer.standardize(COLUMN, map, true), expectedValue);
-    assertEquals(DataTypeTransformer.standardize(COLUMN, map, false), expectedValue);
+    assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, true), expectedValue);
+    assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValue);
 
     // Map with one empty Map and one single-entry Map
     map = new HashMap<>();
     map.put("testKey1", Collections.emptyMap());
     map.put("testKey2", Collections.singletonMap("testKey", expectedValue));
     // Can be standardized into single value because empty Map should be ignored
-    assertEquals(DataTypeTransformer.standardize(COLUMN, map, true), expectedValue);
-    assertEquals(DataTypeTransformer.standardize(COLUMN, map, false), expectedValue);
+    assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, true), expectedValue);
+    assertEquals(DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValue);
 
     // Map with multi-entries List
     map = Collections.singletonMap("testKey", Arrays.asList(expectedValues));
     try {
       // Should fail because Map with multiple entries cannot be standardized as single value
-      DataTypeTransformer.standardize(COLUMN, map, true);
+      DataTypeTransformer.standardize(record, COLUMN, map, true);
       fail();
     } catch (Exception e) {
       // Expected
     }
-    assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, map, false), expectedValues);
+    assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValues);
 
     // Map with one empty Map, one single-entry List and one single-entry Object[]
     map = new HashMap<>();
@@ -155,12 +157,12 @@ public class DataTypeTransformerTest {
     map.put("testKey3", new Object[]{"testValue2"});
     try {
       // Should fail because Map with multiple entries cannot be standardized as single value
-      DataTypeTransformer.standardize(COLUMN, map, true);
+      DataTypeTransformer.standardize(record, COLUMN, map, true);
       fail();
     } catch (Exception e) {
       // Expected
     }
-    assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, map, false), expectedValues);
+    assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(record, COLUMN, map, false), expectedValues);
 
     // List with two single-entry Maps and one empty Map
     list = Arrays
@@ -168,35 +170,35 @@ public class DataTypeTransformerTest {
             Collections.emptyMap());
     try {
       // Should fail because List with multiple entries cannot be standardized as single value
-      DataTypeTransformer.standardize(COLUMN, list, true);
+      DataTypeTransformer.standardize(record, COLUMN, list, true);
       fail();
     } catch (Exception e) {
       // Expected
     }
-    assertEquals((Object[]) DataTypeTransformer.standardize(COLUMN, list, false), expectedValues);
+    assertEquals((Object[]) DataTypeTransformer.standardize(record, COLUMN, list, false), expectedValues);
 
     // Object[] with two single-entry Maps
     values = new Object[]{Collections.singletonMap("testKey", "testValue1"), Collections.singletonMap("testKey",
         "testValue2")};
     try {
       // Should fail because Object[] with multiple entries cannot be standardized as single value
-      DataTypeTransformer.standardize(COLUMN, values, true);
+      DataTypeTransformer.standardize(record, COLUMN, values, true);
       fail();
     } catch (Exception e) {
       // Expected
     }
-    assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, values, false), expectedValues);
+    assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(record, COLUMN, values, false), expectedValues);
 
     // Object[] with one empty Object[], one multi-entries List of nested Map/List/Object[]
     values = new Object[]{new Object[0], Collections.singletonList(
         Collections.singletonMap("testKey", "testValue1")), Collections.singletonMap("testKey",
         Arrays.asList(new Object[]{"testValue2"}, Collections.emptyMap()))};
     try {
-      DataTypeTransformer.standardize(COLUMN, values, true);
+      DataTypeTransformer.standardize(record, COLUMN, values, true);
       fail();
     } catch (Exception e) {
       // Expected
     }
-    assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(COLUMN, values, false), expectedValues);
+    assertEqualsNoOrder((Object[]) DataTypeTransformer.standardize(record, COLUMN, values, false), expectedValues);
   }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
index c28a2d8..994f684 100644
--- a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
@@ -93,10 +93,17 @@ public class QueryExecutorTest {
       driver.init(config);
       driver.build();
       IngestionSchemaValidator ingestionSchemaValidator = driver.getIngestionSchemaValidator();
-      Assert.assertFalse(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected());
-      Assert.assertFalse(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
-      Assert.assertFalse(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected());
-      Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
+
+      Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getDataTypeMismatchResult()
+          .isMismatchDetected());
+      Assert.assertFalse(
+          ingestionSchemaValidator.getFileBasedSchemaValidationResults().getSingleValueMultiValueFieldMismatchResult()
+              .isMismatchDetected());
+      Assert.assertFalse(
+          ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMultiValueStructureMismatchResult()
+              .isMismatchDetected());
+      Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMissingPinotColumnResult()
+          .isMismatchDetected());
       _indexSegments.add(ImmutableSegmentLoader.load(new File(INDEX_DIR, driver.getSegmentName()), ReadMode.mmap));
       _segmentNames.add(driver.getSegmentName());
     }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
index fcc5653..aa3812b 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
@@ -52,8 +52,10 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableCustomConfig;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.RowBasedSchemaValidationResults;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.IngestionSchemaValidator;
+import org.apache.pinot.spi.data.SchemaValidationResults;
 import org.apache.pinot.spi.data.readers.FileFormat;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
 import org.apache.pinot.spi.utils.DataSizeUtils;
@@ -387,23 +389,27 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
     if (ingestionSchemaValidator == null) {
       return;
     }
-    if (ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected()) {
+    SchemaValidationResults fileBasedSchemaValidationResults = ingestionSchemaValidator.getFileBasedSchemaValidationResults();
+    if (fileBasedSchemaValidationResults.getDataTypeMismatchResult().isMismatchDetected()) {
       _dataTypeMismatch++;
-      _logger.warn(ingestionSchemaValidator.getDataTypeMismatchResult().getMismatchReason());
+      _logger.warn(fileBasedSchemaValidationResults.getDataTypeMismatchResult().getMismatchReason());
     }
-    if (ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected()) {
+    if (fileBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected()) {
       _singleValueMultiValueFieldMismatch++;
-      ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().getMismatchReason();
+      fileBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().getMismatchReason();
     }
-    if (ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected()) {
+    if (fileBasedSchemaValidationResults.getMultiValueStructureMismatchResult().isMismatchDetected()) {
       _multiValueStructureMismatch++;
-      ingestionSchemaValidator.getMultiValueStructureMismatchResult().getMismatchReason();
+      fileBasedSchemaValidationResults.getMultiValueStructureMismatchResult().getMismatchReason();
     }
-    if (ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected()) {
+    if (fileBasedSchemaValidationResults.getMissingPinotColumnResult().isMismatchDetected()) {
       _missingPinotColumn++;
-      ingestionSchemaValidator.getMissingPinotColumnResult().getMismatchReason();
+      fileBasedSchemaValidationResults.getMissingPinotColumnResult().getMismatchReason();
     }
 
+    RowBasedSchemaValidationResults rowBasedSchemaValidationResults = ingestionSchemaValidator.getRowBasedSchemaValidationResults();
+    //TODO add logic to detect.
+
     if (isSchemaMismatch() && _failIfSchemaMismatch) {
       throw new RuntimeException("Schema mismatch detected. Forcing to fail the job. Please checking log message above.");
     }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/data/IngestionSchemaValidatorTest.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/data/IngestionSchemaValidatorTest.java
index fec3583..9170b79 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/data/IngestionSchemaValidatorTest.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/data/IngestionSchemaValidatorTest.java
@@ -20,17 +20,35 @@ package org.apache.pinot.hadoop.data;
 
 import com.google.common.base.Preconditions;
 import java.io.File;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.RowBasedSchemaValidationResults;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.IngestionSchemaValidator;
 import org.apache.pinot.spi.data.SchemaValidatorFactory;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 
 public class IngestionSchemaValidatorTest {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "QueryExecutorTest");
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteQuietly(INDEX_DIR);
+    Assert.assertTrue(INDEX_DIR.mkdirs());
+  }
+
   @Test
-  public void testAvroIngestionSchemaValidator()
+  public void testAvroIngestionSchemaValidatorFileBasedSchemaValidation()
       throws Exception {
     String inputFilePath = new File(
         Preconditions.checkNotNull(IngestionSchemaValidatorTest.class.getClassLoader().getResource("data/test_sample_data.avro"))
@@ -47,10 +65,10 @@ public class IngestionSchemaValidatorTest {
     IngestionSchemaValidator ingestionSchemaValidator =
         SchemaValidatorFactory.getSchemaValidator(pinotSchema, recordReaderClassName, inputFilePath);
     Assert.assertNotNull(ingestionSchemaValidator);
-    Assert.assertFalse(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected());
-    Assert.assertFalse(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
-    Assert.assertFalse(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected());
-    Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
+    Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getDataTypeMismatchResult().isMismatchDetected());
+    Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
+    Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMultiValueStructureMismatchResult().isMismatchDetected());
+    Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMissingPinotColumnResult().isMismatchDetected());
 
     // Adding one extra column
     pinotSchema = new Schema.SchemaBuilder()
@@ -64,12 +82,11 @@ public class IngestionSchemaValidatorTest {
     ingestionSchemaValidator =
         SchemaValidatorFactory.getSchemaValidator(pinotSchema, recordReaderClassName, inputFilePath);
     Assert.assertNotNull(ingestionSchemaValidator);
-    Assert.assertFalse(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected());
-    Assert.assertFalse(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
-    Assert.assertFalse(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected());
-    Assert.assertTrue(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
-    Assert.assertNotNull(ingestionSchemaValidator.getMissingPinotColumnResult().getMismatchReason());
-    System.out.println(ingestionSchemaValidator.getMissingPinotColumnResult().getMismatchReason());
+    Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getDataTypeMismatchResult().isMismatchDetected());
+    Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
+    Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMultiValueStructureMismatchResult().isMismatchDetected());
+    Assert.assertTrue(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMissingPinotColumnResult().isMismatchDetected());
+    Assert.assertNotNull(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMissingPinotColumnResult().getMismatchReason());
 
     // Change the data type of column1 from LONG to STRING
     pinotSchema = new Schema.SchemaBuilder()
@@ -81,12 +98,11 @@ public class IngestionSchemaValidatorTest {
     ingestionSchemaValidator =
         SchemaValidatorFactory.getSchemaValidator(pinotSchema, recordReaderClassName, inputFilePath);
     Assert.assertNotNull(ingestionSchemaValidator);
-    Assert.assertTrue(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected());
-    Assert.assertNotNull(ingestionSchemaValidator.getDataTypeMismatchResult().getMismatchReason());
-    System.out.println(ingestionSchemaValidator.getDataTypeMismatchResult().getMismatchReason());
-    Assert.assertFalse(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
-    Assert.assertFalse(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected());
-    Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
+    Assert.assertTrue(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getDataTypeMismatchResult().isMismatchDetected());
+    Assert.assertNotNull(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getDataTypeMismatchResult().getMismatchReason());
+    Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
+    Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMultiValueStructureMismatchResult().isMismatchDetected());
+    Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMissingPinotColumnResult().isMismatchDetected());
 
     // Change column2 from single-value column to multi-value column
     pinotSchema = new Schema.SchemaBuilder()
@@ -98,14 +114,59 @@ public class IngestionSchemaValidatorTest {
     ingestionSchemaValidator =
         SchemaValidatorFactory.getSchemaValidator(pinotSchema, recordReaderClassName, inputFilePath);
     Assert.assertNotNull(ingestionSchemaValidator);
-    Assert.assertFalse(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected());
-    Assert.assertTrue(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
-    Assert.assertNotNull(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().getMismatchReason());
-    System.out.println(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().getMismatchReason());
-    Assert.assertTrue(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected());
-    Assert.assertNotNull(ingestionSchemaValidator.getMultiValueStructureMismatchResult().getMismatchReason());
-    System.out.println(ingestionSchemaValidator.getMultiValueStructureMismatchResult().getMismatchReason());
-    Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
+    Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getDataTypeMismatchResult().isMismatchDetected());
+    Assert.assertTrue(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
+    Assert.assertNotNull(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getSingleValueMultiValueFieldMismatchResult().getMismatchReason());
+    Assert.assertTrue(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMultiValueStructureMismatchResult().isMismatchDetected());
+    Assert.assertNotNull(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMultiValueStructureMismatchResult().getMismatchReason());
+    Assert.assertFalse(ingestionSchemaValidator.getFileBasedSchemaValidationResults().getMissingPinotColumnResult().isMismatchDetected());
+  }
+
+  @Test
+  public void testAvroIngestionValidatorRowBasedSchemaValidation()
+      throws Exception {
+    String tableName = "testTable";
+    File avroFile = new File(
+        Preconditions.checkNotNull(IngestionSchemaValidatorTest.class.getClassLoader().getResource("data/test_sample_data_multi_value.avro"))
+            .getFile());
+
+    // column 2 is of int type in the AVRO.
+    // column3 and column16 are both of array of map structure.
+    Schema pinotSchema = new Schema.SchemaBuilder()
+        .addSingleValueDimension("column1", FieldSpec.DataType.STRING)
+        .addSingleValueDimension("column2", FieldSpec.DataType.LONG)
+        .addSingleValueDimension("column3", FieldSpec.DataType.STRING)
+        .addMultiValueDimension("column16", FieldSpec.DataType.STRING)
+        .addMetric("metric_nus_impressions", FieldSpec.DataType.LONG).build();
+
+    SegmentGeneratorConfig segmentGeneratorConfig =
+        new SegmentGeneratorConfig(new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName).build(),
+            pinotSchema);
+    segmentGeneratorConfig.setInputFilePath(avroFile.getAbsolutePath());
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
+    segmentGeneratorConfig.setTableName(tableName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+
+    RowBasedSchemaValidationResults rowBasedSchemaValidationResults = driver.getIngestionSchemaValidator().getRowBasedSchemaValidationResults();
+    Assert.assertTrue(rowBasedSchemaValidationResults.getDataTypeMismatchResult().isMismatchDetected());
+    Assert.assertNotNull(rowBasedSchemaValidationResults.getDataTypeMismatchResult().getMismatchReason());
+    System.out.println(rowBasedSchemaValidationResults.getDataTypeMismatchResult().getMismatchReason());
+
+    Assert.assertTrue(rowBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
+    Assert.assertNotNull(rowBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().getMismatchReason());
+    System.out.println(rowBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().getMismatchReason());
+
+    Assert.assertTrue(rowBasedSchemaValidationResults.getMultiValueStructureMismatchResult().isMismatchDetected());
+    Assert.assertNotNull(rowBasedSchemaValidationResults.getMultiValueStructureMismatchResult().getMismatchReason());
+    System.out.println(rowBasedSchemaValidationResults.getMultiValueStructureMismatchResult().getMismatchReason());
+
+  }
 
+  @AfterClass
+  public void tearDown() {
+    FileUtils.deleteQuietly(INDEX_DIR);
   }
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/resources/data/test_sample_data_multi_value.avro b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/resources/data/test_sample_data_multi_value.avro
new file mode 100644
index 0000000..4e4a4d8
Binary files /dev/null and b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/resources/data/test_sample_data_multi_value.avro differ
diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroIngestionSchemaValidator.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroIngestionSchemaValidator.java
index d0ee84f..ee6706d 100644
--- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroIngestionSchemaValidator.java
+++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroIngestionSchemaValidator.java
@@ -23,22 +23,23 @@ import java.io.IOException;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.RowBasedSchemaValidationResults;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.IngestionSchemaValidator;
-import org.apache.pinot.spi.data.SchemaValidatorResult;
+import org.apache.pinot.spi.data.SchemaValidationResults;
 
 
 /**
  * Schema validator to validate pinot schema and avro schema
  */
 public class AvroIngestionSchemaValidator implements IngestionSchemaValidator {
+  private SchemaValidationResults _fileBasedSchemaValidationResults = new SchemaValidationResults();
+  private RowBasedSchemaValidationResults _rowBasedSchemaValidationResults = new RowBasedSchemaValidationResults();
+
   private org.apache.avro.Schema _avroSchema;
   private Schema _pinotSchema;
 
-  private SchemaValidatorResult _dataTypeMismatch = new SchemaValidatorResult();
-  private SchemaValidatorResult _singleValueMultiValueFieldMismatch = new SchemaValidatorResult();
-  private SchemaValidatorResult _multiValueStructureMismatch = new SchemaValidatorResult();
-  private SchemaValidatorResult _missingPinotColumn = new SchemaValidatorResult();
+
 
   public AvroIngestionSchemaValidator() {
   }
@@ -48,7 +49,7 @@ public class AvroIngestionSchemaValidator implements IngestionSchemaValidator {
     _pinotSchema = pinotSchema;
     _avroSchema = extractAvroSchemaFromFile(inputFilePath);
 
-    validateSchemas();
+    validateFileBasedSchemas();
   }
 
   @Override
@@ -57,23 +58,13 @@ public class AvroIngestionSchemaValidator implements IngestionSchemaValidator {
   }
 
   @Override
-  public SchemaValidatorResult getDataTypeMismatchResult() {
-    return _dataTypeMismatch;
-  }
-
-  @Override
-  public SchemaValidatorResult getSingleValueMultiValueFieldMismatchResult() {
-    return _singleValueMultiValueFieldMismatch;
-  }
-
-  @Override
-  public SchemaValidatorResult getMultiValueStructureMismatchResult() {
-    return _multiValueStructureMismatch;
+  public SchemaValidationResults getFileBasedSchemaValidationResults() {
+    return _fileBasedSchemaValidationResults;
   }
 
   @Override
-  public SchemaValidatorResult getMissingPinotColumnResult() {
-    return _missingPinotColumn;
+  public RowBasedSchemaValidationResults getRowBasedSchemaValidationResults() {
+    return _rowBasedSchemaValidationResults;
   }
 
   private org.apache.avro.Schema extractAvroSchemaFromFile(String inputPath) {
@@ -87,12 +78,12 @@ public class AvroIngestionSchemaValidator implements IngestionSchemaValidator {
     }
   }
 
-  private void validateSchemas() {
+  private void validateFileBasedSchemas() {
     for (String columnName : _pinotSchema.getPhysicalColumnNames()) {
       FieldSpec fieldSpec = _pinotSchema.getFieldSpecFor(columnName);
       org.apache.avro.Schema.Field avroColumnField = _avroSchema.getField(columnName);
       if (avroColumnField == null) {
-        _missingPinotColumn.addMismatchReason(String
+        _fileBasedSchemaValidationResults.getMissingPinotColumnResult().addMismatchReason(String
             .format("The Pinot column: (%s: %s) is missing in the %s schema of input data.", columnName,
                 fieldSpec.getDataType().name(), getInputSchemaType()));
         continue;
@@ -116,7 +107,7 @@ public class AvroIngestionSchemaValidator implements IngestionSchemaValidator {
       }
 
       if (!fieldSpec.getDataType().name().equalsIgnoreCase(avroColumnType.toString())) {
-        _dataTypeMismatch.addMismatchReason(String
+        _fileBasedSchemaValidationResults.getDataTypeMismatchResult().addMismatchReason(String
             .format("The Pinot column: (%s: %s) doesn't match with the column (%s: %s) in input %s schema.", columnName,
                 fieldSpec.getDataType().name(), avroColumnSchema.getName(), avroColumnType.toString(),
                 getInputSchemaType()));
@@ -125,20 +116,22 @@ public class AvroIngestionSchemaValidator implements IngestionSchemaValidator {
       if (fieldSpec.isSingleValueField()) {
         if (avroColumnType.ordinal() < org.apache.avro.Schema.Type.STRING.ordinal()) {
           // the column is a complex structure
-          _singleValueMultiValueFieldMismatch.addMismatchReason(String.format(
-              "The Pinot column: %s is 'single-value' column but the column: %s from input %s is 'multi-value' column.",
-              columnName, avroColumnSchema.getName(), getInputSchemaType()));
+          _fileBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().addMismatchReason(String
+              .format(
+                  "The Pinot column: %s is 'single-value' column but the column: %s from input %s is 'multi-value' column.",
+                  columnName, avroColumnSchema.getName(), getInputSchemaType()));
         }
       } else {
         if (avroColumnType.ordinal() >= org.apache.avro.Schema.Type.STRING.ordinal()) {
           // the column is a complex structure
-          _singleValueMultiValueFieldMismatch.addMismatchReason(String.format(
-              "The Pinot column: %s is 'multi-value' column but the column: %s from input %s schema is 'single-value' column.",
-              columnName, avroColumnSchema.getName(), getInputSchemaType()));
+          _fileBasedSchemaValidationResults.getSingleValueMultiValueFieldMismatchResult().addMismatchReason(String
+              .format(
+                  "The Pinot column: %s is 'multi-value' column but the column: %s from input %s schema is 'single-value' column.",
+                  columnName, avroColumnSchema.getName(), getInputSchemaType()));
         }
         if (avroColumnType != org.apache.avro.Schema.Type.ARRAY) {
           // multi-value column should use array structure for now.
-          _multiValueStructureMismatch.addMismatchReason(String.format(
+          _fileBasedSchemaValidationResults.getMultiValueStructureMismatchResult().addMismatchReason(String.format(
               "The Pinot column: %s is 'multi-value' column but the column: %s from input %s schema is of '%s' type, which should have been of 'array' type.",
               columnName, avroColumnSchema.getName(), getInputSchemaType(), avroColumnType.getName()));
         }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java
index 045327a..67a3da9 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java
@@ -28,11 +28,7 @@ public interface IngestionSchemaValidator {
 
   String getInputSchemaType();
 
-  SchemaValidatorResult getDataTypeMismatchResult();
+  SchemaValidationResults getFileBasedSchemaValidationResults();
 
-  SchemaValidatorResult getSingleValueMultiValueFieldMismatchResult();
-
-  SchemaValidatorResult getMultiValueStructureMismatchResult();
-
-  SchemaValidatorResult getMissingPinotColumnResult();
+  RowBasedSchemaValidationResults getRowBasedSchemaValidationResults();
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/RowBasedSchemaValidationResults.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/RowBasedSchemaValidationResults.java
new file mode 100644
index 0000000..655275a
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/RowBasedSchemaValidationResults.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * This is the extension class on top of {@code SchemaValidationResults} class, since row based schema validation will
+ * be called much more frequently than the file base schema validation. We collect all the mismatch columns into the hash
+ * set and then generate the mismatch message all at once.
+ */
+public class RowBasedSchemaValidationResults extends SchemaValidationResults {
+
+  private Set<String> _dataTypeMismatchColumns = new HashSet<>();
+  private Set<String> _singleValueMultiValueFieldMismatchColumns = new HashSet<>();
+  private Set<String> _multiValueStructureMismatchColumns = new HashSet<>();
+
+  public void collectDataTypeMismatchColumns(Set<String> columns) {
+    _dataTypeMismatchColumns.addAll(columns);
+  }
+
+  public void collectSingleValueMultiValueFieldMismatchColumns(Set<String> columns) {
+    _singleValueMultiValueFieldMismatchColumns.addAll(columns);
+  }
+
+  public void collectMultiValueStructureMismatchColumns(Set<String> columns) {
+    _multiValueStructureMismatchColumns.addAll(columns);
+  }
+
+  public void gatherRowBasedSchemaValidationResults() {
+    if (!_dataTypeMismatchColumns.isEmpty()) {
+      _dataTypeMismatch.addMismatchReason(String.format("Found data type mismatch from the following Pinot columns: %s",
+          _dataTypeMismatchColumns.toString()));
+    }
+    if (!_singleValueMultiValueFieldMismatchColumns.isEmpty()) {
+      _singleValueMultiValueFieldMismatch.addMismatchReason(String
+          .format("Found single-value multi-value field mismatch from the following Pinot columns: %s",
+              _singleValueMultiValueFieldMismatchColumns.toString()));
+    }
+    if (!_multiValueStructureMismatchColumns.isEmpty()) {
+      _multiValueStructureMismatch.addMismatchReason(String
+          .format("Found multi-value structure mismatch from the following Pinot columns: %s",
+              _multiValueStructureMismatchColumns.toString()));
+    }
+  }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/SchemaValidationResults.java
similarity index 51%
copy from pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java
copy to pinot-spi/src/main/java/org/apache/pinot/spi/data/SchemaValidationResults.java
index 045327a..2d793c0 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/IngestionSchemaValidator.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/SchemaValidationResults.java
@@ -18,21 +18,25 @@
  */
 package org.apache.pinot.spi.data;
 
-
-/**
- * Validator to validate the schema between Pinot schema and input raw data schema
- */
-public interface IngestionSchemaValidator {
-
-  void init(Schema pinotSchema, String inputFilePath);
-
-  String getInputSchemaType();
-
-  SchemaValidatorResult getDataTypeMismatchResult();
-
-  SchemaValidatorResult getSingleValueMultiValueFieldMismatchResult();
-
-  SchemaValidatorResult getMultiValueStructureMismatchResult();
-
-  SchemaValidatorResult getMissingPinotColumnResult();
+public class SchemaValidationResults {
+  SchemaValidatorResult _dataTypeMismatch = new SchemaValidatorResult();
+  SchemaValidatorResult _singleValueMultiValueFieldMismatch = new SchemaValidatorResult();
+  SchemaValidatorResult _multiValueStructureMismatch = new SchemaValidatorResult();
+  SchemaValidatorResult _missingPinotColumnResult = new SchemaValidatorResult();
+
+  public SchemaValidatorResult getDataTypeMismatchResult() {
+    return _dataTypeMismatch;
+  }
+
+  public SchemaValidatorResult getSingleValueMultiValueFieldMismatchResult() {
+    return _singleValueMultiValueFieldMismatch;
+  }
+
+  public SchemaValidatorResult getMultiValueStructureMismatchResult() {
+    return _multiValueStructureMismatch;
+  }
+
+  public SchemaValidatorResult getMissingPinotColumnResult() {
+    return _missingPinotColumnResult;
+  }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
index 5c45d6b..fc3f9a3 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
@@ -61,6 +61,24 @@ public class GenericRow {
    */
   public static final String SKIP_RECORD_KEY = "$SKIP_RECORD_KEY$";
 
+  /**
+   * This key is used to identify whether there is data type mismatch so that it requires a data type conversion.
+   * E.g. the Pinot column is of int type, whereas the input column is of long type.
+   */
+  public static final String DATA_TYPE_MISMATCH_KEY = "$DATA_TYPE_MISMATCH_KEY$";
+
+  /**
+   * This key is used to identify whether the input value is a map structure for multi-value column.
+   * This is necessary for us to identify whether there is any existing use case that is leveraging this way to fetch values.
+   */
+  public static final String MULTI_VALUE_STRUCTURE_MISMATCH_KEY = "$MULTI_VALUE_STRUCTURE_MISMATCH_KEY$";
+
+  /**
+   * This key is used to identify whether there is a single-value multi-value mismatch. E.g. the Pinot column is single-value,
+   * whereas the input data is a Collection/Map/object[].
+   */
+  public static final String SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY = "$SINGLE_VALUE_MULTI_VALUE_FIELD_MISMATCH_KEY$";
+
   private final Map<String, Object> _fieldToValueMap = new HashMap<>();
   private final Set<String> _nullValueFields = new HashSet<>();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org