You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/02/04 05:53:05 UTC

[hudi] branch release-0.13.0 updated (3bcbdd09629 -> 4286d798237)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a change to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git


    from 3bcbdd09629 [0.13.0 Only] Disable default Avro schema validation (#7802)
     new 3fc2c8ed970 [HUDI-5691] Fixing `HoodiePruneFileSourcePartitions` to properly handle non-partitioned tables (#7833)
     new 4e5699b5ce9 [HUDI-5671] BucketIndexPartitioner partition algorithm skew (#7815)
     new 46aa15bd564 [HUDI-5551] support seconds unit on event_time metrics (#7664)
     new e0ef594ed2e [HUDI-5682] Bucket index does not work correctly for multi-writer scenarios (#7838)
     new 9af629c6309 [HUDI-5653] Reset TestDataSource for TestHoodieDeltaStreamerWithMultiWriter (#7831)
     new 78a2c647ceb [MINOR] Fixing CTAS configuration not propagated properly  (#7832)
     new 4286d798237 [HUDI-5496] Avoid unnecessary file system parsing to initialize metadata table for a new data table (#7841)

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/hudi/client/BaseHoodieWriteClient.java  |   2 +
 .../java/org/apache/hudi/client/WriteStatus.java   |  13 +-
 ...urrentFileWritesConflictResolutionStrategy.java |  55 ++++++++
 .../org/apache/hudi/config/HoodieLockConfig.java   |   9 ++
 .../apache/hudi/index/bucket/BucketIdentifier.java |   6 +-
 .../metadata/HoodieBackedTableMetadataWriter.java  |  64 +++++----
 ...urrentFileWritesConflictResolutionStrategy.java | 150 +++++++++++++++++++++
 .../SparkHoodieBackedTableMetadataWriter.java      |   1 +
 .../org/apache/hudi/client/TestWriteStatus.java    |  92 +++++++++++++
 .../sink/bucket/BucketStreamWriteFunction.java     |   5 +-
 .../sink/partitioner/BucketIndexPartitioner.java   |   5 +-
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      |   2 +-
 .../analysis/HoodiePruneFileSourcePartitions.scala |   2 +-
 .../apache/spark/sql/hudi/TestCreateTable.scala    |  28 ++++
 .../TestHoodiePruneFileSourcePartitions.scala      |  40 ++++--
 .../internal/HoodieDataSourceInternalWriter.java   |   4 +-
 .../TestHoodieDeltaStreamerWithMultiWriter.java    |   7 +-
 17 files changed, 430 insertions(+), 55 deletions(-)
 create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/BucketIndexConcurrentFileWritesConflictResolutionStrategy.java
 create mode 100644 hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestBucketIndexConcurrentFileWritesConflictResolutionStrategy.java


[hudi] 01/07: [HUDI-5691] Fixing `HoodiePruneFileSourcePartitions` to properly handle non-partitioned tables (#7833)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 3fc2c8ed97012fa698b302bb036b25ff3259cdad
Author: Alexey Kudinkin <al...@gmail.com>
AuthorDate: Thu Feb 2 21:39:40 2023 -0800

    [HUDI-5691] Fixing `HoodiePruneFileSourcePartitions` to properly handle non-partitioned tables (#7833)
    
    This change addresses the issue of `HoodiePruneFileSourcePartition` rule not being applied to non-partitioned table resulting into their corresponding size being incorrectly estimated by Spark
---
 .../analysis/HoodiePruneFileSourcePartitions.scala |  2 +-
 .../TestHoodiePruneFileSourcePartitions.scala      | 40 ++++++++++++++--------
 2 files changed, 27 insertions(+), 15 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala
index 3b86777e16e..46cb931a59b 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodiePruneFileSourcePartitions.scala
@@ -41,7 +41,7 @@ case class HoodiePruneFileSourcePartitions(spark: SparkSession) extends Rule[Log
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
     case op @ PhysicalOperation(projects, filters, lr @ LogicalRelation(HoodieRelationMatcher(fileIndex), _, _, _))
-      if sparkAdapter.isHoodieTable(lr, spark) && fileIndex.partitionSchema.nonEmpty && !fileIndex.hasPredicatesPushedDown =>
+      if sparkAdapter.isHoodieTable(lr, spark) && !fileIndex.hasPredicatesPushedDown =>
 
       val deterministicFilters = filters.filter(f => f.deterministic && !SubqueryExpression.hasSubquery(f))
       val normalizedFilters = exprUtils.normalizeExprs(deterministicFilters, lr.output)
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala
index 06239697db9..aac2a4027a2 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/analysis/TestHoodiePruneFileSourcePartitions.scala
@@ -54,8 +54,11 @@ class TestHoodiePruneFileSourcePartitions extends HoodieClientTestBase with Scal
     )
 
   @ParameterizedTest
-  @CsvSource(value = Array("cow", "mor"))
-  def testPartitionFiltersPushDown(tableType: String): Unit = {
+  @CsvSource(value = Array(
+    "cow,true", "cow,false",
+    "mor,true", "mor,false"
+  ))
+  def testPartitionFiltersPushDown(tableType: String, partitioned: Boolean): Unit = {
     spark.sql(
       s"""
          |CREATE TABLE $tableName (
@@ -65,7 +68,7 @@ class TestHoodiePruneFileSourcePartitions extends HoodieClientTestBase with Scal
          |  ts long,
          |  partition string
          |) USING hudi
-         |PARTITIONED BY (partition)
+         |${if (partitioned) "PARTITIONED BY (partition)" else ""}
          |TBLPROPERTIES (
          |  type = '$tableType',
          |  primaryKey = 'id',
@@ -103,27 +106,37 @@ class TestHoodiePruneFileSourcePartitions extends HoodieClientTestBase with Scal
             //          support (for partition-pruning) will only occur during execution phase, while file-listing
             //          actually happens during analysis stage
             case "eager" =>
-              assertEquals(1275, f.stats.sizeInBytes.longValue() / 1024)
-              assertEquals(1275, lr.stats.sizeInBytes.longValue() / 1024)
+              // NOTE: In case of partitioned table 3 files will be created, while in case of non-partitioned just 1
+              if (partitioned) {
+                assertEquals(1275, f.stats.sizeInBytes.longValue() / 1024)
+                assertEquals(1275, lr.stats.sizeInBytes.longValue() / 1024)
+              } else {
+                // NOTE: We're adding 512 to make sure we always round to the next integer value
+                assertEquals(425, (f.stats.sizeInBytes.longValue() + 512) / 1024)
+                assertEquals(425, (lr.stats.sizeInBytes.longValue() + 512) / 1024)
+              }
 
             // Case #2: Lazy listing (default mode).
             //          In case of lazy listing mode, Hudi will only list partitions matching partition-predicates that are
             //          eagerly pushed down (w/ help of [[HoodiePruneFileSourcePartitions]]) avoiding the necessity to
             //          list the whole table
             case "lazy" =>
-              assertEquals(425, f.stats.sizeInBytes.longValue() / 1024)
-              assertEquals(425, lr.stats.sizeInBytes.longValue() / 1024)
+              // NOTE: We're adding 512 to make sure we always round to the next integer value
+              assertEquals(425, (f.stats.sizeInBytes.longValue() + 512) / 1024)
+              assertEquals(425, (lr.stats.sizeInBytes.longValue() + 512) / 1024)
 
             case _ => throw new UnsupportedOperationException()
           }
 
-          val executionPlan = df.queryExecution.executedPlan
-          val expectedPhysicalPlanPartitionFiltersClause = tableType match {
-            case "cow" => s"PartitionFilters: [isnotnull($attr), ($attr = 2021-01-05)]"
-            case "mor" => s"PushedFilters: [IsNotNull(partition), EqualTo(partition,2021-01-05)]"
-          }
+          if (partitioned) {
+            val executionPlan = df.queryExecution.executedPlan
+            val expectedPhysicalPlanPartitionFiltersClause = tableType match {
+              case "cow" => s"PartitionFilters: [isnotnull($attr), ($attr = 2021-01-05)]"
+              case "mor" => s"PushedFilters: [IsNotNull(partition), EqualTo(partition,2021-01-05)]"
+            }
 
-          Assertions.assertTrue(executionPlan.toString().contains(expectedPhysicalPlanPartitionFiltersClause))
+            Assertions.assertTrue(executionPlan.toString().contains(expectedPhysicalPlanPartitionFiltersClause))
+          }
 
         case _ =>
           val failureHint =
@@ -224,5 +237,4 @@ class TestHoodiePruneFileSourcePartitions extends HoodieClientTestBase with Scal
     }
   }
 
-
 }


[hudi] 04/07: [HUDI-5682] Bucket index does not work correctly for multi-writer scenarios (#7838)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit e0ef594ed2e70dbc60f03b0d83794d1ecf7ecd69
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Fri Feb 3 19:03:16 2023 +0800

    [HUDI-5682] Bucket index does not work correctly for multi-writer scenarios (#7838)
    
    Resolve conflicts with the bucket Ids instead of the whole file Ids because the last 24 characters are randomly generated.
    
    We do not make the last 24 characters in the file group Id as a constant, mainly because of the replace commit now shadows all the replaced files based on the file group ID, if the same file Id is used for the new file and the replaced file, then all the data in the new files can not be queried.
---
 ...urrentFileWritesConflictResolutionStrategy.java |  55 ++++++++
 .../org/apache/hudi/config/HoodieLockConfig.java   |   9 ++
 .../apache/hudi/index/bucket/BucketIdentifier.java |   6 +-
 ...urrentFileWritesConflictResolutionStrategy.java | 150 +++++++++++++++++++++
 4 files changed, 219 insertions(+), 1 deletion(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/BucketIndexConcurrentFileWritesConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/BucketIndexConcurrentFileWritesConflictResolutionStrategy.java
new file mode 100644
index 00000000000..503f1c42185
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/BucketIndexConcurrentFileWritesConflictResolutionStrategy.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction;
+
+import org.apache.hudi.index.bucket.BucketIdentifier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class is a basic implementation of a conflict resolution strategy for concurrent writes {@link ConflictResolutionStrategy} using bucket index.
+ */
+public class BucketIndexConcurrentFileWritesConflictResolutionStrategy
+    extends SimpleConcurrentFileWritesConflictResolutionStrategy {
+  private static final Logger LOG = LoggerFactory.getLogger(BucketIndexConcurrentFileWritesConflictResolutionStrategy.class);
+
+  @Override
+  public boolean hasConflict(ConcurrentOperation thisOperation, ConcurrentOperation otherOperation) {
+    // TODO : UUID's can clash even for insert/insert, handle that case.
+    Set<String> bucketIdsSetForFirstInstant = extractBucketIds(thisOperation.getMutatedFileIds());
+    Set<String> bucketIdsSetForSecondInstant = extractBucketIds(otherOperation.getMutatedFileIds());
+    Set<String> intersection = new HashSet<>(bucketIdsSetForFirstInstant);
+    intersection.retainAll(bucketIdsSetForSecondInstant);
+    if (!intersection.isEmpty()) {
+      LOG.info("Found conflicting writes between first operation = " + thisOperation
+          + ", second operation = " + otherOperation + " , intersecting bucket ids " + intersection);
+      return true;
+    }
+    return false;
+  }
+
+  private static Set<String> extractBucketIds(Set<String> fileIds) {
+    return fileIds.stream().map(BucketIdentifier::bucketIdStrFromFileId).collect(Collectors.toSet());
+  }
+}
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
index f4aeef09b5c..3c932756685 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java
@@ -17,6 +17,7 @@
 
 package org.apache.hudi.config;
 
+import org.apache.hudi.client.transaction.BucketIndexConcurrentFileWritesConflictResolutionStrategy;
 import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
 import org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictResolutionStrategy;
 import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider;
@@ -26,6 +27,7 @@ import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.lock.LockProvider;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.index.HoodieIndex;
 
 import java.io.File;
 import java.io.FileReader;
@@ -185,6 +187,13 @@ public class HoodieLockConfig extends HoodieConfig {
   public static final ConfigProperty<String> WRITE_CONFLICT_RESOLUTION_STRATEGY_CLASS_NAME = ConfigProperty
       .key(LOCK_PREFIX + "conflict.resolution.strategy")
       .defaultValue(SimpleConcurrentFileWritesConflictResolutionStrategy.class.getName())
+      .withInferFunction(hoodieConfig -> {
+        if (HoodieIndex.IndexType.BUCKET.name().equalsIgnoreCase(hoodieConfig.getStringOrDefault(HoodieIndexConfig.INDEX_TYPE, null))) {
+          return Option.of(BucketIndexConcurrentFileWritesConflictResolutionStrategy.class.getName());
+        } else {
+          return Option.of(SimpleConcurrentFileWritesConflictResolutionStrategy.class.getName());
+        }
+      })
       .sinceVersion("0.8.0")
       .withDocumentation("Lock provider class name, this should be subclass of "
           + "org.apache.hudi.client.transaction.ConflictResolutionStrategy");
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
index 35f9205a8e5..5264c8b39f1 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
@@ -76,7 +76,11 @@ public class BucketIdentifier implements Serializable {
   }
 
   public static int bucketIdFromFileId(String fileId) {
-    return Integer.parseInt(fileId.substring(0, 8));
+    return Integer.parseInt(bucketIdStrFromFileId(fileId));
+  }
+
+  public static String bucketIdStrFromFileId(String fileId) {
+    return fileId.substring(0, 8);
   }
 
   public static String bucketIdStr(int n) {
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestBucketIndexConcurrentFileWritesConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestBucketIndexConcurrentFileWritesConflictResolutionStrategy.java
new file mode 100644
index 00000000000..24c578606d4
--- /dev/null
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestBucketIndexConcurrentFileWritesConflictResolutionStrategy.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.transaction;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstant.State;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieWriteConflictException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class TestBucketIndexConcurrentFileWritesConflictResolutionStrategy extends HoodieCommonTestHarness {
+
+  @BeforeEach
+  public void init() throws IOException {
+    initMetaClient();
+  }
+
+  @Test
+  public void testNoConcurrentWrites() throws Exception {
+    String newInstantTime = HoodieTestTable.makeNewCommitTime();
+    createCommit(newInstantTime);
+    // consider commits before this are all successful
+
+    Option<HoodieInstant> lastSuccessfulInstant = metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant();
+    newInstantTime = HoodieTestTable.makeNewCommitTime();
+    Option<HoodieInstant> currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, newInstantTime));
+
+    SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new BucketIndexConcurrentFileWritesConflictResolutionStrategy();
+    Stream<HoodieInstant> candidateInstants = strategy.getCandidateInstants(metaClient.getActiveTimeline(), currentInstant.get(), lastSuccessfulInstant);
+    Assertions.assertEquals(0, candidateInstants.count());
+  }
+
+  @Test
+  public void testConcurrentWrites() throws Exception {
+    String newInstantTime = HoodieTestTable.makeNewCommitTime();
+    createCommit(newInstantTime);
+    // consider commits before this are all successful
+    // writer 1
+    createInflightCommit(HoodieTestTable.makeNewCommitTime());
+    // writer 2
+    createInflightCommit(HoodieTestTable.makeNewCommitTime());
+    Option<HoodieInstant> lastSuccessfulInstant = metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant();
+    newInstantTime = HoodieTestTable.makeNewCommitTime();
+    Option<HoodieInstant> currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, newInstantTime));
+    SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new BucketIndexConcurrentFileWritesConflictResolutionStrategy();
+    Stream<HoodieInstant> candidateInstants = strategy.getCandidateInstants(metaClient.getActiveTimeline(), currentInstant.get(), lastSuccessfulInstant);
+    Assertions.assertEquals(0, candidateInstants.count());
+  }
+
+  @Test
+  public void testConcurrentWritesWithInterleavingSuccessfulCommit() throws Exception {
+    createCommit(HoodieActiveTimeline.createNewInstantTime());
+    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+    // consider commits before this are all successful
+    Option<HoodieInstant> lastSuccessfulInstant = timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
+    // writer 1 starts
+    String currentWriterInstant = HoodieActiveTimeline.createNewInstantTime();
+    createInflightCommit(currentWriterInstant);
+    // writer 2 starts and finishes
+    String newInstantTime = HoodieActiveTimeline.createNewInstantTime();
+    createCommit(newInstantTime);
+
+    Option<HoodieInstant> currentInstant = Option.of(new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, currentWriterInstant));
+    SimpleConcurrentFileWritesConflictResolutionStrategy strategy = new BucketIndexConcurrentFileWritesConflictResolutionStrategy();
+    HoodieCommitMetadata currentMetadata = createCommitMetadata(currentWriterInstant);
+    timeline = timeline.reload();
+    List<HoodieInstant> candidateInstants = strategy.getCandidateInstants(timeline, currentInstant.get(), lastSuccessfulInstant).collect(
+        Collectors.toList());
+    // writer 1 conflicts with writer 2
+    Assertions.assertEquals(1, candidateInstants.size());
+    ConcurrentOperation thatCommitOperation = new ConcurrentOperation(candidateInstants.get(0), metaClient);
+    ConcurrentOperation thisCommitOperation = new ConcurrentOperation(currentInstant.get(), currentMetadata);
+    Assertions.assertTrue(strategy.hasConflict(thisCommitOperation, thatCommitOperation));
+    try {
+      strategy.resolveConflict(null, thisCommitOperation, thatCommitOperation);
+      Assertions.fail("Cannot reach here, writer 1 and writer 2 should have thrown a conflict");
+    } catch (HoodieWriteConflictException e) {
+      // expected
+    }
+  }
+
+  private void createCommit(String instantTime) throws Exception {
+    String fileId1 = "00000001-file-" + instantTime + "-1";
+    String fileId2 = "00000002-file-"  + instantTime + "-2";
+
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+    commitMetadata.addMetadata("test", "test");
+    HoodieWriteStat writeStat = new HoodieWriteStat();
+    writeStat.setFileId(fileId1);
+    commitMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat);
+    commitMetadata.setOperationType(WriteOperationType.INSERT);
+    HoodieTestTable.of(metaClient)
+        .addCommit(instantTime, Option.of(commitMetadata))
+        .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
+  }
+
+  private HoodieCommitMetadata createCommitMetadata(String instantTime, String writeFileName) {
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+    commitMetadata.addMetadata("test", "test");
+    HoodieWriteStat writeStat = new HoodieWriteStat();
+    writeStat.setFileId(writeFileName);
+    commitMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, writeStat);
+    commitMetadata.setOperationType(WriteOperationType.INSERT);
+    return commitMetadata;
+  }
+
+  private HoodieCommitMetadata createCommitMetadata(String instantTime) {
+    return createCommitMetadata(instantTime, "00000001-file-" + instantTime + "-1");
+  }
+
+  private void createInflightCommit(String instantTime) throws Exception {
+    String fileId1 = "00000001-file-" + instantTime + "-1";
+    String fileId2 = "00000002-file-" + instantTime + "-2";
+    HoodieTestTable.of(metaClient)
+        .addInflightCommit(instantTime)
+        .withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
+  }
+}


[hudi] 05/07: [HUDI-5653] Reset TestDataSource for TestHoodieDeltaStreamerWithMultiWriter (#7831)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 9af629c63096edc2533c1f63407443d419015a09
Author: Jon Vexler <jb...@gmail.com>
AuthorDate: Fri Feb 3 12:39:56 2023 -0500

    [HUDI-5653] Reset TestDataSource for TestHoodieDeltaStreamerWithMultiWriter (#7831)
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java      | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
index 4c03e1c67f2..df17a88156a 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
@@ -36,6 +36,7 @@ import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -72,8 +73,12 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
   String basePath;
   String propsFilePath;
   String tableBasePath;
+  
+  @AfterEach
+  public void teardown() throws Exception {
+    TestDataSource.resetDataGen();
+  }
 
-  @Disabled("HUDI-5653")
   @ParameterizedTest
   @EnumSource(HoodieTableType.class)
   void testUpsertsContinuousModeWithMultipleWritersForConflicts(HoodieTableType tableType) throws Exception {


[hudi] 02/07: [HUDI-5671] BucketIndexPartitioner partition algorithm skew (#7815)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 4e5699b5ce9666cf59761c412cc6b106211025b7
Author: luokey <85...@qq.com>
AuthorDate: Fri Feb 3 00:56:07 2023 -0500

    [HUDI-5671] BucketIndexPartitioner partition algorithm skew (#7815)
---
 .../java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java  | 5 +++--
 .../org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java     | 5 +++--
 2 files changed, 6 insertions(+), 4 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
index cf06dbc18d6..dcbb30fb8bc 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
@@ -135,8 +135,9 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
    * (partition + curBucket) % numPartitions == this taskID belongs to this task.
    */
   public boolean isBucketToLoad(int bucketNumber, String partition) {
-    int globalHash = ((partition + bucketNumber).hashCode()) & Integer.MAX_VALUE;
-    return BucketIdentifier.mod(globalHash, parallelism) == taskID;
+    final int partitionIndex = (partition.hashCode() & Integer.MAX_VALUE) % parallelism;
+    int globalIndex = partitionIndex + bucketNumber;
+    return BucketIdentifier.mod(globalIndex, parallelism)  == taskID;
   }
 
   /**
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
index 5fa3d1ab9a0..4e0c08b1046 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
@@ -42,7 +42,8 @@ public class BucketIndexPartitioner<T extends HoodieKey> implements Partitioner<
   @Override
   public int partition(HoodieKey key, int numPartitions) {
     int curBucket = BucketIdentifier.getBucketId(key, indexKeyFields, bucketNum);
-    int globalHash = (key.getPartitionPath() + curBucket).hashCode() & Integer.MAX_VALUE;
-    return BucketIdentifier.mod(globalHash, numPartitions);
+    int partitionIndex = (key.getPartitionPath().hashCode() & Integer.MAX_VALUE) % numPartitions;
+    int globalIndex = partitionIndex + curBucket;
+    return BucketIdentifier.mod(globalIndex, numPartitions);
   }
 }


[hudi] 03/07: [HUDI-5551] support seconds unit on event_time metrics (#7664)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 46aa15bd564e823ca6984992a110534de9611ece
Author: 苏承祥 <11...@qq.com>
AuthorDate: Fri Feb 3 16:02:41 2023 +0800

    [HUDI-5551] support seconds unit on event_time metrics (#7664)
    
    * event_time support seconds
    
    Co-authored-by: 苏承祥 <su...@tuya.com>
---
 .../java/org/apache/hudi/client/WriteStatus.java   | 13 ++-
 .../org/apache/hudi/client/TestWriteStatus.java    | 92 ++++++++++++++++++++++
 2 files changed, 104 insertions(+), 1 deletion(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
index b306d6c5400..54e88fcca22 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
@@ -101,7 +101,18 @@ public class WriteStatus implements Serializable {
       String eventTimeVal = optionalRecordMetadata.get().getOrDefault(METADATA_EVENT_TIME_KEY, null);
       try {
         if (!StringUtils.isNullOrEmpty(eventTimeVal)) {
-          long eventTime = DateTimeUtils.parseDateTime(eventTimeVal).toEpochMilli();
+          int length = eventTimeVal.length();
+          long millisEventTime;
+          // eventTimeVal in seconds unit
+          if (length == 10) {
+            millisEventTime = Long.parseLong(eventTimeVal) * 1000;
+          } else if (length == 13) {
+            // eventTimeVal in millis unit
+            millisEventTime = Long.parseLong(eventTimeVal);
+          } else {
+            throw new IllegalArgumentException("not support event_time format:" + eventTimeVal);
+          }
+          long eventTime = DateTimeUtils.parseDateTime(Long.toString(millisEventTime)).toEpochMilli();
           stat.setMinEventTime(eventTime);
           stat.setMaxEventTime(eventTime);
         }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
index 78e711ed701..99fb76650f9 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
@@ -18,12 +18,19 @@
 
 package org.apache.hudi.client;
 
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.util.Option;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 
@@ -53,4 +60,89 @@ public class TestWriteStatus {
     assertTrue(status.getWrittenRecords().isEmpty());
     assertEquals(2000, status.getTotalRecords());
   }
+
+  @Test
+  public void testSuccessWithEventTime() {
+    // test with empty eventTime
+    WriteStatus status = new WriteStatus(false, 1.0);
+    status.setStat(new HoodieWriteStat());
+    for (int i = 0; i < 1000; i++) {
+      Map<String, String> metadata = new HashMap<>();
+      metadata.put(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY, "");
+      status.markSuccess(mock(HoodieRecord.class), Option.of(metadata));
+    }
+    assertEquals(1000, status.getTotalRecords());
+    assertFalse(status.hasErrors());
+    assertNull(status.getStat().getMaxEventTime());
+    assertNull(status.getStat().getMinEventTime());
+
+    // test with null eventTime
+    status = new WriteStatus(false, 1.0);
+    status.setStat(new HoodieWriteStat());
+    for (int i = 0; i < 1000; i++) {
+      Map<String, String> metadata = new HashMap<>();
+      metadata.put(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY, null);
+      status.markSuccess(mock(HoodieRecord.class), Option.of(metadata));
+    }
+    assertEquals(1000, status.getTotalRecords());
+    assertFalse(status.hasErrors());
+    assertNull(status.getStat().getMaxEventTime());
+    assertNull(status.getStat().getMinEventTime());
+
+    // test with seconds eventTime
+    status = new WriteStatus(false, 1.0);
+    status.setStat(new HoodieWriteStat());
+    long minSeconds = 0L;
+    long maxSeconds = 0L;
+    for (int i = 0; i < 1000; i++) {
+      Map<String, String> metadata = new HashMap<>();
+      long eventTime = System.currentTimeMillis() / 1000;
+      if (i == 0) {
+        minSeconds = eventTime;
+      } else if (i == 999) {
+        maxSeconds = eventTime;
+      }
+      metadata.put(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY, String.valueOf(eventTime));
+      status.markSuccess(mock(HoodieRecord.class), Option.of(metadata));
+    }
+    assertEquals(1000, status.getTotalRecords());
+    assertFalse(status.hasErrors());
+    assertEquals(maxSeconds * 1000L, status.getStat().getMaxEventTime());
+    assertEquals(minSeconds * 1000L, status.getStat().getMinEventTime());
+
+    // test with millis eventTime
+    status = new WriteStatus(false, 1.0);
+    status.setStat(new HoodieWriteStat());
+    minSeconds = 0L;
+    maxSeconds = 0L;
+    for (int i = 0; i < 1000; i++) {
+      Map<String, String> metadata = new HashMap<>();
+      long eventTime = System.currentTimeMillis();
+      if (i == 0) {
+        minSeconds = eventTime;
+      } else if (i == 999) {
+        maxSeconds = eventTime;
+      }
+      metadata.put(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY, String.valueOf(eventTime));
+      status.markSuccess(mock(HoodieRecord.class), Option.of(metadata));
+    }
+    assertEquals(1000, status.getTotalRecords());
+    assertFalse(status.hasErrors());
+    assertEquals(maxSeconds, status.getStat().getMaxEventTime());
+    assertEquals(minSeconds, status.getStat().getMinEventTime());
+
+    // test with error format eventTime
+    status = new WriteStatus(false, 1.0);
+    status.setStat(new HoodieWriteStat());
+    for (int i = 0; i < 1000; i++) {
+      Map<String, String> metadata = new HashMap<>();
+      metadata.put(DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY, String.valueOf(i));
+      status.markSuccess(mock(HoodieRecord.class), Option.of(metadata));
+    }
+    assertEquals(1000, status.getTotalRecords());
+    assertFalse(status.hasErrors());
+    assertNull(status.getStat().getMaxEventTime());
+    assertNull(status.getStat().getMinEventTime());
+
+  }
 }


[hudi] 06/07: [MINOR] Fixing CTAS configuration not propagated properly (#7832)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 78a2c647ceb6122738f68898d4fa80f6e8b472c2
Author: Alexey Kudinkin <al...@gmail.com>
AuthorDate: Fri Feb 3 12:57:00 2023 -0800

    [MINOR] Fixing CTAS configuration not propagated properly  (#7832)
    
    This change addresses issue of CTAS erroneously performing de-duplication, due to the fact that
    
     - It reuses `InsenrtIntoHoodieTableCommand` infrastructure to insert the data
     - It provides an extra-options overriding default configs (tuned for Insert Into statement) specifically for CTAS
     - Extra options not being merged into the final config (at some point after 0.12.2), resulted into it by default doing de-duplication
---
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      |  2 +-
 .../apache/spark/sql/hudi/TestCreateTable.scala    | 28 ++++++++++++++++++++++
 2 files changed, 29 insertions(+), 1 deletion(-)

diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index c8f01a12623..0c766f5135b 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -186,7 +186,7 @@ trait ProvidesHoodieConfig extends Logging {
       HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn)
     )
 
-    val overridingOpts = Map(
+    val overridingOpts = extraOptions ++ Map(
       "path" -> path,
       TABLE_TYPE.key -> tableType,
       TBL_NAME.key -> hoodieCatalogTable.tableName,
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
index 120c6adb6cd..47a8054252a 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.types._
 import org.junit.jupiter.api.Assertions.assertFalse
 
 import scala.collection.JavaConverters._
+import scala.collection.Seq
 
 class TestCreateTable extends HoodieSparkSqlTestBase {
 
@@ -1036,4 +1037,31 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
     checkKeyGenerator("org.apache.hudi.keygen.ComplexKeyGenerator", tableName)
     spark.sql(s"drop table $tableName")
   }
+
+  test("Test CTAS not de-duplicating (by default)") {
+    withRecordType() {
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        spark.sql(
+          s"""
+             |CREATE TABLE $tableName USING hudi
+             | LOCATION '${tmp.getCanonicalPath}/$tableName'
+             | TBLPROPERTIES (
+             |  primaryKey = 'id',
+             |  preCombineField = 'ts'
+             | )
+             | AS SELECT * FROM (
+             |  SELECT 1 as id, 'a1' as name, 10 as price, 1000 as ts
+             |  UNION ALL
+             |  SELECT 1 as id, 'a1' as name, 11 as price, 1001 as ts
+             | )
+       """.stripMargin)
+
+        checkAnswer(s"select id, name, price, ts from $tableName")(
+          Seq(1, "a1", 10.0, 1000),
+          Seq(1, "a1", 11.0, 1001)
+        )
+      }
+    }
+  }
 }


[hudi] 07/07: [HUDI-5496] Avoid unnecessary file system parsing to initialize metadata table for a new data table (#7841)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 4286d7982374809e5517852adfcdcd5bba5619ec
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Fri Feb 3 17:59:18 2023 -0800

    [HUDI-5496] Avoid unnecessary file system parsing to initialize metadata table for a new data table (#7841)
    
    - Optimizing instantiation of metadata table for a fresh table by avoiding file listing
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  |  2 +
 .../metadata/HoodieBackedTableMetadataWriter.java  | 64 ++++++++++++----------
 .../SparkHoodieBackedTableMetadataWriter.java      |  1 +
 .../internal/HoodieDataSourceInternalWriter.java   |  4 +-
 4 files changed, 39 insertions(+), 32 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index c3260914bd5..17956479762 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -519,6 +519,8 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient
    */
   protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata) {
     try {
+      context.setJobStatus(this.getClass().getSimpleName(),"Cleaning up marker directories for commit " + instantTime + " in table "
+          + config.getTableName());
       // Delete the marker directory for the instant.
       WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
           .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index a8356ff9c71..5e8367e2095 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -1086,39 +1086,45 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
 
     Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>();
 
-    List<DirectoryInfo> partitionInfoList = listAllPartitions(dataMetaClient);
-    Map<String, Map<String, Long>> partitionToFilesMap = partitionInfoList.stream()
-        .map(p -> {
-          String partitionName = HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
-          return Pair.of(partitionName, p.getFileNameToSizeMap());
-        })
-        .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
-    int totalDataFilesCount = partitionToFilesMap.values().stream().mapToInt(Map::size).sum();
-    List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet());
-
-    if (partitionTypes.contains(MetadataPartitionType.FILES)) {
-      // Record which saves the list of all partitions
-      HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions);
-      HoodieData<HoodieRecord> filesPartitionRecords = getFilesPartitionRecords(createInstantTime, partitionInfoList, allPartitionRecord);
-      ValidationUtils.checkState(filesPartitionRecords.count() == (partitions.size() + 1));
-      partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords);
-    }
+    // skip file system listing to populate metadata records if its a fresh table.
+    // this is applicable only if the table already has N commits and metadata is enabled at a later point in time.
+    if (createInstantTime.equals(SOLO_COMMIT_TIMESTAMP)) { // SOLO_COMMIT_TIMESTAMP will be the initial commit time in MDT for a fresh table.
+      // If not, last completed commit in data table will be chosen as the initial commit time.
+      LOG.info("Triggering empty Commit to metadata to initialize");
+    } else {
+      List<DirectoryInfo> partitionInfoList = listAllPartitions(dataMetaClient);
+      Map<String, Map<String, Long>> partitionToFilesMap = partitionInfoList.stream()
+          .map(p -> {
+            String partitionName = HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
+            return Pair.of(partitionName, p.getFileNameToSizeMap());
+          })
+          .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+      int totalDataFilesCount = partitionToFilesMap.values().stream().mapToInt(Map::size).sum();
+      List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet());
+
+      if (partitionTypes.contains(MetadataPartitionType.FILES)) {
+        // Record which saves the list of all partitions
+        HoodieRecord allPartitionRecord = HoodieMetadataPayload.createPartitionListRecord(partitions);
+        HoodieData<HoodieRecord> filesPartitionRecords = getFilesPartitionRecords(createInstantTime, partitionInfoList, allPartitionRecord);
+        ValidationUtils.checkState(filesPartitionRecords.count() == (partitions.size() + 1));
+        partitionToRecordsMap.put(MetadataPartitionType.FILES, filesPartitionRecords);
+      }
 
-    if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && totalDataFilesCount > 0) {
-      final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
-          engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams(), createInstantTime);
-      partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, recordsRDD);
-    }
+      if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) && totalDataFilesCount > 0) {
+        final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
+            engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams(), createInstantTime);
+        partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, recordsRDD);
+      }
 
-    if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) && totalDataFilesCount > 0) {
-      final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
-          engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams());
-      partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, recordsRDD);
+      if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) && totalDataFilesCount > 0) {
+        final HoodieData<HoodieRecord> recordsRDD = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+            engineContext, Collections.emptyMap(), partitionToFilesMap, getRecordsGenerationParams());
+        partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS, recordsRDD);
+      }
+      LOG.info("Committing " + partitions.size() + " partitions and " + totalDataFilesCount + " files to metadata");
     }
 
-    LOG.info("Committing " + partitions.size() + " partitions and " + totalDataFilesCount + " files to metadata");
-
     commit(createInstantTime, partitionToRecordsMap, false);
   }
 
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 81526c25bcc..23537f6f798 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -133,6 +133,7 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
     HoodieData<HoodieRecord> preppedRecords = prepRecords(partitionRecordsMap);
     JavaRDD<HoodieRecord> preppedRecordRDD = HoodieJavaRDD.getJavaRDD(preppedRecords);
 
+    engineContext.setJobStatus(this.getClass().getName(), "Committing " + instantTime + " to metadata table " + metadataWriteConfig.getTableName());
     try (SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, metadataWriteConfig)) {
       // rollback partially failed writes if any.
       if (writeClient.rollbackFailedWrites()) {
diff --git a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
index c4b21483e8f..11f5d5030b4 100644
--- a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
+++ b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
@@ -34,7 +34,6 @@ import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
 import org.apache.spark.sql.types.StructType;
 
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -51,7 +50,6 @@ public class HoodieDataSourceInternalWriter implements DataSourceWriter {
   private final DataSourceInternalWriterHelper dataSourceInternalWriterHelper;
   private final boolean populateMetaFields;
   private final Boolean arePartitionRecordsSorted;
-  private Map<String, String> extraMetadataMap = new HashMap<>();
 
   public HoodieDataSourceInternalWriter(String instantTime, HoodieWriteConfig writeConfig, StructType structType,
                                         SparkSession sparkSession, Configuration configuration, DataSourceOptions dataSourceOptions,
@@ -61,7 +59,7 @@ public class HoodieDataSourceInternalWriter implements DataSourceWriter {
     this.structType = structType;
     this.populateMetaFields = populateMetaFields;
     this.arePartitionRecordsSorted = arePartitionRecordsSorted;
-    this.extraMetadataMap = DataSourceUtils.getExtraMetadata(dataSourceOptions.asMap());
+    Map<String, String> extraMetadataMap = DataSourceUtils.getExtraMetadata(dataSourceOptions.asMap());
     this.dataSourceInternalWriterHelper = new DataSourceInternalWriterHelper(instantTime, writeConfig, structType,
         sparkSession, configuration, extraMetadataMap);
   }