You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2022/06/11 20:48:59 UTC

[hudi] branch release-0.11.1-rc2-prep created (now 8b8d82df5a)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a change to branch release-0.11.1-rc2-prep
in repository https://gitbox.apache.org/repos/asf/hudi.git


      at 8b8d82df5a [HUDI-4205] Fix NullPointerException in HFile reader creation

This branch includes the following new commits:

     new 7b98ecfc2a [MINOR][DOCS] Update the README.md file in hudi-examples (#5803)
     new c68f22aeb8 [MINOR] FlinkStateBackendConverter add more  exception message (#5809)
     new e68e0ca374 [HUDI-4213] Infer keygen clazz for Spark SQL (#5815)
     new 0be352e0ec [HUDI-4139]improvement for flink write operator name to identify tables easily (#5744)
     new 807bc71dde [HUDI-3889] Do not validate table config if save mode is set to Overwrite (#5619)
     new 58e25ba856 [HUDI-4221] Fixing getAllPartitionPaths perf hit w/ FileSystemBackedMetadata (#5829)
     new cd1514484f [HUDI-4223] Fix NullPointerException from getLogRecordScanner when reading metadata table (#5840)
     new 8b8d82df5a [HUDI-4205] Fix NullPointerException in HFile reader creation

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[hudi] 03/08: [HUDI-4213] Infer keygen clazz for Spark SQL (#5815)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.11.1-rc2-prep
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit e68e0ca374a0626502a5e36389921fe3605cf97b
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Thu Jun 9 20:37:58 2022 +0800

    [HUDI-4213] Infer keygen clazz for Spark SQL (#5815)
---
 .../scala/org/apache/hudi/DataSourceOptions.scala  | 34 +++++++++++++---------
 .../spark/sql/hudi/command/SqlKeyGenerator.scala   |  9 +++---
 2 files changed, 25 insertions(+), 18 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index bcc8ce3db0..a6c957b4c0 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -19,7 +19,7 @@ package org.apache.hudi
 
 import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
-import org.apache.hudi.common.config.{ConfigProperty, HoodieCommonConfig, HoodieConfig}
+import org.apache.hudi.common.config.{ConfigProperty, HoodieCommonConfig, HoodieConfig, TypedProperties}
 import org.apache.hudi.common.fs.ConsistencyGuardConfig
 import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType}
 import org.apache.hudi.common.table.HoodieTableConfig
@@ -323,22 +323,12 @@ object DataSourceWriteOptions {
   val HIVE_STYLE_PARTITIONING = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE
 
   /**
-    * Key generator class, that implements will extract the key out of incoming record
-    *
+    * Key generator class, that implements will extract the key out of incoming record.
     */
   val keyGeneraterInferFunc = DataSourceOptionsHelper.scalaFunctionToJavaFunction((p: HoodieConfig) => {
-    if (!p.contains(PARTITIONPATH_FIELD)) {
-      Option.of(classOf[NonpartitionedKeyGenerator].getName)
-    } else {
-      val numOfPartFields = p.getString(PARTITIONPATH_FIELD).split(",").length
-      val numOfRecordKeyFields = p.getString(RECORDKEY_FIELD).split(",").length
-      if (numOfPartFields == 1 && numOfRecordKeyFields == 1) {
-        Option.of(classOf[SimpleKeyGenerator].getName)
-      } else {
-        Option.of(classOf[ComplexKeyGenerator].getName)
-      }
-    }
+    Option.of(DataSourceOptionsHelper.inferKeyGenClazz(p.getProps))
   })
+
   val KEYGENERATOR_CLASS_NAME: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.write.keygenerator.class")
     .defaultValue(classOf[SimpleKeyGenerator].getName)
@@ -804,6 +794,22 @@ object DataSourceOptionsHelper {
     ) ++ translateConfigurations(parameters)
   }
 
+  def inferKeyGenClazz(props: TypedProperties): String = {
+    val partitionFields = props.getString(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), null)
+    if (partitionFields != null) {
+      val numPartFields = partitionFields.split(",").length
+      val recordsKeyFields = props.getString(DataSourceWriteOptions.RECORDKEY_FIELD.key(), DataSourceWriteOptions.RECORDKEY_FIELD.defaultValue())
+      val numRecordKeyFields = recordsKeyFields.split(",").length
+      if (numPartFields == 1 && numRecordKeyFields == 1) {
+        classOf[SimpleKeyGenerator].getName
+      } else {
+        classOf[ComplexKeyGenerator].getName
+      }
+    } else {
+      classOf[NonpartitionedKeyGenerator].getName
+    }
+  }
+
   implicit def scalaFunctionToJavaFunction[From, To](function: (From) => To): JavaFunction[From, To] = {
     new JavaFunction[From, To] {
       override def apply (input: From): To = function (input)
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
index 9d139389fd..798ed84b09 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hudi.command
 
 import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.DataSourceOptionsHelper
 import org.apache.hudi.common.config.TypedProperties
 import org.apache.hudi.common.util.PartitionPathEncodeUtils
 import org.apache.hudi.config.HoodieWriteConfig
@@ -113,14 +114,14 @@ class SqlKeyGenerator(props: TypedProperties) extends ComplexKeyGenerator(props)
     } else partitionPath
   }
 
-  override def getPartitionPath(record: GenericRecord) = {
+  override def getPartitionPath(record: GenericRecord): String = {
     val partitionPath = super.getPartitionPath(record)
-    convertPartitionPathToSqlType(partitionPath, false)
+    convertPartitionPathToSqlType(partitionPath, rowType = false)
   }
 
   override def getPartitionPath(row: Row): String = {
     val partitionPath = super.getPartitionPath(row)
-    convertPartitionPathToSqlType(partitionPath, true)
+    convertPartitionPathToSqlType(partitionPath, rowType = true)
   }
 }
 
@@ -135,7 +136,7 @@ object SqlKeyGenerator {
     if (beforeKeyGenClassName != null && beforeKeyGenClassName.nonEmpty) {
       HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(beforeKeyGenClassName)
     } else {
-      classOf[ComplexKeyGenerator].getCanonicalName
+      DataSourceOptionsHelper.inferKeyGenClazz(props)
     }
   }
 }


[hudi] 06/08: [HUDI-4221] Fixing getAllPartitionPaths perf hit w/ FileSystemBackedMetadata (#5829)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.11.1-rc2-prep
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 58e25ba8563bb45b629fd534eb12423aeb948e6e
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Sat Jun 11 16:17:42 2022 -0400

    [HUDI-4221] Fixing getAllPartitionPaths perf hit w/ FileSystemBackedMetadata (#5829)
---
 .../metadata/FileSystemBackedTableMetadata.java    | 39 ++++++++++++----------
 1 file changed, 22 insertions(+), 17 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index b77bb12c49..f029995ba0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -68,13 +68,14 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
 
   @Override
   public List<String> getAllPartitionPaths() throws IOException {
-    FileSystem fs = new Path(datasetBasePath).getFileSystem(hadoopConf.get());
+    Path basePath = new Path(datasetBasePath);
+    FileSystem fs = basePath.getFileSystem(hadoopConf.get());
     if (assumeDatePartitioning) {
       return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, datasetBasePath);
     }
 
     List<Path> pathsToList = new CopyOnWriteArrayList<>();
-    pathsToList.add(new Path(datasetBasePath));
+    pathsToList.add(basePath);
     List<String> partitionPaths = new CopyOnWriteArrayList<>();
 
     while (!pathsToList.isEmpty()) {
@@ -82,27 +83,31 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
       int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, pathsToList.size());
 
       // List all directories in parallel
-      List<FileStatus[]> dirToFileListing = engineContext.map(pathsToList, path -> {
+      List<Pair<Path, FileStatus[]>> dirToFileListing = engineContext.map(pathsToList, path -> {
         FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
-        return fileSystem.listStatus(path);
+        return Pair.of(path, fileSystem.listStatus(path));
       }, listingParallelism);
       pathsToList.clear();
 
       // if current dictionary contains PartitionMetadata, add it to result
       // if current dictionary does not contain PartitionMetadata, add it to queue
-      dirToFileListing.stream().flatMap(Arrays::stream).parallel()
-          .forEach(fileStatus -> {
-            if (fileStatus.isDirectory()) {
-              if (HoodiePartitionMetadata.hasPartitionMetadata(fs, fileStatus.getPath())) {
-                partitionPaths.add(FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath()));
-              } else if (!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) {
-                pathsToList.add(fileStatus.getPath());
-              }
-            } else if (fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) {
-              String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), fileStatus.getPath().getParent());
-              partitionPaths.add(partitionName);
-            }
-          });
+      dirToFileListing.forEach(p -> {
+        Option<FileStatus> partitionMetaFile = Option.fromJavaOptional(Arrays.stream(p.getRight()).parallel()
+            .filter(fileStatus -> fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX))
+            .findFirst());
+
+        if (partitionMetaFile.isPresent()) {
+          // Is a partition.
+          String partitionName = FSUtils.getRelativePartitionPath(basePath, p.getLeft());
+          partitionPaths.add(partitionName);
+        } else {
+          // Add sub-dirs to the queue
+          pathsToList.addAll(Arrays.stream(p.getRight())
+              .filter(fileStatus -> fileStatus.isDirectory() && !fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME))
+              .map(fileStatus -> fileStatus.getPath())
+              .collect(Collectors.toList()));
+        }
+      });
     }
     return partitionPaths;
   }


[hudi] 08/08: [HUDI-4205] Fix NullPointerException in HFile reader creation

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.11.1-rc2-prep
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 8b8d82df5a4c39b0ec2e8e2b3ef3e49fc76a29c0
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Sat Jun 11 00:58:07 2022 -0700

    [HUDI-4205] Fix NullPointerException in HFile reader creation
---
 .../src/main/scala/org/apache/hudi/HoodieBaseRelation.scala       | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 47e391a560..43a2d72733 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -34,9 +34,10 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
 import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
+import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
 import org.apache.hudi.io.storage.HoodieHFileReader
+import org.apache.spark.SerializableWritable
 import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
@@ -535,11 +536,10 @@ object HoodieBaseRelation extends SparkAdapterSupport {
                                 filters: Seq[Filter],
                                 options: Map[String, String],
                                 hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
-    val hadoopConfBroadcast =
-      spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+    val hadoopConfBroadcast = spark.sparkContext.broadcast(new SerializableWritable(hadoopConf))
 
     partitionedFile => {
-      val hadoopConf = hadoopConfBroadcast.value.get()
+      val hadoopConf = hadoopConfBroadcast.value.value
       val reader = new HoodieHFileReader[GenericRecord](hadoopConf, new Path(partitionedFile.filePath),
         new CacheConfig(hadoopConf))
 


[hudi] 04/08: [HUDI-4139]improvement for flink write operator name to identify tables easily (#5744)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.11.1-rc2-prep
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 0be352e0ec40abe5cc6cddadeff9a12872e86ed2
Author: yanenze <34...@users.noreply.github.com>
AuthorDate: Fri Jun 10 05:48:20 2022 +0800

    [HUDI-4139]improvement for flink write operator name to identify tables easily (#5744)
    
    
    Co-authored-by: yanenze <ya...@keytop.com.cn>
---
 .../main/java/org/apache/hudi/sink/utils/Pipelines.java    | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 91ac2beadc..c969c10ed1 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -114,7 +114,7 @@ public class Pipelines {
             conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
       }
       return dataStream
-          .transform("bucket_bulk_insert", TypeInformation.of(Object.class), operatorFactory)
+          .transform(writeOpIdentifier("bucket_bulk_insert", conf), TypeInformation.of(Object.class), operatorFactory)
           .uid("uid_bucket_bulk_insert" + conf.getString(FlinkOptions.TABLE_NAME))
           .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
           .addSink(DummySink.INSTANCE)
@@ -146,7 +146,7 @@ public class Pipelines {
       }
     }
     return dataStream
-        .transform("hoodie_bulk_insert_write",
+        .transform(writeOpIdentifier("hoodie_bulk_insert_write", conf),
             TypeInformation.of(Object.class),
             operatorFactory)
         // follow the parallelism of upstream operators to avoid shuffle
@@ -190,7 +190,7 @@ public class Pipelines {
     WriteOperatorFactory<RowData> operatorFactory = AppendWriteOperator.getFactory(conf, rowType);
 
     return dataStream
-        .transform("hoodie_append_write", TypeInformation.of(Object.class), operatorFactory)
+        .transform(writeOpIdentifier("hoodie_append_write", conf), TypeInformation.of(Object.class), operatorFactory)
         .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
         .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
         .addSink(DummySink.INSTANCE)
@@ -322,7 +322,7 @@ public class Pipelines {
       String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
       BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
       return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
-          .transform("bucket_write", TypeInformation.of(Object.class), operatorFactory)
+          .transform(writeOpIdentifier("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory)
           .uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME))
           .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
     } else {
@@ -338,7 +338,7 @@ public class Pipelines {
           .setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism))
           // shuffle by fileId(bucket id)
           .keyBy(record -> record.getCurrentLocation().getFileId())
-          .transform("stream_write", TypeInformation.of(Object.class), operatorFactory)
+          .transform(writeOpIdentifier("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)
           .uid("uid_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
           .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
     }
@@ -385,6 +385,10 @@ public class Pipelines {
         .name("clean_commits");
   }
 
+  public static String writeOpIdentifier(String operatorN, Configuration conf) {
+    return operatorN + ": " + conf.getString(FlinkOptions.TABLE_NAME);
+  }
+
   /**
    * Dummy sink that does nothing.
    */


[hudi] 05/08: [HUDI-3889] Do not validate table config if save mode is set to Overwrite (#5619)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.11.1-rc2-prep
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 807bc71dde6c0a2bb8fa429fe754ed35d7fc6302
Author: xi chaomin <36...@users.noreply.github.com>
AuthorDate: Fri Jun 10 07:23:51 2022 +0800

    [HUDI-3889] Do not validate table config if save mode is set to Overwrite (#5619)
    
    
    Co-authored-by: xicm <xi...@asiainfo.com>
---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 12 ++---
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  | 63 ++++++++++++----------
 .../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 23 ++++++++
 3 files changed, 65 insertions(+), 33 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 84280559e9..fe4391c0a5 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -83,9 +83,9 @@ object HoodieSparkSqlWriter {
     val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
     tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
     var tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
-    validateTableConfig(sqlContext.sparkSession, optParams, tableConfig)
+    validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite)
 
-    val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig)
+    val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode)
     val originKeyGeneratorClassName = HoodieWriterUtils.getOriginKeyGenerator(parameters)
     val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestampBasedKeyGenerator(
       originKeyGeneratorClassName, parameters)
@@ -408,9 +408,9 @@ object HoodieSparkSqlWriter {
     val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
     tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
     val tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
-    validateTableConfig(sqlContext.sparkSession, optParams, tableConfig)
+    validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite)
 
-    val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig)
+    val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode)
     val tableName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")
     val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE)
     val bootstrapBasePath = hoodieConfig.getStringOrThrow(BASE_PATH,
@@ -734,14 +734,14 @@ object HoodieSparkSqlWriter {
   }
 
   private def mergeParamsAndGetHoodieConfig(optParams: Map[String, String],
-      tableConfig: HoodieTableConfig): (Map[String, String], HoodieConfig) = {
+      tableConfig: HoodieTableConfig, mode: SaveMode): (Map[String, String], HoodieConfig) = {
     val translatedOptions = DataSourceWriteOptions.translateSqlOptions(optParams)
     val mergedParams = mutable.Map.empty ++ HoodieWriterUtils.parametersWithWriteDefaults(translatedOptions)
     if (!mergedParams.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)
       && mergedParams.contains(KEYGENERATOR_CLASS_NAME.key)) {
       mergedParams(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = mergedParams(KEYGENERATOR_CLASS_NAME.key)
     }
-    if (null != tableConfig) {
+    if (null != tableConfig && mode != SaveMode.Overwrite) {
       tableConfig.getProps.foreach { case (key, value) =>
         mergedParams(key) = value
       }
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 63f1a7afc2..4967212675 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -118,47 +118,56 @@ object HoodieWriterUtils {
     }
   }
 
+  def validateTableConfig(spark: SparkSession, params: Map[String, String],
+                          tableConfig: HoodieConfig): Unit = {
+    validateTableConfig(spark, params, tableConfig, false)
+  }
+
   /**
    * Detects conflicts between new parameters and existing table configurations
    */
   def validateTableConfig(spark: SparkSession, params: Map[String, String],
-      tableConfig: HoodieConfig): Unit = {
-    val resolver = spark.sessionState.conf.resolver
-    val diffConfigs = StringBuilder.newBuilder
-    params.foreach { case (key, value) =>
-      val existingValue = getStringFromTableConfigWithAlternatives(tableConfig, key)
-      if (null != existingValue && !resolver(existingValue, value)) {
-        diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n")
+      tableConfig: HoodieConfig, isOverWriteMode: Boolean): Unit = {
+    // If Overwrite is set as save mode, we don't need to do table config validation.
+    if (!isOverWriteMode) {
+      val resolver = spark.sessionState.conf.resolver
+      val diffConfigs = StringBuilder.newBuilder
+      params.foreach { case (key, value) =>
+        val existingValue = getStringFromTableConfigWithAlternatives(tableConfig, key)
+        if (null != existingValue && !resolver(existingValue, value)) {
+          diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n")
+        }
       }
-    }
 
-    if (null != tableConfig) {
-      val datasourceRecordKey = params.getOrElse(RECORDKEY_FIELD.key(), null)
-      val tableConfigRecordKey = tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS)
-      if (null != datasourceRecordKey && null != tableConfigRecordKey
+      if (null != tableConfig) {
+        val datasourceRecordKey = params.getOrElse(RECORDKEY_FIELD.key(), null)
+        val tableConfigRecordKey = tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS)
+        if (null != datasourceRecordKey && null != tableConfigRecordKey
           && datasourceRecordKey != tableConfigRecordKey) {
-        diffConfigs.append(s"RecordKey:\t$datasourceRecordKey\t$tableConfigRecordKey\n")
-      }
+          diffConfigs.append(s"RecordKey:\t$datasourceRecordKey\t$tableConfigRecordKey\n")
+        }
 
-      val datasourcePreCombineKey = params.getOrElse(PRECOMBINE_FIELD.key(), null)
-      val tableConfigPreCombineKey = tableConfig.getString(HoodieTableConfig.PRECOMBINE_FIELD)
-      if (null != datasourcePreCombineKey && null != tableConfigPreCombineKey
+        val datasourcePreCombineKey = params.getOrElse(PRECOMBINE_FIELD.key(), null)
+        val tableConfigPreCombineKey = tableConfig.getString(HoodieTableConfig.PRECOMBINE_FIELD)
+        if (null != datasourcePreCombineKey && null != tableConfigPreCombineKey
           && datasourcePreCombineKey != tableConfigPreCombineKey) {
-        diffConfigs.append(s"PreCombineKey:\t$datasourcePreCombineKey\t$tableConfigPreCombineKey\n")
-      }
+          diffConfigs.append(s"PreCombineKey:\t$datasourcePreCombineKey\t$tableConfigPreCombineKey\n")
+        }
 
-      val datasourceKeyGen = getOriginKeyGenerator(params)
-      val tableConfigKeyGen = tableConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
-      if (null != datasourceKeyGen && null != tableConfigKeyGen
+        val datasourceKeyGen = getOriginKeyGenerator(params)
+        val tableConfigKeyGen = tableConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
+        if (null != datasourceKeyGen && null != tableConfigKeyGen
           && datasourceKeyGen != tableConfigKeyGen) {
-        diffConfigs.append(s"KeyGenerator:\t$datasourceKeyGen\t$tableConfigKeyGen\n")
+          diffConfigs.append(s"KeyGenerator:\t$datasourceKeyGen\t$tableConfigKeyGen\n")
+        }
       }
-    }
 
-    if (diffConfigs.nonEmpty) {
-      diffConfigs.insert(0, "\nConfig conflict(key\tcurrent value\texisting value):\n")
-      throw new HoodieException(diffConfigs.toString.trim)
+      if (diffConfigs.nonEmpty) {
+        diffConfigs.insert(0, "\nConfig conflict(key\tcurrent value\texisting value):\n")
+        throw new HoodieException(diffConfigs.toString.trim)
+      }
     }
+
     // Check schema evolution for bootstrap table.
     // now we do not support bootstrap table.
     if (params.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 339dbb5c71..928b1b1a1e 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -272,6 +272,29 @@ class TestHoodieSparkSqlWriter {
     assert(tableAlreadyExistException.getMessage.contains(s"${HoodieWriteConfig.TBL_NAME.key}:\thoodie_bar_tbl\thoodie_foo_tbl"))
   }
 
+  /**
+    * Test case for Do not validate table config if save mode is set to Overwrite
+    */
+  @Test
+  def testValidateTableConfigWithOverwriteSaveMode(): Unit = {
+    //create a new table
+    val tableModifier1 = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
+      "hoodie.datasource.write.recordkey.field" -> "uuid")
+    val dataFrame = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime)))
+    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, tableModifier1, dataFrame)
+
+    //on same path try write with different RECORDKEY_FIELD_NAME and Append SaveMode should throw an exception
+    val tableModifier2 = Map("path" -> tempBasePath, HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
+      "hoodie.datasource.write.recordkey.field" -> "ts")
+    val dataFrame2 = spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new Date().getTime)))
+    val hoodieException = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, tableModifier2, dataFrame2))
+    assert(hoodieException.getMessage.contains("Config conflict"))
+    assert(hoodieException.getMessage.contains(s"RecordKey:\tts\tuuid"))
+
+    //on same path try write with different RECORDKEY_FIELD_NAME and Overwrite SaveMode should be successful.
+    assert(HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, tableModifier2, dataFrame2)._1)
+  }
+
   /**
    * Test case for each bulk insert sort mode
    *


[hudi] 01/08: [MINOR][DOCS] Update the README.md file in hudi-examples (#5803)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.11.1-rc2-prep
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 7b98ecfc2a92daf5b04f3823f71fab0ab96e3fac
Author: liuzhuang2017 <95...@users.noreply.github.com>
AuthorDate: Thu Jun 9 08:45:00 2022 +0800

    [MINOR][DOCS] Update the README.md file in hudi-examples (#5803)
---
 hudi-examples/README.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/hudi-examples/README.md b/hudi-examples/README.md
index ee077c3f78..5c228b6825 100644
--- a/hudi-examples/README.md
+++ b/hudi-examples/README.md
@@ -42,4 +42,4 @@ To run the demo:
 
       6.1 The configuration files we provided is just the simplest demo, you can change it according to your specific needs.
 
-      6.2 You could also use Intellij to run the example directly by configuring parameters as "Program arguments"
+      6.2 You could also use IntelliJ IDEA to run the example directly by configuring parameters as "Program arguments"


[hudi] 07/08: [HUDI-4223] Fix NullPointerException from getLogRecordScanner when reading metadata table (#5840)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.11.1-rc2-prep
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit cd1514484fff1ae6e66b4e56ae11013bdf4ac6e9
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Sat Jun 11 13:19:24 2022 -0700

    [HUDI-4223] Fix NullPointerException from getLogRecordScanner when reading metadata table (#5840)
    
    When explicitly specifying the metadata table path for reading in spark, the "hoodie.metadata.enable" is overwritten to true for proper read behavior.
---
 .../src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala       | 6 +++---
 .../hudi/functional/TestMetadataTableWithSparkDataSource.scala      | 4 ++--
 2 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
index a7ca60865f..2fdb9b882e 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -25,7 +25,6 @@ import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
 import org.apache.hudi.HoodieMergeOnReadRDD.{AvroDeserializerSupport, collectFieldOrdinals, getPartitionPath, projectAvro, projectAvroUnsafe, projectRowUnsafe, resolveAvroSchemaNullability}
-import org.apache.hudi.MergeOnReadSnapshotRelation.getFilePath
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.engine.HoodieLocalEngineContext
 import org.apache.hudi.common.fs.FSUtils
@@ -37,9 +36,9 @@ import org.apache.hudi.config.HoodiePayloadConfig
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
 import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable
 import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieTableMetadata}
-import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.avro.HoodieAvroDeserializer
 import org.apache.spark.sql.catalyst.InternalRow
@@ -324,7 +323,8 @@ private object HoodieMergeOnReadRDD {
     val fs = FSUtils.getFs(tablePath, hadoopConf)
 
     if (HoodieTableMetadata.isMetadataTable(tablePath)) {
-      val metadataConfig = tableState.metadataConfig
+      val metadataConfig = HoodieMetadataConfig.newBuilder()
+        .fromProperties(tableState.metadataConfig.getProps).enable(true).build()
       val dataTableBasePath = getDataTableBasePathFromMetadataTable(tablePath)
       val metadataTable = new HoodieBackedTableMetadata(
         new HoodieLocalEngineContext(hadoopConf), metadataConfig,
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
index 11705f9eb1..02e0ee6dfd 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataTableWithSparkDataSource.scala
@@ -78,7 +78,7 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn
       .save(basePath)
 
     // Files partition of MT
-    val filesPartitionDF = spark.read.options(metadataOpts).format(hudi).load(s"$basePath/.hoodie/metadata/files")
+    val filesPartitionDF = spark.read.format(hudi).load(s"$basePath/.hoodie/metadata/files")
 
     // Smoke test
     filesPartitionDF.show()
@@ -96,7 +96,7 @@ class TestMetadataTableWithSparkDataSource extends SparkClientFunctionalTestHarn
     assertEquals(expectedKeys, keys)
 
     // Column Stats Index partition of MT
-    val colStatsDF = spark.read.options(metadataOpts).format(hudi).load(s"$basePath/.hoodie/metadata/column_stats")
+    val colStatsDF = spark.read.format(hudi).load(s"$basePath/.hoodie/metadata/column_stats")
 
     // Smoke test
     colStatsDF.show()


[hudi] 02/08: [MINOR] FlinkStateBackendConverter add more exception message (#5809)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.11.1-rc2-prep
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit c68f22aeb818da44c93d0d899873a03b97ffc31b
Author: sandyfog <15...@qq.com>
AuthorDate: Thu Jun 9 15:13:27 2022 +0800

    [MINOR] FlinkStateBackendConverter add more  exception message (#5809)
    
    * [MINOR] FlinkStateBackendConverter add more  exception message
---
 .../src/main/java/org/apache/hudi/util/FlinkStateBackendConverter.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkStateBackendConverter.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkStateBackendConverter.java
index a6b15ffb74..0e4797973e 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkStateBackendConverter.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkStateBackendConverter.java
@@ -36,7 +36,7 @@ public class FlinkStateBackendConverter implements IStringConverter<StateBackend
       case "hashmap" : return new HashMapStateBackend();
       case "rocksdb" : return new EmbeddedRocksDBStateBackend();
       default:
-        throw new HoodieException(String.format("Unknown flink state backend %s.", value));
+        throw new HoodieException(String.format("Unknown flink state backend %s. Supports only hashmap and rocksdb by now", value));
     }
   }
 }