You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2021/06/30 17:23:28 UTC
[gobblin] branch master updated: [GOBBLIN-1485]Enable feature to
get schema from writer schema when do hive registration (#3324)
This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4b1d57f [GOBBLIN-1485]Enable feature to get schema from writer schema when do hive registration (#3324)
4b1d57f is described below
commit 4b1d57ff82783eab04ab19fe183086f0154365e3
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Wed Jun 30 10:23:22 2021 -0700
[GOBBLIN-1485]Enable feature to get schema from writer schema when do hive registration (#3324)
* [GOBBLIN-1485]Enable feature to get schema from writer schema when do hive registration
* remove unused import
* address comments
---
.../hive/HiveRegistrationUnitComparator.java | 21 ++++++++++++++++++++-
.../gobblin/hive/orc/HiveOrcSerDeManager.java | 17 ++++++++++++++++-
2 files changed, 36 insertions(+), 2 deletions(-)
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegistrationUnitComparator.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegistrationUnitComparator.java
index 8b76056..4c4eb03 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegistrationUnitComparator.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegistrationUnitComparator.java
@@ -17,14 +17,18 @@
package org.apache.gobblin.hive;
+import com.google.common.base.Strings;
import java.util.Set;
+import org.apache.avro.Schema;
+import org.apache.gobblin.util.AvroUtils;
import org.apache.hadoop.fs.Path;
import com.google.common.base.Optional;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.State;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
/**
@@ -66,6 +70,7 @@ import org.apache.gobblin.configuration.State;
*/
@Alpha
public class HiveRegistrationUnitComparator<T extends HiveRegistrationUnitComparator<?>> {
+ private static String SCHEMA_CREATION_TIME = "schema.creationTime";
protected final HiveRegistrationUnit existingUnit;
protected final HiveRegistrationUnit newUnit;
@@ -142,12 +147,26 @@ public class HiveRegistrationUnitComparator<T extends HiveRegistrationUnitCompar
return (T) this;
}
+ private State extractSchemaVersion(State state) {
+ //FIXME: This is a temp fix for special character in schema string, need to investigate the root
+ //cause of why we see different encoding here and have a permanent fix for this
+ State newState = new State(state);
+ String schemaFromState = state.getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName());
+ if (!Strings.isNullOrEmpty(schemaFromState)) {
+ String schemaVersion = AvroUtils.getSchemaCreationTime(new Schema.Parser().parse(schemaFromState));
+ if (!Strings.isNullOrEmpty(schemaVersion)) {
+ newState.removeProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName());
+ newState.setProp(SCHEMA_CREATION_TIME, schemaVersion);
+ }
+ }
+ return newState;
+ }
@SuppressWarnings("unchecked")
public T compareParameters() {
if (!this.result) {
checkExistingIsSuperstate(this.existingUnit.getProps(), this.newUnit.getProps());
checkExistingIsSuperstate(this.existingUnit.getStorageProps(), this.newUnit.getStorageProps());
- checkExistingIsSuperstate(this.existingUnit.getSerDeProps(), this.newUnit.getSerDeProps());
+ checkExistingIsSuperstate(extractSchemaVersion(this.existingUnit.getSerDeProps()), extractSchemaVersion(this.newUnit.getSerDeProps()));
}
return (T) this;
}
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java
index f20f962..5cbb039 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java
@@ -16,6 +16,7 @@
*/
package org.apache.gobblin.hive.orc;
+import com.google.common.base.Strings;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -24,6 +25,7 @@ import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
+import org.apache.avro.Schema;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -33,6 +35,9 @@ import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -264,7 +269,17 @@ public class HiveOrcSerDeManager extends HiveSerDeManager {
*
*/
protected void addSchemaPropertiesHelper(Path path, HiveRegistrationUnit hiveUnit) throws IOException {
- TypeInfo schema = getSchemaFromLatestFile(path, this.fs);
+ TypeInfo schema;
+ if(!Strings.isNullOrEmpty(props.getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()))) {
+ try {
+ Schema avroSchema = new Schema.Parser().parse(props.getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
+ schema = TypeInfoUtils.getTypeInfoFromObjectInspector(new AvroObjectInspectorGenerator(avroSchema).getObjectInspector());
+ } catch (SerDeException e) {
+ throw new IOException(e);
+ }
+ } else {
+ schema = getSchemaFromLatestFile(path, this.fs);
+ }
if (schema instanceof StructTypeInfo) {
StructTypeInfo structTypeInfo = (StructTypeInfo) schema;
hiveUnit.setSerDeProp(serdeConstants.LIST_COLUMNS,