You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/09/28 06:51:49 UTC

[GitHub] [hudi] codope commented on a diff in pull request #6741: [HUDI-4898] presto/hive respect payload during merge parquet file and logfile when reading mor table

codope commented on code in PR #6741:
URL: https://github.com/apache/hudi/pull/6741#discussion_r982009108


##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java:
##########
@@ -81,7 +82,7 @@ private HoodieMergedLogRecordScanner getMergedLogRecordScanner() throws IOExcept
         .withFileSystem(FSUtils.getFs(split.getPath().toString(), jobConf))
         .withBasePath(split.getBasePath())
         .withLogFilePaths(split.getDeltaLogPaths())
-        .withReaderSchema(usesCustomPayload ? getWriterSchema() : getReaderSchema())
+        .withReaderSchema(getWriterSchema())

Review Comment:
   why this change? here, we're saying that let's set the reader schema to be the writer schema, but then let's say `usesCustomPayload` is false and `supportPayload` is also false (due to fallback) then `buildGenericRecordwithCustomPayload` will actually want to use reader schema while calling `getInsertValue`. In case of mismatch between reader and writer schema, won't there be an error?
   Should we revert this and set the schema based on conditional as it was before like below?
   `.withReaderSchema(usesCustomPayload ? getWriterSchema() : getReaderSchema())`



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java:
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.hadoop.hive.common.type.HiveChar;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveVarchar;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+import org.apache.hadoop.hive.serde2.avro.InstanceCache;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableDateObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.exception.HoodieException;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.hudi.avro.AvroSchemaUtils.resolveUnionSchema;
+import static org.apache.hudi.avro.HoodieAvroUtils.isMetadataField;
+
+/**
+ * Helper class to serialize hive writable type to avro record.
+ */
+public class HiveAvroSerializer {
+
+  private final List<String> columnNames;
+  private final List<TypeInfo> columnTypes;
+  private final ObjectInspector objectInspector;
+
+  public HiveAvroSerializer(ObjectInspector objectInspector, List<String> columnNames, List<TypeInfo> columnTypes) {
+    this.columnNames = columnNames;
+    this.columnTypes = columnTypes;
+    this.objectInspector = objectInspector;
+  }
+
+  private static final Schema STRING_SCHEMA = Schema.create(Schema.Type.STRING);
+
+  public GenericRecord serialize(Object o, Schema schema) {
+
+    StructObjectInspector soi = (StructObjectInspector) objectInspector;
+    GenericData.Record record = new GenericData.Record(schema);
+
+    List<? extends StructField> outputFieldRefs = soi.getAllStructFieldRefs();
+    if (outputFieldRefs.size() != columnNames.size()) {
+      throw new HoodieException("Number of input columns was different than output columns (in = " + columnNames.size() + " vs out = " + outputFieldRefs.size());
+    }
+
+    int size = schema.getFields().size();
+
+    List<? extends StructField> allStructFieldRefs = soi.getAllStructFieldRefs();
+    List<Object> structFieldsDataAsList = soi.getStructFieldsDataAsList(o);
+
+    for (int i  = 0; i < size; i++) {
+      Schema.Field field = schema.getFields().get(i);
+      if (i >= columnTypes.size()) {
+        break;
+      }
+      TypeInfo typeInfo = columnTypes.get(i);
+      StructField structFieldRef = allStructFieldRefs.get(i);
+      Object structFieldData = structFieldsDataAsList.get(i);
+      ObjectInspector fieldOI = structFieldRef.getFieldObjectInspector();
+      Object val = serialize(typeInfo, fieldOI, structFieldData, field.schema());
+      if (val == null) {
+        if (field.defaultVal() instanceof JsonProperties.Null) {
+          record.put(field.name(), null);
+        } else {
+          record.put(field.name(), field.defaultVal());
+        }
+      } else {
+        record.put(field.name(), val);
+      }
+    }
+    return record;
+  }
+
+  /**
+   * Determine if an Avro schema is of type Union[T, NULL].  Avro supports nullable
+   * types via a union of type T and null.  This is a very common use case.
+   * As such, we want to silently convert it to just T and allow the value to be null.
+   *
+   * When a Hive union type is used with AVRO, the schema type becomes
+   * Union[NULL, T1, T2, ...]. The NULL in the union should be silently removed
+   *
+   * @return true if type represents Union[T, Null], false otherwise
+   */
+  public static boolean isNullableType(Schema schema) {
+    if (!schema.getType().equals(Schema.Type.UNION)) {
+      return false;
+    }
+
+    List<Schema> itemSchemas = schema.getTypes();
+    if (itemSchemas.size() < 2) {
+      return false;
+    }
+
+    for (Schema itemSchema : itemSchemas) {
+      if (Schema.Type.NULL.equals(itemSchema.getType())) {
+        return true;
+      }
+    }
+
+    // [null, null] not allowed, so this check is ok.
+    return false;
+  }
+
+  /**
+   * If the union schema is a nullable union, get the schema for the non-nullable type.
+   * This method does no checking that the provided Schema is nullable. If the provided
+   * union schema is non-nullable, it simply returns the union schema
+   */
+  public static Schema getOtherTypeFromNullableType(Schema unionSchema) {
+    final List<Schema> types = unionSchema.getTypes();
+    if (types.size() == 2) { // most common scenario
+      if (types.get(0).getType() == Schema.Type.NULL) {
+        return types.get(1);
+      }
+      if (types.get(1).getType() == Schema.Type.NULL) {
+        return types.get(0);
+      }
+      // not a nullable union
+      return unionSchema;
+    }
+
+    final List<Schema> itemSchemas = new ArrayList<>();
+    for (Schema itemSchema : types) {
+      if (!Schema.Type.NULL.equals(itemSchema.getType())) {
+        itemSchemas.add(itemSchema);
+      }
+    }
+
+    if (itemSchemas.size() > 1) {
+      return Schema.createUnion(itemSchemas);
+    } else {
+      return itemSchemas.get(0);
+    }
+  }
+
+  private Object serialize(TypeInfo typeInfo, ObjectInspector fieldOI, Object structFieldData, Schema schema) throws HoodieException {
+    if (null == structFieldData) {
+      return null;
+    }
+
+    if (isNullableType(schema)) {
+      schema = getOtherTypeFromNullableType(schema);
+    }
+    /* Because we use Hive's 'string' type when Avro calls for enum, we have to expressly check for enum-ness */
+    if (Schema.Type.ENUM.equals(schema.getType())) {
+      assert fieldOI instanceof PrimitiveObjectInspector;
+      return serializeEnum(typeInfo, (PrimitiveObjectInspector) fieldOI, structFieldData, schema);
+    }
+    switch (typeInfo.getCategory()) {
+      case PRIMITIVE:
+        assert fieldOI instanceof PrimitiveObjectInspector;
+        return serializePrimitive(typeInfo, (PrimitiveObjectInspector) fieldOI, structFieldData, schema);
+      case MAP:
+        assert fieldOI instanceof MapObjectInspector;
+        assert typeInfo instanceof MapTypeInfo;
+        return serializeMap((MapTypeInfo) typeInfo, (MapObjectInspector) fieldOI, structFieldData, schema);
+      case LIST:
+        assert fieldOI instanceof ListObjectInspector;
+        assert typeInfo instanceof ListTypeInfo;
+        return serializeList((ListTypeInfo) typeInfo, (ListObjectInspector) fieldOI, structFieldData, schema);
+      case UNION:
+        assert fieldOI instanceof UnionObjectInspector;
+        assert typeInfo instanceof UnionTypeInfo;
+        return serializeUnion((UnionTypeInfo) typeInfo, (UnionObjectInspector) fieldOI, structFieldData, schema);
+      case STRUCT:
+        assert fieldOI instanceof StructObjectInspector;
+        assert typeInfo instanceof StructTypeInfo;
+        return serializeStruct((StructTypeInfo) typeInfo, (StructObjectInspector) fieldOI, structFieldData, schema);
+      default:
+        throw new HoodieException("Ran out of TypeInfo Categories: " + typeInfo.getCategory());
+    }
+  }
+
+  /** private cache to avoid lots of EnumSymbol creation while serializing.
+   *  Two levels because the enum symbol is specific to a schema.
+   *  Object because we want to avoid the overhead of repeated toString calls while maintaining compatability.
+   *  Provided there are few enum types per record, and few symbols per enum, memory use should be moderate.
+   *  eg 20 types with 50 symbols each as length-10 Strings should be on the order of 100KB per AvroSerializer.
+   */
+  final InstanceCache<Schema, InstanceCache<Object, GenericEnumSymbol>> enums = new InstanceCache<Schema, InstanceCache<Object, GenericEnumSymbol>>() {
+    @Override
+    protected InstanceCache<Object, GenericEnumSymbol> makeInstance(final Schema schema,
+                                                                    Set<Schema> seenSchemas) {
+      return new InstanceCache<Object, GenericEnumSymbol>() {
+        @Override
+        protected GenericEnumSymbol makeInstance(Object seed, Set<Object> seenSchemas) {
+          return new GenericData.EnumSymbol(schema, seed.toString());
+        }
+      };
+    }
+  };
+
+  private Object serializeEnum(TypeInfo typeInfo, PrimitiveObjectInspector fieldOI, Object structFieldData, Schema schema) throws HoodieException {
+    try {
+      return enums.retrieve(schema).retrieve(serializePrimitive(typeInfo, fieldOI, structFieldData, schema));
+    } catch (Exception e) {
+      throw new HoodieException(e);
+    }
+  }
+
+  private Object serializeStruct(StructTypeInfo typeInfo, StructObjectInspector ssoi, Object o, Schema schema) {
+    int size = schema.getFields().size();
+    List<? extends StructField> allStructFieldRefs = ssoi.getAllStructFieldRefs();
+    List<Object> structFieldsDataAsList = ssoi.getStructFieldsDataAsList(o);
+    GenericData.Record record = new GenericData.Record(schema);
+    ArrayList<TypeInfo> allStructFieldTypeInfos = typeInfo.getAllStructFieldTypeInfos();
+
+    for (int i  = 0; i < size; i++) {
+      Schema.Field field = schema.getFields().get(i);
+      TypeInfo colTypeInfo = allStructFieldTypeInfos.get(i);
+      StructField structFieldRef = allStructFieldRefs.get(i);
+      Object structFieldData = structFieldsDataAsList.get(i);
+      ObjectInspector fieldOI = structFieldRef.getFieldObjectInspector();
+
+      Object val = serialize(colTypeInfo, fieldOI, structFieldData, field.schema());
+      if (val == null) {
+        if (field.defaultVal() instanceof JsonProperties.Null) {
+          record.put(field.name(), null);
+        } else {
+          record.put(field.name(), field.defaultVal());
+        }
+      } else {
+        record.put(field.name(), val);
+      }
+    }
+    return record;
+  }
+
+  private Object serializePrimitive(TypeInfo typeInfo, PrimitiveObjectInspector fieldOI, Object structFieldData, Schema schema) throws HoodieException {

Review Comment:
   Do we need `TypeInfo` argument for primitive type serialization? You're already getting that from the object inspector. If it is not needed, then please remove the argument.



##########
hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hudi.avro.HoodieAvroUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class TestHiveAvroSerializer {
+
+  private static final String SIMPLESCHEMA = "{\"type\":\"record\",\"name\":\"h0_record\",\"namespace\":\"hoodie.h0\",\"fields\""
+      + ":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"default\":null},"
+      + "{\"name\":\"col1\",\"type\":[\"null\",\"long\"],\"default\":null},"
+      + "{\"name\":\"col2\",\"type\":[\"null\",\"float\"],\"default\":null},"
+      + "{\"name\":\"col3\",\"type\":[\"null\",\"double\"],\"default\":null},"
+      + "{\"name\":\"col4\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"fixed\",\"namespace\":\"hoodie.h0.h0_record.col4\","
+      + "\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":4}],\"default\":null},"
+      + "{\"name\":\"col5\",\"type\":[\"null\",\"string\"],\"default\":null},"
+      + "{\"name\":\"col6\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}],\"default\":null},"
+      + "{\"name\":\"col7\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}],\"default\":null},"
+      + "{\"name\":\"col8\",\"type\":[\"null\",\"boolean\"],\"default\":null},"
+      + "{\"name\":\"col9\",\"type\":[\"null\",\"bytes\"],\"default\":null},"
+      + "{\"name\":\"par\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}],\"default\":null}]}";
+  private static final String NESTSCHEMA = "{\"name\":\"MyClass\",\"type\":\"record\",\"namespace\":\"com.acme.avro\",\"fields\":["
+      + "{\"name\":\"firstname\",\"type\":\"string\"},"
+      + "{\"name\":\"lastname\",\"type\":\"string\"},"
+      + "{\"name\":\"student\",\"type\":{\"name\":\"student\",\"type\":\"record\",\"fields\":["
+      + "{\"name\":\"firstname\",\"type\":[\"null\" ,\"string\"],\"default\": null},{\"name\":\"lastname\",\"type\":[\"null\" ,\"string\"],\"default\": null}]}}]}";
+
+  @Test
+  public void testSerialize() {
+    Schema avroSchema = new Schema.Parser().parse(SIMPLESCHEMA);
+    // create a test record with avroSchema
+    GenericData.Record avroRecord = new GenericData.Record(avroSchema);
+    avroRecord.put("id", 1);
+    avroRecord.put("col1", 1000L);
+    avroRecord.put("col2", -5.001f);
+    avroRecord.put("col3", 12.999d);
+    Schema currentDecimalType = avroSchema.getField("col4").schema().getTypes().get(1);
+    BigDecimal bd = new BigDecimal("123.456").setScale(((LogicalTypes.Decimal) currentDecimalType.getLogicalType()).getScale());
+    avroRecord.put("col4", HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, currentDecimalType, currentDecimalType.getLogicalType()));
+    avroRecord.put("col5", "2011-01-01");
+    avroRecord.put("col6", 18987);
+    avroRecord.put("col7", 1640491505000000L);
+    avroRecord.put("col8", false);
+    ByteBuffer bb = ByteBuffer.wrap(new byte[]{97, 48, 53});
+    avroRecord.put("col9", bb);
+    assertTrue(GenericData.get().validate(avroSchema, avroRecord));
+    ArrayWritable writable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, avroSchema);
+
+    List<Writable> writableList = Arrays.stream(writable.get()).collect(Collectors.toList());
+    writableList.remove(writableList.size() - 1);
+    ArrayWritable clipWritable = new ArrayWritable(writable.getValueClass(), writableList.toArray(new Writable[0]));
+
+    List<TypeInfo> columnTypeList = createHiveTypeInfoFrom("int,bigint,float,double,decimal(10,4),string,date,timestamp,boolean,binary,date");
+    List<String> columnNameList = createHiveColumnsFrom("id,col1,col2,col3,col4,col5,col6,col7,col8,col9,par");
+    StructTypeInfo rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
+    GenericRecord testRecord = new HiveAvroSerializer(new ArrayWritableObjectInspector(rowTypeInfo), columnNameList, columnTypeList).serialize(writable, avroSchema);
+    assertTrue(GenericData.get().validate(avroSchema, testRecord));
+    // test
+    List<TypeInfo> columnTypeListClip = createHiveTypeInfoFrom("int,bigint,float,double,decimal(10,4),string,date,timestamp,boolean,binary");
+    List<String> columnNameListClip = createHiveColumnsFrom("id,col1,col2,col3,col4,col5,col6,col7,col8,col9");
+    StructTypeInfo rowTypeInfoClip = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNameListClip, columnTypeListClip);
+    GenericRecord testRecordClip = new HiveAvroSerializer(new ArrayWritableObjectInspector(rowTypeInfoClip), columnNameListClip, columnTypeListClip).serialize(clipWritable, avroSchema);
+    assertTrue(GenericData.get().validate(avroSchema, testRecordClip));
+
+  }
+
+  @Test
+  public void testNestValueSerialize() {

Review Comment:
   ```suggestion
     public void testNestedValueSerialize() {
   ```



##########
hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hudi.avro.HoodieAvroUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class TestHiveAvroSerializer {
+
+  private static final String SIMPLESCHEMA = "{\"type\":\"record\",\"name\":\"h0_record\",\"namespace\":\"hoodie.h0\",\"fields\""
+      + ":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"default\":null},"
+      + "{\"name\":\"col1\",\"type\":[\"null\",\"long\"],\"default\":null},"
+      + "{\"name\":\"col2\",\"type\":[\"null\",\"float\"],\"default\":null},"
+      + "{\"name\":\"col3\",\"type\":[\"null\",\"double\"],\"default\":null},"
+      + "{\"name\":\"col4\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"fixed\",\"namespace\":\"hoodie.h0.h0_record.col4\","
+      + "\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":4}],\"default\":null},"
+      + "{\"name\":\"col5\",\"type\":[\"null\",\"string\"],\"default\":null},"
+      + "{\"name\":\"col6\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}],\"default\":null},"
+      + "{\"name\":\"col7\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}],\"default\":null},"
+      + "{\"name\":\"col8\",\"type\":[\"null\",\"boolean\"],\"default\":null},"
+      + "{\"name\":\"col9\",\"type\":[\"null\",\"bytes\"],\"default\":null},"
+      + "{\"name\":\"par\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}],\"default\":null}]}";
+  private static final String NESTSCHEMA = "{\"name\":\"MyClass\",\"type\":\"record\",\"namespace\":\"com.acme.avro\",\"fields\":["

