You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2021/06/30 17:23:28 UTC

[gobblin] branch master updated: [GOBBLIN-1485]Enable feature to get schema from writer schema when do hive registration (#3324)

This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b1d57f  [GOBBLIN-1485]Enable feature to get schema from writer schema when do hive registration (#3324)
4b1d57f is described below

commit 4b1d57ff82783eab04ab19fe183086f0154365e3
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Wed Jun 30 10:23:22 2021 -0700

    [GOBBLIN-1485]Enable feature to get schema from writer schema when do hive registration (#3324)
    
    * [GOBBLIN-1485]Enable feature to get schema from writer schema when do hive registration
    
    * remove unused import
    
    * address comments
---
 .../hive/HiveRegistrationUnitComparator.java        | 21 ++++++++++++++++++++-
 .../gobblin/hive/orc/HiveOrcSerDeManager.java       | 17 ++++++++++++++++-
 2 files changed, 36 insertions(+), 2 deletions(-)

diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegistrationUnitComparator.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegistrationUnitComparator.java
index 8b76056..4c4eb03 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegistrationUnitComparator.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegistrationUnitComparator.java
@@ -17,14 +17,18 @@
 
 package org.apache.gobblin.hive;
 
+import com.google.common.base.Strings;
 import java.util.Set;
 
+import org.apache.avro.Schema;
+import org.apache.gobblin.util.AvroUtils;
 import org.apache.hadoop.fs.Path;
 
 import com.google.common.base.Optional;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.State;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
 
 
 /**
@@ -66,6 +70,7 @@ import org.apache.gobblin.configuration.State;
  */
 @Alpha
 public class HiveRegistrationUnitComparator<T extends HiveRegistrationUnitComparator<?>> {
+  private static String SCHEMA_CREATION_TIME = "schema.creationTime";
 
   protected final HiveRegistrationUnit existingUnit;
   protected final HiveRegistrationUnit newUnit;
@@ -142,12 +147,26 @@ public class HiveRegistrationUnitComparator<T extends HiveRegistrationUnitCompar
     return (T) this;
   }
 
+  private State extractSchemaVersion(State state) {
+    //FIXME: This is a temp fix for special character in schema string, need to investigate the root
+    //cause of why we see different encoding here and have a permanent fix for this
+    State newState = new State(state);
+    String schemaFromState = state.getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName());
+    if (!Strings.isNullOrEmpty(schemaFromState)) {
+      String schemaVersion = AvroUtils.getSchemaCreationTime(new Schema.Parser().parse(schemaFromState));
+      if (!Strings.isNullOrEmpty(schemaVersion)) {
+         newState.removeProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName());
+         newState.setProp(SCHEMA_CREATION_TIME, schemaVersion);
+      }
+    }
+    return newState;
+  }
   @SuppressWarnings("unchecked")
   public T compareParameters() {
     if (!this.result) {
       checkExistingIsSuperstate(this.existingUnit.getProps(), this.newUnit.getProps());
       checkExistingIsSuperstate(this.existingUnit.getStorageProps(), this.newUnit.getStorageProps());
-      checkExistingIsSuperstate(this.existingUnit.getSerDeProps(), this.newUnit.getSerDeProps());
+      checkExistingIsSuperstate(extractSchemaVersion(this.existingUnit.getSerDeProps()), extractSchemaVersion(this.newUnit.getSerDeProps()));
     }
     return (T) this;
   }
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java
index f20f962..5cbb039 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java
@@ -16,6 +16,7 @@
  */
 package org.apache.gobblin.hive.orc;
 
+import com.google.common.base.Strings;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -24,6 +25,7 @@ import java.util.Collections;
 import java.util.List;
 
 import java.util.stream.Collectors;
+import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -33,6 +35,9 @@ import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
 import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -264,7 +269,17 @@ public class HiveOrcSerDeManager extends HiveSerDeManager {
    *
    */
   protected void addSchemaPropertiesHelper(Path path, HiveRegistrationUnit hiveUnit) throws IOException {
-    TypeInfo schema = getSchemaFromLatestFile(path, this.fs);
+    TypeInfo schema;
+    if(!Strings.isNullOrEmpty(props.getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()))) {
+      try {
+        Schema avroSchema = new Schema.Parser().parse(props.getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
+        schema = TypeInfoUtils.getTypeInfoFromObjectInspector(new AvroObjectInspectorGenerator(avroSchema).getObjectInspector());
+      } catch (SerDeException e) {
+        throw new IOException(e);
+      }
+    }  else {
+      schema = getSchemaFromLatestFile(path, this.fs);
+    }
     if (schema instanceof StructTypeInfo) {
       StructTypeInfo structTypeInfo = (StructTypeInfo) schema;
       hiveUnit.setSerDeProp(serdeConstants.LIST_COLUMNS,