You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by ak...@apache.org on 2023/01/21 07:18:37 UTC
[hudi] branch master updated: [HUDI-5417] support to read avro from non-legacy map/list in parquet log (#7512)
This is an automated email from the ASF dual-hosted git repository.
akudinkin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new febff4afd2a [HUDI-5417] support to read avro from non-legacy map/list in parquet log (#7512)
febff4afd2a is described below
commit febff4afd2a71085fdba05dc08e857f8267216d3
Author: komao <ma...@gmail.com>
AuthorDate: Sat Jan 21 15:18:26 2023 +0800
[HUDI-5417] support to read avro from non-legacy map/list in parquet log (#7512)
### Change Logs
Support to read Avro from non-legacy map/list from Parquet log-blocks when using by `SparkRecordMerger`.
Without such conversion when using `SparkRecordMerger` and write to a table with schema that contains list/map type (written in non-legacy format), reading such records back as Avro will fail.
---
.../hudi/io/storage/HoodieAvroParquetReader.java | 4 +-
.../avro/HoodieAvroParquetReaderBuilder.java | 79 ++++++++++++
.../apache/parquet/avro/HoodieAvroReadSupport.java | 133 +++++++++++++++++++++
.../apache/hudi/functional/TestMORDataSource.scala | 58 ++++++++-
4 files changed, 271 insertions(+), 3 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
index aa57b73d0c6..3328707c9ed 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
@@ -32,9 +32,9 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.avro.HoodieAvroParquetReaderBuilder;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.hadoop.ParquetReader;
@@ -165,7 +165,7 @@ public class HoodieAvroParquetReader extends HoodieAvroFileReaderBase {
AvroReadSupport.setAvroReadSchema(conf, requestedSchema.get());
AvroReadSupport.setRequestedProjection(conf, requestedSchema.get());
}
- ParquetReader<IndexedRecord> reader = AvroParquetReader.<IndexedRecord>builder(path).withConf(conf).build();
+ ParquetReader<IndexedRecord> reader = new HoodieAvroParquetReaderBuilder<IndexedRecord>(path).withConf(conf).build();
ParquetReaderIterator<IndexedRecord> parquetReaderIterator = new ParquetReaderIterator<>(reader);
readerIterators.add(parquetReaderIterator);
return parquetReaderIterator;
diff --git a/hudi-common/src/main/java/org/apache/parquet/avro/HoodieAvroParquetReaderBuilder.java b/hudi-common/src/main/java/org/apache/parquet/avro/HoodieAvroParquetReaderBuilder.java
new file mode 100644
index 00000000000..d6179ea1aac
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/parquet/avro/HoodieAvroParquetReaderBuilder.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.parquet.avro;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.specific.SpecificData;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.io.InputFile;
+
+/**
+ * Copy from org.apache.parquet.avro.AvroParquetReader.Builder.
+ * We use HoodieAvroParquetReaderBuilder to build HoodieAvroReadSupport
+ * that can support reading avro from non-legacy map/list in parquet file.
+ */
+public class HoodieAvroParquetReaderBuilder<T> extends ParquetReader.Builder<T> {
+
+ private GenericData model = null;
+ private boolean enableCompatibility = true;
+ private boolean isReflect = true;
+
+ @Deprecated
+ public HoodieAvroParquetReaderBuilder(Path path) {
+ super(path);
+ }
+
+ public HoodieAvroParquetReaderBuilder(InputFile file) {
+ super(file);
+ }
+
+ public HoodieAvroParquetReaderBuilder<T> withDataModel(GenericData model) {
+ this.model = model;
+
+ // only generic and specific are supported by AvroIndexedRecordConverter
+ if (model.getClass() != GenericData.class
+ && model.getClass() != SpecificData.class) {
+ isReflect = true;
+ }
+
+ return this;
+ }
+
+ public HoodieAvroParquetReaderBuilder<T> disableCompatibility() {
+ this.enableCompatibility = false;
+ return this;
+ }
+
+ public HoodieAvroParquetReaderBuilder<T> withCompatibility(boolean enableCompatibility) {
+ this.enableCompatibility = enableCompatibility;
+ return this;
+ }
+
+ @Override
+ protected ReadSupport<T> getReadSupport() {
+ if (isReflect) {
+ conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
+ } else {
+ conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, enableCompatibility);
+ }
+ return new HoodieAvroReadSupport<>(model);
+ }
+}
diff --git a/hudi-common/src/main/java/org/apache/parquet/avro/HoodieAvroReadSupport.java b/hudi-common/src/main/java/org/apache/parquet/avro/HoodieAvroReadSupport.java
new file mode 100644
index 00000000000..326accb66b2
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/parquet/avro/HoodieAvroReadSupport.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.parquet.avro;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Support to avro-record read parquet-log which written by spark-record.
+ * See the examples in TestMORDataSource#testRecordTypeCompatibilityWithParquetLog.
+ * Exception throw when schema end with map or list.
+ */
+public class HoodieAvroReadSupport<T> extends AvroReadSupport<T> {
+
+ public HoodieAvroReadSupport(GenericData model) {
+ super(model);
+ }
+
+ public HoodieAvroReadSupport() {
+ }
+
+ @Override
+ public ReadContext init(Configuration configuration, Map<String, String> keyValueMetaData, MessageType fileSchema) {
+ boolean legacyMode = checkLegacyMode(fileSchema.getFields());
+ // support non-legacy list
+ if (!legacyMode && configuration.get(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE) == null) {
+ configuration.set(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE,
+ "false", "support reading avro from non-legacy map/list in parquet file");
+ }
+ ReadContext readContext = super.init(configuration, keyValueMetaData, fileSchema);
+ MessageType requestedSchema = readContext.getRequestedSchema();
+ // support non-legacy map. Convert non-legacy map to legacy map
+ // Because there is no AvroWriteSupport.WRITE_OLD_MAP_STRUCTURE
+ // according to AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE
+ if (!legacyMode) {
+ requestedSchema = new MessageType(requestedSchema.getName(), convertLegacyMap(requestedSchema.getFields()));
+ }
+ return new ReadContext(requestedSchema, readContext.getReadSupportMetadata());
+ }
+
+ /**
+ * Check whether write map/list with legacy mode.
+ * legacy:
+ * list:
+ * optional group obj_ids (LIST) {
+ * repeated binary array (UTF8);
+ * }
+ * map:
+ * optional group obj_ids (MAP) {
+ * repeated group map (MAP_KEY_VALUE) {
+ * required binary key (UTF8);
+ * required binary value (UTF8);
+ * }
+ * }
+ * non-legacy:
+ * optional group obj_ids (LIST) {
+ * repeated group list {
+ * optional binary element (UTF8);
+ * }
+ * }
+ * optional group obj_maps (MAP) {
+ * repeated group key_value {
+ * required binary key (UTF8);
+ * optional binary value (UTF8);
+ * }
+ * }
+ */
+ private boolean checkLegacyMode(List<Type> parquetFields) {
+ for (Type type : parquetFields) {
+ if (!type.isPrimitive()) {
+ GroupType groupType = type.asGroupType();
+ OriginalType originalType = groupType.getOriginalType();
+ if (originalType == OriginalType.MAP
+ && groupType.getFields().get(0).getOriginalType() != OriginalType.MAP_KEY_VALUE) {
+ return false;
+ }
+ if (originalType == OriginalType.LIST
+ && !groupType.getType(0).getName().equals("array")) {
+ return false;
+ }
+ if (!checkLegacyMode(groupType.getFields())) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Convert non-legacy map to legacy map.
+ */
+ private List<Type> convertLegacyMap(List<Type> oldTypes) {
+ List<Type> newTypes = new ArrayList<>(oldTypes.size());
+ for (Type type : oldTypes) {
+ if (!type.isPrimitive()) {
+ GroupType parent = type.asGroupType();
+ List<Type> types = convertLegacyMap(parent.getFields());
+ if (type.getOriginalType() == OriginalType.MAP_KEY_VALUE) {
+ newTypes.add(new GroupType(parent.getRepetition(), "key_value", types));
+ } else {
+ newTypes.add(new GroupType(parent.getRepetition(), parent.getName(), parent.getOriginalType(), types));
+ }
+ } else {
+ newTypes.add(type);
+ }
+ }
+ return newTypes;
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 184cb97f2be..513ab45c16a 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -23,7 +23,7 @@ import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieStorageConfig}
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
-import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecordPayload, HoodieTableType, OverwriteWithLatestAvroPayload}
+import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord, HoodieRecordPayload, HoodieTableType, OverwriteWithLatestAvroPayload}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
@@ -1243,6 +1243,62 @@ class TestMORDataSource extends HoodieClientTestBase with SparkDatasetMixin {
roDf.where(col(recordKeyField) === 0).select(dataField).collect()(0).getLong(0))
}
+ @ParameterizedTest
+ @CsvSource(Array(
+ "AVRO, AVRO, END_MAP",
+ "AVRO, SPARK, END_MAP",
+ "SPARK, AVRO, END_MAP",
+ "AVRO, AVRO, END_ARRAY",
+ "AVRO, SPARK, END_ARRAY",
+ "SPARK, AVRO, END_ARRAY"))
+ def testRecordTypeCompatibilityWithParquetLog(readType: HoodieRecordType,
+ writeType: HoodieRecordType,
+ transformMode: String): Unit = {
+ def transform(sourceDF: DataFrame, transformed: String): DataFrame = {
+ transformed match {
+ case "END_MAP" => sourceDF
+ .withColumn("obj_ids", array(lit("wk_tenant_id")))
+ .withColumn("obj_maps", map(lit("wk_tenant_id"), col("obj_ids")))
+ case "END_ARRAY" => sourceDF
+ .withColumn("obj_maps", map(lit("wk_tenant_id"), lit("wk_tenant_id")))
+ .withColumn("obj_ids", array(col("obj_maps")))
+ }
+ }
+
+ var (_, readOpts) = getWriterReaderOpts(readType)
+ var (writeOpts, _) = getWriterReaderOpts(writeType)
+ readOpts = readOpts ++ Map(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet")
+ writeOpts = writeOpts ++ Map(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet")
+ val records = dataGen.generateInserts("001", 10)
+
+ // End with array
+ val inputDF1 = transform(spark.read.json(
+ spark.sparkContext.parallelize(recordsToStrings(records).asScala, 2))
+ .withColumn("wk_tenant_id", lit("wk_tenant_id"))
+ .withColumn("ref_id", lit("wk_tenant_id")), transformMode)
+ inputDF1.write.format("org.apache.hudi")
+ .options(writeOpts)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+ // CanIndexLogFile=true
+ .option(HoodieIndexConfig.INDEX_TYPE_PROP, IndexType.INMEMORY.name())
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+
+ val snapshotDF1 = spark.read.format("org.apache.hudi")
+ .options(readOpts)
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+ .load(basePath + "/*/*/*/*")
+
+ def sort(df: DataFrame): DataFrame = df.sort("_row_key")
+
+ val inputRows = sort(inputDF1).collectAsList()
+ val readRows = sort(snapshotDF1.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala: _*)).collectAsList()
+
+ assertEquals(inputRows, readRows)
+ }
+
def getWriterReaderOpts(recordType: HoodieRecordType,
opt: Map[String, String] = commonOpts,
enableFileIndex: Boolean = DataSourceReadOptions.ENABLE_HOODIE_FILE_INDEX.defaultValue()):