You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2021/08/18 16:50:30 UTC
[gobblin] branch master updated: [GOBBLIN-1515]Fix hive schema not
updated issue in streaming and GMIP (#3364)
This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new daffe0c [GOBBLIN-1515]Fix hive schema not updated issue in streaming and GMIP (#3364)
daffe0c is described below
commit daffe0c2d2c01ef9ca74344593d5c09db50d537a
Author: Zihan Li <zi...@linkedin.com>
AuthorDate: Wed Aug 18 09:50:25 2021 -0700
[GOBBLIN-1515]Fix hive schema not updated issue in streaming and GMIP (#3364)
* [GOBBLIN-1515]Fix hive schema not updated issue in streaming and GMIP
* address comments
---
gobblin-hive-registration/build.gradle | 1 +
.../hive/metastore/HiveMetaStoreBasedRegister.java | 6 +-
.../gobblin/hive/metastore/HiveMetaStoreUtils.java | 19 +-
.../gobblin/hive/orc/HiveOrcSerDeManager.java | 37 +-
.../gobblin/hive/writer/HiveMetadataWriter.java | 15 +-
.../orc/TypeDescriptionToObjectInspectorUtil.java | 443 +++++++++++++++++++++
.../apache/gobblin/iceberg/GobblinMCEProducer.java | 5 +
.../writer/GenericRecordToOrcValueWriter.java | 1 +
.../apache/gobblin/writer/GobblinOrcWriter.java | 1 +
.../writer/GenericRecordToOrcValueWriterTest.java | 1 +
gobblin-utility/build.gradle | 1 +
.../gobblin/util/orc}/AvroOrcSchemaConverter.java | 3 +-
.../util/orc}/AvroOrcSchemaConverterTest.java | 8 +-
13 files changed, 508 insertions(+), 33 deletions(-)
diff --git a/gobblin-hive-registration/build.gradle b/gobblin-hive-registration/build.gradle
index feec628..c136ea1 100644
--- a/gobblin-hive-registration/build.gradle
+++ b/gobblin-hive-registration/build.gradle
@@ -24,6 +24,7 @@ dependencies {
compile project(":gobblin-utility")
compile externalDependency.avro
+ compile externalDependency.orcCore
compile externalDependency.datanucleusCore
compile externalDependency.datanucleusRdbms
compile externalDependency.guava
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
index d5b2393..3d61634 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
@@ -238,7 +238,8 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
spec.getTable()
.getSerDeProps()
.setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), existingTableSchema);
- table.getSd().setSerdeInfo(HiveMetaStoreUtils.getSerDeInfo(spec.getTable()));
+ HiveMetaStoreUtils.updateColumnsInfoIfNeeded(spec);
+ table.setSd(HiveMetaStoreUtils.getStorageDescriptor(spec.getTable()));
return;
}
Schema writerSchema = new Schema.Parser().parse((
@@ -254,7 +255,8 @@ public class HiveMetaStoreBasedRegister extends HiveRegister {
spec.getTable()
.getSerDeProps()
.setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), existingTableSchema);
- table.getSd().setSerdeInfo(HiveMetaStoreUtils.getSerDeInfo(spec.getTable()));
+ HiveMetaStoreUtils.updateColumnsInfoIfNeeded(spec);
+ table.setSd(HiveMetaStoreUtils.getStorageDescriptor(spec.getTable()));
}
}
} catch ( IOException e) {
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
index d381d51..967cc2e 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.hive.metastore;
+import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
@@ -26,7 +27,9 @@ import java.util.Properties;
import org.apache.avro.SchemaParseException;
import org.apache.commons.lang.reflect.MethodUtils;
+import org.apache.gobblin.hive.spec.HiveSpec;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.TableType;
@@ -37,6 +40,7 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
+import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
@@ -207,7 +211,7 @@ public class HiveMetaStoreUtils {
return parameters;
}
- private static StorageDescriptor getStorageDescriptor(HiveRegistrationUnit unit) {
+ public static StorageDescriptor getStorageDescriptor(HiveRegistrationUnit unit) {
State props = unit.getStorageProps();
StorageDescriptor sd = new StorageDescriptor();
sd.setParameters(getParameters(props));
@@ -421,6 +425,19 @@ public class HiveMetaStoreUtils {
return deserializer;
}
+ public static void updateColumnsInfoIfNeeded(HiveSpec spec) throws IOException {
+ HiveTable table = spec.getTable();
+ if (table.getSerDeProps().contains(serdeConstants.LIST_COLUMNS)) {
+ if (table.getSerDeManager().isPresent()) {
+ String path = spec.getPartition().isPresent() && spec.getPartition().get().getLocation().isPresent() ? spec.getPartition()
+ .get()
+ .getLocation()
+ .get() : spec.getTable().getLocation().get();
+ table.getSerDeManager().get().addSerDeProperties(new Path(path), table);
+ }
+ }
+ }
+
@VisibleForTesting
protected static void inVokeDetermineSchemaOrThrowExceptionMethod(Properties props, Configuration conf)
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java
index 5cbb039..eeed3ef 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
+import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -34,9 +35,8 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.TypeDescriptionToObjectInspectorUtil;
import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
@@ -59,6 +59,7 @@ import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.util.FileListUtils;
import org.apache.gobblin.util.HadoopUtils;
+import org.apache.orc.TypeDescription;
/**
* A derived class of {@link org.apache.gobblin.hive.HiveSerDeManager} that is mainly responsible for adding schema
@@ -203,7 +204,7 @@ public class HiveOrcSerDeManager extends HiveSerDeManager {
}
return getSchemaFromLatestFile(files.get(0).getPath(), fs);
} else {
- return TypeInfoUtils.getTypeInfoFromObjectInspector(OrcFile.createReader(fs, path).getObjectInspector());
+ return TypeInfoUtils.getTypeInfoFromObjectInspector(OrcFile.createReader(fs, path).getObjectInspector());
}
}
@@ -270,24 +271,26 @@ public class HiveOrcSerDeManager extends HiveSerDeManager {
*/
protected void addSchemaPropertiesHelper(Path path, HiveRegistrationUnit hiveUnit) throws IOException {
TypeInfo schema;
- if(!Strings.isNullOrEmpty(props.getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()))) {
- try {
- Schema avroSchema = new Schema.Parser().parse(props.getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
- schema = TypeInfoUtils.getTypeInfoFromObjectInspector(new AvroObjectInspectorGenerator(avroSchema).getObjectInspector());
- } catch (SerDeException e) {
- throw new IOException(e);
- }
- } else {
+ String schemaString = hiveUnit.getSerDeProps()
+ .getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
+ props.getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
+ if (!Strings.isNullOrEmpty(schemaString)) {
+ Schema avroSchema =
+ new Schema.Parser().parse(props.getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
+ TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(avroSchema);
+ schema = TypeInfoUtils.getTypeInfoFromObjectInspector(
+ TypeDescriptionToObjectInspectorUtil.getObjectInspector(orcSchema));
+ } else {
schema = getSchemaFromLatestFile(path, this.fs);
}
if (schema instanceof StructTypeInfo) {
StructTypeInfo structTypeInfo = (StructTypeInfo) schema;
- hiveUnit.setSerDeProp(serdeConstants.LIST_COLUMNS,
- Joiner.on(",").join(structTypeInfo.getAllStructFieldNames()));
- hiveUnit.setSerDeProp(serdeConstants.LIST_COLUMN_TYPES,
- Joiner.on(",").join(
- structTypeInfo.getAllStructFieldTypeInfos().stream().map(x -> x.getTypeName())
- .collect(Collectors.toList())));
+ hiveUnit.setSerDeProp(serdeConstants.LIST_COLUMNS, Joiner.on(",").join(structTypeInfo.getAllStructFieldNames()));
+ hiveUnit.setSerDeProp(serdeConstants.LIST_COLUMN_TYPES, Joiner.on(",")
+ .join(structTypeInfo.getAllStructFieldTypeInfos()
+ .stream()
+ .map(x -> x.getTypeName())
+ .collect(Collectors.toList())));
} else {
// Hive always uses a struct with a field for each of the top-level columns as the root object type.
// So for here we assume to-be-registered ORC files follow this pattern.
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
index 88adce0..9ca6bb9 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
@@ -42,11 +42,11 @@ import org.apache.gobblin.data.management.copy.hive.WhitelistBlacklist;
import org.apache.gobblin.hive.HiveRegister;
import org.apache.gobblin.hive.HiveTable;
import org.apache.gobblin.hive.metastore.HiveMetaStoreBasedRegister;
+import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
import org.apache.gobblin.hive.spec.HiveSpec;
import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
import org.apache.gobblin.metadata.SchemaSource;
import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
-import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.util.AvroUtils;
@@ -197,7 +197,7 @@ public class HiveMetadataWriter implements MetadataWriter {
}
public void addFiles(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSpec>> newSpecsMap, String dbName,
- String tableName, String topicName) throws SchemaRegistryException {
+ String tableName, String topicName) throws IOException {
String tableKey = tableNameJoiner.join(dbName, tableName);
for (Collection<HiveSpec> specs : newSpecsMap.values()) {
for (HiveSpec spec : specs) {
@@ -243,7 +243,7 @@ public class HiveMetadataWriter implements MetadataWriter {
}
private void schemaUpdateHelper(GobblinMetadataChangeEvent gmce, HiveSpec spec, String topicName, String tableKey)
- throws SchemaRegistryException {
+ throws IOException {
if (gmce.getSchemaSource() != SchemaSource.NONE) {
String newSchemaString =
spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName());
@@ -285,6 +285,7 @@ public class HiveMetadataWriter implements MetadataWriter {
spec.getTable()
.getSerDeProps()
.setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), sourceSchema);
+ HiveMetaStoreUtils.updateColumnsInfoIfNeeded(spec);
}
} catch (IOException e) {
log.warn(String.format("Cannot get schema from table %s.%s", schemaSourceDb, spec.getTable().getTableName()), e);
@@ -292,9 +293,11 @@ public class HiveMetadataWriter implements MetadataWriter {
return;
}
//Force to set the schema even there is no schema literal defined in the spec
- spec.getTable()
- .getSerDeProps()
- .setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), latestSchemaMap.get(tableKey));
+ if (latestSchemaMap.containsKey(tableKey)) {
+ spec.getTable().getSerDeProps()
+ .setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), latestSchemaMap.get(tableKey));
+ HiveMetaStoreUtils.updateColumnsInfoIfNeeded(spec);
+ }
}
private String fetchSchemaFromTable(String dbName, String tableName) throws IOException {
diff --git a/gobblin-hive-registration/src/main/java/org/apache/hadoop/hive/ql/io/orc/TypeDescriptionToObjectInspectorUtil.java b/gobblin-hive-registration/src/main/java/org/apache/hadoop/hive/ql/io/orc/TypeDescriptionToObjectInspectorUtil.java
new file mode 100644
index 0000000..0b591f1
--- /dev/null
+++ b/gobblin-hive-registration/src/main/java/org/apache/hadoop/hive/ql/io/orc/TypeDescriptionToObjectInspectorUtil.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableMapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.orc.OrcProto;
+import org.apache.orc.OrcUtils;
+import org.apache.orc.TypeDescription;
+
+
+public class TypeDescriptionToObjectInspectorUtil {
+ public static ObjectInspector getObjectInspector(TypeDescription orcSchema) {
+ return createObjectInspector(0, OrcUtils.getOrcTypes(orcSchema));
+ }
+
+ static class Field implements StructField {
+ private final String name;
+ private final ObjectInspector inspector;
+ private final int offset;
+
+ Field(String name, ObjectInspector inspector, int offset) {
+ this.name = name;
+ this.inspector = inspector;
+ this.offset = offset;
+ }
+
+ @Override
+ public String getFieldName() {
+ return name;
+ }
+
+ @Override
+ public ObjectInspector getFieldObjectInspector() {
+ return inspector;
+ }
+
+ @Override
+ public int getFieldID() {
+ return offset;
+ }
+
+ @Override
+ public String getFieldComment() {
+ return null;
+ }
+ }
+
+ static class OrcStructInspector extends SettableStructObjectInspector {
+ private List<StructField> fields;
+
+ OrcStructInspector(int columnId, List<OrcProto.Type> types) {
+ OrcProto.Type type = types.get(columnId);
+ int fieldCount = type.getSubtypesCount();
+ fields = new ArrayList<>(fieldCount);
+ for(int i=0; i < fieldCount; ++i) {
+ int fieldType = type.getSubtypes(i);
+ fields.add(new Field(type.getFieldNames(i),
+ createObjectInspector(fieldType, types), i));
+ }
+ }
+
+ @Override
+ public List<StructField> getAllStructFieldRefs() {
+ return fields;
+ }
+
+ @Override
+ public StructField getStructFieldRef(String s) {
+ for(StructField field: fields) {
+ if (field.getFieldName().equalsIgnoreCase(s)) {
+ return field;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Object getStructFieldData(Object object, StructField field) {
+ if (object == null) {
+ return null;
+ }
+ int offset = ((Field) field).offset;
+ OrcStruct struct = (OrcStruct) object;
+ if (offset >= struct.getNumFields()) {
+ return null;
+ }
+
+ return struct.getFieldValue(offset);
+ }
+
+ @Override
+ public List<Object> getStructFieldsDataAsList(Object object) {
+ if (object == null) {
+ return null;
+ }
+ OrcStruct struct = (OrcStruct) object;
+ List<Object> result = new ArrayList<Object>(struct.getNumFields());
+ for (int i=0; i<struct.getNumFields(); i++) {
+ result.add(struct.getFieldValue(i));
+ }
+ return result;
+ }
+
+ @Override
+ public String getTypeName() {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append("struct<");
+ for(int i=0; i < fields.size(); ++i) {
+ StructField field = fields.get(i);
+ if (i != 0) {
+ buffer.append(",");
+ }
+ buffer.append(field.getFieldName());
+ buffer.append(":");
+ buffer.append(field.getFieldObjectInspector().getTypeName());
+ }
+ buffer.append(">");
+ return buffer.toString();
+ }
+
+ @Override
+ public Category getCategory() {
+ return Category.STRUCT;
+ }
+
+ @Override
+ public Object create() {
+ return new OrcStruct(0);
+ }
+
+ @Override
+ public Object setStructFieldData(Object struct, StructField field,
+ Object fieldValue) {
+ OrcStruct orcStruct = (OrcStruct) struct;
+ int offset = ((Field) field).offset;
+ // if the offset is bigger than our current number of fields, grow it
+ if (orcStruct.getNumFields() <= offset) {
+ orcStruct.setNumFields(offset+1);
+ }
+ orcStruct.setFieldValue(offset, fieldValue);
+ return struct;
+ }
+
+ }
+
+ static class OrcMapObjectInspector
+ implements MapObjectInspector, SettableMapObjectInspector {
+ private ObjectInspector key;
+ private ObjectInspector value;
+
+ OrcMapObjectInspector(int columnId, List<OrcProto.Type> types) {
+ OrcProto.Type type = types.get(columnId);
+ key = createObjectInspector(type.getSubtypes(0), types);
+ value = createObjectInspector(type.getSubtypes(1), types);
+ }
+
+ @Override
+ public ObjectInspector getMapKeyObjectInspector() {
+ return key;
+ }
+
+ @Override
+ public ObjectInspector getMapValueObjectInspector() {
+ return value;
+ }
+
+ @Override
+ public Object getMapValueElement(Object map, Object key) {
+ return ((map == null || key == null)? null : ((Map) map).get(key));
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Map<Object, Object> getMap(Object map) {
+ if (map == null) {
+ return null;
+ }
+ return (Map) map;
+ }
+
+ @Override
+ public int getMapSize(Object map) {
+ if (map == null) {
+ return -1;
+ }
+ return ((Map) map).size();
+ }
+
+ @Override
+ public String getTypeName() {
+ return "map<" + key.getTypeName() + "," + value.getTypeName() + ">";
+ }
+
+ @Override
+ public Category getCategory() {
+ return Category.MAP;
+ }
+
+ @Override
+ public Object create() {
+ return new LinkedHashMap<>();
+ }
+
+ @Override
+ public Object put(Object map, Object key, Object value) {
+ ((Map) map).put(key, value);
+ return map;
+ }
+
+ @Override
+ public Object remove(Object map, Object key) {
+ ((Map) map).remove(key);
+ return map;
+ }
+
+ @Override
+ public Object clear(Object map) {
+ ((Map) map).clear();
+ return map;
+ }
+
+ }
+
+ static class OrcUnionObjectInspector implements UnionObjectInspector {
+ private List<ObjectInspector> children;
+
+ protected OrcUnionObjectInspector() {
+ super();
+ }
+ OrcUnionObjectInspector(int columnId,
+ List<OrcProto.Type> types) {
+ OrcProto.Type type = types.get(columnId);
+ children = new ArrayList<ObjectInspector>(type.getSubtypesCount());
+ for(int i=0; i < type.getSubtypesCount(); ++i) {
+ children.add(createObjectInspector(type.getSubtypes(i),
+ types));
+ }
+ }
+
+ @Override
+ public List<ObjectInspector> getObjectInspectors() {
+ return children;
+ }
+
+ @Override
+ public byte getTag(Object obj) {
+ return ((OrcUnion) obj).getTag();
+ }
+
+ @Override
+ public Object getField(Object obj) {
+ return ((OrcUnion) obj).getObject();
+ }
+
+ @Override
+ public String getTypeName() {
+ StringBuilder builder = new StringBuilder("uniontype<");
+ boolean first = true;
+ for(ObjectInspector child: children) {
+ if (first) {
+ first = false;
+ } else {
+ builder.append(",");
+ }
+ builder.append(child.getTypeName());
+ }
+ builder.append(">");
+ return builder.toString();
+ }
+
+ @Override
+ public Category getCategory() {
+ return Category.UNION;
+ }
+
+ }
+
+ static class OrcListObjectInspector
+ implements ListObjectInspector, SettableListObjectInspector {
+ private ObjectInspector child;
+
+
+ OrcListObjectInspector(int columnId, List<OrcProto.Type> types) {
+ OrcProto.Type type = types.get(columnId);
+ child = createObjectInspector(type.getSubtypes(0), types);
+ }
+
+ @Override
+ public ObjectInspector getListElementObjectInspector() {
+ return child;
+ }
+
+ @Override
+ public Object getListElement(Object list, int i) {
+ if (list == null || i < 0 || i >= getListLength(list)) {
+ return null;
+ }
+ return ((List) list).get(i);
+ }
+
+ @Override
+ public int getListLength(Object list) {
+ if (list == null) {
+ return -1;
+ }
+ return ((List) list).size();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public List<?> getList(Object list) {
+ if (list == null) {
+ return null;
+ }
+ return (List) list;
+ }
+
+ @Override
+ public String getTypeName() {
+ return "array<" + child.getTypeName() + ">";
+ }
+
+ @Override
+ public Category getCategory() {
+ return Category.LIST;
+ }
+
+ @Override
+ public Object create(int size) {
+ ArrayList<Object> result = new ArrayList<Object>(size);
+ for(int i = 0; i < size; ++i) {
+ result.add(null);
+ }
+ return result;
+ }
+
+ @Override
+ public Object set(Object list, int index, Object element) {
+ List l = (List) list;
+ for(int i=l.size(); i < index+1; ++i) {
+ l.add(null);
+ }
+ l.set(index, element);
+ return list;
+ }
+
+ @Override
+ public Object resize(Object list, int newSize) {
+ ((ArrayList) list).ensureCapacity(newSize);
+ return list;
+ }
+
+ }
+
+
+ static ObjectInspector createObjectInspector(int columnId,
+ List<OrcProto.Type> types){
+ OrcProto.Type type = types.get(columnId);
+ switch (type.getKind()) {
+ case FLOAT:
+ return PrimitiveObjectInspectorFactory.writableFloatObjectInspector;
+ case DOUBLE:
+ return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
+ case BOOLEAN:
+ return PrimitiveObjectInspectorFactory.writableBooleanObjectInspector;
+ case BYTE:
+ return PrimitiveObjectInspectorFactory.writableByteObjectInspector;
+ case SHORT:
+ return PrimitiveObjectInspectorFactory.writableShortObjectInspector;
+ case INT:
+ return PrimitiveObjectInspectorFactory.writableIntObjectInspector;
+ case LONG:
+ return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
+ case BINARY:
+ return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;
+ case STRING:
+ return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
+ case CHAR:
+ if (!type.hasMaximumLength()) {
+ throw new UnsupportedOperationException(
+ "Illegal use of char type without length in ORC type definition.");
+ }
+ return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(
+ TypeInfoFactory.getCharTypeInfo(type.getMaximumLength()));
+ case VARCHAR:
+ if (!type.hasMaximumLength()) {
+ throw new UnsupportedOperationException(
+ "Illegal use of varchar type without length in ORC type definition.");
+ }
+ return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(
+ TypeInfoFactory.getVarcharTypeInfo(type.getMaximumLength()));
+ case TIMESTAMP:
+ return PrimitiveObjectInspectorFactory.writableTimestampObjectInspector;
+ case DATE:
+ return PrimitiveObjectInspectorFactory.writableDateObjectInspector;
+ case DECIMAL:
+ int precision = type.hasPrecision() ? type.getPrecision() : HiveDecimal.SYSTEM_DEFAULT_PRECISION;
+ int scale = type.hasScale()? type.getScale() : HiveDecimal.SYSTEM_DEFAULT_SCALE;
+ return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(
+ TypeInfoFactory.getDecimalTypeInfo(precision, scale));
+ case STRUCT:
+ return new OrcStructInspector(columnId, types);
+ case UNION:
+ return new OrcUnionObjectInspector(columnId, types);
+ case MAP:
+ return new OrcMapObjectInspector(columnId, types);
+ case LIST:
+ return new OrcListObjectInspector(columnId, types);
+ default:
+ throw new UnsupportedOperationException("Unknown type " +
+ type.getKind());
+ }
+ }
+}
diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
index 4a7f566..ec81f2d 100644
--- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
+++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
@@ -30,6 +30,7 @@ import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.hive.HiveSerDeManager;
import org.apache.gobblin.hive.metastore.HiveMetaStoreBasedRegister;
import org.apache.gobblin.hive.policy.HiveRegistrationPolicyBase;
import org.apache.gobblin.iceberg.Utils.IcebergUtils;
@@ -154,6 +155,10 @@ public abstract class GobblinMCEProducer implements Closeable {
regProperties.put(HiveMetaStoreBasedRegister.SCHEMA_SOURCE_DB,
state.getProp(HiveMetaStoreBasedRegister.SCHEMA_SOURCE_DB));
}
+ if (state.contains(HiveSerDeManager.HIVE_ROW_FORMAT)) {
+ regProperties.put(HiveSerDeManager.HIVE_ROW_FORMAT,
+ state.getProp(HiveSerDeManager.HIVE_ROW_FORMAT));
+ }
if (!regProperties.isEmpty()) {
gmceBuilder.setRegistrationProperties(regProperties);
}
diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
index a6777a3..32b63b4 100644
--- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
+++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java
@@ -31,6 +31,7 @@ import org.apache.avro.generic.GenericEnumSymbol;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
+import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;
import org.apache.orc.TypeDescription;
import org.apache.orc.storage.common.type.HiveDecimal;
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
index 6c9a190..dba1f2f 100644
--- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
+++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriter.java
@@ -23,6 +23,7 @@ import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.avro.AvroObjectInspectorGenerator;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
diff --git a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
index 5d1422e..0f10316 100644
--- a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
+++ b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java
@@ -27,6 +27,7 @@ import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
+import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
diff --git a/gobblin-utility/build.gradle b/gobblin-utility/build.gradle
index 78f87c5..7c85ccb 100644
--- a/gobblin-utility/build.gradle
+++ b/gobblin-utility/build.gradle
@@ -30,6 +30,7 @@ dependencies {
compile externalDependency.guava
compile externalDependency.slf4j
compile externalDependency.avro
+ compile externalDependency.orcCore
compile externalDependency.hiveMetastore
compile externalDependency.jodaTime
compile externalDependency.jacksonCore
diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/AvroOrcSchemaConverter.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/orc/AvroOrcSchemaConverter.java
similarity index 99%
rename from gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/AvroOrcSchemaConverter.java
rename to gobblin-utility/src/main/java/org/apache/gobblin/util/orc/AvroOrcSchemaConverter.java
index 2e1b113..1e9a6a0 100644
--- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/AvroOrcSchemaConverter.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/orc/AvroOrcSchemaConverter.java
@@ -14,11 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gobblin.writer;
+package org.apache.gobblin.util.orc;
import java.util.List;
import java.util.stream.Collectors;
-
import org.apache.avro.Schema;
import org.apache.orc.TypeDescription;
diff --git a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/AvroOrcSchemaConverterTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/orc/AvroOrcSchemaConverterTest.java
similarity index 98%
rename from gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/AvroOrcSchemaConverterTest.java
rename to gobblin-utility/src/test/java/org/apache/gobblin/util/orc/AvroOrcSchemaConverterTest.java
index a531047..cb05509 100644
--- a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/AvroOrcSchemaConverterTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/orc/AvroOrcSchemaConverterTest.java
@@ -15,10 +15,10 @@
* limitations under the License.
*/
-package org.apache.gobblin.writer;
+package org.apache.gobblin.util.orc;
+import com.google.common.base.Preconditions;
import java.util.List;
-
import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
@@ -26,9 +26,7 @@ import org.apache.orc.TypeDescription;
import org.testng.Assert;
import org.testng.annotations.Test;
-import com.google.common.base.Preconditions;
-
-import static org.apache.gobblin.writer.AvroOrcSchemaConverter.sanitizeNullableSchema;
+import static org.apache.gobblin.util.orc.AvroOrcSchemaConverter.*;
public class AvroOrcSchemaConverterTest {