You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/01/30 04:56:52 UTC

[hudi] 19/19: [MINOR] Cleaning up recently introduced configs (#7772)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 5765e1b9b9663dfac08b874ca42f02245e0ce914
Author: Alexey Kudinkin <al...@gmail.com>
AuthorDate: Sun Jan 29 14:57:05 2023 -0800

    [MINOR] Cleaning up recently introduced configs (#7772)
    
    Cleaning up some of the recently introduced configs:
    
    Shortening file-listing mode override for Spark's FileIndex
    Fixing Disruptor's write buffer limit config
    Scoped CANONICALIZE_NULLABLE config to HoodieSparkSqlWriter
---
 .../org/apache/hudi/config/HoodieWriteConfig.java   | 12 ++++++------
 .../java/org/apache/hudi/util/ExecutorFactory.java  |  2 +-
 .../execution/TestDisruptorExecutionInSpark.java    |  8 +++++---
 .../hudi/execution/TestDisruptorMessageQueue.java   |  4 ++--
 .../scala/org/apache/hudi/DataSourceOptions.scala   | 13 +------------
 .../org/apache/hudi/HoodieSparkSqlWriter.scala      | 21 +++++++++++++++++----
 .../hudi/command/MergeIntoHoodieTableCommand.scala  |  3 ++-
 .../TestHoodiePruneFileSourcePartitions.scala       |  4 ++--
 8 files changed, 36 insertions(+), 31 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 8890ddfdeee..e187fff4483 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -272,8 +272,8 @@ public class HoodieWriteConfig extends HoodieConfig {
       .defaultValue(String.valueOf(4 * 1024 * 1024))
       .withDocumentation("Size of in-memory buffer used for parallelizing network reads and lake storage writes.");
 
-  public static final ConfigProperty<String> WRITE_EXECUTOR_DISRUPTOR_BUFFER_SIZE = ConfigProperty
-      .key("hoodie.write.executor.disruptor.buffer.size")
+  public static final ConfigProperty<String> WRITE_EXECUTOR_DISRUPTOR_BUFFER_LIMIT_BYTES = ConfigProperty
+      .key("hoodie.write.executor.disruptor.buffer.limit.bytes")
       .defaultValue(String.valueOf(1024))
       .sinceVersion("0.13.0")
       .withDocumentation("The size of the Disruptor Executor ring buffer, must be power of 2");
@@ -1180,8 +1180,8 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getStringOrDefault(WRITE_EXECUTOR_DISRUPTOR_WAIT_STRATEGY);
   }
 
-  public Integer getWriteExecutorDisruptorWriteBufferSize() {
-    return Integer.parseInt(getStringOrDefault(WRITE_EXECUTOR_DISRUPTOR_BUFFER_SIZE));
+  public Integer getWriteExecutorDisruptorWriteBufferLimitBytes() {
+    return Integer.parseInt(getStringOrDefault(WRITE_EXECUTOR_DISRUPTOR_BUFFER_LIMIT_BYTES));
   }
 
   public boolean shouldCombineBeforeInsert() {
@@ -2542,8 +2542,8 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
-    public Builder withWriteExecutorDisruptorWriteBufferSize(long size) {
-      writeConfig.setValue(WRITE_EXECUTOR_DISRUPTOR_BUFFER_SIZE, String.valueOf(size));
+    public Builder withWriteExecutorDisruptorWriteBufferLimitBytes(long size) {
+      writeConfig.setValue(WRITE_EXECUTOR_DISRUPTOR_BUFFER_LIMIT_BYTES, String.valueOf(size));
       return this;
     }
 
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java
index bd192f649db..7baada74089 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/ExecutorFactory.java
@@ -52,7 +52,7 @@ public class ExecutorFactory {
         return new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, consumer,
             transformFunction, preExecuteRunnable);
       case DISRUPTOR:
-        return new DisruptorExecutor<>(hoodieConfig.getWriteExecutorDisruptorWriteBufferSize(), inputItr, consumer,
+        return new DisruptorExecutor<>(hoodieConfig.getWriteExecutorDisruptorWriteBufferLimitBytes(), inputItr, consumer,
             transformFunction, hoodieConfig.getWriteExecutorDisruptorWaitStrategy(), preExecuteRunnable);
       case SIMPLE:
         return new SimpleExecutor<>(inputItr, consumer, transformFunction);
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java
index 55c2325b137..19155f6b318 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorExecutionInSpark.java
@@ -50,7 +50,7 @@ public class TestDisruptorExecutionInSpark extends HoodieClientTestHarness {
 
   private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
       .withExecutorType(ExecutorType.DISRUPTOR.name())
-      .withWriteExecutorDisruptorWriteBufferSize(8)
+      .withWriteExecutorDisruptorWriteBufferLimitBytes(8)
       .build(false);
 
   @BeforeEach
@@ -94,7 +94,7 @@ public class TestDisruptorExecutionInSpark extends HoodieClientTestHarness {
     DisruptorExecutor<HoodieRecord, HoodieRecord, Integer> exec = null;
 
     try {
-      exec = new DisruptorExecutor<>(writeConfig.getWriteExecutorDisruptorWriteBufferSize(), hoodieRecords.iterator(), consumer,
+      exec = new DisruptorExecutor<>(writeConfig.getWriteExecutorDisruptorWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer,
           Function.identity(), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable());
       int result = exec.execute();
       // It should buffer and write 100 records
@@ -127,7 +127,9 @@ public class TestDisruptorExecutionInSpark extends HoodieClientTestHarness {
           @Override
           public void consume(HoodieRecord record) {
             try {
-              Thread.currentThread().wait();
+              synchronized (this) {
+                wait();
+              }
             } catch (InterruptedException ie) {
               // ignore here
             }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java
index 7344ccd89fd..4c8e0dac27d 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java
@@ -68,7 +68,7 @@ public class TestDisruptorMessageQueue extends HoodieClientTestHarness {
 
   private final HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
       .withExecutorType(ExecutorType.DISRUPTOR.name())
-      .withWriteExecutorDisruptorWriteBufferSize(16)
+      .withWriteExecutorDisruptorWriteBufferLimitBytes(16)
       .build(false);
 
   @BeforeEach
@@ -139,7 +139,7 @@ public class TestDisruptorMessageQueue extends HoodieClientTestHarness {
     DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> exec = null;
 
     try {
-      exec = new DisruptorExecutor(writeConfig.getWriteExecutorDisruptorWriteBufferSize(), hoodieRecords.iterator(), consumer,
+      exec = new DisruptorExecutor(writeConfig.getWriteExecutorDisruptorWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer,
           getTransformer(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable());
       int result = exec.execute();
       // It should buffer and write 100 records
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index f3a169fc06e..d2c8629df98 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -156,7 +156,7 @@ object DataSourceReadOptions {
   val FILE_INDEX_LISTING_MODE_LAZY = "lazy"
 
   val FILE_INDEX_LISTING_MODE_OVERRIDE: ConfigProperty[String] =
-    ConfigProperty.key("hoodie.datasource.read.file.index.listing.mode.override")
+    ConfigProperty.key("hoodie.datasource.read.file.index.listing.mode")
       .defaultValue(FILE_INDEX_LISTING_MODE_LAZY)
       .withValidValues(FILE_INDEX_LISTING_MODE_LAZY, FILE_INDEX_LISTING_MODE_EAGER)
       .sinceVersion("0.13.0")
@@ -463,17 +463,6 @@ object DataSourceWriteOptions {
 
   val RECONCILE_SCHEMA: ConfigProperty[Boolean] = HoodieCommonConfig.RECONCILE_SCHEMA
 
-  // NOTE: This is an internal config that is not exposed to the public
-  private[hudi] val CANONICALIZE_SCHEMA: ConfigProperty[Boolean] =
-    ConfigProperty.key("hoodie.datasource.write.schema.canonicalize")
-      .defaultValue(true)
-      .sinceVersion("0.13.0")
-      .withDocumentation("Controls whether incoming batch's schema's nullability constraints should be canonicalized "
-        + "relative to the table's schema. For ex, in case field A is marked as null-able in table's schema, but is marked "
-        + "as non-null in the incoming batch, w/o canonicalization such write might fail as we won't be able to read existing "
-        + "null records from the table (for updating, for ex). Note, that this config has only effect when "
-        + "'hoodie.datasource.write.reconcile.schema' is set to false.")
-
   // HIVE SYNC SPECIFIC CONFIGS
   // NOTE: DO NOT USE uppercase for the keys as they are internally lower-cased. Using upper-cases causes
   // unexpected issues with config getting reset
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 5dda7c9df78..7e234775faa 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -29,7 +29,7 @@ import org.apache.hudi.avro.AvroSchemaUtils.{isCompatibleProjectionOf, isSchemaC
 import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
-import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieConfig, HoodieMetadataConfig, TypedProperties}
+import org.apache.hudi.common.config.{ConfigProperty, HoodieCommonConfig, HoodieConfig, HoodieMetadataConfig, TypedProperties}
 import org.apache.hudi.common.engine.HoodieEngineContext
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
@@ -72,6 +72,19 @@ import scala.collection.mutable.ListBuffer
 
 object HoodieSparkSqlWriter {
 
+  /**
+   * Controls whether incoming batch's schema's nullability constraints should be canonicalized
+   * relative to the table's schema. For ex, in case field A is marked as null-able in table's schema, but is marked
+   * as non-null in the incoming batch, w/o canonicalization such write might fail as we won't be able to read existing
+   * null records from the table (for updating, for ex). Note, that this config has only effect when
+   * 'hoodie.datasource.write.reconcile.schema' is set to false
+   *
+   * NOTE: This is an internal config that is not exposed to the public
+   */
+  val CANONICALIZE_NULLABLE: ConfigProperty[Boolean] =
+    ConfigProperty.key("hoodie.internal.write.schema.canonicalize.nullable")
+      .defaultValue(true)
+
   private val log = LogManager.getLogger(getClass)
   private var tableExists: Boolean = false
   private var asyncCompactionTriggerFnDefined: Boolean = false
@@ -397,9 +410,9 @@ object HoodieSparkSqlWriter {
         // relative to the table's one, by doing a (minor) reconciliation of the nullability constraints:
         // for ex, if in incoming schema column A is designated as non-null, but it's designated as nullable
         // in the table's one we want to proceed aligning nullability constraints w/ the table's schema
-        val shouldCanonicalizeSchema = opts.getOrDefault(DataSourceWriteOptions.CANONICALIZE_SCHEMA.key,
-          DataSourceWriteOptions.CANONICALIZE_SCHEMA.defaultValue.toString).toBoolean
-        val canonicalizedSourceSchema = if (shouldCanonicalizeSchema) {
+        val shouldCanonicalizeNullable = opts.getOrDefault(CANONICALIZE_NULLABLE.key,
+          CANONICALIZE_NULLABLE.defaultValue.toString).toBoolean
+        val canonicalizedSourceSchema = if (shouldCanonicalizeNullable) {
           AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema, latestTableSchema)
         } else {
           sourceSchema
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 9099c7225e0..418b2f8d6ec 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hudi.command
 import org.apache.avro.Schema
 import org.apache.hudi.AvroConversionUtils.convertStructTypeToAvroSchema
 import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.HoodieSparkSqlWriter.CANONICALIZE_NULLABLE
 import org.apache.hudi.common.model.HoodieAvroRecordMerger
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.config.HoodieWriteConfig
@@ -532,7 +533,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
         //       target table, ie partially updating)
         AVRO_SCHEMA_VALIDATE_ENABLE.key -> "false",
         RECONCILE_SCHEMA.key -> "false",
-        "hoodie.datasource.write.schema.canonicalize" -> "false"
+        CANONICALIZE_NULLABLE.key -> "false"
       )
     }
   }
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala
index 36540b43a40..06239697db9 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala
@@ -87,7 +87,7 @@ class TestHoodiePruneFileSourcePartitions extends HoodieClientTestBase with Scal
       // instead of serving already cached value
       spark.sessionState.catalog.invalidateAllCachedTables()
 
-      spark.sql(s"SET hoodie.datasource.read.file.index.listing.mode.override=$listingModeOverride")
+      spark.sql(s"SET hoodie.datasource.read.file.index.listing.mode=$listingModeOverride")
 
       val df = spark.sql(s"SELECT * FROM $tableName WHERE partition = '2021-01-05'")
       val optimizedPlan = df.queryExecution.optimizedPlan
@@ -179,7 +179,7 @@ class TestHoodiePruneFileSourcePartitions extends HoodieClientTestBase with Scal
       // instead of serving already cached value
       spark.sessionState.catalog.invalidateAllCachedTables()
 
-      spark.sql(s"SET hoodie.datasource.read.file.index.listing.mode.override=$listingModeOverride")
+      spark.sql(s"SET hoodie.datasource.read.file.index.listing.mode=$listingModeOverride")
 
       val df = spark.sql(s"SELECT * FROM $tableName")
       val optimizedPlan = df.queryExecution.optimizedPlan