You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2021/08/18 16:50:30 UTC

[gobblin] branch master updated: [GOBBLIN-1515]Fix hive schema not updated issue in streaming and GMIP (#3364)

This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new daffe0c  [GOBBLIN-1515]Fix hive schema not updated issue in streaming and GMIP (#3364)
daffe0c is described below

commit daffe0c2d2c01ef9ca74344593d5c09db50d537a
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Wed Aug 18 09:50:25 2021 -0700

    [GOBBLIN-1515]Fix hive schema not updated issue in streaming and GMIP (#3364)
    
    * [GOBBLIN-1515]Fix hive schema not updated issue in streaming and GMIP
    
    * address comments
---
 gobblin-hive-registration/build.gradle             |   1 +
 .../hive/metastore/HiveMetaStoreBasedRegister.java |   6 +-
 .../gobblin/hive/metastore/HiveMetaStoreUtils.java |  19 +-
 .../gobblin/hive/orc/HiveOrcSerDeManager.java      |  37 +-
 .../gobblin/hive/writer/HiveMetadataWriter.java    |  15 +-
 .../orc/TypeDescriptionToObjectInspectorUtil.java  | 443 +++++++++++++++++++++
 .../apache/gobblin/iceberg/GobblinMCEProducer.java |   5 +
 .../writer/GenericRecordToOrcValueWriter.java      |   1 +
 .../apache/gobblin/writer/GobblinOrcWriter.java    |   1 +
 .../writer/GenericRecordToOrcValueWriterTest.java  |   1 +
 gobblin-utility/build.gradle                       |   1 +
 .../gobblin/util/orc}/AvroOrcSchemaConverter.java  |   3 +-
 .../util/orc}/AvroOrcSchemaConverterTest.java      |   8 +-
 13 files changed, 508 insertions(+), 33 deletions(-)

diff --git a/gobblin-hive-registration/build.gradle b/gobblin-hive-registration/build.gradle
index feec628..c136ea1 100644
--- a/gobblin-hive-registration/build.gradle
+++ b/gobblin-hive-registration/build.gradle
@@ -24,6 +24,7 @@ dependencies {
   compile project(":gobblin-utility")
 
   compile externalDependency.avro
+  compile externalDependency.orcCore
   compile externalDependency.datanucleusCore
   compile externalDependency.datanucleusRdbms
   compile externalDependency.guava
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
index d5b2393..3d61634 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
@@ -238,7 +238,8 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
           spec.getTable()
               .getSerDeProps()
               .setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), existingTableSchema);
-          table.getSd().setSerdeInfo(HiveMetaStoreUtils.getSerDeInfo(spec.getTable()));
+          HiveMetaStoreUtils.updateColumnsInfoIfNeeded(spec);
+          table.setSd(HiveMetaStoreUtils.getStorageDescriptor(spec.getTable()));
           return;
         }
         Schema writerSchema = new Schema.Parser().parse((
@@ -254,7 +255,8 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
             spec.getTable()
                 .getSerDeProps()
                 .setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), existingTableSchema);