Review Comment:
   ```suggestion
     private static final String NESTED_SCHEMA = "{\"name\":\"MyClass\",\"type\":\"record\",\"namespace\":\"com.acme.avro\",\"fields\":["
   ```



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java:
##########
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.hadoop.hive.common.type.HiveChar;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveVarchar;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+import org.apache.hadoop.hive.serde2.avro.InstanceCache;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableDateObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.exception.HoodieException;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.hudi.avro.AvroSchemaUtils.resolveUnionSchema;
+import static org.apache.hudi.avro.HoodieAvroUtils.isMetadataField;
+
+/**
+ * Helper class to serialize hive writable type to avro record.
+ */
+public class HiveAvroSerializer {
+
+  private final List<String> columnNames;
+  private final List<TypeInfo> columnTypes;
+  private final ObjectInspector objectInspector;
+
+  public HiveAvroSerializer(ObjectInspector objectInspector, List<String> columnNames, List<TypeInfo> columnTypes) {
+    this.columnNames = columnNames;
+    this.columnTypes = columnTypes;
+    this.objectInspector = objectInspector;
+  }
+
+  private static final Schema STRING_SCHEMA = Schema.create(Schema.Type.STRING);
+
+  public GenericRecord serialize(Object o, Schema schema) {
+
+    StructObjectInspector soi = (StructObjectInspector) objectInspector;
+    GenericData.Record record = new GenericData.Record(schema);
+
+    List<? extends StructField> outputFieldRefs = soi.getAllStructFieldRefs();
+    if (outputFieldRefs.size() != columnNames.size()) {
+      throw new HoodieException("Number of input columns was different than output columns (in = " + columnNames.size() + " vs out = " + outputFieldRefs.size());
+    }
+
+    int size = schema.getFields().size();
+
+    List<? extends StructField> allStructFieldRefs = soi.getAllStructFieldRefs();
+    List<Object> structFieldsDataAsList = soi.getStructFieldsDataAsList(o);
+
+    for (int i  = 0; i < size; i++) {
+      Schema.Field field = schema.getFields().get(i);
+      if (i >= columnTypes.size()) {
+        break;
+      }
+      TypeInfo typeInfo = columnTypes.get(i);
+      StructField structFieldRef = allStructFieldRefs.get(i);
+      Object structFieldData = structFieldsDataAsList.get(i);
+      ObjectInspector fieldOI = structFieldRef.getFieldObjectInspector();
+      Object val = serialize(typeInfo, fieldOI, structFieldData, field.schema());
+      if (val == null) {
+        if (field.defaultVal() instanceof JsonProperties.Null) {
+          record.put(field.name(), null);
+        } else {
+          record.put(field.name(), field.defaultVal());
+        }
+      } else {
+        record.put(field.name(), val);
+      }
+    }
+    return record;
+  }
+
+  /**
+   * Determine if an Avro schema is of type Union[T, NULL].  Avro supports nullable
+   * types via a union of type T and null.  This is a very common use case.
+   * As such, we want to silently convert it to just T and allow the value to be null.
+   *
+   * When a Hive union type is used with AVRO, the schema type becomes
+   * Union[NULL, T1, T2, ...]. The NULL in the union should be silently removed
+   *
+   * @return true if type represents Union[T, Null], false otherwise
+   */
+  public static boolean isNullableType(Schema schema) {
+    if (!schema.getType().equals(Schema.Type.UNION)) {
+      return false;
+    }
+
+    List<Schema> itemSchemas = schema.getTypes();
+    if (itemSchemas.size() < 2) {
+      return false;
+    }
+
+    for (Schema itemSchema : itemSchemas) {
+      if (Schema.Type.NULL.equals(itemSchema.getType())) {
+        return true;
+      }
+    }
+
+    // [null, null] not allowed, so this check is ok.
+    return false;
+  }
+
+  /**
+   * If the union schema is a nullable union, get the schema for the non-nullable type.
+   * This method does no checking that the provided Schema is nullable. If the provided
+   * union schema is non-nullable, it simply returns the union schema
+   */
+  public static Schema getOtherTypeFromNullableType(Schema unionSchema) {
+    final List<Schema> types = unionSchema.getTypes();
+    if (types.size() == 2) { // most common scenario
+      if (types.get(0).getType() == Schema.Type.NULL) {
+        return types.get(1);
+      }
+      if (types.get(1).getType() == Schema.Type.NULL) {
+        return types.get(0);
+      }
+      // not a nullable union
+      return unionSchema;
+    }
+
+    final List<Schema> itemSchemas = new ArrayList<>();
+    for (Schema itemSchema : types) {
+      if (!Schema.Type.NULL.equals(itemSchema.getType())) {
+        itemSchemas.add(itemSchema);
+      }
+    }
+
+    if (itemSchemas.size() > 1) {
+      return Schema.createUnion(itemSchemas);
+    } else {
+      return itemSchemas.get(0);
+    }
+  }
+
+  private Object serialize(TypeInfo typeInfo, ObjectInspector fieldOI, Object structFieldData, Schema schema) throws HoodieException {
+    if (null == structFieldData) {
+      return null;
+    }
+
+    if (isNullableType(schema)) {
+      schema = getOtherTypeFromNullableType(schema);
+    }
+    /* Because we use Hive's 'string' type when Avro calls for enum, we have to expressly check for enum-ness */
+    if (Schema.Type.ENUM.equals(schema.getType())) {
+      assert fieldOI instanceof PrimitiveObjectInspector;
+      return serializeEnum(typeInfo, (PrimitiveObjectInspector) fieldOI, structFieldData, schema);
+    }
+    switch (typeInfo.getCategory()) {
+      case PRIMITIVE:
+        assert fieldOI instanceof PrimitiveObjectInspector;
+        return serializePrimitive(typeInfo, (PrimitiveObjectInspector) fieldOI, structFieldData, schema);
+      case MAP:
+        assert fieldOI instanceof MapObjectInspector;
+        assert typeInfo instanceof MapTypeInfo;
+        return serializeMap((MapTypeInfo) typeInfo, (MapObjectInspector) fieldOI, structFieldData, schema);
+      case LIST:
+        assert fieldOI instanceof ListObjectInspector;
+        assert typeInfo instanceof ListTypeInfo;
+        return serializeList((ListTypeInfo) typeInfo, (ListObjectInspector) fieldOI, structFieldData, schema);
+      case UNION:
+        assert fieldOI instanceof UnionObjectInspector;
+        assert typeInfo instanceof UnionTypeInfo;
+        return serializeUnion((UnionTypeInfo) typeInfo, (UnionObjectInspector) fieldOI, structFieldData, schema);
+      case STRUCT:
+        assert fieldOI instanceof StructObjectInspector;
+        assert typeInfo instanceof StructTypeInfo;
+        return serializeStruct((StructTypeInfo) typeInfo, (StructObjectInspector) fieldOI, structFieldData, schema);
+      default:
+        throw new HoodieException("Ran out of TypeInfo Categories: " + typeInfo.getCategory());
+    }
+  }
+
+  /** private cache to avoid lots of EnumSymbol creation while serializing.
+   *  Two levels because the enum symbol is specific to a schema.
+   *  Object because we want to avoid the overhead of repeated toString calls while maintaining compatability.
+   *  Provided there are few enum types per record, and few symbols per enum, memory use should be moderate.
+   *  eg 20 types with 50 symbols each as length-10 Strings should be on the order of 100KB per AvroSerializer.
+   */
+  final InstanceCache<Schema, InstanceCache<Object, GenericEnumSymbol>> enums = new InstanceCache<Schema, InstanceCache<Object, GenericEnumSymbol>>() {
+    @Override
+    protected InstanceCache<Object, GenericEnumSymbol> makeInstance(final Schema schema,
+                                                                    Set<Schema> seenSchemas) {
+      return new InstanceCache<Object, GenericEnumSymbol>() {
+        @Override
+        protected GenericEnumSymbol makeInstance(Object seed, Set<Object> seenSchemas) {
+          return new GenericData.EnumSymbol(schema, seed.toString());
+        }
+      };
+    }
+  };
+
+  private Object serializeEnum(TypeInfo typeInfo, PrimitiveObjectInspector fieldOI, Object structFieldData, Schema schema) throws HoodieException {
+    try {
+      return enums.retrieve(schema).retrieve(serializePrimitive(typeInfo, fieldOI, structFieldData, schema));
+    } catch (Exception e) {
+      throw new HoodieException(e);
+    }
+  }
+
+  private Object serializeStruct(StructTypeInfo typeInfo, StructObjectInspector ssoi, Object o, Schema schema) {
+    int size = schema.getFields().size();
+    List<? extends StructField> allStructFieldRefs = ssoi.getAllStructFieldRefs();
+    List<Object> structFieldsDataAsList = ssoi.getStructFieldsDataAsList(o);
+    GenericData.Record record = new GenericData.Record(schema);
+    ArrayList<TypeInfo> allStructFieldTypeInfos = typeInfo.getAllStructFieldTypeInfos();
+
+    for (int i  = 0; i < size; i++) {
+      Schema.Field field = schema.getFields().get(i);
+      TypeInfo colTypeInfo = allStructFieldTypeInfos.get(i);
+      StructField structFieldRef = allStructFieldRefs.get(i);
+      Object structFieldData = structFieldsDataAsList.get(i);
+      ObjectInspector fieldOI = structFieldRef.getFieldObjectInspector();
+
+      Object val = serialize(colTypeInfo, fieldOI, structFieldData, field.schema());
+      if (val == null) {
+        if (field.defaultVal() instanceof JsonProperties.Null) {
+          record.put(field.name(), null);
+        } else {
+          record.put(field.name(), field.defaultVal());
+        }
+      } else {
+        record.put(field.name(), val);
+      }

Review Comment:
   Looks like a duplicate code fragment.. same as in `serialize` method above. Let's extract to a separate method and reuse?



##########
hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hudi.avro.HoodieAvroUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class TestHiveAvroSerializer {
+
+  private static final String SIMPLESCHEMA = "{\"type\":\"record\",\"name\":\"h0_record\",\"namespace\":\"hoodie.h0\",\"fields\""

Review Comment:
   ```suggestion
     private static final String SIMPLE_SCHEMA = "{\"type\":\"record\",\"name\":\"h0_record\",\"namespace\":\"hoodie.h0\",\"fields\""
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org