You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2019/05/17 16:30:27 UTC

[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #2637: [GOBBLIN-772]Implement Schema Comparison Strategy during Disctp

autumnust commented on a change in pull request #2637: [GOBBLIN-772]Implement Schema Comparison Strategy during Disctp
URL: https://github.com/apache/incubator-gobblin/pull/2637#discussion_r285198718
 
 

 ##########
 File path: gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractorWithCheckSchema.java
 ##########
 @@ -39,32 +42,109 @@
  * check if the schema matches the expected schema. If not it will abort the job.
  */
 
-public class FileAwareInputStreamExtractorWithCheckSchema extends FileAwareInputStreamExtractor{
+public class FileAwareInputStreamExtractorWithCheckSchema extends FileAwareInputStreamExtractor {
 
-  public FileAwareInputStreamExtractorWithCheckSchema(FileSystem fs, CopyableFile file, WorkUnitState state)
-  {
+  public FileAwareInputStreamExtractorWithCheckSchema(FileSystem fs, CopyableFile file, WorkUnitState state) {
     super(fs, file, state);
   }
-  public FileAwareInputStreamExtractorWithCheckSchema(FileSystem fs, CopyableFile file)
-  {
+
+  public FileAwareInputStreamExtractorWithCheckSchema(FileSystem fs, CopyableFile file) {
     this(fs, file, null);
   }
 
   @Override
-  protected FileAwareInputStream buildStream(FileSystem fsFromFile)
-      throws DataRecordException, IOException{
-    if(!schemaChecking(fsFromFile))
-    {
+  protected FileAwareInputStream buildStream(FileSystem fsFromFile) throws DataRecordException, IOException {
+    if (!schemaChecking(fsFromFile)) {
       throw new DataRecordException("Schema does not match the expected schema");
     }
     return super.buildStream(fsFromFile);
   }
 
-  protected boolean schemaChecking(FileSystem fsFromFile)
-      throws IOException {
+  protected boolean schemaChecking(FileSystem fsFromFile) throws IOException {
     DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
-    DataFileReader<GenericRecord> dataFileReader = new DataFileReader(new FsInput(this.file.getFileStatus().getPath(),fsFromFile), datumReader);
+    DataFileReader<GenericRecord> dataFileReader =
+        new DataFileReader(new FsInput(this.file.getFileStatus().getPath(), fsFromFile), datumReader);
     Schema schema = dataFileReader.getSchema();
-    return schema.toString().equals(this.state.getProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA));
+    Schema expectedSchema = new Schema.Parser().parse(this.state.getProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA));
+
+    return compare(schema, expectedSchema);
+  }
+
+  private boolean compare(Schema toValidate, Schema expected) {
+    if (toValidate.getType() != expected.getType() || !toValidate.getName().equals(expected.getName())) {return false;}
+    else {
+      switch (toValidate.getType()) {
+        case NULL:
+        case BOOLEAN:
+        case INT:
+        case LONG:
+        case FLOAT:
+        case DOUBLE:
+        case BYTES:
+        case STRING: {
+          return true;
+        }
+        case ARRAY: {
+          return compare(toValidate.getElementType(), expected.getElementType());
+        }
+        case MAP: {
+          return compare(toValidate.getValueType(), expected.getValueType());
+        }
+        case FIXED: {
+          // fixed size and name must match:
+          if (toValidate.getFixedSize() != expected.getFixedSize()) {
+            return false;
+          }
+        }
+        case ENUM: {
+          // expected symbols must contain all toValidate symbols:
+          final Set<String> expectedSymbols = new HashSet<String>(expected.getEnumSymbols());
+          final Set<String> toValidateSymbols = new HashSet<String>(toValidate.getEnumSymbols());
+          if (expectedSymbols.size() != toValidateSymbols.size()) {
+            return false;
+          }
+          if (!expectedSymbols.containsAll(toValidateSymbols)) {
+            return false;
+          }
+        }
+
+        case RECORD: {
+          // Check that each field of toValidate schema is in expected schema
+          if(toValidate.getFields().size() != expected.getFields().size()) {return false;}
+          for (final Schema.Field expectedFiled : expected.getFields()) {
+            final Schema.Field toValidateField = toValidate.getField(expectedFiled.name());
+            if (toValidateField == null) {
+              // expected field does not correspond to any field in the toValidate record schema
+              return false;
+            } else {
+              if (!compare(toValidateField.schema(), expectedFiled.schema())) {
+                return false;
+              }
+            }
+          }
+          return true;
+        }
+        case UNION: {
 
 Review comment:
   Just want to confirm, do you care about the ordering of fields ? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services