You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2022/06/28 19:27:40 UTC

[hudi] branch master updated: [HUDI-4320] Make sure `HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED` could be specified by the writer (#5970)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ed823f1c6f [HUDI-4320] Make sure `HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED` could be specified by the writer (#5970)
ed823f1c6f is described below

commit ed823f1c6f99fe1626a07b347d4ce063f47c42f8
Author: Alexey Kudinkin <al...@infinilake.com>
AuthorDate: Tue Jun 28 12:27:32 2022 -0700

    [HUDI-4320] Make sure `HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED` could be specified by the writer (#5970)
    
    Fixed sequence determining whether Parquet's legacy-format writing property should be overridden to only kick in when it has not been explicitly specified by the caller
---
 .../java/org/apache/hudi/util/DataTypeUtils.java   | 15 ++---
 .../main/java/org/apache/hudi/DataSourceUtils.java | 46 ++++++++++----
 .../java/org/apache/hudi/TestDataSourceUtils.java  | 74 +++++++++++++---------
 .../org/apache/hudi/internal/DefaultSource.java    |  4 +-
 .../apache/hudi/spark3/internal/DefaultSource.java |  4 +-
 5 files changed, 89 insertions(+), 54 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/DataTypeUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/DataTypeUtils.java
index 1f4e8cc1dc..3ff7e1055c 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/DataTypeUtils.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/DataTypeUtils.java
@@ -126,24 +126,23 @@ public class DataTypeUtils {
   }
 
   /**
-   * Try to find current sparktype whether contains that DecimalType which's scale < Decimal.MAX_LONG_DIGITS().
-   *
-   * @param sparkType spark schema.
-   * @return found result.
+   * Checks whether provided {@link DataType} contains {@link DecimalType} whose scale is less than
+   * {@link Decimal#MAX_LONG_DIGITS()}
    */
-  public static boolean foundSmallPrecisionDecimalType(DataType sparkType) {
+  public static boolean hasSmallPrecisionDecimalType(DataType sparkType) {
     if (sparkType instanceof StructType) {
       StructField[] fields = ((StructType) sparkType).fields();
-      return Arrays.stream(fields).anyMatch(f -> foundSmallPrecisionDecimalType(f.dataType()));
+      return Arrays.stream(fields).anyMatch(f -> hasSmallPrecisionDecimalType(f.dataType()));
     } else if (sparkType instanceof MapType) {
       MapType map = (MapType) sparkType;
-      return foundSmallPrecisionDecimalType(map.keyType()) || foundSmallPrecisionDecimalType(map.valueType());
+      return hasSmallPrecisionDecimalType(map.keyType()) || hasSmallPrecisionDecimalType(map.valueType());
     } else if (sparkType instanceof ArrayType) {
-      return foundSmallPrecisionDecimalType(((ArrayType) sparkType).elementType());
+      return hasSmallPrecisionDecimalType(((ArrayType) sparkType).elementType());
     } else if (sparkType instanceof DecimalType) {
       DecimalType decimalType = (DecimalType) sparkType;
       return decimalType.precision() < Decimal.MAX_LONG_DIGITS();
     }
+
     return false;
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
index 4042f431d7..c5b1857699 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -18,6 +18,9 @@
 
 package org.apache.hudi;
 
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.client.HoodieReadClient;
 import org.apache.hudi.client.HoodieWriteResult;
 import org.apache.hudi.client.SparkRDDWriteClient;
@@ -45,10 +48,6 @@ import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
 import org.apache.hudi.sync.common.HoodieSyncConfig;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.util.DataTypeUtils;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
@@ -327,14 +326,39 @@ public class DataSourceUtils {
     return hiveSyncConfig;
   }
 
-  // Now by default ParquetWriteSupport will write DecimalType to parquet as int32/int64 when the scale of decimalType < Decimal.MAX_LONG_DIGITS(),
-  // but AvroParquetReader which used by HoodieParquetReader cannot support read int32/int64 as DecimalType.
-  // try to find current schema whether contains that DecimalType, and auto set the value of "hoodie.parquet.writelegacyformat.enabled"
-  public static void mayBeOverwriteParquetWriteLegacyFormatProp(Map<String, String> properties, StructType schema) {
-    if (DataTypeUtils.foundSmallPrecisionDecimalType(schema)
-        && !Boolean.parseBoolean(properties.getOrDefault(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key(), "false"))) {
+
+  /**
+   * Checks whether default value (false) of "hoodie.parquet.writelegacyformat.enabled" should be
+   * overridden in case:
+   *
+   * <ul>
+   *   <li>Property has not been explicitly set by the writer</li>
+   *   <li>Data schema contains {@code DecimalType} that would be affected by it</li>
+   * </ul>
+   *
+   * If both of the aforementioned conditions are true, will override the default value of the config
+   * (by essentially setting the value) to make sure that the produced Parquet data files could be
+   * read by {@code AvroParquetReader}
+   *
+   * @param properties properties specified by the writer
+   * @param schema schema of the dataset being written
+   */
+  public static void tryOverrideParquetWriteLegacyFormatProperty(Map<String, String> properties, StructType schema) {
+    if (DataTypeUtils.hasSmallPrecisionDecimalType(schema)
+        && properties.get(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key()) == null) {
+      // ParquetWriteSupport writes DecimalType to parquet as INT32/INT64 when the scale of decimalType
+      // is less than {@code Decimal.MAX_LONG_DIGITS}, but {@code AvroParquetReader} which is used by
+      // {@code HoodieParquetReader} does not support DecimalType encoded as INT32/INT64 as.
+      //
+      // To work this problem around we're checking whether
+      //    - Schema contains any decimals that could be encoded as INT32/INT64
+      //    - {@code HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED} has not been explicitly
+      //    set by the writer
+      //
+      // If both of these conditions are true, than we override the default value of {@code
+      // HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED} and set it to "true"
+      LOG.warn("Small Decimal Type found in the persisted schema, reverting default value of 'hoodie.parquet.writelegacyformat.enabled' to true");
       properties.put(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key(), "true");
-      LOG.warn("Small Decimal Type found in current schema, auto set the value of hoodie.parquet.writelegacyformat.enabled to true");
     }
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
index af5bbe7717..41c061d2bc 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
+++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java
@@ -18,6 +18,12 @@
 
 package org.apache.hudi;
 
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.common.config.TypedProperties;
@@ -27,21 +33,16 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.table.BulkInsertPartitioner;
-
-import org.apache.avro.Conversions;
-import org.apache.avro.LogicalTypes;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericFixed;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.DecimalType;
 import org.apache.spark.sql.types.DecimalType$;
 import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
@@ -51,7 +52,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
@@ -61,11 +63,13 @@ import org.mockito.junit.jupiter.MockitoExtension;
 import java.math.BigDecimal;
 import java.time.LocalDate;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.stream.Stream;
 
-import static org.apache.hudi.DataSourceUtils.mayBeOverwriteParquetWriteLegacyFormatProp;
+import static org.apache.hudi.DataSourceUtils.tryOverrideParquetWriteLegacyFormatProperty;
 import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
 import static org.apache.hudi.hive.ddl.HiveSyncMode.HMS;
 import static org.hamcrest.CoreMatchers.containsString;
@@ -313,31 +317,39 @@ public class TestDataSourceUtils {
   }
 
   @ParameterizedTest
-  @CsvSource({"true, false", "true, true", "false, true", "false, false"})
-  public void testAutoModifyParquetWriteLegacyFormatParameter(boolean smallDecimal, boolean defaultWriteValue) {
-    // create test StructType
-    List<StructField> structFields = new ArrayList<>();
+  @MethodSource("testAutoModifyParquetWriteLegacyFormatParameterParams")
+  public void testAutoModifyParquetWriteLegacyFormatParameter(boolean smallDecimal, Boolean propValue, Boolean expectedPropValue) {
+    DecimalType decimalType;
     if (smallDecimal) {
-      structFields.add(StructField.apply("d1", DecimalType$.MODULE$.apply(10, 2), false, Metadata.empty()));
+      decimalType = DecimalType$.MODULE$.apply(10, 2);
     } else {
-      structFields.add(StructField.apply("d1", DecimalType$.MODULE$.apply(38, 10), false, Metadata.empty()));
+      decimalType = DecimalType$.MODULE$.apply(38, 10);
     }
-    StructType structType = StructType$.MODULE$.apply(structFields);
-    // create write options
-    Map<String, String> options = new HashMap<>();
-    options.put("hoodie.parquet.writelegacyformat.enabled", String.valueOf(defaultWriteValue));
 
-    // start test
-    mayBeOverwriteParquetWriteLegacyFormatProp(options, structType);
+    StructType structType = StructType$.MODULE$.apply(
+        Arrays.asList(
+            StructField.apply("d1", decimalType, false, Metadata.empty())
+        )
+    );
 
-    // check result
-    boolean res = Boolean.parseBoolean(options.get("hoodie.parquet.writelegacyformat.enabled"));
-    if (smallDecimal) {
-      // should auto modify "hoodie.parquet.writelegacyformat.enabled" = "true".
-      assertEquals(true, res);
-    } else {
-      // should not modify the value of "hoodie.parquet.writelegacyformat.enabled".
-      assertEquals(defaultWriteValue, res);
-    }
+    Map<String, String> options = propValue != null
+        ? Collections.singletonMap(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key(), String.valueOf(propValue))
+        : new HashMap<>();
+
+    tryOverrideParquetWriteLegacyFormatProperty(options, structType);
+
+    Boolean finalPropValue =
+        Option.ofNullable(options.get(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key()))
+            .map(Boolean::parseBoolean)
+            .orElse(null);
+    assertEquals(expectedPropValue, finalPropValue);
+  }
+
+  private static Stream<Arguments> testAutoModifyParquetWriteLegacyFormatParameterParams() {
+    return Arrays.stream(new Object[][] {
+        {true, null, true},   {false, null, null},
+        {true, false, false}, {true, true, true},
+        {false, true, true},  {false, false, false}
+    }).map(Arguments::of);
   }
 }
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java
index 4866a5be5c..3b3b8eafb8 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java
+++ b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java
@@ -36,7 +36,7 @@ import org.apache.spark.sql.types.StructType;
 import java.util.Map;
 import java.util.Optional;
 
-import static org.apache.hudi.DataSourceUtils.mayBeOverwriteParquetWriteLegacyFormatProp;
+import static org.apache.hudi.DataSourceUtils.tryOverrideParquetWriteLegacyFormatProperty;
 
 /**
  * DataSource V2 implementation for managing internal write logic. Only called internally.
@@ -69,7 +69,7 @@ public class DefaultSource extends BaseDefaultSource implements DataSourceV2,
         HoodieTableConfig.POPULATE_META_FIELDS.defaultValue());
     Map<String, String> properties = options.asMap();
     // Auto set the value of "hoodie.parquet.writelegacyformat.enabled"
-    mayBeOverwriteParquetWriteLegacyFormatProp(properties, schema);
+    tryOverrideParquetWriteLegacyFormatProperty(properties, schema);
     // 1st arg to createHoodieConfig is not really required to be set. but passing it anyways.
     HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(options.get(HoodieWriteConfig.AVRO_SCHEMA_STRING.key()).get(), path, tblName, properties);
     boolean arePartitionRecordsSorted = HoodieInternalConfig.getBulkInsertIsPartitionRecordsSorted(
diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java
index 4f7ff89a90..ab2f16703b 100644
--- a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java
+++ b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java
@@ -34,7 +34,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.apache.hudi.DataSourceUtils.mayBeOverwriteParquetWriteLegacyFormatProp;
+import static org.apache.hudi.DataSourceUtils.tryOverrideParquetWriteLegacyFormatProperty;
 
 /**
  * DataSource V2 implementation for managing internal write logic. Only called internally.
@@ -59,7 +59,7 @@ public class DefaultSource extends BaseDefaultSource implements TableProvider {
     // Create a new map as the properties is an unmodifiableMap on Spark 3.2.0
     Map<String, String> newProps = new HashMap<>(properties);
     // Auto set the value of "hoodie.parquet.writelegacyformat.enabled"
-    mayBeOverwriteParquetWriteLegacyFormatProp(newProps, schema);
+    tryOverrideParquetWriteLegacyFormatProperty(newProps, schema);
     // 1st arg to createHoodieConfig is not really required to be set. but passing it anyways.
     HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(newProps.get(HoodieWriteConfig.AVRO_SCHEMA_STRING.key()), path, tblName, newProps);
     return new HoodieDataSourceInternalTable(instantTime, config, schema, getSparkSession(),