You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2020/04/12 16:50:28 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1114] OrcValueMapper schema evolution up-conversion recursive

This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f12dc6e  [GOBBLIN-1114] OrcValueMapper schema evolution up-conversion recursive
f12dc6e is described below

commit f12dc6eb5d84b74aeb9ed237266807a978ae51a4
Author: Lei Sun <le...@linkedin.com>
AuthorDate: Sun Apr 12 09:50:11 2020 -0700

    [GOBBLIN-1114] OrcValueMapper schema evolution up-conversion recursive
    
    OrcValueMapper schema evolution up-conversion
    recursive
    
    Fix findBugsMain
    
    Address comments
    
    Address comments
    
    Closes #2954 from autumnust/master
---
 .../gobblin/compaction/mapreduce/orc/OrcUtils.java |  89 +++++++++++-
 .../compaction/mapreduce/orc/OrcValueMapper.java   | 151 +++++++++------------
 .../mapreduce/orc/OrcValueMapperTest.java          |  64 ++++++++-
 3 files changed, 212 insertions(+), 92 deletions(-)

diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
index 7f82690..6d053d2 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcUtils.java
@@ -19,8 +19,12 @@ package org.apache.gobblin.compaction.mapreduce.orc;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+
 import org.apache.gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner;
 import org.apache.gobblin.util.FileListUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -32,6 +36,8 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.orc.OrcFile;
 import org.apache.orc.Reader;
 import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.ConvertTreeReaderFactory;
+import org.apache.orc.impl.SchemaEvolution;
 
 
 public class OrcUtils {
@@ -50,6 +56,9 @@ public class OrcUtils {
 
   public static TypeDescription getNewestSchemaFromSource(Job job, FileSystem fs) throws IOException {
     Path[] sourceDirs = FileInputFormat.getInputPaths(job);
+    if (sourceDirs.length == 0) {
+      throw new IllegalStateException("There should be at least one directory specified for the MR job");
+    }
 
     List<FileStatus> files = new ArrayList<FileStatus>();
 
@@ -67,6 +76,84 @@ public class OrcUtils {
     }
 
     throw new IllegalStateException(
-        String.format("There's no file carrying orc file schema in %s list", sourceDirs));
+        String.format("There's no file carrying orc file schema in the list of directories: %s", Arrays.toString(sourceDirs)));
+  }
+
+  /**
+   * Determine if two types are following valid evolution.
+   * Implementation stolen and manipulated from {@link SchemaEvolution} as that was package-private.
+   */
+  static boolean isEvolutionValid(TypeDescription fileType, TypeDescription readerType) {
+    boolean isOk = true;
+    if (fileType.getCategory() == readerType.getCategory()) {
+      switch (readerType.getCategory()) {
+        case BOOLEAN:
+        case BYTE:
+        case SHORT:
+        case INT:
+        case LONG:
+        case DOUBLE:
+        case FLOAT:
+        case STRING:
+        case TIMESTAMP:
+        case BINARY:
+        case DATE:
+          // these are always a match
+          break;
+        case CHAR:
+        case VARCHAR:
+          break;
+        case DECIMAL:
+          break;
+        case UNION:
+        case MAP:
+        case LIST: {
+          // these must be an exact match
+          List<TypeDescription> fileChildren = fileType.getChildren();
+          List<TypeDescription> readerChildren = readerType.getChildren();
+          if (fileChildren.size() == readerChildren.size()) {
+            for (int i = 0; i < fileChildren.size(); ++i) {
+              isOk &= isEvolutionValid(fileChildren.get(i), readerChildren.get(i));
+            }
+            return isOk;
+          } else {
+            return false;
+          }
+        }
+        case STRUCT: {
+          List<TypeDescription> readerChildren = readerType.getChildren();
+          List<TypeDescription> fileChildren = fileType.getChildren();
+
+          List<String> readerFieldNames = readerType.getFieldNames();
+          List<String> fileFieldNames = fileType.getFieldNames();
+
+          final Map<String, TypeDescription> fileTypesIdx = new HashMap();
+          for (int i = 0; i < fileFieldNames.size(); i++) {
+            final String fileFieldName = fileFieldNames.get(i);
+            fileTypesIdx.put(fileFieldName, fileChildren.get(i));
+          }
+
+          for (int i = 0; i < readerFieldNames.size(); i++) {
+            final String readerFieldName = readerFieldNames.get(i);
+            TypeDescription readerField = readerChildren.get(i);
+            TypeDescription fileField = fileTypesIdx.get(readerFieldName);
+            if (fileField == null) {
+              continue;
+            }
+
+            isOk &= isEvolutionValid(fileField, readerField);
+          }
+          return isOk;
+        }
+        default:
+          throw new IllegalArgumentException("Unknown type " + readerType);
+      }
+      return isOk;
+    } else {
+      /*
+       * Check for the few cases where will not convert....
+       */
+      return ConvertTreeReaderFactory.canConvert(fileType, readerType);
+    }
   }
 }
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
index c23ed9a..ab9b065 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapper.java
@@ -18,64 +18,83 @@
 package org.apache.gobblin.compaction.mapreduce.orc;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.gobblin.compaction.mapreduce.RecordKeyMapperBase;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.orc.OrcConf;
 import org.apache.orc.TypeDescription;
