You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2020/09/11 21:38:04 UTC

[incubator-pinot] branch fix-schema-validator created (now 6d22faa)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a change to branch fix-schema-validator
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at 6d22faa  Adjust schema validation logic in AvroIngestionSchemaValidator

This branch includes the following new commits:

     new 6d22faa  Adjust schema validation logic in AvroIngestionSchemaValidator

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Adjust schema validation logic in AvroIngestionSchemaValidator

Posted by jl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch fix-schema-validator
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 6d22faa9fd916217b764ddebd8cacac54ea4a6db
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Fri Sep 11 14:37:24 2020 -0700

    Adjust schema validation logic in AvroIngestionSchemaValidator
---
 .../hadoop/data/IngestionSchemaValidatorTest.java  |  51 ++++++++++++++-------
 .../data/test_sample_data_multi_value.avro         | Bin 0 -> 12222227 bytes
 .../avro/AvroIngestionSchemaValidator.java         |  51 +++++++++++++++------
 3 files changed, 71 insertions(+), 31 deletions(-)

diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/data/IngestionSchemaValidatorTest.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/data/IngestionSchemaValidatorTest.java
index fec3583..8cd0912 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/data/IngestionSchemaValidatorTest.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/data/IngestionSchemaValidatorTest.java
@@ -29,16 +29,16 @@ import org.testng.annotations.Test;
 
 
 public class IngestionSchemaValidatorTest {
+
   @Test
-  public void testAvroIngestionSchemaValidator()
+  public void testAvroIngestionSchemaValidatorForSingleValueColumns()
       throws Exception {
-    String inputFilePath = new File(
-        Preconditions.checkNotNull(IngestionSchemaValidatorTest.class.getClassLoader().getResource("data/test_sample_data.avro"))
-            .getFile()).toString();
+    String inputFilePath = new File(Preconditions
+        .checkNotNull(IngestionSchemaValidatorTest.class.getClassLoader().getResource("data/test_sample_data.avro"))
+        .getFile()).toString();
     String recordReaderClassName = "org.apache.pinot.plugin.inputformat.avro.AvroRecordReader";
 
-    Schema pinotSchema = new Schema.SchemaBuilder()
-        .addSingleValueDimension("column1", FieldSpec.DataType.LONG)
+    Schema pinotSchema = new Schema.SchemaBuilder().addSingleValueDimension("column1", FieldSpec.DataType.LONG)
         .addSingleValueDimension("column2", FieldSpec.DataType.INT)
         .addSingleValueDimension("column3", FieldSpec.DataType.STRING)
         .addSingleValueDimension("column7", FieldSpec.DataType.STRING)
@@ -53,8 +53,7 @@ public class IngestionSchemaValidatorTest {
     Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
 
     // Adding one extra column
-    pinotSchema = new Schema.SchemaBuilder()
-        .addSingleValueDimension("column1", FieldSpec.DataType.LONG)
+    pinotSchema = new Schema.SchemaBuilder().addSingleValueDimension("column1", FieldSpec.DataType.LONG)
         .addSingleValueDimension("column2", FieldSpec.DataType.INT)
         .addSingleValueDimension("column3", FieldSpec.DataType.STRING)
         .addSingleValueDimension("extra_column", FieldSpec.DataType.STRING)
@@ -69,11 +68,9 @@ public class IngestionSchemaValidatorTest {
     Assert.assertFalse(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected());
     Assert.assertTrue(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
     Assert.assertNotNull(ingestionSchemaValidator.getMissingPinotColumnResult().getMismatchReason());
-    System.out.println(ingestionSchemaValidator.getMissingPinotColumnResult().getMismatchReason());
 
     // Change the data type of column1 from LONG to STRING
-    pinotSchema = new Schema.SchemaBuilder()
-        .addSingleValueDimension("column1", FieldSpec.DataType.STRING)
+    pinotSchema = new Schema.SchemaBuilder().addSingleValueDimension("column1", FieldSpec.DataType.STRING)
         .addSingleValueDimension("column2", FieldSpec.DataType.INT)
         .addSingleValueDimension("column3", FieldSpec.DataType.STRING)
         .addSingleValueDimension("column7", FieldSpec.DataType.STRING)
@@ -83,14 +80,12 @@ public class IngestionSchemaValidatorTest {
     Assert.assertNotNull(ingestionSchemaValidator);
     Assert.assertTrue(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected());
     Assert.assertNotNull(ingestionSchemaValidator.getDataTypeMismatchResult().getMismatchReason());
-    System.out.println(ingestionSchemaValidator.getDataTypeMismatchResult().getMismatchReason());
     Assert.assertFalse(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
     Assert.assertFalse(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected());
     Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
 
     // Change column2 from single-value column to multi-value column
-    pinotSchema = new Schema.SchemaBuilder()
-        .addSingleValueDimension("column1", FieldSpec.DataType.LONG)
+    pinotSchema = new Schema.SchemaBuilder().addSingleValueDimension("column1", FieldSpec.DataType.LONG)
         .addMultiValueDimension("column2", FieldSpec.DataType.INT)
         .addSingleValueDimension("column3", FieldSpec.DataType.STRING)
         .addSingleValueDimension("column7", FieldSpec.DataType.STRING)
@@ -101,11 +96,35 @@ public class IngestionSchemaValidatorTest {
     Assert.assertFalse(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected());
     Assert.assertTrue(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
     Assert.assertNotNull(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().getMismatchReason());
-    System.out.println(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().getMismatchReason());
     Assert.assertTrue(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected());
     Assert.assertNotNull(ingestionSchemaValidator.getMultiValueStructureMismatchResult().getMismatchReason());
-    System.out.println(ingestionSchemaValidator.getMultiValueStructureMismatchResult().getMismatchReason());
     Assert.assertFalse(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
+  }
+
+  @Test
+  public void testAvroIngestionValidatorForMultiValueColumns()
+      throws Exception {
+    String inputFilePath = new File(Preconditions.checkNotNull(
+        IngestionSchemaValidatorTest.class.getClassLoader().getResource("data/test_sample_data_multi_value.avro"))
+        .getFile()).toString();
+    String recordReaderClassName = "org.apache.pinot.plugin.inputformat.avro.AvroRecordReader";
+
+    // column 2 is of int type in the AVRO.
+    // column3 and column16 are both of array of map structure.
+    // metric_not_found doesn't exist in input AVRO
+    Schema pinotSchema = new Schema.SchemaBuilder().addSingleValueDimension("column1", FieldSpec.DataType.STRING)
+        .addSingleValueDimension("column2", FieldSpec.DataType.LONG)
+        .addSingleValueDimension("column3", FieldSpec.DataType.STRING)
+        .addMultiValueDimension("column16", FieldSpec.DataType.STRING)
+        .addMetric("metric_not_found", FieldSpec.DataType.LONG)
+        .addMetric("metric_nus_impressions", FieldSpec.DataType.LONG).build();
 
+    IngestionSchemaValidator ingestionSchemaValidator =
+        SchemaValidatorFactory.getSchemaValidator(pinotSchema, recordReaderClassName, inputFilePath);
+    Assert.assertNotNull(ingestionSchemaValidator);
+    Assert.assertTrue(ingestionSchemaValidator.getDataTypeMismatchResult().isMismatchDetected());
+    Assert.assertTrue(ingestionSchemaValidator.getSingleValueMultiValueFieldMismatchResult().isMismatchDetected());
+    Assert.assertTrue(ingestionSchemaValidator.getMultiValueStructureMismatchResult().isMismatchDetected());
+    Assert.assertTrue(ingestionSchemaValidator.getMissingPinotColumnResult().isMismatchDetected());
   }
 }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/resources/data/test_sample_data_multi_value.avro b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/resources/data/test_sample_data_multi_value.avro
new file mode 100644
index 0000000..4e4a4d8
Binary files /dev/null and b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/resources/data/test_sample_data_multi_value.avro differ
diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroIngestionSchemaValidator.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroIngestionSchemaValidator.java
index d0ee84f..42fa3a5 100644
--- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroIngestionSchemaValidator.java
+++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroIngestionSchemaValidator.java
@@ -97,6 +97,7 @@ public class AvroIngestionSchemaValidator implements IngestionSchemaValidator {
                 fieldSpec.getDataType().name(), getInputSchemaType()));
         continue;
       }
+      String avroColumnName = avroColumnField.schema().getName();
       org.apache.avro.Schema avroColumnSchema = avroColumnField.schema();
       org.apache.avro.Schema.Type avroColumnType = avroColumnSchema.getType();
       if (avroColumnType == org.apache.avro.Schema.Type.UNION) {
@@ -111,36 +112,56 @@ public class AvroIngestionSchemaValidator implements IngestionSchemaValidator {
           }
         }
         if (nonNullSchema != null) {
+          avroColumnSchema = nonNullSchema;
           avroColumnType = nonNullSchema.getType();
         }
       }
 
-      if (!fieldSpec.getDataType().name().equalsIgnoreCase(avroColumnType.toString())) {
-        _dataTypeMismatch.addMismatchReason(String
-            .format("The Pinot column: (%s: %s) doesn't match with the column (%s: %s) in input %s schema.", columnName,
-                fieldSpec.getDataType().name(), avroColumnSchema.getName(), avroColumnType.toString(),
-                getInputSchemaType()));
-      }
-
       if (fieldSpec.isSingleValueField()) {
+        // check data type mismatch
+        if (!fieldSpec.getDataType().name().equalsIgnoreCase(avroColumnType.toString())) {
+          getDataTypeMismatchResult().addMismatchReason(String
+              .format("The Pinot column: (%s: %s) doesn't match with the column (%s: %s) in input %s schema.", columnName,
+                  fieldSpec.getDataType().name(), avroColumnName, avroColumnType.toString(),
+                  getInputSchemaType()));
+        }
+        // check single-value multi-value mismatch
         if (avroColumnType.ordinal() < org.apache.avro.Schema.Type.STRING.ordinal()) {
           // the column is a complex structure
-          _singleValueMultiValueFieldMismatch.addMismatchReason(String.format(
-              "The Pinot column: %s is 'single-value' column but the column: %s from input %s is 'multi-value' column.",
-              columnName, avroColumnSchema.getName(), getInputSchemaType()));
+          getSingleValueMultiValueFieldMismatchResult().addMismatchReason(String
+              .format(
+                  "The Pinot column: %s is 'single-value' column but the column: %s from input %s is 'multi-value' column.",
+                  columnName, avroColumnName, getInputSchemaType()));
         }
       } else {
+        // check data type mismatch
+        FieldSpec.DataType dataTypeForMVColumn = AvroUtils.extractFieldDataType(avroColumnField);
+        if (fieldSpec.getDataType() != dataTypeForMVColumn) {
+          getDataTypeMismatchResult().addMismatchReason(String
+              .format("The Pinot column: (%s: %s) doesn't match with the column (%s: %s) in input %s schema.",
+                  columnName, fieldSpec.getDataType().name(), avroColumnName, dataTypeForMVColumn.name(),
+                  getInputSchemaType()));
+        }
+        // check single-value multi-value mismatch
         if (avroColumnType.ordinal() >= org.apache.avro.Schema.Type.STRING.ordinal()) {
           // the column is a complex structure
-          _singleValueMultiValueFieldMismatch.addMismatchReason(String.format(
-              "The Pinot column: %s is 'multi-value' column but the column: %s from input %s schema is 'single-value' column.",
-              columnName, avroColumnSchema.getName(), getInputSchemaType()));
+          getSingleValueMultiValueFieldMismatchResult().addMismatchReason(String
+              .format(
+                  "The Pinot column: %s is 'multi-value' column but the column: %s from input %s schema is 'single-value' column.",
+                  columnName, avroColumnName, getInputSchemaType()));
         }
+        // check multi-value column structure mismatch
         if (avroColumnType != org.apache.avro.Schema.Type.ARRAY) {
           // multi-value column should use array structure for now.
-          _multiValueStructureMismatch.addMismatchReason(String.format(
+          getMultiValueStructureMismatchResult().addMismatchReason(String.format(
               "The Pinot column: %s is 'multi-value' column but the column: %s from input %s schema is of '%s' type, which should have been of 'array' type.",
-              columnName, avroColumnSchema.getName(), getInputSchemaType(), avroColumnType.getName()));
+              columnName, avroColumnName, getInputSchemaType(), avroColumnType.getName()));
+        } else if (avroColumnSchema.getElementType().getType().ordinal() < org.apache.avro.Schema.Type.STRING
+            .ordinal()) {
+          // even though the column schema is of array type, the element type of that array could be of complex type like array, map, etc.
+          getMultiValueStructureMismatchResult().addMismatchReason(String.format(
+              "The Pinot column: %s is 'multi-value' column and it's of 'array' type in input %s schema, but the element type is of '%s' type, which should have been of 'primitive' type.",
+              columnName, getInputSchemaType(), avroColumnSchema.getElementType().getType()));
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org