You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by ak...@apache.org on 2023/01/21 07:18:37 UTC

[hudi] branch master updated: [HUDI-5417] support to read avro from non-legacy map/list in parquet log (#7512)

This is an automated email from the ASF dual-hosted git repository.

akudinkin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new febff4afd2a [HUDI-5417] support to read avro from non-legacy map/list in parquet log (#7512)
febff4afd2a is described below

commit febff4afd2a71085fdba05dc08e857f8267216d3
Author: komao <ma...@gmail.com>
AuthorDate: Sat Jan 21 15:18:26 2023 +0800

    [HUDI-5417] support to read avro from non-legacy map/list in parquet log (#7512)
    
    ### Change Logs
    
    Support to read Avro from non-legacy map/list from Parquet log-blocks when using by `SparkRecordMerger`.
    
    Without such conversion when using `SparkRecordMerger` and write to a table with schema that contains list/map type (written in non-legacy format), reading such records back as Avro will fail.
---
 .../hudi/io/storage/HoodieAvroParquetReader.java   |   4 +-
 .../avro/HoodieAvroParquetReaderBuilder.java       |  79 ++++++++++++
 .../apache/parquet/avro/HoodieAvroReadSupport.java | 133 +++++++++++++++++++++
 .../apache/hudi/functional/TestMORDataSource.scala |  58 ++++++++-
 4 files changed, 271 insertions(+), 3 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
index aa57b73d0c6..3328707c9ed 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
@@ -32,9 +32,9 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.parquet.avro.AvroParquetReader;
 import org.apache.parquet.avro.AvroReadSupport;
 import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.avro.HoodieAvroParquetReaderBuilder;
 import org.apache.parquet.hadoop.ParquetInputFormat;
 import org.apache.parquet.hadoop.ParquetReader;
 
@@ -165,7 +165,7 @@ public class HoodieAvroParquetReader extends HoodieAvroFileReaderBase {
       AvroReadSupport.setAvroReadSchema(conf, requestedSchema.get());
       AvroReadSupport.setRequestedProjection(conf, requestedSchema.get());
     }
-    ParquetReader<IndexedRecord> reader = AvroParquetReader.<IndexedRecord>builder(path).withConf(conf).build();
+    ParquetReader<IndexedRecord> reader = new HoodieAvroParquetReaderBuilder<IndexedRecord>(path).withConf(conf).build();
     ParquetReaderIterator<IndexedRecord> parquetReaderIterator = new ParquetReaderIterator<>(reader);
     readerIterators.add(parquetReaderIterator);
     return parquetReaderIterator;
diff --git a/hudi-common/src/main/java/org/apache/parquet/avro/HoodieAvroParquetReaderBuilder.java b/hudi-common/src/main/java/org/apache/parquet/avro/HoodieAvroParquetReaderBuilder.java
new file mode 100644
index 00000000000..d6179ea1aac
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/parquet/avro/HoodieAvroParquetReaderBuilder.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.parquet.avro;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.specific.SpecificData;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.io.InputFile;
+
+/**
+ * Copy from org.apache.parquet.avro.AvroParquetReader.Builder.
+ * We use HoodieAvroParquetReaderBuilder to build HoodieAvroReadSupport
+ * that can support reading avro from non-legacy map/list in parquet file.
+ */
+public class HoodieAvroParquetReaderBuilder<T> extends ParquetReader.Builder<T> {
+
+  private GenericData model = null;
+  private boolean enableCompatibility = true;
+  private boolean isReflect = true;
+
+  @Deprecated
+  public HoodieAvroParquetReaderBuilder(Path path) {
+    super(path);
+  }
+
+  public HoodieAvroParquetReaderBuilder(InputFile file) {
+    super(file);
+  }
+
+  public HoodieAvroParquetReaderBuilder<T> withDataModel(GenericData model) {
+    this.model = model;
+
+    // only generic and specific are supported by AvroIndexedRecordConverter
+    if (model.getClass() != GenericData.class
+        && model.getClass() != SpecificData.class) {
+      isReflect = true;
+    }
+
+    return this;
+  }
+
+  public HoodieAvroParquetReaderBuilder<T> disableCompatibility() {
+    this.enableCompatibility = false;
+    return this;
+  }
+
+  public HoodieAvroParquetReaderBuilder<T> withCompatibility(boolean enableCompatibility) {
+    this.enableCompatibility = enableCompatibility;
+    return this;
+  }
+
+  @Override
+  protected ReadSupport<T> getReadSupport() {
+    if (isReflect) {
+      conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
+    } else {
+      conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, enableCompatibility);
+    }
+    return new HoodieAvroReadSupport<>(model);
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/parquet/avro/HoodieAvroReadSupport.java b/hudi-common/src/main/java/org/apache/parquet/avro/HoodieAvroReadSupport.java
new file mode 100644
index 00000000000..326accb66b2
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/parquet/avro/HoodieAvroReadSupport.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.parquet.avro;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Support to avro-record read parquet-log which written by spark-record.
+ * See the examples in TestMORDataSource#testRecordTypeCompatibilityWithParquetLog.
+ * Exception throw when schema end with map or list.
+ */
+public class HoodieAvroReadSupport<T> extends AvroReadSupport<T> {
+
+  public HoodieAvroReadSupport(GenericData model) {
+    super(model);
+  }
+
+  public HoodieAvroReadSupport() {
+  }
+
+  @Override
+  public ReadContext init(Configuration configuration, Map<String, String> keyValueMetaData, MessageType fileSchema) {
+    boolean legacyMode = checkLegacyMode(fileSchema.getFields());
+    // support non-legacy list
+    if (!legacyMode && configuration.get(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE) == null) {
+      configuration.set(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE,
+          "false", "support reading avro from non-legacy map/list in parquet file");
+    }
+    ReadContext readContext = super.init(configuration, keyValueMetaData, fileSchema);
+    MessageType requestedSchema = readContext.getRequestedSchema();
+    // support non-legacy map. Convert non-legacy map to legacy map
+    // Because there is no AvroWriteSupport.WRITE_OLD_MAP_STRUCTURE
+    // according to AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE
+    if (!legacyMode) {
+      requestedSchema = new MessageType(requestedSchema.getName(), convertLegacyMap(requestedSchema.getFields()));
+    }
+    return new ReadContext(requestedSchema, readContext.getReadSupportMetadata());
+  }
+
+  /**
+   * Check whether write map/list with legacy mode.
+   * legacy:
+   *  list:
+   *    optional group obj_ids (LIST) {
+   *      repeated binary array (UTF8);
+   *    }
+   *  map:
+   *    optional group obj_ids (MAP) {
+   *      repeated group map (MAP_KEY_VALUE) {
+   *          required binary key (UTF8);
+   *          required binary value (UTF8);
+   *      }
+   *    }
+   * non-legacy:
+   *    optional group obj_ids (LIST) {
+   *      repeated group list {
+   *        optional binary element (UTF8);
+   *      }
+   *    }
+   *    optional group obj_maps (MAP) {
+   *      repeated group key_value {
+   *        required binary key (UTF8);
+   *        optional binary value (UTF8);
+   *      }
+   *    }
+   */
+  private boolean checkLegacyMode(List<Type> parquetFields) {
+    for (Type type : parquetFields) {
+      if (!type.isPrimitive()) {
+        GroupType groupType = type.asGroupType();
+        OriginalType originalType = groupType.getOriginalType();
+        if (originalType == OriginalType.MAP
+            && groupType.getFields().get(0).getOriginalType() != OriginalType.MAP_KEY_VALUE) {
+          return false;
+        }
+        if (originalType == OriginalType.LIST
+            && !groupType.getType(0).getName().equals("array")) {
+          return false;
+        }
+        if (!checkLegacyMode(groupType.getFields())) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Convert non-legacy map to legacy map.
+   */
+  private List<Type> convertLegacyMap(List<Type> oldTypes) {
+    List<Type> newTypes = new ArrayList<>(oldTypes.size());
+    for (Type type : oldTypes) {
+      if (!type.isPrimitive()) {
+        GroupType parent = type.asGroupType();
+        List<Type> types = convertLegacyMap(parent.getFields());
+        if (type.getOriginalType() == OriginalType.MAP_KEY_VALUE) {
+          newTypes.add(new GroupType(parent.getRepetition(), "key_value", types));
+        } else {
+          newTypes.add(new GroupType(parent.getRepetition(), parent.getName(), parent.getOriginalType(), types));
+        }
+      } else {
+        newTypes.add(type);
+      }
+    }
+    return newTypes;
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 184cb97f2be..513ab45c16a 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -23,7 +23,7 @@ import org.apache.hudi.HoodieConversionUtils.toJavaOption
 import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieStorageConfig}
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
-import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecordPayload, HoodieTableType, OverwriteWithLatestAvroPayload}
+import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord, HoodieRecordPayload, HoodieTableType, OverwriteWithLatestAvroPayload}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
@@ -1243,6 +1243,62 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
       roDf.where(col(recordKeyField) === 0).select(dataField).collect()(0).getLong(0))
   }
 
+  @ParameterizedTest
+  @CsvSource(Array(
+    "AVRO, AVRO, END_MAP",
+    "AVRO, SPARK, END_MAP",
+    "SPARK, AVRO, END_MAP",
+    "AVRO, AVRO, END_ARRAY",
+    "AVRO, SPARK, END_ARRAY",
+    "SPARK, AVRO, END_ARRAY"))
+  def testRecordTypeCompatibilityWithParquetLog(readType: HoodieRecordType,
+                                                writeType: HoodieRecordType,
+                                                transformMode: String): Unit = {
+    def transform(sourceDF: DataFrame, transformed: String): DataFrame = {
+      transformed match {
+        case "END_MAP" => sourceDF
+          .withColumn("obj_ids", array(lit("wk_tenant_id")))
+          .withColumn("obj_maps", map(lit("wk_tenant_id"), col("obj_ids")))
+        case "END_ARRAY" => sourceDF
+          .withColumn("obj_maps", map(lit("wk_tenant_id"), lit("wk_tenant_id")))
+          .withColumn("obj_ids", array(col("obj_maps")))
+      }
+    }
+
+    var (_, readOpts) = getWriterReaderOpts(readType)
+    var (writeOpts, _) = getWriterReaderOpts(writeType)
+    readOpts = readOpts ++ Map(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet")
+    writeOpts = writeOpts ++ Map(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet")
+    val records = dataGen.generateInserts("001", 10)
+
+    // End with array
+    val inputDF1 = transform(spark.read.json(
+      spark.sparkContext.parallelize(recordsToStrings(records).asScala, 2))
+      .withColumn("wk_tenant_id", lit("wk_tenant_id"))
+      .withColumn("ref_id", lit("wk_tenant_id")), transformMode)
+    inputDF1.write.format("org.apache.hudi")
+      .options(writeOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+      // CanIndexLogFile=true
+      .option(HoodieIndexConfig.INDEX_TYPE_PROP, IndexType.INMEMORY.name())
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+
+    val snapshotDF1 = spark.read.format("org.apache.hudi")
+      .options(readOpts)
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+      .load(basePath + "/*/*/*/*")
+
+    def sort(df: DataFrame): DataFrame = df.sort("_row_key")
+
+    val inputRows = sort(inputDF1).collectAsList()
+    val readRows = sort(snapshotDF1.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala: _*)).collectAsList()
+
+    assertEquals(inputRows, readRows)
+  }
+
   def getWriterReaderOpts(recordType: HoodieRecordType,
                           opt: Map[String, String] = commonOpts,
                           enableFileIndex: Boolean = DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.defaultValue()):