-import org.apache.orc.impl.ConvertTreeReaderFactory;
 import org.apache.orc.impl.SchemaEvolution;
 import org.apache.orc.mapred.OrcKey;
+import org.apache.orc.mapred.OrcList;
+import org.apache.orc.mapred.OrcMap;
 import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcUnion;
 import org.apache.orc.mapred.OrcValue;
 import org.apache.orc.mapreduce.OrcMapreduceRecordReader;
 
+import com.google.common.annotations.VisibleForTesting;
+
+import lombok.extern.slf4j.Slf4j;
+
 
 /**
  * To keep consistent with {@link OrcMapreduceRecordReader}'s decision on implementing
  * {@link RecordReader} with {@link NullWritable} as the key and generic type of value, the ORC Mapper will
  * read in the record as the input value.
  */
+@Slf4j
 public class OrcValueMapper extends RecordKeyMapperBase<NullWritable, OrcStruct, Object, OrcValue> {
 
   private OrcValue outValue;
   private TypeDescription mapperSchema;
 
+  // This is added mostly for debuggability.
+  private static int writeCount = 0;
+
   @Override
   protected void setup(Context context)
       throws IOException, InterruptedException {
     super.setup(context);
     this.outValue = new OrcValue();
-    this.mapperSchema = TypeDescription.fromString(context.getConfiguration().get(OrcConf.MAPRED_INPUT_SCHEMA.getAttribute()));
+    this.mapperSchema =
+        TypeDescription.fromString(context.getConfiguration().get(OrcConf.MAPRED_INPUT_SCHEMA.getAttribute()));
   }
 
   @Override
   protected void map(NullWritable key, OrcStruct orcStruct, Context context)
       throws IOException, InterruptedException {
-    OrcStruct upConvertedStruct = upConvertOrcStruct(orcStruct, context);
-    if (context.getNumReduceTasks() == 0) {
-      this.outValue.value = upConvertedStruct;
-      context.write(NullWritable.get(), this.outValue);
-    } else {
-      this.outValue.value = upConvertedStruct;
-      context.write(getDedupKey(upConvertedStruct), this.outValue);
+    OrcStruct upConvertedStruct = upConvertOrcStruct(orcStruct, mapperSchema);
+    this.outValue.value = upConvertedStruct;
+    try {
+      if (context.getNumReduceTasks() == 0) {
+        context.write(NullWritable.get(), this.outValue);
+      } else {
+        context.write(getDedupKey(upConvertedStruct), this.outValue);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException("Failure in write record no." + writeCount, e);
     }
+    writeCount += 1;
 
     context.getCounter(EVENT_COUNTER.RECORD_COUNT).increment(1);
   }
 
   /**
-   * If a {@link OrcStruct}'s schema differs from newest schema obtained when creating MR jobs (which is the
-   * newest schema seen by the MR job), all the other ORC object will need to be up-converted.
+   * Recursively up-convert the {@link OrcStruct} into {@link #mapperSchema}
+   * Limitation:
+   * 1. Does not support up-conversion of key types in Maps
+   * 2. Conversion only happens if org.apache.gobblin.compaction.mapreduce.orc.OrcValueMapper#isEvolutionValid return true.
    */
-  OrcStruct upConvertOrcStruct(OrcStruct orcStruct, Context context) {
+  @VisibleForTesting
+  OrcStruct upConvertOrcStruct(OrcStruct orcStruct, TypeDescription mapperSchema) {
     // For ORC schema, if schema object differs that means schema itself is different while for Avro,
     // there are chances that documentation or attributes' difference lead to the schema object difference.
     if (!orcStruct.getSchema().equals(mapperSchema)) {
+      log.info("There's schema mismatch identified from reader's schema and writer's schema");
       OrcStruct newStruct = new OrcStruct(mapperSchema);
 
       int indexInNewSchema = 0;
@@ -90,8 +109,10 @@ public class OrcValueMapper extends RecordKeyMapperBase<NullWritable, OrcStruct,
           TypeDescription fileType = oldSchemaTypes.get(fieldIndex);
           TypeDescription readerType = newSchemaTypes.get(indexInNewSchema);
 
-          if (isEvolutionValid(fileType, readerType)) {
-            newStruct.setFieldValue(field, orcStruct.getFieldValue(field));
+          if (OrcUtils.isEvolutionValid(fileType, readerType)) {
+            WritableComparable oldField = orcStruct.getFieldValue(field);
+            oldField = structConversionHelper(oldField, mapperSchema.getChildren().get(fieldIndex));
+            newStruct.setFieldValue(field, oldField);
           } else {
             throw new SchemaEvolution.IllegalEvolutionException(String
                 .format("ORC does not support type conversion from file" + " type %s to reader type %s ",
@@ -111,81 +132,37 @@ public class OrcValueMapper extends RecordKeyMapperBase<NullWritable, OrcStruct,
   }
 
   /**
-   * Determine if two types are following valid evolution.
-   * Implementation stolen and manipulated from {@link SchemaEvolution} as that was package-private.
+   * Suppress the warning of type checking: All casts are clearly valid as they are all (sub)elements Orc types.
+   * Check failure will trigger Cast exception and blow up the process.
    */
-  static boolean isEvolutionValid(TypeDescription fileType, TypeDescription readerType) {
-    boolean isOk = true;
-    if (fileType.getCategory() == readerType.getCategory()) {
-      switch (readerType.getCategory()) {
-        case BOOLEAN:
-        case BYTE:
-        case SHORT:
-        case INT:
-        case LONG:
-        case DOUBLE:
-        case FLOAT:
-        case STRING:
-        case TIMESTAMP:
-        case BINARY:
-        case DATE:
-          // these are always a match
-          break;
-        case CHAR:
-        case VARCHAR:
-          break;
-        case DECIMAL:
-          break;
-        case UNION:
-        case MAP:
-        case LIST: {
-          // these must be an exact match
-          List<TypeDescription> fileChildren = fileType.getChildren();
-          List<TypeDescription> readerChildren = readerType.getChildren();
-          if (fileChildren.size() == readerChildren.size()) {
-            for (int i = 0; i < fileChildren.size(); ++i) {
-              isOk &= isEvolutionValid(fileChildren.get(i), readerChildren.get(i));
-            }
-            return isOk;
-          } else {
-            return false;
-          }
-        }
-        case STRUCT: {
-          List<TypeDescription> readerChildren = readerType.getChildren();
-          List<TypeDescription> fileChildren = fileType.getChildren();
-
-          List<String> readerFieldNames = readerType.getFieldNames();
-          List<String> fileFieldNames = fileType.getFieldNames();
-
-          final Map<String, TypeDescription> fileTypesIdx = new HashMap();
-          for (int i = 0; i < fileFieldNames.size(); i++) {
-            final String fileFieldName = fileFieldNames.get(i);
-            fileTypesIdx.put(fileFieldName, fileChildren.get(i));
-          }
-
-          for (int i = 0; i < readerFieldNames.size(); i++) {
-            final String readerFieldName = readerFieldNames.get(i);
-            TypeDescription readerField = readerChildren.get(i);
-            TypeDescription fileField = fileTypesIdx.get(readerFieldName);
-            if (fileField == null) {
-              continue;
-            }
-
-            isOk &= isEvolutionValid(fileField, readerField);
-          }
-          return isOk;
-        }
-        default:
-          throw new IllegalArgumentException("Unknown type " + readerType);
+  @SuppressWarnings("unchecked")
+  private WritableComparable structConversionHelper(WritableComparable w, TypeDescription mapperSchema) {
+    if (w instanceof OrcStruct) {
+      return upConvertOrcStruct((OrcStruct) w, mapperSchema);
+    } else if (w instanceof OrcList) {
+      OrcList castedList = (OrcList) w;
+      TypeDescription elementType = mapperSchema.getChildren().get(0);
+      for (int i = 0; i < castedList.size(); i++) {
+        castedList.set(i, structConversionHelper((WritableComparable) castedList.get(i), elementType));
       }
-      return isOk;
-    } else {
-      /*
-       * Check for the few cases where will not convert....
-       */
-      return ConvertTreeReaderFactory.canConvert(fileType, readerType);
+    } else if (w instanceof OrcMap) {
+      OrcMap castedMap = (OrcMap) w;
+      for (Object entry : castedMap.entrySet()) {
+        Map.Entry<WritableComparable, WritableComparable> castedEntry =
+            (Map.Entry<WritableComparable, WritableComparable>) entry;
+        castedMap.put(castedEntry.getKey(),
+            structConversionHelper(castedEntry.getValue(), mapperSchema.getChildren().get(1)));
+      }
+      return castedMap;
+    } else if (w instanceof OrcUnion) {
+      OrcUnion castedUnion = (OrcUnion) w;
+      byte tag = castedUnion.getTag();
+      castedUnion.set(tag,
+          structConversionHelper((WritableComparable) castedUnion.getObject(), mapperSchema.getChildren().get(tag)));
     }
+
+    // Directly return if primitive object.
+    return w;
   }
 
   /**
diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapperTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapperTest.java
index 2304bdc..651c512 100644
--- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapperTest.java
+++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/orc/OrcValueMapperTest.java
@@ -18,6 +18,8 @@
 package org.apache.gobblin.compaction.mapreduce.orc;
 
 import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.SchemaEvolution;
+import org.apache.orc.mapred.OrcStruct;
 import org.junit.Assert;
 import org.testng.annotations.Test;
 
@@ -29,9 +31,63 @@ public class OrcValueMapperTest {
     TypeDescription schema_2 = TypeDescription.fromString("struct<i:int,j:int,k:bigint>");
     TypeDescription schema_3 = TypeDescription.fromString("struct<i:int,j:int,k:tinyint>");
     TypeDescription schema_4 = TypeDescription.fromString("struct<i:int,j:int>");
-    Assert.assertTrue(OrcValueMapper.isEvolutionValid(schema_1, schema_2));
-    Assert.assertTrue(OrcValueMapper.isEvolutionValid(schema_1, schema_3));
-    Assert.assertTrue(OrcValueMapper.isEvolutionValid(schema_1, schema_4));
-    Assert.assertTrue(OrcValueMapper.isEvolutionValid(schema_4, schema_1));
+    Assert.assertTrue(OrcUtils.isEvolutionValid(schema_1, schema_2));
+    Assert.assertTrue(OrcUtils.isEvolutionValid(schema_1, schema_3));
+    Assert.assertTrue(OrcUtils.isEvolutionValid(schema_1, schema_4));
+    Assert.assertTrue(OrcUtils.isEvolutionValid(schema_4, schema_1));
+  }
+
+  @Test
+  public void testUpConvertOrcStruct(){
+    OrcValueMapper mapper = new OrcValueMapper();
+
+    // Basic case.
+    TypeDescription baseStructSchema = TypeDescription.fromString("struct<a:int,b:string>");
+    OrcStruct baseStruct = (OrcStruct) OrcStruct.createValue(baseStructSchema);
+    TypeDescription evolved_baseStructSchema = TypeDescription.fromString("struct<a:int,b:string,c:int>");
+    OrcStruct evolvedStruct = (OrcStruct) OrcStruct.createValue(evolved_baseStructSchema);
+    OrcStruct resultStruct = mapper.upConvertOrcStruct(baseStruct, evolved_baseStructSchema);
+    Assert.assertEquals(resultStruct.getSchema(), evolved_baseStructSchema);
+
+    // Base case: Reverse direction.
+    resultStruct = mapper.upConvertOrcStruct(evolvedStruct, baseStructSchema);
+    Assert.assertEquals(resultStruct.getSchema(), baseStructSchema);
+
+    // Simple Nested: List/Map/Union/Struct within Struct.
+    TypeDescription listInStructSchema = TypeDescription.fromString("struct<a:array<struct<a:int,b:string>>>");
+    OrcStruct listInStruct = (OrcStruct) OrcStruct.createValue(listInStructSchema);
+    TypeDescription evolved_listInStructSchema = TypeDescription.fromString("struct<a:array<struct<a:int,b:string,c:string>>>");
+    OrcStruct evolved_listInStruct = (OrcStruct) OrcStruct.createValue(evolved_listInStructSchema);
+    resultStruct = mapper.upConvertOrcStruct(listInStruct, evolved_listInStructSchema);
+    Assert.assertEquals(resultStruct.getSchema(), evolved_listInStructSchema);
+    resultStruct = mapper.upConvertOrcStruct(evolved_listInStruct, listInStructSchema);
+    Assert.assertEquals(resultStruct.getSchema(), listInStructSchema);
+
+    TypeDescription mapInStructSchema = TypeDescription.fromString("struct<a:map<string,int>>");
+    OrcStruct mapInStruct = (OrcStruct) OrcStruct.createValue(mapInStructSchema);
+    TypeDescription evolved_mapInStructSchema = TypeDescription.fromString("struct<a:map<string,bigint>>");
+    OrcStruct evolved_mapInStruct = (OrcStruct) OrcStruct.createValue(evolved_mapInStructSchema);
+    resultStruct = mapper.upConvertOrcStruct(mapInStruct, evolved_mapInStructSchema);
+    Assert.assertEquals(resultStruct.getSchema(), evolved_mapInStructSchema);
+    resultStruct = mapper.upConvertOrcStruct(evolved_mapInStruct, mapInStructSchema);
+    // Evolution not valid, no up-conversion happened.
+    try {
+      resultStruct.getSchema().equals(evolved_mapInStructSchema);
+    } catch (SchemaEvolution.IllegalEvolutionException ie) {
+      Assert.assertTrue(true);
+    }
+
+    TypeDescription unionInStructSchema = TypeDescription.fromString("struct<a:uniontype<int,string>>");
+    OrcStruct unionInStruct = (OrcStruct) OrcStruct.createValue(unionInStructSchema);
+    TypeDescription evolved_unionInStructSchema = TypeDescription.fromString("struct<a:uniontype<bigint,string>>");
+    resultStruct = mapper.upConvertOrcStruct(unionInStruct, evolved_unionInStructSchema);
+    Assert.assertEquals(resultStruct.getSchema(), evolved_unionInStructSchema);
+
+    // Complex: List<Struct> within struct among others and evolution happens on multiple places.
+    TypeDescription complex_1 = TypeDescription.fromString("struct<a:array<struct<a:string,b:int>>,b:struct<a:uniontype<int,string>>>");
+    OrcStruct complex_struct = (OrcStruct) OrcStruct.createValue(complex_1);
+    TypeDescription evolved_complex_1 = TypeDescription.fromString("struct<a:array<struct<a:string,b:int,c:string>>,b:struct<a:uniontype<bigint,string>,b:int>>");
+    resultStruct = mapper.upConvertOrcStruct(complex_struct, evolved_complex_1);
+    Assert.assertEquals(resultStruct.getSchema(), evolved_complex_1);
   }
 }
\ No newline at end of file