-            table.getSd().setSerdeInfo(HiveMetaStoreUtils.getSerDeInfo(spec.getTable()));
+            HiveMetaStoreUtils.updateColumnsInfoIfNeeded(spec);
+            table.setSd(HiveMetaStoreUtils.getStorageDescriptor(spec.getTable()));
           }
         }
       } catch ( IOException e) {
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
index d381d51..967cc2e 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.hive.metastore;
 
+import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
@@ -26,7 +27,9 @@ import java.util.Properties;
 
 import org.apache.avro.SchemaParseException;
 import org.apache.commons.lang.reflect.MethodUtils;
+import org.apache.gobblin.hive.spec.HiveSpec;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.TableType;
@@ -37,6 +40,7 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
@@ -207,7 +211,7 @@ public class HiveMetaStoreUtils {
     return parameters;
   }
 
-  private static StorageDescriptor getStorageDescriptor(HiveRegistrationUnit unit) {
+  public static StorageDescriptor getStorageDescriptor(HiveRegistrationUnit unit) {
     State props = unit.getStorageProps();
     StorageDescriptor sd = new StorageDescriptor();
     sd.setParameters(getParameters(props));
@@ -421,6 +425,19 @@ public class HiveMetaStoreUtils {
     return deserializer;
   }
 
+  public static void updateColumnsInfoIfNeeded(HiveSpec spec) throws IOException {
+    HiveTable table = spec.getTable();
+    if (table.getSerDeProps().contains(serdeConstants.LIST_COLUMNS)) {
+      if (table.getSerDeManager().isPresent()) {
+        String path = spec.getPartition().isPresent() && spec.getPartition().get().getLocation().isPresent() ? spec.getPartition()
+            .get()
+            .getLocation()
+            .get() : spec.getTable().getLocation().get();
+        table.getSerDeManager().get().addSerDeProperties(new Path(path), table);
+      }
+    }
+  }
+
   @VisibleForTesting
   protected static void inVokeDetermineSchemaOrThrowExceptionMethod(Properties props, Configuration conf)
       throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java
index 5cbb039..eeed3ef 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java
@@ -26,6 +26,7 @@ import java.util.List;
 
 import java.util.stream.Collectors;
 import org.apache.avro.Schema;
+import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -34,9 +35,8 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.TypeDescriptionToObjectInspectorUtil;
 import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
@@ -59,6 +59,7 @@ import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.util.FileListUtils;
 import org.apache.gobblin.util.HadoopUtils;
+import org.apache.orc.TypeDescription;
 
 /**
  * A derived class of {@link org.apache.gobblin.hive.HiveSerDeManager} that is mainly responsible for adding schema
@@ -203,7 +204,7 @@ public class HiveOrcSerDeManager extends HiveSerDeManager {
       }
       return getSchemaFromLatestFile(files.get(0).getPath(), fs);
     } else {
-       return TypeInfoUtils.getTypeInfoFromObjectInspector(OrcFile.createReader(fs, path).getObjectInspector());
+      return TypeInfoUtils.getTypeInfoFromObjectInspector(OrcFile.createReader(fs, path).getObjectInspector());
     }
   }
 
@@ -270,24 +271,26 @@ public class HiveOrcSerDeManager extends HiveSerDeManager {
    */
   protected void addSchemaPropertiesHelper(Path path, HiveRegistrationUnit hiveUnit) throws IOException {
     TypeInfo schema;
-    if(!Strings.isNullOrEmpty(props.getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()))) {
-      try {
-        Schema avroSchema = new Schema.Parser().parse(props.getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
-        schema = TypeInfoUtils.getTypeInfoFromObjectInspector(new AvroObjectInspectorGenerator(avroSchema).getObjectInspector());
-      } catch (SerDeException e) {
-        throw new IOException(e);
-      }
-    }  else {
+    String schemaString = hiveUnit.getSerDeProps()
+        .getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
+            props.getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
+    if (!Strings.isNullOrEmpty(schemaString)) {
+      Schema avroSchema =
+          new Schema.Parser().parse(props.getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
+      TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(avroSchema);
+      schema = TypeInfoUtils.getTypeInfoFromObjectInspector(
+          TypeDescriptionToObjectInspectorUtil.getObjectInspector(orcSchema));
+    } else {
       schema = getSchemaFromLatestFile(path, this.fs);
     }
     if (schema instanceof StructTypeInfo) {
       StructTypeInfo structTypeInfo = (StructTypeInfo) schema;
-      hiveUnit.setSerDeProp(serdeConstants.LIST_COLUMNS,
-          Joiner.on(",").join(structTypeInfo.getAllStructFieldNames()));
-      hiveUnit.setSerDeProp(serdeConstants.LIST_COLUMN_TYPES,
-          Joiner.on(",").join(
-              structTypeInfo.getAllStructFieldTypeInfos().stream().map(x -> x.getTypeName())
-                  .collect(Collectors.toList())));
+      hiveUnit.setSerDeProp(serdeConstants.LIST_COLUMNS, Joiner.on(",").join(structTypeInfo.getAllStructFieldNames()));
+      hiveUnit.setSerDeProp(serdeConstants.LIST_COLUMN_TYPES, Joiner.on(",")
+          .join(structTypeInfo.getAllStructFieldTypeInfos()
+              .stream()
+              .map(x -> x.getTypeName())
+              .collect(Collectors.toList())));
     } else {
       // Hive always uses a struct with a field for each of the top-level columns as the root object type.
       // So for here we assume to-be-registered ORC files follow this pattern.
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
index 88adce0..9ca6bb9 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
@@ -42,11 +42,11 @@ import org.apache.gobblin.data.management.copy.hive.WhitelistBlacklist;
 import org.apache.gobblin.hive.HiveRegister;
 import org.apache.gobblin.hive.HiveTable;
 import org.apache.gobblin.hive.metastore.HiveMetaStoreBasedRegister;
+import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
 import org.apache.gobblin.hive.spec.HiveSpec;
 import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
 import org.apache.gobblin.metadata.SchemaSource;
 import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
-import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
 import org.apache.gobblin.stream.RecordEnvelope;
 import org.apache.gobblin.util.AvroUtils;
 
@@ -197,7 +197,7 @@ public class HiveMetadataWriter implements MetadataWriter {
   }
 
   public void addFiles(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSpec>> newSpecsMap, String dbName,
-      String tableName, String topicName) throws SchemaRegistryException {
+      String tableName, String topicName) throws IOException {
     String tableKey = tableNameJoiner.join(dbName, tableName);
     for (Collection<HiveSpec> specs : newSpecsMap.values()) {
       for (HiveSpec spec : specs) {
@@ -243,7 +243,7 @@ public class HiveMetadataWriter implements MetadataWriter {
   }
 
   private void schemaUpdateHelper(GobblinMetadataChangeEvent gmce, HiveSpec spec, String topicName, String tableKey)
-      throws SchemaRegistryException {
+      throws IOException {
     if (gmce.getSchemaSource() != SchemaSource.NONE) {
       String newSchemaString =
           spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName());
@@ -285,6 +285,7 @@ public class HiveMetadataWriter implements MetadataWriter {
           spec.getTable()
               .getSerDeProps()
               .setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), sourceSchema);
+          HiveMetaStoreUtils.updateColumnsInfoIfNeeded(spec);
         }
       } catch (IOException e) {
         log.warn(String.format("Cannot get schema from table %s.%s", schemaSourceDb, spec.getTable().getTableName()), e);
@@ -292,9 +293,11 @@ public class HiveMetadataWriter implements MetadataWriter {
       return;
     }
     //Force to set the schema even there is no schema literal defined in the spec
-    spec.getTable()
-        .getSerDeProps()
-        .setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), latestSchemaMap.get(tableKey));
+    if (latestSchemaMap.containsKey(tableKey)) {
+      spec.getTable().getSerDeProps()
+          .setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), latestSchemaMap.get(tableKey));
+      HiveMetaStoreUtils.updateColumnsInfoIfNeeded(spec);
+    }
   }
 
   private String fetchSchemaFromTable(String dbName, String tableName) throws IOException {
diff --git a/gobblin-hive-registration/src/main/java/org/apache/hadoop/hive/ql/io/orc/TypeDescriptionToObjectInspectorUtil.java b/gobblin-hive-registration/src/main/java/org/apache/hadoop/hive/ql/io/orc/TypeDescriptionToObjectInspectorUtil.java
new file mode 100644
index 0000000..0b591f1
--- /dev/null
+++ b/gobblin-hive-registration/src/main/java/org/apache/hadoop/hive/ql/io/orc/TypeDescriptionToObjectInspectorUtil.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableMapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.TypeDescription;
+
+
+public class TypeDescriptionToObjectInspectorUtil {
+  public static ObjectInspector getObjectInspector(TypeDescription orcSchema) {
+    return createObjectInspector(0, OrcUtils.getOrcTypes(orcSchema));
+  }
+
+  static class Field implements StructField {
+    private final String name;
+    private final ObjectInspector inspector;
+    private final int offset;
+
+    Field(String name, ObjectInspector inspector, int offset) {
+      this.name = name;
+      this.inspector = inspector;
+      this.offset = offset;
+    }
+
+    @Override
+    public String getFieldName() {
+      return name;
+    }
+
+    @Override
+    public ObjectInspector getFieldObjectInspector() {
+      return inspector;
+    }
+
+    @Override
+    public int getFieldID() {
+      return offset;
+    }
+
+    @Override
+    public String getFieldComment() {
+      return null;
+    }
+  }
+
+  static class OrcStructInspector extends SettableStructObjectInspector {
+    private List<StructField> fields;
+
+    OrcStructInspector(int columnId, List<OrcProto.Type> types) {
+      OrcProto.Type type = types.get(columnId);
+      int fieldCount = type.getSubtypesCount();
+      fields = new ArrayList<>(fieldCount);
+      for(int i=0; i < fieldCount; ++i) {
+        int fieldType = type.getSubtypes(i);
+        fields.add(new Field(type.getFieldNames(i),
+            createObjectInspector(fieldType, types), i));
+      }
+    }
+
+    @Override
+    public List<StructField> getAllStructFieldRefs() {
+      return fields;
+    }
+
+    @Override
+    public StructField getStructFieldRef(String s) {
+      for(StructField field: fields) {
+        if (field.getFieldName().equalsIgnoreCase(s)) {
+          return field;
+        }
+      }
+      return null;
+    }
+
+    @Override
+    public Object getStructFieldData(Object object, StructField field) {
+      if (object == null) {
+        return null;
+      }
+      int offset = ((Field) field).offset;
+      OrcStruct struct = (OrcStruct) object;
+      if (offset >= struct.getNumFields()) {
+        return null;
+      }
+
+      return struct.getFieldValue(offset);
+    }
+
+    @Override
+    public List<Object> getStructFieldsDataAsList(Object object) {
+      if (object == null) {
+        return null;
+      }
+      OrcStruct struct = (OrcStruct) object;
+      List<Object> result = new ArrayList<Object>(struct.getNumFields());
+      for (int i=0; i<struct.getNumFields(); i++) {
+        result.add(struct.getFieldValue(i));
+      }
+      return result;
+    }
+
+    @Override
+    public String getTypeName() {
+      StringBuilder buffer = new StringBuilder();
+      buffer.append("struct<");
+      for(int i=0; i < fields.size(); ++i) {
+        StructField field = fields.get(i);
+        if (i != 0) {
+          buffer.append(",");
+        }
+        buffer.append(field.getFieldName());
+        buffer.append(":");
+        buffer.append(field.getFieldObjectInspector().getTypeName());
+      }
+      buffer.append(">");
+      return buffer.toString();
+    }
+
+    @Override
+    public Category getCategory() {
+      return Category.STRUCT;
+    }
+
+    @Override
+    public Object create() {
+      return new OrcStruct(0);
+    }
+
+    @Override
+    public Object setStructFieldData(Object struct, StructField field,
+        Object fieldValue) {
+      OrcStruct orcStruct = (OrcStruct) struct;
+      int offset = ((Field) field).offset;
+      // if the offset is bigger than our current number of fields, grow it
+      if (orcStruct.getNumFields() <= offset) {
+        orcStruct.setNumFields(offset+1);
+      }
+      orcStruct.setFieldValue(offset, fieldValue);
+      return struct;
+    }
+
+  }
+
+  static class OrcMapObjectInspector
+      implements MapObjectInspector, SettableMapObjectInspector {
+    private ObjectInspector key;
+    private ObjectInspector value;
+
+    OrcMapObjectInspector(int columnId, List<OrcProto.Type> types) {
+      OrcProto.Type type = types.get(columnId);
+      key = createObjectInspector(type.getSubtypes(0), types);
+      value = createObjectInspector(type.getSubtypes(1), types);
+    }
+
+    @Override
+    public ObjectInspector getMapKeyObjectInspector() {
+      return key;
+    }
+
+    @Override
+    public ObjectInspector getMapValueObjectInspector() {
+      return value;
+    }
+
+    @Override
+    public Object getMapValueElement(Object map, Object key) {
+      return ((map == null || key == null)? null : ((Map) map).get(key));
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Map<Object, Object> getMap(Object map) {
+      if (map == null) {
+        return null;
+      }
+      return (Map) map;
+    }
+
+    @Override
+    public int getMapSize(Object map) {
+      if (map == null) {
+        return -1;
+      }
+      return ((Map) map).size();
+    }
+
+    @Override
+    public String getTypeName() {
+      return "map<" + key.getTypeName() + "," + value.getTypeName() + ">";
+    }
+
+    @Override
+    public Category getCategory() {
+      return Category.MAP;
+    }
+
+    @Override
+    public Object create() {
+      return new LinkedHashMap<>();
+    }
+
+    @Override
+    public Object put(Object map, Object key, Object value) {
+      ((Map) map).put(key, value);
+      return map;
+    }
+
+    @Override
+    public Object remove(Object map, Object key) {
+      ((Map) map).remove(key);
+      return map;
+    }
+
+    @Override
+    public Object clear(Object map) {
+      ((Map) map).clear();
+      return map;
+    }
+
+  }
+
+  static class OrcUnionObjectInspector implements UnionObjectInspector {
+    private List<ObjectInspector> children;
+
+    protected OrcUnionObjectInspector() {
+      super();
+    }
+    OrcUnionObjectInspector(int columnId,
+        List<OrcProto.Type> types) {
+      OrcProto.Type type = types.get(columnId);
+      children = new ArrayList<ObjectInspector>(type.getSubtypesCount());
+      for(int i=0; i < type.getSubtypesCount(); ++i) {
+        children.add(createObjectInspector(type.getSubtypes(i),
+            types));
+      }
+    }
+
+    @Override
+    public List<ObjectInspector> getObjectInspectors() {
+      return children;
+    }
+
+    @Override
+    public byte getTag(Object obj) {
+      return ((OrcUnion) obj).getTag();
+    }
+
+    @Override
+    public Object getField(Object obj) {
+      return ((OrcUnion) obj).getObject();
+    }
+
+    @Override
+    public String getTypeName() {
+      StringBuilder builder = new StringBuilder("uniontype<");
+      boolean first = true;
+      for(ObjectInspector child: children) {
+        if (first) {
+          first = false;
+        } else {
+          builder.append(",");
+        }
+        builder.append(child.getTypeName());
+      }
+      builder.append(">");
+      return builder.toString();
+    }
+
+    @Override
+    public Category getCategory() {
+      return Category.UNION;
+    }
+
+  }
+
+  static class OrcListObjectInspector
+      implements ListObjectInspector, SettableListObjectInspector {
+    private ObjectInspector child;
+
+
+    OrcListObjectInspector(int columnId, List<OrcProto.Type> types) {
+      OrcProto.Type type = types.get(columnId);
+      child = createObjectInspector(type.getSubtypes(0), types);
+    }
+
+    @Override
+    public ObjectInspector getListElementObjectInspector() {
+      return child;
+    }
+
+    @Override
+    public Object getListElement(Object list, int i) {
+      if (list == null || i < 0 || i >= getListLength(list)) {
+        return null;
+      }
+      return ((List) list).get(i);
+    }
+
+    @Override
+    public int getListLength(Object list) {
+      if (list == null) {
+        return -1;
+      }
+      return ((List) list).size();
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public List<?> getList(Object list) {
+      if (list == null) {
+        return null;
+      }
+      return (List) list;
+    }
+
+    @Override
+    public String getTypeName() {
+      return "array<" + child.getTypeName() + ">";
+    }
+
+    @Override
+    public Category getCategory() {
+      return Category.LIST;
+    }
+
+    @Override
+    public Object create(int size) {
+      ArrayList<Object> result = new ArrayList<Object>(size);
+      for(int i = 0; i < size; ++i) {
+        result.add(null);
+      }
+      return result;
+    }
+
+    @Override
+    public Object set(Object list, int index, Object element) {
+      List l = (List) list;
+      for(int i=l.size(); i < index+1; ++i) {
+        l.add(null);
+      }
+      l.set(index, element);
+      return list;
+    }
+
+    @Override
+    public Object resize(Object list, int newSize) {
+      ((ArrayList) list).ensureCapacity(newSize);
+      return list;
+    }
+
+  }
+
+
+  static ObjectInspector createObjectInspector(int columnId,
+      List<OrcProto.Type> types){
+    OrcProto.Type type = types.get(columnId);
+    switch (type.getKind()) {
+      case FLOAT:
+        return PrimitiveObjectInspectorFactory.writableFloatObjectInspector;
+      case DOUBLE:
+        return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
+      case BOOLEAN:
+        return PrimitiveObjectInspectorFactory.writableBooleanObjectInspector;
+      case BYTE:
+        return PrimitiveObjectInspectorFactory.writableByteObjectInspector;
+      case SHORT:
+        return PrimitiveObjectInspectorFactory.writableShortObjectInspector;
+      case INT:
+        return PrimitiveObjectInspectorFactory.writableIntObjectInspector;
+      case LONG:
+        return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
+      case BINARY:
+        return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;
+      case STRING:
+        return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
+      case CHAR:
+        if (!type.hasMaximumLength()) {
+          throw new UnsupportedOperationException(
+              "Illegal use of char type without length in ORC type definition.");
+        }
+        return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(
+            TypeInfoFactory.getCharTypeInfo(type.getMaximumLength()));
+      case VARCHAR:
+        if (!type.hasMaximumLength()) {
+          throw new UnsupportedOperationException(
+              "Illegal use of varchar type without length in ORC type definition.");
+        }
+        return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(
+            TypeInfoFactory.getVarcharTypeInfo(type.getMaximumLength()));
+      case TIMESTAMP:
+        return PrimitiveObjectInspectorFactory.writableTimestampObjectInspector;
+      case DATE:
+        return PrimitiveObjectInspectorFactory.writableDateObjectInspector;
+      case DECIMAL:
+        int precision = type.hasPrecision() ? type.getPrecision() : HiveDecimal.SYSTEM_DEFAULT_PRECISION;
+        int scale =  type.hasScale()? type.getScale() : HiveDecimal.SYSTEM_DEFAULT_SCALE;
+        return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(
+            TypeInfoFactory.getDecimalTypeInfo(precision, scale));
+      case STRUCT:
+        return new OrcStructInspector(columnId, types);
+      case UNION:
+        return new OrcUnionObjectInspector(columnId, types);
+      case MAP:
+        return new OrcMapObjectInspector(columnId, types);
+      case LIST:
+        return new OrcListObjectInspector(columnId, types);
+      default:
+        throw new UnsupportedOperationException("Unknown type " +
+            type.getKind());
+    }
+  }
+}
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
index 4a7f566..ec81f2d 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
@@ -30,6 +30,7 @@ import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.hive.HiveSerDeManager;
 import org.apache.gobblin.hive.metastore.HiveMetaStoreBasedRegister;
 import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
 import org.apache.gobblin.iceberg.Utils.IcebergUtils;
@@ -154,6 +155,10 @@ public abstract class GobblinMCEProducer implements Closeable {
       regProperties.put(HiveMetaStoreBasedRegister.SCHEMA_SOURCE_DB,
           state.getProp(HiveMetaStoreBasedRegister.SCHEMA_SOURCE_DB));
     }
+    if (state.contains(HiveSerDeManager.HIVE_ROW_FORMAT)) {
+      regProperties.put(HiveSerDeManager.HIVE_ROW_FORMAT,
+          state.getProp(HiveSerDeManager.HIVE_ROW_FORMAT));
+    }
     if (!regProperties.isEmpty()) {
       gmceBuilder.setRegistrationProperties(regProperties);
     }
diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
index a6777a3..32b63b4 100644
--- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
+++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
@@ -31,6 +31,7 @@ import org.apache.avro.generic.GenericEnumSymbol;
 import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.util.Utf8;
+import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.storage.common.type.HiveDecimal;
 import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
index 6c9a190..dba1f2f 100644
--- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
+++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
@@ -23,6 +23,7 @@ import java.util.Properties;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
diff --git a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
index 5d1422e..0f10316 100644
--- a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
+++ b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
@@ -27,6 +27,7 @@ import java.util.stream.Collectors;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
diff --git a/gobblin-utility/build.gradle b/gobblin-utility/build.gradle
index 78f87c5..7c85ccb 100644
--- a/gobblin-utility/build.gradle
+++ b/gobblin-utility/build.gradle
@@ -30,6 +30,7 @@ dependencies {
   compile externalDependency.guava
   compile externalDependency.slf4j
   compile externalDependency.avro
+  compile externalDependency.orcCore
   compile externalDependency.hiveMetastore
   compile externalDependency.jodaTime
   compile externalDependency.jacksonCore
diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/AvroOrcSchemaConverter.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/orc/AvroOrcSchemaConverter.java
similarity index 99%
rename from gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/AvroOrcSchemaConverter.java
rename to gobblin-utility/src/main/java/org/apache/gobblin/util/orc/AvroOrcSchemaConverter.java
index 2e1b113..1e9a6a0 100644
--- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/AvroOrcSchemaConverter.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/orc/AvroOrcSchemaConverter.java
@@ -14,11 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gobblin.writer;
+package org.apache.gobblin.util.orc;
 
 import java.util.List;
 import java.util.stream.Collectors;
-
 import org.apache.avro.Schema;
 import org.apache.orc.TypeDescription;
 
diff --git a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/AvroOrcSchemaConverterTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/orc/AvroOrcSchemaConverterTest.java
similarity index 98%
rename from gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/AvroOrcSchemaConverterTest.java
rename to gobblin-utility/src/test/java/org/apache/gobblin/util/orc/AvroOrcSchemaConverterTest.java
index a531047..cb05509 100644
--- a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/AvroOrcSchemaConverterTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/orc/AvroOrcSchemaConverterTest.java
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.writer;
+package org.apache.gobblin.util.orc;
 
+import com.google.common.base.Preconditions;
 import java.util.List;
-
 import org.apache.avro.LogicalType;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
@@ -26,9 +26,7 @@ import org.apache.orc.TypeDescription;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import com.google.common.base.Preconditions;
-
-import static org.apache.gobblin.writer.AvroOrcSchemaConverter.sanitizeNullableSchema;
+import static org.apache.gobblin.util.orc.AvroOrcSchemaConverter.*;
 
 
 public class AvroOrcSchemaConverterTest {