You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yu...@apache.org on 2022/09/29 01:42:41 UTC

[hudi] branch release-0.12.1-rc1 updated (48dd28fa86 -> d201c8420a)

This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a change to branch release-0.12.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git


    from 48dd28fa86 [HUDI-4760] Fixing repeated trigger of data file creations w/ clustering (#6561)
     add cd44adbee3 [HUDI-4584] Cleaning up Spark utilities (#6351)
     add a496c8dd15 fix test
     new 0538630811 [HUDI-4914] Managed memory weight should be set when sort clustering is enabled (#6792)
     new f2360894b5 [HUDI-4910] Fix unknown variable or type "Cast" (#6778)
     new 6685101262 [HUDI-4918] Fix bugs about when trying to show the non -existing key from env, NullPointException occurs. (#6794)
     new efedf5dd5f [HUDI-4718] Add Kerberos kinit command support. (#6719)
     new 070a395c5e [HUDI-4902] Set default partitioner for SIMPLE BUCKET index (#6759)
     new f1661187f5 [MINOR] Update PR template with documentation update (#6748)
     new 00d80215df [HUDI-4907] Prevent single commit multi instant issue (#6766)
     new 3e82f13f6e [HUDI-4923] Fix flaky TestHoodieReadClient.testReadFilterExistAfterBulkInsertPrepped (#6801)
     new 1abdb57877 [HUDI-4848] Fixing repair deprecated partition tool (#6731)
     new dcec30f14c [HUDI-4913] Fix HoodieSnapshotExporter for writing to a different S3 bucket or FS (#6785)
     new 739da48f1c [HUDI-4453] Fix schema to include partition columns in bootstrap operation (#6676)
     new e2a9d3b790 [HUDI-2780] Fix the issue of Mor log skipping complete blocks when reading data (#4015)
     new b8b0256095 [HUDI-4924] Auto-tune dedup parallelism (#6802)
     new c9dd84088e [HUDI-4687] Avoid setAccessible which breaks strong encapsulation (#6657)
     new 1de1dfcd84 [MINOR] fixing validate async operations to poll completed clean instances (#6814)
     new 9a3fcc8456 [HUDI-4734] Deltastreamer table config change validation (#6753)
     new d201c8420a [HUDI-4934] Revert batch clean files (#6813)

The 17 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/PULL_REQUEST_TEMPLATE.md                   |   9 +
 .../commands/KerberosAuthenticationCommand.java    |  60 ++++
 .../apache/hudi/cli/commands/SparkEnvCommand.java  |   2 +-
 .../org/apache/hudi/cli/commands/SparkMain.java    |  23 +-
 .../org/apache/hudi/config/HoodieLayoutConfig.java |  11 +-
 .../action/clean/CleanPlanActionExecutor.java      |  11 +-
 .../hudi/table/action/clean/CleanPlanner.java      | 215 +++++++-------
 .../table/action/commit/HoodieWriteHelper.java     |   5 +-
 .../apache/hudi/config/TestHoodieWriteConfig.java  |  17 ++
 .../HoodieSparkBootstrapSchemaProvider.java        |  33 +--
 .../java/org/apache/hudi/data/HoodieJavaRDD.java   |   5 +
 .../apache/hudi/keygen/BuiltinKeyGenerator.java    |   2 +-
 .../table/upgrade/SparkUpgradeDowngradeHelper.java |   4 +-
 .../org/apache/hudi/AvroConversionUtils.scala      |   9 +-
 .../scala/org/apache/hudi/HoodieSparkUtils.scala   | 206 +------------
 .../scala/org/apache/hudi/util/PathUtils.scala     |  84 ++++++
 .../org/apache/hudi/util/SparkKeyGenUtils.scala    |   2 +-
 .../spark/sql/HoodieCatalystExpressionUtils.scala  | 152 +++++++---
 .../spark/sql/HoodieCatalystPlansUtils.scala       |   5 -
 .../org/apache/spark/sql/hudi/SparkAdapter.scala   |   2 +-
 .../apache/hudi/client/TestHoodieReadClient.java   |  15 +-
 .../TestHoodieClientOnCopyOnWriteStorage.java      |  14 +-
 .../org/apache/hudi/data/TestHoodieJavaRDD.java}   |  24 +-
 ...dieSparkCopyOnWriteTableArchiveWithReplace.java |   4 +-
 hudi-common/pom.xml                                |   5 +
 .../org/apache/hudi/common/data/HoodieData.java    |   9 +-
 .../apache/hudi/common/data/HoodieListData.java    |   5 +
 .../hudi/common/table/log/HoodieLogFileReader.java |  15 +-
 .../table/view/AbstractTableFileSystemView.java    |  16 +-
 .../table/view/PriorityBasedFileSystemView.java    |   5 -
 .../view/RemoteHoodieTableFileSystemView.java      |  12 -
 .../common/table/view/TableFileSystemView.java     |  14 +-
 .../hudi/common/util/ObjectSizeCalculator.java     | 321 +--------------------
 .../hudi/common/data/TestHoodieListData.java       |   8 +
 .../common/functional/TestHoodieLogFormat.java     |  79 +++--
 .../hudi/common/util/TestObjectSizeCalculator.java | 102 +++++++
 .../hudi/sink/StreamWriteOperatorCoordinator.java  |   2 +-
 .../hudi/sink/clustering/ClusteringOperator.java   |  39 +--
 .../org/apache/hudi/sink/meta/CkpMetadata.java     |   6 +-
 .../java/org/apache/hudi/sink/utils/Pipelines.java |  10 +-
 .../apache/hudi/sink/ITTestDataStreamWrite.java    |  12 +
 .../org/apache/hudi/sink/meta/TestCkpMetadata.java |   3 +-
 .../dag/nodes/ValidateAsyncOperations.java         |   2 +-
 .../org/apache/hudi/integ/ITTestHoodieSanity.java  |   4 +-
 .../main/scala/org/apache/hudi/DefaultSource.scala |   4 +-
 .../scala/org/apache/hudi/HoodieBaseRelation.scala |   7 +-
 .../org/apache/hudi/HoodieBootstrapRelation.scala  |   2 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |   3 +-
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |   3 +-
 .../apache/hudi/MergeOnReadSnapshotRelation.scala  |  55 ----
 .../sql/hudi/streaming/HoodieStreamSource.scala    |   2 +-
 .../hudi/command/payload/ExpressionCodeGen.scala   |   5 +-
 .../procedures/RunClusteringProcedure.scala        |  13 +-
 .../TestConvertFilterToCatalystExpression.scala    |  15 +-
 .../org/apache/hudi/TestDataSkippingUtils.scala    |  11 +-
 .../org/apache/hudi/TestHoodieSparkUtils.scala     |  57 ----
 .../scala/org/apache/hudi/util/TestPathUtils.scala |  85 ++++++
 .../spark/sql/hudi/TestMergeIntoTable2.scala       |  40 +++
 .../sql/HoodieSpark2CatalystExpressionUtils.scala  |   2 +-
 .../spark/sql/HoodieSpark2CatalystPlanUtils.scala  |   4 -
 .../apache/spark/sql/adapter/Spark2Adapter.scala   |  11 +-
 .../spark/sql/HoodieSpark3CatalystPlanUtils.scala  |   4 -
 .../spark/sql/adapter/BaseSpark3Adapter.scala      |   8 +-
 .../hudi/utilities/HoodieSnapshotExporter.java     |  64 ++--
 .../hudi/utilities/deltastreamer/DeltaSync.java    |   4 +-
 .../deltastreamer/HoodieDeltaStreamer.java         |   5 +
 .../functional/TestHoodieDeltaStreamer.java        |  58 +++-
 .../utilities/sources/TestJsonKafkaSource.java     |   2 +-
 packaging/hudi-flink-bundle/pom.xml                |   5 +
 packaging/hudi-hadoop-mr-bundle/pom.xml            |   5 +
 packaging/hudi-hive-sync-bundle/pom.xml            |   5 +
 packaging/hudi-integ-test-bundle/pom.xml           |   5 +
 packaging/hudi-kafka-connect-bundle/pom.xml        |   5 +
 packaging/hudi-presto-bundle/pom.xml               |   5 +
 packaging/hudi-spark-bundle/pom.xml                |   5 +
 packaging/hudi-trino-bundle/pom.xml                |   5 +
 packaging/hudi-utilities-bundle/pom.xml            |   5 +
 pom.xml                                            |   7 +
 78 files changed, 1053 insertions(+), 1046 deletions(-)
 create mode 100644 hudi-cli/src/main/java/org/apache/hudi/cli/commands/KerberosAuthenticationCommand.java
 create mode 100644 hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/PathUtils.scala
 copy hudi-client/{hudi-client-common/src/test/java/org/apache/hudi/optimize/TestHilbertCurveUtils.java => hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java} (62%)
 create mode 100644 hudi-common/src/test/java/org/apache/hudi/common/util/TestObjectSizeCalculator.java
 create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/TestPathUtils.scala


[hudi] 16/17: [HUDI-4734] Deltastreamer table config change validation (#6753)

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-0.12.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 9a3fcc8456796add306f6d3d2756afadf830f41a
Author: Jon Vexler <jo...@onehouse.ai>
AuthorDate: Wed Sep 28 17:12:27 2022 -0400

    [HUDI-4734] Deltastreamer table config change validation (#6753)
    
    
    Co-authored-by: sivabalan <n....@gmail.com>
---
 .../deltastreamer/HoodieDeltaStreamer.java         |  5 ++
 .../functional/TestHoodieDeltaStreamer.java        | 58 ++++++++++++++++------
 2 files changed, 49 insertions(+), 14 deletions(-)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 867aa05b30..74cb3e31df 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.utilities.deltastreamer;
 
 import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.HoodieWriterUtils;
 import org.apache.hudi.async.AsyncClusteringService;
 import org.apache.hudi.async.AsyncCompactService;
 import org.apache.hudi.async.HoodieAsyncService;
@@ -76,6 +77,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -651,6 +653,9 @@ public class HoodieDeltaStreamer implements Serializable {
                 + cfg.baseFileFormat);
         cfg.baseFileFormat = baseFileFormat;
         this.cfg.baseFileFormat = baseFileFormat;
+        Map<String,String> propsToValidate = new HashMap<>();
+        properties.get().forEach((k,v) -> propsToValidate.put(k.toString(),v.toString()));
+        HoodieWriterUtils.validateTableConfig(this.sparkSession, org.apache.hudi.HoodieConversionUtils.mapAsScalaImmutableMap(propsToValidate), meta.getTableConfig());
       } else {
         tableType = HoodieTableType.valueOf(cfg.tableType);
         if (cfg.baseFileFormat == null) {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index d94ff1477a..12c4c6fe0e 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -611,25 +611,16 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
 
     // Initial bulk insert
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
-    new HoodieDeltaStreamer(cfg, jsc).sync();
-    TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
-    TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
+    syncAndAssertRecordCount(cfg, 1000,  tableBasePath,  "00000",  1);
 
     // No new data => no commits.
     cfg.sourceLimit = 0;
-    new HoodieDeltaStreamer(cfg, jsc).sync();
-    TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
-    TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00000", tableBasePath, dfs, 1);
+    syncAndAssertRecordCount(cfg, 1000,  tableBasePath,  "00000",  1);
 
     // upsert() #1
     cfg.sourceLimit = 2000;
     cfg.operation = WriteOperationType.UPSERT;
-    new HoodieDeltaStreamer(cfg, jsc).sync();
-    TestHelpers.assertRecordCount(1950, tableBasePath, sqlContext);
-    TestHelpers.assertDistanceCount(1950, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
+    syncAndAssertRecordCount(cfg,1950, tableBasePath, "00001", 2);
     List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
     assertEquals(1950, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
 
@@ -663,6 +654,43 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     assertTrue(fieldNames.containsAll(expectedFieldNames));
   }
 
+  @Test
+  public void testModifiedTableConfigs() throws Exception {
+    String tableBasePath = dfsBasePath + "/test_table_modified_configs";
+
+    // Initial bulk insert
+    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
+    syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);
+
+    // No new data => no commits.
+    cfg.sourceLimit = 0;
+    syncAndAssertRecordCount(cfg, 1000, tableBasePath, "00000", 1);
+
+    // add disallowed config update to recordkey field. An exception should be thrown
+    cfg.sourceLimit = 2000;
+    cfg.operation = WriteOperationType.UPSERT;
+    cfg.configs.add(HoodieTableConfig.RECORDKEY_FIELDS.key() + "=differentval");
+    assertThrows(HoodieException.class, () -> syncAndAssertRecordCount(cfg,1000,tableBasePath,"00000",1));
+    List<Row> counts = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
+    assertEquals(1000, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
+
+
+    //perform the upsert and now with the original config, the commit should go through
+    HoodieDeltaStreamer.Config newCfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
+    newCfg.sourceLimit = 2000;
+    newCfg.operation = WriteOperationType.UPSERT;
+    syncAndAssertRecordCount(newCfg, 1950, tableBasePath, "00001", 2);
+    List<Row> counts2 = TestHelpers.countsPerCommit(tableBasePath, sqlContext);
+    assertEquals(1950, counts2.stream().mapToLong(entry -> entry.getLong(1)).sum());
+  }
+
+  private void syncAndAssertRecordCount(HoodieDeltaStreamer.Config cfg, Integer expected, String tableBasePath, String metadata, Integer totalCommits) throws Exception {
+    new HoodieDeltaStreamer(cfg, jsc).sync();
+    TestHelpers.assertRecordCount(expected, tableBasePath, sqlContext);
+    TestHelpers.assertDistanceCount(expected, tableBasePath, sqlContext);
+    TestHelpers.assertCommitMetadata(metadata, tableBasePath, dfs, totalCommits);
+  }
+
   @ParameterizedTest
   @MethodSource("schemaEvolArgs")
   public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema, boolean useSchemaPostProcessor) throws Exception {
@@ -1418,7 +1446,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
 
   @Test
   public void testPayloadClassUpdate() throws Exception {
-    String dataSetBasePath = dfsBasePath + "/test_dataset_mor";
+    String dataSetBasePath = dfsBasePath + "/test_dataset_mor_payload_class_update";
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dataSetBasePath, WriteOperationType.BULK_INSERT,
         Collections.singletonList(SqlQueryBasedTransformer.class.getName()), PROPS_FILENAME_TEST_SOURCE, false,
         true, false, null, "MERGE_ON_READ");
@@ -1572,6 +1600,8 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
       populateCommonProps(parquetProps, dfsBasePath);
     }
 
+    parquetProps.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
+
     parquetProps.setProperty("include", "base.properties");
     parquetProps.setProperty("hoodie.embed.timeline.server", "false");
     parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
@@ -2122,7 +2152,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     // No records should match the HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION.
     TestHelpers.assertNoPartitionMatch(tableBasePath, sqlContext, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
   }
-  
+
   void testDeltaStreamerWithSpecifiedOperation(final String tableBasePath, WriteOperationType operationType) throws Exception {
     // Initial insert
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);


[hudi] 13/17: [HUDI-4924] Auto-tune dedup parallelism (#6802)

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-0.12.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit b8b0256095a649d4cdfe5592295df153f6746ea4
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Wed Sep 28 08:03:41 2022 -0700

    [HUDI-4924] Auto-tune dedup parallelism (#6802)
---
 .../table/action/commit/HoodieWriteHelper.java     |  5 ++-
 .../java/org/apache/hudi/data/HoodieJavaRDD.java   |  5 +++
 .../TestHoodieClientOnCopyOnWriteStorage.java      | 14 ++++++--
 .../org/apache/hudi/data/TestHoodieJavaRDD.java    | 40 ++++++++++++++++++++++
 .../org/apache/hudi/common/data/HoodieData.java    |  9 +++--
 .../apache/hudi/common/data/HoodieListData.java    |  5 +++
 .../hudi/common/data/TestHoodieListData.java       |  8 +++++
 7 files changed, 80 insertions(+), 6 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
index 80762b1de8..b359550e8a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieWriteHelper.java
@@ -54,6 +54,9 @@ public class HoodieWriteHelper<T extends HoodieRecordPayload, R> extends BaseWri
   public HoodieData<HoodieRecord<T>> deduplicateRecords(
       HoodieData<HoodieRecord<T>> records, HoodieIndex<?, ?> index, int parallelism) {
     boolean isIndexingGlobal = index.isGlobal();
+    // Auto-tunes the parallelism for reduce transformation based on the number of data partitions
+    // in engine-specific representation
+    int reduceParallelism = Math.max(1, Math.min(records.getNumPartitions(), parallelism));
     return records.mapToPair(record -> {
       HoodieKey hoodieKey = record.getKey();
       // If index used is global, then records are expected to differ in their partitionPath
@@ -65,7 +68,7 @@ public class HoodieWriteHelper<T extends HoodieRecordPayload, R> extends BaseWri
       HoodieKey reducedKey = rec1.getData().equals(reducedData) ? rec1.getKey() : rec2.getKey();
 
       return new HoodieAvroRecord<>(reducedKey, reducedData);
-    }, parallelism).map(Pair::getRight);
+    }, reduceParallelism).map(Pair::getRight);
   }
 
 }
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
index 3964fa2d6b..ed9613bc15 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
@@ -102,6 +102,11 @@ public class HoodieJavaRDD<T> implements HoodieData<T> {
     return rddData.count();
   }
 
+  @Override
+  public int getNumPartitions() {
+    return rddData.getNumPartitions();
+  }
+
   @Override
   public <O> HoodieData<O> map(SerializableFunction<T, O> func) {
     return HoodieJavaRDD.of(rddData.map(func::apply));
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 4b7a0139dd..8aafbcd9f6 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -460,11 +460,17 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
 
     HoodieData<HoodieRecord<RawTripTestPayload>> records = HoodieJavaRDD.of(
         jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1));
+    HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+        .combineInput(true, true);
+    addConfigsForPopulateMetaFields(configBuilder, populateMetaFields);
 
     // Global dedup should be done based on recordKey only
     HoodieIndex index = mock(HoodieIndex.class);
     when(index.isGlobal()).thenReturn(true);
-    List<HoodieRecord<RawTripTestPayload>> dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1).collectAsList();
+    int dedupParallelism = records.getNumPartitions() + 100;
+    HoodieData<HoodieRecord<RawTripTestPayload>> dedupedRecsRdd = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, dedupParallelism);
+    List<HoodieRecord<RawTripTestPayload>> dedupedRecs = dedupedRecsRdd.collectAsList();
+    assertEquals(records.getNumPartitions(), dedupedRecsRdd.getNumPartitions());
     assertEquals(1, dedupedRecs.size());
     assertEquals(dedupedRecs.get(0).getPartitionPath(), recordThree.getPartitionPath());
     assertNodupesWithinPartition(dedupedRecs);
@@ -472,13 +478,15 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     // non-Global dedup should be done based on both recordKey and partitionPath
     index = mock(HoodieIndex.class);
     when(index.isGlobal()).thenReturn(false);
-    dedupedRecs = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, 1).collectAsList();
+    dedupedRecsRdd = HoodieWriteHelper.newInstance().deduplicateRecords(records, index, dedupParallelism);
+    dedupedRecs = dedupedRecsRdd.collectAsList();
+    assertEquals(records.getNumPartitions(), dedupedRecsRdd.getNumPartitions());
     assertEquals(2, dedupedRecs.size());
     assertNodupesWithinPartition(dedupedRecs);
 
     // Perform write-action and check
     JavaRDD<HoodieRecord> recordList = jsc.parallelize(Arrays.asList(recordOne, recordTwo, recordThree), 1);
-    HoodieWriteConfig.Builder configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
+    configBuilder = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
         .combineInput(true, true);
     addConfigsForPopulateMetaFields(configBuilder, populateMetaFields);
 
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java
new file mode 100644
index 0000000000..7595888304
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.data;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHoodieJavaRDD extends HoodieClientTestBase {
+  @Test
+  public void testGetNumPartitions() {
+    int numPartitions = 6;
+    HoodieData<Integer> rddData = HoodieJavaRDD.of(jsc.parallelize(
+        IntStream.rangeClosed(0, 100).boxed().collect(Collectors.toList()), numPartitions));
+    assertEquals(numPartitions, rddData.getNumPartitions());
+  }
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
index 2d24e7dd12..1d56e63fad 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
@@ -67,14 +67,19 @@ public interface HoodieData<T> extends Serializable {
 
   /**
    * Returns number of objects held in the collection
-   *
+   * <p>
    * NOTE: This is a terminal operation
    */
   long count();
 
+  /**
+   * @return the number of data partitions in the engine-specific representation.
+   */
+  int getNumPartitions();
+
   /**
    * Maps every element in the collection using provided mapping {@code func}.
-   *
+   * <p>
    * This is an intermediate operation
    *
    * @param func serializable map function
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
index 0be9ec9fa7..b2a503a85b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
@@ -175,6 +175,11 @@ public class HoodieListData<T> extends HoodieBaseListData<T> implements HoodieDa
     return super.count();
   }
 
+  @Override
+  public int getNumPartitions() {
+    return 1;
+  }
+
   @Override
   public List<T> collectAsList() {
     return super.collectAsList();
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java
index 8da8be1338..ea19f128d1 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/data/TestHoodieListData.java
@@ -29,6 +29,7 @@ import org.junit.jupiter.params.provider.MethodSource;
 import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -64,4 +65,11 @@ class TestHoodieListData {
     assertEquals(3, originalListData.count());
     assertEquals(sourceList, originalListData.collectAsList());
   }
+
+  @Test
+  public void testGetNumPartitions() {
+    HoodieData<Integer> listData = HoodieListData.eager(
+        IntStream.rangeClosed(0, 100).boxed().collect(Collectors.toList()));
+    assertEquals(1, listData.getNumPartitions());
+  }
 }


[hudi] 08/17: [HUDI-4923] Fix flaky TestHoodieReadClient.testReadFilterExistAfterBulkInsertPrepped (#6801)

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-0.12.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 3e82f13f6e47a24f5792e0c411d5992c5c1c43b4
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Tue Sep 27 04:13:26 2022 -0700

    [HUDI-4923] Fix flaky TestHoodieReadClient.testReadFilterExistAfterBulkInsertPrepped (#6801)
    
    
    
    Co-authored-by: Raymond Xu <27...@users.noreply.github.com>
---
 .../java/org/apache/hudi/client/TestHoodieReadClient.java | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
index 5ff92fe197..bc1d6e03c0 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieReadClient.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.testutils.HoodieClientTestBase;
 
 import org.apache.spark.api.java.JavaPairRDD;
@@ -32,6 +33,7 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -42,12 +44,23 @@ import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
-@SuppressWarnings("unchecked")
 /**
  * Test-cases for covering HoodieReadClient APIs
  */
+@SuppressWarnings("unchecked")
 public class TestHoodieReadClient extends HoodieClientTestBase {
 
+  @Override
+  protected void initPath() {
+    try {
+      java.nio.file.Path basePath = tempDir.resolve("dataset");
+      java.nio.file.Files.createDirectories(basePath);
+      this.basePath = basePath.toUri().toString();
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+  }
+
   /**
    * Test ReadFilter API after writing new records using HoodieWriteClient.insert.
    */


[hudi] 05/17: [HUDI-4902] Set default partitioner for SIMPLE BUCKET index (#6759)

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-0.12.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 070a395c5eb51ae2edacdbb932bebe6d86c59409
Author: Manu <36...@users.noreply.github.com>
AuthorDate: Tue Sep 27 07:45:12 2022 +0800

    [HUDI-4902] Set default partitioner for SIMPLE BUCKET index (#6759)
---
 .../java/org/apache/hudi/config/HoodieLayoutConfig.java | 11 ++++++++++-
 .../org/apache/hudi/config/TestHoodieWriteConfig.java   | 17 +++++++++++++++++
 2 files changed, 27 insertions(+), 1 deletion(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLayoutConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLayoutConfig.java
index 1d0810696d..0579ee3d62 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLayoutConfig.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLayoutConfig.java
@@ -48,6 +48,9 @@ public class HoodieLayoutConfig extends HoodieConfig {
       .noDefaultValue()
       .withDocumentation("Partitioner class, it is used to distribute data in a specific way.");
 
+  public static final String SIMPLE_BUCKET_LAYOUT_PARTITIONER_CLASS_NAME =
+      "org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner";
+
   private HoodieLayoutConfig() {
     super();
   }
@@ -80,8 +83,14 @@ public class HoodieLayoutConfig extends HoodieConfig {
     }
 
     private void setDefault() {
-      if (layoutConfig.contains(HoodieIndexConfig.INDEX_TYPE.key()) && layoutConfig.getString(HoodieIndexConfig.INDEX_TYPE.key()).equals(HoodieIndex.IndexType.BUCKET.name())) {
+      if (layoutConfig.contains(HoodieIndexConfig.INDEX_TYPE.key())
+          && layoutConfig.getString(HoodieIndexConfig.INDEX_TYPE.key()).equals(HoodieIndex.IndexType.BUCKET.name())) {
         layoutConfig.setDefaultValue(LAYOUT_TYPE, HoodieStorageLayout.LayoutType.BUCKET.name());
+
+        // Currently, the partitioner of the SIMPLE bucket index is supported by SparkBucketIndexPartitioner only.
+        if ("SIMPLE".equals(layoutConfig.getString(HoodieIndexConfig.BUCKET_INDEX_ENGINE_TYPE))) {
+          layoutConfig.setDefaultValue(LAYOUT_PARTITIONER_CLASS_NAME, SIMPLE_BUCKET_LAYOUT_PARTITIONER_CLASS_NAME);
+        }
       }
       layoutConfig.setDefaultValue(LAYOUT_TYPE, LAYOUT_TYPE.defaultValue());
     }
diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index 0adbb998a0..e956668d0c 100644
--- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -371,6 +371,23 @@ public class TestHoodieWriteConfig {
         HoodieFailedWritesCleaningPolicy.LAZY, FileSystemBasedLockProviderTestClass.class.getName());
   }
 
+  @Test
+  public void testSimpleBucketIndexPartitionerConfig() {
+    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp")
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET)
+            .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE).build())
+        .build();
+    assertEquals(HoodieLayoutConfig.SIMPLE_BUCKET_LAYOUT_PARTITIONER_CLASS_NAME, writeConfig.getString(HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME));
+
+    HoodieWriteConfig overwritePartitioner = HoodieWriteConfig.newBuilder().withPath("/tmp")
+        .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET)
+            .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE)
+            .build())
+        .withLayoutConfig(HoodieLayoutConfig.newBuilder().withLayoutPartitioner("org.apache.hudi.table.action.commit.UpsertPartitioner").build())
+        .build();
+    assertEquals("org.apache.hudi.table.action.commit.UpsertPartitioner", overwritePartitioner.getString(HoodieLayoutConfig.LAYOUT_PARTITIONER_CLASS_NAME));
+  }
+
   private HoodieWriteConfig createWriteConfig(Map<String, String> configs) {
     final Properties properties = new Properties();
     configs.forEach(properties::setProperty);


[hudi] 09/17: [HUDI-4848] Fixing repair deprecated partition tool (#6731)

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-0.12.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 1abdb5787748aa2dde56f7c9d8d06f91ae2b5119
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Tue Sep 27 12:02:35 2022 -0700

    [HUDI-4848] Fixing repair deprecated partition tool (#6731)
---
 .../org/apache/hudi/cli/commands/SparkMain.java    | 23 ++++++++++++++++------
 1 file changed, 17 insertions(+), 6 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index e43a5d037e..6649eaf766 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -59,6 +59,7 @@ import org.apache.hudi.utilities.HoodieCompactor;
 import org.apache.hudi.utilities.deltastreamer.BootstrapExecutor;
 import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -67,6 +68,7 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.StructType;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -456,8 +458,15 @@ public class SparkMain {
       HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build();
       Map<String, String> propsMap = getPropsForRewrite(metaClient);
       rewriteRecordsToNewPartition(basePath, newPartition, recordsToRewrite, metaClient, propsMap);
-      // after re-writing, we can safely delete older data.
+      // after re-writing, we can safely delete older partition.
       deleteOlderPartition(basePath, oldPartition, recordsToRewrite, propsMap);
+      // also, we can physically delete the old partition.
+      FileSystem fs = FSUtils.getFs(new Path(basePath), metaClient.getHadoopConf());
+      try {
+        fs.delete(new Path(basePath, oldPartition), true);
+      } catch (IOException e) {
+        LOG.warn("Failed to delete older partition " + basePath);
+      }
     }
     return 0;
   }
@@ -473,10 +482,14 @@ public class SparkMain {
   }
 
   private static void rewriteRecordsToNewPartition(String basePath, String newPartition, Dataset<Row> recordsToRewrite, HoodieTableMetaClient metaClient, Map<String, String> propsMap) {
-    recordsToRewrite.withColumn(metaClient.getTableConfig().getPartitionFieldProp(), functions.lit(newPartition))
+    String partitionFieldProp = metaClient.getTableConfig().getPartitionFieldProp();
+    StructType structType = recordsToRewrite.schema();
+    int partitionIndex = structType.fieldIndex(partitionFieldProp);
+
+    recordsToRewrite.withColumn(metaClient.getTableConfig().getPartitionFieldProp(), functions.lit(null).cast(structType.apply(partitionIndex).dataType()))
         .write()
         .options(propsMap)
-        .option("hoodie.datasource.write.operation", "insert")
+        .option("hoodie.datasource.write.operation", WriteOperationType.BULK_INSERT.value())
         .format("hudi")
         .mode("Append")
         .save(basePath);
@@ -484,10 +497,8 @@ public class SparkMain {
 
   private static Dataset<Row> getRecordsToRewrite(String basePath, String oldPartition, SQLContext sqlContext) {
     return sqlContext.read()
-        .option("hoodie.datasource.read.extract.partition.values.from.path", "false")
         .format("hudi")
-        .load(basePath)
-        .filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " == '" + oldPartition + "'")
+        .load(basePath + "/" + oldPartition)
         .drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
         .drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD)
         .drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)


[hudi] 04/17: [HUDI-4718] Add Kerberos kinit command support. (#6719)

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-0.12.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit efedf5dd5fd0e93d84b7bffc8310713609f4a39e
Author: Paul Zhang <xz...@126.com>
AuthorDate: Mon Sep 26 22:11:10 2022 +0800

    [HUDI-4718] Add Kerberos kinit command support. (#6719)
---
 .../commands/KerberosAuthenticationCommand.java    | 60 ++++++++++++++++++++++
 1 file changed, 60 insertions(+)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/KerberosAuthenticationCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/KerberosAuthenticationCommand.java
new file mode 100644
index 0000000000..d79279a315
--- /dev/null
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/KerberosAuthenticationCommand.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cli.commands;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.springframework.shell.standard.ShellComponent;
+import org.springframework.shell.standard.ShellMethod;
+import org.springframework.shell.standard.ShellOption;
+
+import java.io.IOException;
+
+/**
+ * CLI command to perform Kerberos authentication.
+ */
+@ShellComponent
+public class KerberosAuthenticationCommand {
+
+  @ShellMethod(key = "kerberos kinit", value = "Perform Kerberos authentication")
+  public String performKerberosAuthentication(
+          @ShellOption(value = "--krb5conf", help = "Path to krb5.conf", defaultValue = "/etc/krb5.conf") String krb5ConfPath,
+          @ShellOption(value = "--principal", help = "Kerberos principal") String principal,
+          @ShellOption(value = "--keytab", help = "Path to keytab") String keytabPath) throws IOException {
+
+    System.out.println("Perform Kerberos authentication");
+    System.out.println("Parameters:");
+    System.out.println("--krb5conf: " + krb5ConfPath);
+    System.out.println("--principal: " + principal);
+    System.out.println("--keytab: " + keytabPath);
+
+    System.setProperty("java.security.krb5.conf", krb5ConfPath);
+    Configuration conf = new Configuration();
+    conf.set("hadoop.security.authentication", "kerberos");
+    conf.set("keytab.file", keytabPath);
+    conf.set("kerberos.principal", principal);
+    UserGroupInformation.setConfiguration(conf);
+    UserGroupInformation.loginUserFromKeytab(principal, keytabPath);
+
+    System.out.println("Kerberos current user: " + UserGroupInformation.getCurrentUser());
+    System.out.println("Kerberos login user: " + UserGroupInformation.getLoginUser());
+
+    return "Kerberos authentication success";
+  }
+}


[hudi] 02/17: [HUDI-4910] Fix unknown variable or type "Cast" (#6778)

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-0.12.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit f2360894b5a48725957fb97972c5e9812d73c56b
Author: KnightChess <98...@qq.com>
AuthorDate: Mon Sep 26 22:03:40 2022 +0800

    [HUDI-4910] Fix unknown variable or type "Cast" (#6778)
---
 .../hudi/command/payload/ExpressionCodeGen.scala   |  5 +--
 .../spark/sql/hudi/TestMergeIntoTable2.scala       | 40 ++++++++++++++++++++++
 2 files changed, 43 insertions(+), 2 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionCodeGen.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionCodeGen.scala
index 947291d103..cd5b201f91 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionCodeGen.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionCodeGen.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.avro.AvroSerializer
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.codegen.Block.BlockHelper
 import org.apache.spark.sql.catalyst.expressions.codegen._
-import org.apache.spark.sql.catalyst.expressions.{BoundReference, Expression, GenericInternalRow, LeafExpression, UnsafeArrayData, UnsafeMapData, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.{BoundReference, Cast, Expression, GenericInternalRow, LeafExpression, UnsafeArrayData, UnsafeMapData, UnsafeRow}
 import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
 import org.apache.spark.sql.hudi.command.payload.ExpressionCodeGen.RECORD_NAME
 import org.apache.spark.sql.types.{DataType, Decimal}
@@ -122,7 +122,8 @@ object ExpressionCodeGen extends Logging {
       classOf[IndexedRecord].getName,
       classOf[AvroSerializer].getName,
       classOf[GenericRecord].getName,
-      classOf[GenericInternalRow].getName
+      classOf[GenericInternalRow].getName,
+      classOf[Cast].getName
     )
     evaluator.setImplementedInterfaces(Array(classOf[IExpressionEvaluator]))
     try {
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
index b77b5c3dbd..8e6acd1be5 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
@@ -673,4 +673,44 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
       )
     }
   }
+
+  test ("Test Merge into with String cast to Double") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      // Create a cow partitioned table.
+      spark.sql(
+        s"""
+           | create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long,
+           |  dt string
+           | ) using hudi
+           | tblproperties (
+           |  type = 'cow',
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+           | partitioned by(dt)
+           | location '${tmp.getCanonicalPath}'
+         """.stripMargin)
+      // Insert data
+      spark.sql(s"insert into $tableName select 1, 'a1', cast(10.0 as double), 999, '2021-03-21'")
+      spark.sql(
+        s"""
+           | merge into $tableName as t0
+           | using (
+           |  select 'a1' as name, 1 as id, '10.1' as price, 1000 as ts, '2021-03-21' as dt
+           | ) as s0
+           | on t0.id = s0.id
+           | when matched then update set t0.price = s0.price, t0.ts = s0.ts
+           | when not matched then insert *
+         """.stripMargin
+      )
+      checkAnswer(s"select id,name,price,dt from $tableName")(
+        Seq(1, "a1", 10.1, "2021-03-21")
+      )
+    }
+  }
 }


[hudi] 17/17: [HUDI-4934] Revert batch clean files (#6813)

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-0.12.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit d201c8420a5e1d999565fca9af04cb52638cbb72
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Wed Sep 28 15:51:45 2022 -0700

    [HUDI-4934] Revert batch clean files (#6813)
    
    * Revert "[HUDI-4792] Batch clean files to delete (#6580)"
    This reverts commit cbf9b83ca6d3dada14eea551a5bae25144ca0459.
---
 .../action/clean/CleanPlanActionExecutor.java      |  11 +-
 .../hudi/table/action/clean/CleanPlanner.java      | 215 ++++++++++-----------
 ...dieSparkCopyOnWriteTableArchiveWithReplace.java |   4 +-
 .../table/view/AbstractTableFileSystemView.java    |  16 +-
 .../table/view/PriorityBasedFileSystemView.java    |   5 -
 .../view/RemoteHoodieTableFileSystemView.java      |  12 --
 .../common/table/view/TableFileSystemView.java     |  14 +-
 7 files changed, 112 insertions(+), 165 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
index bd7ec798ed..7f3b437178 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java
@@ -42,7 +42,6 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -117,15 +116,9 @@ public class CleanPlanActionExecutor<T extends HoodieRecordPayload, I, K, O> ext
       context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + config.getTableName());
 
       Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context
-          .parallelize(partitionsToClean, cleanerParallelism)
-          .mapPartitions(partitionIterator -> {
-            List<String> partitionList = new ArrayList<>();
-            partitionIterator.forEachRemaining(partitionList::add);
-            Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanResult = planner.getDeletePaths(partitionList);
-            return cleanResult.entrySet().iterator();
-          }, false).collectAsList()
+          .map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism)
           .stream()
-          .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+          .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
 
       Map<String, List<HoodieCleanFileInfo>> cleanOps = cleanOpsWithPartitionMeta.entrySet().stream()
           .collect(Collectors.toMap(Map.Entry::getKey,
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index e85793d711..64e69b1d2a 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -58,7 +58,6 @@ import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -223,10 +222,10 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
    * policy is useful, if you are simply interested in querying the table, and you don't want too many versions for a
    * single file (i.e run it with versionsRetained = 1)
    */
-  private Map<String, Pair<Boolean, List<CleanFileInfo>>> getFilesToCleanKeepingLatestVersions(List<String> partitionPaths) {
-    LOG.info("Cleaning " + partitionPaths + ", retaining latest " + config.getCleanerFileVersionsRetained()
+  private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestVersions(String partitionPath) {
+    LOG.info("Cleaning " + partitionPath + ", retaining latest " + config.getCleanerFileVersionsRetained()
         + " file versions. ");
-    Map<String, Pair<Boolean, List<CleanFileInfo>>> map = new HashMap<>();
+    List<CleanFileInfo> deletePaths = new ArrayList<>();
     // Collect all the datafiles savepointed by all the savepoints
     List<String> savepointedFiles = hoodieTable.getSavepointTimestamps().stream()
         .flatMap(this::getSavepointedDataFiles)
@@ -234,48 +233,43 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
 
     // In this scenario, we will assume that once replaced a file group automatically becomes eligible for cleaning completely
     // In other words, the file versions only apply to the active file groups.
-    List<Pair<String, List<HoodieFileGroup>>> fileGroupsPerPartition = fileSystemView.getAllFileGroups(partitionPaths).collect(Collectors.toList());
-    for (Pair<String, List<HoodieFileGroup>> partitionFileGroupList : fileGroupsPerPartition) {
-      List<CleanFileInfo> deletePaths = new ArrayList<>(getReplacedFilesEligibleToClean(savepointedFiles, partitionFileGroupList.getLeft(), Option.empty()));
-      boolean toDeletePartition = false;
-      for (HoodieFileGroup fileGroup : partitionFileGroupList.getRight()) {
-        int keepVersions = config.getCleanerFileVersionsRetained();
-        // do not cleanup slice required for pending compaction
-        Iterator<FileSlice> fileSliceIterator =
-            fileGroup.getAllFileSlices()
-                .filter(fs -> !isFileSliceNeededForPendingCompaction(fs))
-                .iterator();
-        if (isFileGroupInPendingCompaction(fileGroup)) {
-          // We have already saved the last version of file-groups for pending compaction Id
-          keepVersions--;
-        }
+    deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, Option.empty()));
+    boolean toDeletePartition = false;
+    List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
+    for (HoodieFileGroup fileGroup : fileGroups) {
+      int keepVersions = config.getCleanerFileVersionsRetained();
+      // do not cleanup slice required for pending compaction
+      Iterator<FileSlice> fileSliceIterator =
+          fileGroup.getAllFileSlices().filter(fs -> !isFileSliceNeededForPendingCompaction(fs)).iterator();
+      if (isFileGroupInPendingCompaction(fileGroup)) {
+        // We have already saved the last version of file-groups for pending compaction Id
+        keepVersions--;
+      }
 
-        while (fileSliceIterator.hasNext() && keepVersions > 0) {
-          // Skip this most recent version
-          fileSliceIterator.next();
-          keepVersions--;
-        }
-        // Delete the remaining files
-        while (fileSliceIterator.hasNext()) {
-          FileSlice nextSlice = fileSliceIterator.next();
-          Option<HoodieBaseFile> dataFile = nextSlice.getBaseFile();
-          if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) {
-            // do not clean up a savepoint data file
-            continue;
-          }
-          deletePaths.addAll(getCleanFileInfoForSlice(nextSlice));
-        }
+      while (fileSliceIterator.hasNext() && keepVersions > 0) {
+        // Skip this most recent version
+        fileSliceIterator.next();
+        keepVersions--;
       }
-      // if there are no valid file groups for the partition, mark it to be deleted
-      if (partitionFileGroupList.getValue().isEmpty()) {
-        toDeletePartition = true;
+      // Delete the remaining files
+      while (fileSliceIterator.hasNext()) {
+        FileSlice nextSlice = fileSliceIterator.next();
+        Option<HoodieBaseFile> dataFile = nextSlice.getBaseFile();
+        if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) {
+          // do not clean up a savepoint data file
+          continue;
+        }
+        deletePaths.addAll(getCleanFileInfoForSlice(nextSlice));
       }
-      map.put(partitionFileGroupList.getLeft(), Pair.of(toDeletePartition, deletePaths));
     }
-    return map;
+    // if there are no valid file groups for the partition, mark it to be deleted
+    if (fileGroups.isEmpty()) {
+      toDeletePartition = true;
+    }
+    return Pair.of(toDeletePartition, deletePaths);
   }
 
-  private Map<String, Pair<Boolean, List<CleanFileInfo>>> getFilesToCleanKeepingLatestCommits(List<String> partitionPath) {
+  private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(String partitionPath) {
     return getFilesToCleanKeepingLatestCommits(partitionPath, config.getCleanerCommitsRetained(), HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
   }
 
@@ -296,9 +290,9 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
    * @return A {@link Pair} whose left is boolean indicating whether partition itself needs to be deleted,
    *         and right is a list of {@link CleanFileInfo} about the files in the partition that needs to be deleted.
    */
-  private Map<String, Pair<Boolean, List<CleanFileInfo>>> getFilesToCleanKeepingLatestCommits(List<String> partitionPaths, int commitsRetained, HoodieCleaningPolicy policy) {
-    LOG.info("Cleaning " + partitionPaths + ", retaining latest " + commitsRetained + " commits. ");
-    Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanFileInfoPerPartitionMap = new HashMap<>();
+  private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(String partitionPath, int commitsRetained, HoodieCleaningPolicy policy) {
+    LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. ");
+    List<CleanFileInfo> deletePaths = new ArrayList<>();
 
     // Collect all the datafiles savepointed by all the savepoints
     List<String> savepointedFiles = hoodieTable.getSavepointTimestamps().stream()
@@ -310,79 +304,75 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
     if (commitTimeline.countInstants() > commitsRetained) {
       Option<HoodieInstant> earliestCommitToRetainOption = getEarliestCommitToRetain();
       HoodieInstant earliestCommitToRetain = earliestCommitToRetainOption.get();
+      // all replaced file groups before earliestCommitToRetain are eligible to clean
+      deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, earliestCommitToRetainOption));
       // add active files
-      List<Pair<String, List<HoodieFileGroup>>> fileGroupsPerPartition = fileSystemView.getAllFileGroups(partitionPaths).collect(Collectors.toList());
-      for (Pair<String, List<HoodieFileGroup>> partitionFileGroupList : fileGroupsPerPartition) {
-        List<CleanFileInfo> deletePaths = new ArrayList<>(getReplacedFilesEligibleToClean(savepointedFiles, partitionFileGroupList.getLeft(), earliestCommitToRetainOption));
-        // all replaced file groups before earliestCommitToRetain are eligible to clean
-        deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionFileGroupList.getLeft(), earliestCommitToRetainOption));
-        for (HoodieFileGroup fileGroup : partitionFileGroupList.getRight()) {
-          List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList());
-
-          if (fileSliceList.isEmpty()) {
+      List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
+      for (HoodieFileGroup fileGroup : fileGroups) {
+        List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList());
+
+        if (fileSliceList.isEmpty()) {
+          continue;
+        }
+
+        String lastVersion = fileSliceList.get(0).getBaseInstantTime();
+        String lastVersionBeforeEarliestCommitToRetain =
+            getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain);
+
+        // Ensure there are more than 1 version of the file (we only clean old files from updates)
+        // i.e always spare the last commit.
+        for (FileSlice aSlice : fileSliceList) {
+          Option<HoodieBaseFile> aFile = aSlice.getBaseFile();
+          String fileCommitTime = aSlice.getBaseInstantTime();
+          if (aFile.isPresent() && savepointedFiles.contains(aFile.get().getFileName())) {
+            // do not clean up a savepoint data file
             continue;
           }
 
-          String lastVersion = fileSliceList.get(0).getBaseInstantTime();
-          String lastVersionBeforeEarliestCommitToRetain =
-              getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain);
-
-          // Ensure there are more than 1 version of the file (we only clean old files from updates)
-          // i.e always spare the last commit.
-          for (FileSlice aSlice : fileSliceList) {
-            Option<HoodieBaseFile> aFile = aSlice.getBaseFile();
-            String fileCommitTime = aSlice.getBaseInstantTime();
-            if (aFile.isPresent() && savepointedFiles.contains(aFile.get().getFileName())) {
-              // do not clean up a savepoint data file
+          if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
+            // Dont delete the latest commit and also the last commit before the earliest commit we
+            // are retaining
+            // The window of commit retain == max query run time. So a query could be running which
+            // still
+            // uses this file.
+            if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) {
+              // move on to the next file
               continue;
             }
-
-            if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
-              // Dont delete the latest commit and also the last commit before the earliest commit we
-              // are retaining
-              // The window of commit retain == max query run time. So a query could be running which
-              // still
-              // uses this file.
-              if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) {
-                // move on to the next file
-                continue;
-              }
-            } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
-              // This block corresponds to KEEP_LATEST_BY_HOURS policy
-              // Do not delete the latest commit.
-              if (fileCommitTime.equals(lastVersion)) {
-                // move on to the next file
-                continue;
-              }
+          } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
+            // This block corresponds to KEEP_LATEST_BY_HOURS policy
+            // Do not delete the latest commit.
+            if (fileCommitTime.equals(lastVersion)) {
+              // move on to the next file
+              continue;
             }
+          }
 
-            // Always keep the last commit
-            if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline
-                .compareTimestamps(earliestCommitToRetain.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) {
-              // this is a commit, that should be cleaned.
-              aFile.ifPresent(hoodieDataFile -> {
-                deletePaths.add(new CleanFileInfo(hoodieDataFile.getPath(), false));
-                if (hoodieDataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) {
-                  deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true));
-                }
-              });
-              if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
-                // If merge on read, then clean the log files for the commits as well
-                deletePaths.addAll(
-                    aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
-                        .collect(Collectors.toList()));
+          // Always keep the last commit
+          if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline
+              .compareTimestamps(earliestCommitToRetain.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) {
+            // this is a commit, that should be cleaned.
+            aFile.ifPresent(hoodieDataFile -> {
+              deletePaths.add(new CleanFileInfo(hoodieDataFile.getPath(), false));
+              if (hoodieDataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) {
+                deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true));
               }
+            });
+            if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) {
+              // 1. If merge on read, then clean the log files for the commits as well;
+              // 2. If change log capture is enabled, clean the log files no matter the table type is mor or cow.
+              deletePaths.addAll(aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
+                  .collect(Collectors.toList()));
             }
           }
         }
-        // if there are no valid file groups for the partition, mark it to be deleted
-        if (partitionFileGroupList.getValue().isEmpty()) {
-          toDeletePartition = true;
-        }
-        cleanFileInfoPerPartitionMap.put(partitionFileGroupList.getLeft(), Pair.of(toDeletePartition, deletePaths));
+      }
+      // if there are no valid file groups for the partition, mark it to be deleted
+      if (fileGroups.isEmpty()) {
+        toDeletePartition = true;
       }
     }
-    return cleanFileInfoPerPartitionMap;
+    return Pair.of(toDeletePartition, deletePaths);
   }
 
   /**
@@ -390,11 +380,10 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
    * all the files with commit time earlier than 5 hours will be removed. Also the latest file for any file group is retained.
    * This policy gives much more flexibility to users for retaining data for running incremental queries as compared to
    * KEEP_LATEST_COMMITS cleaning policy. The default number of hours is 5.
-   *
    * @param partitionPath partition path to check
    * @return list of files to clean
    */
-  private Map<String, Pair<Boolean, List<CleanFileInfo>>> getFilesToCleanKeepingLatestHours(List<String> partitionPath) {
+  private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestHours(String partitionPath) {
     return getFilesToCleanKeepingLatestCommits(partitionPath, 0, HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS);
   }
 
@@ -448,23 +437,21 @@ public class CleanPlanner<T extends HoodieRecordPayload, I, K, O> implements Ser
   /**
    * Returns files to be cleaned for the given partitionPath based on cleaning policy.
    */
-  public Map<String, Pair<Boolean, List<CleanFileInfo>>> getDeletePaths(List<String> partitionPaths) {
+  public Pair<Boolean, List<CleanFileInfo>> getDeletePaths(String partitionPath) {
     HoodieCleaningPolicy policy = config.getCleanerPolicy();
-    Map<String, Pair<Boolean, List<CleanFileInfo>>> deletePaths;
+    Pair<Boolean, List<CleanFileInfo>> deletePaths;
     if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
-      deletePaths = getFilesToCleanKeepingLatestCommits(partitionPaths);
+      deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath);
     } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
-      deletePaths = getFilesToCleanKeepingLatestVersions(partitionPaths);
+      deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath);
     } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
-      deletePaths = getFilesToCleanKeepingLatestHours(partitionPaths);
+      deletePaths = getFilesToCleanKeepingLatestHours(partitionPath);
     } else {
       throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name());
     }
-    for (String partitionPath : deletePaths.keySet()) {
-      LOG.info(deletePaths.get(partitionPath).getRight().size() + " patterns used to delete in partition path:" + partitionPath);
-      if (deletePaths.get(partitionPath).getLeft()) {
-        LOG.info("Partition " + partitionPath + " to be deleted");
-      }
+    LOG.info(deletePaths.getValue().size() + " patterns used to delete in partition path:" + partitionPath);
+    if (deletePaths.getKey()) {
+      LOG.info("Partition " + partitionPath + " to be deleted");
     }
     return deletePaths;
   }
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
index 967e313f4e..baff4ebac8 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
@@ -57,7 +57,7 @@ public class TestHoodieSparkCopyOnWriteTableArchiveWithReplace extends SparkClie
     HoodieWriteConfig writeConfig = getConfigBuilder(true)
         .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build())
         .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build())
-            .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).withMaxNumDeltaCommitsBeforeCompaction(2).build())
+            .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).build())
         .build();
     try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig);
          HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(DEFAULT_PARTITION_PATHS)) {
@@ -81,7 +81,7 @@ public class TestHoodieSparkCopyOnWriteTableArchiveWithReplace extends SparkClie
       client.startCommitWithTime(instantTime4, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
       client.deletePartitions(Arrays.asList(DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH), instantTime4);
 
-      // 2nd write batch; 4 commits for the 3rd partition; the 4th commit to trigger archiving the replace commit
+      // 2nd write batch; 4 commits for the 4th partition; the 4th commit to trigger archiving the replace commit
       for (int i = 5; i < 9; i++) {
         String instantTime = HoodieActiveTimeline.createNewInstantTime(i * 1000);
         client.startCommitWithTime(instantTime);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index 89a184bf49..8cfd92d01f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -116,7 +116,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
 
   /**
    * Refresh commits timeline.
-   *
+   * 
    * @param visibleActiveTimeline Visible Active Timeline
    */
   protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) {
@@ -750,20 +750,6 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
     return getAllFileGroupsIncludingReplaced(partitionStr).filter(fg -> !isFileGroupReplaced(fg));
   }
 
-  @Override
-  public final Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths) {
-    return getAllFileGroupsIncludingReplaced(partitionPaths)
-        .map(pair -> Pair.of(pair.getLeft(), pair.getRight().stream().filter(fg -> !isFileGroupReplaced(fg)).collect(Collectors.toList())));
-  }
-
-  private Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroupsIncludingReplaced(final List<String> partitionStrList) {
-    List<Pair<String, List<HoodieFileGroup>>> fileGroupPerPartitionList = new ArrayList<>();
-    for (String partitionStr : partitionStrList) {
-      fileGroupPerPartitionList.add(Pair.of(partitionStr, getAllFileGroupsIncludingReplaced(partitionStr).collect(Collectors.toList())));
-    }
-    return fileGroupPerPartitionList.stream();
-  }
-
   private Stream<HoodieFileGroup> getAllFileGroupsIncludingReplaced(final String partitionStr) {
     try {
       readLock.lock();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
index 9006bd45cb..ff44c7cef0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
@@ -204,11 +204,6 @@ public class PriorityBasedFileSystemView implements SyncableFileSystemView, Seri
     return execute(partitionPath, preferredView::getAllFileGroups, secondaryView::getAllFileGroups);
   }
 
-  @Override
-  public Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths) {
-    return execute(partitionPaths, preferredView::getAllFileGroups, secondaryView::getAllFileGroups);
-  }
-
   @Override
   public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) {
     return execute(maxCommitTime, partitionPath, preferredView::getReplacedFileGroupsBeforeOrOn, secondaryView::getReplacedFileGroupsBeforeOrOn);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
index 5e52767fe2..bd18ba22a2 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -51,11 +51,9 @@ import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
@@ -379,16 +377,6 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
     }
   }
 
-  @Override
-  public Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths) {
-    ArrayList<Pair<String, List<HoodieFileGroup>>> fileGroupPerPartitionList = new ArrayList<>();
-    for (String partitionPath : partitionPaths) {
-      Stream<HoodieFileGroup> fileGroup = getAllFileGroups(partitionPath);
-      fileGroupPerPartitionList.add(Pair.of(partitionPath, fileGroup.collect(Collectors.toList())));
-    }
-    return fileGroupPerPartitionList.stream();
-  }
-
   @Override
   public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) {
     Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
index 9c83c8f19c..c32e2cabb1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java
@@ -109,18 +109,18 @@ public interface TableFileSystemView {
     /**
      * Stream all latest file slices in given partition with precondition that commitTime(file) before maxCommitTime.
      *
-     * @param partitionPath                        Partition path
-     * @param maxCommitTime                        Max Instant Time
+     * @param partitionPath Partition path
+     * @param maxCommitTime Max Instant Time
      * @param includeFileSlicesInPendingCompaction include file-slices that are in pending compaction
      */
     Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime,
-                                                    boolean includeFileSlicesInPendingCompaction);
+        boolean includeFileSlicesInPendingCompaction);
 
     /**
      * Stream all "merged" file-slices before on an instant time If a file-group has a pending compaction request, the
      * file-slice before and after compaction request instant is merged and returned.
-     *
-     * @param partitionPath  Partition Path
+     * 
+     * @param partitionPath Partition Path
      * @param maxInstantTime Max Instant Time
      * @return
      */
@@ -149,12 +149,10 @@ public interface TableFileSystemView {
    */
   Stream<HoodieFileGroup> getAllFileGroups(String partitionPath);
 
-  Stream<Pair<String, List<HoodieFileGroup>>> getAllFileGroups(List<String> partitionPaths);
-
   /**
    * Return Pending Compaction Operations.
    *
-   * @return Pair<Pair < InstantTime, CompactionOperation>>
+   * @return Pair<Pair<InstantTime,CompactionOperation>>
    */
   Stream<Pair<String, CompactionOperation>> getPendingCompactionOperations();
 


[hudi] 12/17: [HUDI-2780] Fix the issue of Mor log skipping complete blocks when reading data (#4015)

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-0.12.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit e2a9d3b790c9192bf03b29a579b6fc4c30d5e8da
Author: hj2016 <hj...@163.com>
AuthorDate: Wed Sep 28 23:02:59 2022 +0800

    [HUDI-2780] Fix the issue of Mor log skipping complete blocks when reading data (#4015)
    
    
    Co-authored-by: huangjing02 <hu...@bilibili.com>
    Co-authored-by: sivabalan <n....@gmail.com>
---
 .../hudi/common/table/log/HoodieLogFileReader.java | 15 ++--
 .../common/functional/TestHoodieLogFormat.java     | 79 ++++++++++++++--------
 2 files changed, 57 insertions(+), 37 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index cb16c8b141..11d9e75f4b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -149,13 +149,14 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
   // for max of Integer size
   private HoodieLogBlock readBlock() throws IOException {
     int blockSize;
+    long blockStartPos = inputStream.getPos();
     try {
       // 1 Read the total size of the block
       blockSize = (int) inputStream.readLong();
     } catch (EOFException | CorruptedLogFileException e) {
       // An exception reading any of the above indicates a corrupt block
       // Create a corrupt block by finding the next MAGIC marker or EOF
-      return createCorruptBlock();
+      return createCorruptBlock(blockStartPos);
     }
 
     // We may have had a crash which could have written this block partially
@@ -163,7 +164,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
     // block) or EOF. If we did not find either of it, then this block is a corrupted block.
     boolean isCorrupted = isBlockCorrupted(blockSize);
     if (isCorrupted) {
-      return createCorruptBlock();
+      return createCorruptBlock(blockStartPos);
     }
 
     // 2. Read the version for this log format
@@ -249,14 +250,14 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader {
     return HoodieLogBlockType.values()[type];
   }
 
-  private HoodieLogBlock createCorruptBlock() throws IOException {
-    LOG.info("Log " + logFile + " has a corrupted block at " + inputStream.getPos());
-    long currentPos = inputStream.getPos();
+  private HoodieLogBlock createCorruptBlock(long blockStartPos) throws IOException {
+    LOG.info("Log " + logFile + " has a corrupted block at " + blockStartPos);
+    inputStream.seek(blockStartPos);
     long nextBlockOffset = scanForNextAvailableBlockOffset();
     // Rewind to the initial start and read corrupted bytes till the nextBlockOffset
-    inputStream.seek(currentPos);
+    inputStream.seek(blockStartPos);
     LOG.info("Next available block in " + logFile + " starts at " + nextBlockOffset);
-    int corruptedBlockSize = (int) (nextBlockOffset - currentPos);
+    int corruptedBlockSize = (int) (nextBlockOffset - blockStartPos);
     long contentPosition = inputStream.getPos();
     Option<byte[]> corruptedBytes = HoodieLogBlock.tryReadContent(inputStream, corruptedBlockSize, readBlockLazily);
     HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc =
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index c08c2f7029..1d15822ff1 100755
--- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -604,20 +604,11 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
 
   @Test
   public void testAppendAndReadOnCorruptedLog() throws IOException, URISyntaxException, InterruptedException {
-    Writer writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
-    List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
-    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
-    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
-    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
-    HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
-    writer.appendBlock(dataBlock);
-    writer.close();
+    HoodieLogFile logFile = addValidBlock("test-fileId1", "100", 100);
 
     // Append some arbit byte[] to the end of the log (mimics a partially written commit)
     fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
-    FSDataOutputStream outputStream = fs.append(writer.getLogFile().getPath());
+    FSDataOutputStream outputStream = fs.append(logFile.getPath());
     // create a block with
     outputStream.write(HoodieLogFormat.MAGIC);
     // Write out a length that does not confirm with the content
@@ -632,17 +623,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     outputStream.close();
 
     // Append a proper block that is of the missing length of the corrupted block
-    writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
-    records = SchemaTestUtil.generateTestRecords(0, 10);
-    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
-    dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
-    writer.appendBlock(dataBlock);
-    writer.close();
+    logFile = addValidBlock("test-fileId1", "100", 10);
 
     // First round of reads - we should be able to read the first block and then EOF
-    Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema());
+    Reader reader = HoodieLogFormat.newReader(fs, logFile, SchemaTestUtil.getSimpleSchema());
     assertTrue(reader.hasNext(), "First block should be available");
     reader.next();
     assertTrue(reader.hasNext(), "We should have corrupted block next");
@@ -655,7 +639,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     reader.close();
 
     // Simulate another failure back to back
-    outputStream = fs.append(writer.getLogFile().getPath());
+    outputStream = fs.append(logFile.getPath());
     // create a block with
     outputStream.write(HoodieLogFormat.MAGIC);
     // Write out a length that does not confirm with the content
@@ -670,17 +654,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     outputStream.close();
 
     // Should be able to append a new block
-    writer =
-        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-            .withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
-    records = SchemaTestUtil.generateTestRecords(0, 100);
-    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
-    dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
-    writer.appendBlock(dataBlock);
-    writer.close();
+    logFile = addValidBlock("test-fileId1", "100", 100);
 
     // Second round of reads - we should be able to read the first and last block
-    reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), SchemaTestUtil.getSimpleSchema());
+    reader = HoodieLogFormat.newReader(fs, logFile, SchemaTestUtil.getSimpleSchema());
     assertTrue(reader.hasNext(), "First block should be available");
     reader.next();
     assertTrue(reader.hasNext(), "We should get the 1st corrupted block next");
@@ -696,6 +673,48 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     reader.close();
   }
 
+  @Test
+  public void testMissingBlockExceptMagicBytes() throws IOException, URISyntaxException, InterruptedException {
+    HoodieLogFile logFile = addValidBlock("test-fileId1", "100", 100);
+
+    // Append just magic bytes and move onto next block
+    fs = FSUtils.getFs(fs.getUri().toString(), fs.getConf());
+    FSDataOutputStream outputStream = fs.append(logFile.getPath());
+    outputStream.write(HoodieLogFormat.MAGIC);
+    outputStream.flush();
+    outputStream.close();
+
+    // Append a proper block
+    logFile = addValidBlock("test-fileId1", "100", 10);
+
+    // First round of reads - we should be able to read the first block and then EOF
+    Reader reader = HoodieLogFormat.newReader(fs, logFile, SchemaTestUtil.getSimpleSchema());
+    assertTrue(reader.hasNext(), "First block should be available");
+    reader.next();
+    assertTrue(reader.hasNext(), "We should have corrupted block next");
+    HoodieLogBlock block = reader.next();
+    assertEquals(HoodieLogBlockType.CORRUPT_BLOCK, block.getBlockType(), "The read block should be a corrupt block");
+    assertTrue(reader.hasNext(), "Third block should be available");
+    reader.next();
+    assertFalse(reader.hasNext(), "There should be no more block left");
+
+    reader.close();
+  }
+
+  private HoodieLogFile addValidBlock(String fileId, String commitTime, int numRecords) throws IOException, URISyntaxException, InterruptedException {
+    Writer writer =
+        HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+            .withFileId(fileId).overBaseCommit(commitTime).withFs(fs).build();
+    List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, numRecords);
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
+    HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
+    writer.appendBlock(dataBlock);
+    writer.close();
+    return writer.getLogFile();
+  }
+
   @Test
   public void testValidateCorruptBlockEndPosition() throws IOException, URISyntaxException, InterruptedException {
     Writer writer =


[hudi] 03/17: [HUDI-4918] Fix bugs about when trying to show the non -existing key from env, NullPointException occurs. (#6794)

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-0.12.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 66851012627d3266c66a003c37308b7c6861689e
Author: Forus <70...@users.noreply.github.com>
AuthorDate: Mon Sep 26 22:05:34 2022 +0800

    [HUDI-4918] Fix bugs about when trying to show the non -existing key from env, NullPointException occurs. (#6794)
---
 .../src/main/java/org/apache/hudi/cli/commands/SparkEnvCommand.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkEnvCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkEnvCommand.java
index 5c21fe43e1..02778ac2cf 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkEnvCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkEnvCommand.java
@@ -60,7 +60,7 @@ public class SparkEnvCommand {
     if (key == null || key.isEmpty()) {
       return showAllEnv();
     } else {
-      return HoodiePrintHelper.print(new String[] {"key", "value"}, new String[][] {new String[] {key, env.get(key)}});
+      return HoodiePrintHelper.print(new String[] {"key", "value"}, new String[][] {new String[] {key, env.getOrDefault(key, "")}});
     }
   }
 }


[hudi] 10/17: [HUDI-4913] Fix HoodieSnapshotExporter for writing to a different S3 bucket or FS (#6785)

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-0.12.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit dcec30f14c9a696f4e7006fc8519f417ebbee71b
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Tue Sep 27 12:21:19 2022 -0700

    [HUDI-4913] Fix HoodieSnapshotExporter for writing to a different S3 bucket or FS (#6785)
---
 .../hudi/utilities/HoodieSnapshotExporter.java     | 64 +++++++++++++---------
 1 file changed, 38 insertions(+), 26 deletions(-)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
index 753765fb6a..187f66d073 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java
@@ -114,16 +114,18 @@ public class HoodieSnapshotExporter {
   }
 
   public void export(JavaSparkContext jsc, Config cfg) throws IOException {
-    FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
+    FileSystem outputFs = FSUtils.getFs(cfg.targetOutputPath, jsc.hadoopConfiguration());
     HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
 
-    if (outputPathExists(fs, cfg)) {
+    if (outputPathExists(outputFs, cfg)) {
       throw new HoodieSnapshotExporterException("The target output path already exists.");
     }
 
-    final String latestCommitTimestamp = getLatestCommitTimestamp(fs, cfg).<HoodieSnapshotExporterException>orElseThrow(() -> {
-      throw new HoodieSnapshotExporterException("No commits present. Nothing to snapshot.");
-    });
+    FileSystem sourceFs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
+    final String latestCommitTimestamp = getLatestCommitTimestamp(sourceFs, cfg)
+        .<HoodieSnapshotExporterException>orElseThrow(() -> {
+          throw new HoodieSnapshotExporterException("No commits present. Nothing to snapshot.");
+        });
     LOG.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.",
         latestCommitTimestamp));
 
@@ -134,11 +136,11 @@ public class HoodieSnapshotExporter {
     LOG.info(String.format("The job needs to export %d partitions.", partitions.size()));
 
     if (cfg.outputFormat.equals(OutputFormatValidator.HUDI)) {
-      exportAsHudi(jsc, cfg, partitions, latestCommitTimestamp);
+      exportAsHudi(jsc, sourceFs, cfg, partitions, latestCommitTimestamp);
     } else {
-      exportAsNonHudi(jsc, cfg, partitions, latestCommitTimestamp);
+      exportAsNonHudi(jsc, sourceFs, cfg, partitions, latestCommitTimestamp);
     }
-    createSuccessTag(fs, cfg);
+    createSuccessTag(outputFs, cfg);
   }
 
   private boolean outputPathExists(FileSystem fs, Config cfg) throws IOException {
@@ -164,7 +166,8 @@ public class HoodieSnapshotExporter {
     }
   }
 
-  private void exportAsNonHudi(JavaSparkContext jsc, Config cfg, List<String> partitions, String latestCommitTimestamp) {
+  private void exportAsNonHudi(JavaSparkContext jsc, FileSystem sourceFs,
+                               Config cfg, List<String> partitions, String latestCommitTimestamp) {
     Partitioner defaultPartitioner = dataset -> {
       Dataset<Row> hoodieDroppedDataset = dataset.drop(JavaConversions.asScalaIterator(HoodieRecord.HOODIE_META_COLUMNS.iterator()).toSeq());
       return StringUtils.isNullOrEmpty(cfg.outputPartitionField)
@@ -178,7 +181,7 @@ public class HoodieSnapshotExporter {
 
     HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
     context.setJobStatus(this.getClass().getSimpleName(), "Exporting as non-HUDI dataset: " + cfg.targetOutputPath);
-    final BaseFileOnlyView fsView = getBaseFileOnlyView(jsc, cfg);
+    final BaseFileOnlyView fsView = getBaseFileOnlyView(sourceFs, cfg);
     Iterator<String> exportingFilePaths = jsc
         .parallelize(partitions, partitions.size())
         .flatMap(partition -> fsView
@@ -193,8 +196,9 @@ public class HoodieSnapshotExporter {
         .save(cfg.targetOutputPath);
   }
 
-  private void exportAsHudi(JavaSparkContext jsc, Config cfg, List<String> partitions, String latestCommitTimestamp) throws IOException {
-    final BaseFileOnlyView fsView = getBaseFileOnlyView(jsc, cfg);
+  private void exportAsHudi(JavaSparkContext jsc, FileSystem sourceFs,
+                            Config cfg, List<String> partitions, String latestCommitTimestamp) throws IOException {
+    final BaseFileOnlyView fsView = getBaseFileOnlyView(sourceFs, cfg);
 
     final HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
     final SerializableConfiguration serConf = context.getHadoopConf();
@@ -219,20 +223,26 @@ public class HoodieSnapshotExporter {
       String partition = tuple._1();
       Path sourceFilePath = new Path(tuple._2());
       Path toPartitionPath = FSUtils.getPartitionPath(cfg.targetOutputPath, partition);
-      FileSystem fs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy());
+      FileSystem executorSourceFs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy());
+      FileSystem executorOutputFs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy());
 
-      if (!fs.exists(toPartitionPath)) {
-        fs.mkdirs(toPartitionPath);
+      if (!executorOutputFs.exists(toPartitionPath)) {
+        executorOutputFs.mkdirs(toPartitionPath);
       }
-      FileUtil.copy(fs, sourceFilePath, fs, new Path(toPartitionPath, sourceFilePath.getName()), false,
-          fs.getConf());
+      FileUtil.copy(
+          executorSourceFs,
+          sourceFilePath,
+          executorOutputFs,
+          new Path(toPartitionPath, sourceFilePath.getName()),
+          false,
+          executorOutputFs.getConf());
     }, files.size());
 
     // Also copy the .commit files
     LOG.info(String.format("Copying .commit files which are no-late-than %s.", latestCommitTimestamp));
-    final FileSystem fileSystem = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
+    FileSystem outputFs = FSUtils.getFs(cfg.targetOutputPath, jsc.hadoopConfiguration());
     FileStatus[] commitFilesToCopy =
-        fileSystem.listStatus(new Path(cfg.sourceBasePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME), (commitFilePath) -> {
+        sourceFs.listStatus(new Path(cfg.sourceBasePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME), (commitFilePath) -> {
           if (commitFilePath.getName().equals(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) {
             return true;
           } else {
@@ -244,20 +254,22 @@ public class HoodieSnapshotExporter {
     for (FileStatus commitStatus : commitFilesToCopy) {
       Path targetFilePath =
           new Path(cfg.targetOutputPath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitStatus.getPath().getName());
-      if (!fileSystem.exists(targetFilePath.getParent())) {
-        fileSystem.mkdirs(targetFilePath.getParent());
+      if (!outputFs.exists(targetFilePath.getParent())) {
+        outputFs.mkdirs(targetFilePath.getParent());
       }
-      if (fileSystem.exists(targetFilePath)) {
+      if (outputFs.exists(targetFilePath)) {
         LOG.error(
             String.format("The target output commit file (%s targetBasePath) already exists.", targetFilePath));
       }
-      FileUtil.copy(fileSystem, commitStatus.getPath(), fileSystem, targetFilePath, false, fileSystem.getConf());
+      FileUtil.copy(sourceFs, commitStatus.getPath(), outputFs, targetFilePath, false, outputFs.getConf());
     }
   }
 
-  private BaseFileOnlyView getBaseFileOnlyView(JavaSparkContext jsc, Config cfg) {
-    FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
-    HoodieTableMetaClient tableMetadata = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(cfg.sourceBasePath).build();
+  private BaseFileOnlyView getBaseFileOnlyView(FileSystem sourceFs, Config cfg) {
+    HoodieTableMetaClient tableMetadata = HoodieTableMetaClient.builder()
+        .setConf(sourceFs.getConf())
+        .setBasePath(cfg.sourceBasePath)
+        .build();
     return new HoodieTableFileSystemView(tableMetadata, tableMetadata
         .getActiveTimeline().getWriteTimeline().filterCompletedInstants());
   }


[hudi] 06/17: [MINOR] Update PR template with documentation update (#6748)

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-0.12.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit f1661187f55f0f5448ca39a29f2c63da9610a67c
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Mon Sep 26 16:46:05 2022 -0700

    [MINOR] Update PR template with documentation update (#6748)
---
 .github/PULL_REQUEST_TEMPLATE.md | 9 +++++++++
 1 file changed, 9 insertions(+)

diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md
index 2ec8b61d05..17ad995a97 100644
--- a/.github/PULL_REQUEST_TEMPLATE.md
+++ b/.github/PULL_REQUEST_TEMPLATE.md
@@ -10,6 +10,15 @@ _Describe any public API or user-facing feature change or any performance impact
 
 _Choose one. If medium or high, explain what verification was done to mitigate the risks._
 
+### Documentation Update
+
+_Describe any necessary documentation update if there is any new feature, config, or user-facing change_
+
+- _The config description must be updated if new configs are added or the default value of the configs are changed_
+- _Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
+  ticket number here and follow the [instruction](https://hudi.apache.org/contribute/developer-setup#website) to make
+  changes to the website._
+
 ### Contributor's checklist
 
 - [ ] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute)


[hudi] 01/17: [HUDI-4914] Managed memory weight should be set when sort clustering is enabled (#6792)

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-0.12.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 053863081140309f8db4b92b524d3f8ef0de5bc8
Author: Nicholas Jiang <pr...@163.com>
AuthorDate: Mon Sep 26 20:22:50 2022 +0800

    [HUDI-4914] Managed memory weight should be set when sort clustering is enabled (#6792)
---
 .../hudi/sink/clustering/ClusteringOperator.java   | 39 +++++++++-------------
 .../java/org/apache/hudi/sink/utils/Pipelines.java | 10 ++++--
 .../apache/hudi/sink/ITTestDataStreamWrite.java    | 12 +++++++
 3 files changed, 34 insertions(+), 27 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index 9b38f0ceea..e30a3577f0 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -105,9 +105,6 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
   private transient int[] requiredPos;
   private transient AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter;
   private transient HoodieFlinkWriteClient writeClient;
-  private transient BulkInsertWriterHelper writerHelper;
-
-  private transient BinaryExternalSorter sorter;
   private transient StreamRecordCollector<ClusteringCommitEvent> collector;
   private transient BinaryRowDataSerializer binarySerializer;
 
@@ -153,10 +150,6 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
     this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(rowType);
     this.binarySerializer = new BinaryRowDataSerializer(rowType.getFieldCount());
 
-    if (this.sortClusteringEnabled) {
-      initSorter();
-    }
-
     if (this.asyncClustering) {
       this.executor = NonThrownExecutor.builder(LOG).build();
     }
@@ -186,6 +179,7 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
     if (this.writeClient != null) {
       this.writeClient.cleanHandlesGracefully();
       this.writeClient.close();
+      this.writeClient = null;
     }
   }
 
@@ -203,7 +197,9 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
   private void doClustering(String instantTime, ClusteringPlanEvent event) throws Exception {
     final ClusteringGroupInfo clusteringGroupInfo = event.getClusteringGroupInfo();
 
-    initWriterHelper(instantTime);
+    BulkInsertWriterHelper writerHelper = new BulkInsertWriterHelper(this.conf, this.table, this.writeConfig,
+        instantTime, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
+        this.rowType);
 
     List<ClusteringOperation> clusteringOps = clusteringGroupInfo.getOperations();
     boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0);
@@ -220,33 +216,27 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
     RowDataSerializer rowDataSerializer = new RowDataSerializer(rowType);
 
     if (this.sortClusteringEnabled) {
+      BinaryExternalSorter sorter = initSorter();
       while (iterator.hasNext()) {
         RowData rowData = iterator.next();
         BinaryRowData binaryRowData = rowDataSerializer.toBinaryRow(rowData).copy();
-        this.sorter.write(binaryRowData);
+        sorter.write(binaryRowData);
       }
 
       BinaryRowData row = binarySerializer.createInstance();
       while ((row = sorter.getIterator().next(row)) != null) {
-        this.writerHelper.write(row);
+        writerHelper.write(row);
       }
+      sorter.close();
     } else {
       while (iterator.hasNext()) {
-        this.writerHelper.write(iterator.next());
+        writerHelper.write(iterator.next());
       }
     }
 
-    List<WriteStatus> writeStatuses = this.writerHelper.getWriteStatuses(this.taskID);
+    List<WriteStatus> writeStatuses = writerHelper.getWriteStatuses(this.taskID);
     collector.collect(new ClusteringCommitEvent(instantTime, writeStatuses, this.taskID));
-    this.writerHelper = null;
-  }
-
-  private void initWriterHelper(String clusteringInstantTime) {
-    if (this.writerHelper == null) {
-      this.writerHelper = new BulkInsertWriterHelper(this.conf, this.table, this.writeConfig,
-          clusteringInstantTime, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
-          this.rowType);
-    }
+    writerHelper.close();
   }
 
   /**
@@ -338,13 +328,13 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
         .toArray();
   }
 
-  private void initSorter() {
+  private BinaryExternalSorter initSorter() {
     ClassLoader cl = getContainingTask().getUserCodeClassLoader();
     NormalizedKeyComputer computer = createSortCodeGenerator().generateNormalizedKeyComputer("SortComputer").newInstance(cl);
     RecordComparator comparator = createSortCodeGenerator().generateRecordComparator("SortComparator").newInstance(cl);
 
     MemoryManager memManager = getContainingTask().getEnvironment().getMemoryManager();
-    this.sorter =
+    BinaryExternalSorter sorter =
         new BinaryExternalSorter(
             this.getContainingTask(),
             memManager,
@@ -355,12 +345,13 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
             computer,
             comparator,
             getContainingTask().getJobConfiguration());
-    this.sorter.startThreads();
+    sorter.startThreads();
 
     // register the metrics.
     getMetricGroup().gauge("memoryUsedSizeInBytes", (Gauge<Long>) sorter::getUsedMemoryInBytes);
     getMetricGroup().gauge("numSpillFiles", (Gauge<Long>) sorter::getNumSpillFiles);
     getMetricGroup().gauge("spillInBytes", (Gauge<Long>) sorter::getSpillInBytes);
+    return sorter;
   }
 
   private SortCodeGenerator createSortCodeGenerator() {
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 1e4f82a957..82761adf73 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -401,7 +401,7 @@ public class Pipelines {
    * @return the clustering pipeline
    */
   public static DataStreamSink<ClusteringCommitEvent> cluster(Configuration conf, RowType rowType, DataStream<Object> dataStream) {
-    return dataStream.transform("cluster_plan_generate",
+    DataStream<ClusteringCommitEvent> clusteringStream = dataStream.transform("cluster_plan_generate",
             TypeInformation.of(ClusteringPlanEvent.class),
             new ClusteringPlanOperator(conf))
         .setParallelism(1) // plan generate must be singleton
@@ -413,8 +413,12 @@ public class Pipelines {
         .transform("clustering_task",
             TypeInformation.of(ClusteringCommitEvent.class),
             new ClusteringOperator(conf, rowType))
-        .setParallelism(conf.getInteger(FlinkOptions.CLUSTERING_TASKS))
-        .addSink(new ClusteringCommitSink(conf))
+        .setParallelism(conf.getInteger(FlinkOptions.CLUSTERING_TASKS));
+    if (OptionsResolver.sortClusteringEnabled(conf)) {
+      ExecNodeUtil.setManagedMemoryWeight(clusteringStream.getTransformation(),
+          conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
+    }
+    return clusteringStream.addSink(new ClusteringCommitSink(conf))
         .name("clustering_commit")
         .setParallelism(1); // compaction commit should be singleton
   }
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index 6ee23727d0..e6d2ddb7b5 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -158,10 +158,22 @@ public class ITTestDataStreamWrite extends TestLogger {
 
   @Test
   public void testWriteCopyOnWriteWithClustering() throws Exception {
+    testWriteCopyOnWriteWithClustering(false);
+  }
+
+  @Test
+  public void testWriteCopyOnWriteWithSortClustering() throws Exception {
+    testWriteCopyOnWriteWithClustering(true);
+  }
+
+  private void testWriteCopyOnWriteWithClustering(boolean sortClusteringEnabled) throws Exception {
     Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
     conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
     conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
     conf.setString(FlinkOptions.OPERATION, "insert");
+    if (sortClusteringEnabled) {
+      conf.setString(FlinkOptions.CLUSTERING_SORT_COLUMNS, "uuid");
+    }
 
     testWriteToHoodieWithCluster(conf, "cow_write_with_cluster", 1, EXPECTED);
   }


[hudi] 14/17: [HUDI-4687] Avoid setAccessible which breaks strong encapsulation (#6657)

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-0.12.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit c9dd84088e0def31655c794206bc89a9a1403e57
Author: Sagar Sumit <sa...@gmail.com>
AuthorDate: Wed Sep 28 22:34:04 2022 +0530

    [HUDI-4687] Avoid setAccessible which breaks strong encapsulation (#6657)
    
    Use JOL GraphLayout for estimating deep size.
---
 hudi-common/pom.xml                                |   5 +
 .../hudi/common/util/ObjectSizeCalculator.java     | 321 +--------------------
 .../hudi/common/util/TestObjectSizeCalculator.java | 102 +++++++
 .../org/apache/hudi/integ/ITTestHoodieSanity.java  |   4 +-
 packaging/hudi-flink-bundle/pom.xml                |   5 +
 packaging/hudi-hadoop-mr-bundle/pom.xml            |   5 +
 packaging/hudi-hive-sync-bundle/pom.xml            |   5 +
 packaging/hudi-integ-test-bundle/pom.xml           |   5 +
 packaging/hudi-kafka-connect-bundle/pom.xml        |   5 +
 packaging/hudi-presto-bundle/pom.xml               |   5 +
 packaging/hudi-spark-bundle/pom.xml                |   5 +
 packaging/hudi-trino-bundle/pom.xml                |   5 +
 packaging/hudi-utilities-bundle/pom.xml            |   5 +
 pom.xml                                            |   7 +
 14 files changed, 175 insertions(+), 309 deletions(-)

diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml
index 7e7e2a81d4..8b4fa39a62 100644
--- a/hudi-common/pom.xml
+++ b/hudi-common/pom.xml
@@ -101,6 +101,11 @@
   </build>
 
   <dependencies>
+    <dependency>
+      <groupId>org.openjdk.jol</groupId>
+      <artifactId>jol-core</artifactId>
+    </dependency>
+
     <!-- Logging -->
     <dependency>
       <groupId>org.apache.logging.log4j</groupId>
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ObjectSizeCalculator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ObjectSizeCalculator.java
index 7e625e8eb4..86f1d9215e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ObjectSizeCalculator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ObjectSizeCalculator.java
@@ -18,33 +18,11 @@
 
 package org.apache.hudi.common.util;
 
-import org.apache.hudi.common.util.jvm.MemoryLayoutSpecification;
-import org.apache.hudi.common.util.jvm.HotSpotMemoryLayoutSpecification32bit;
-import org.apache.hudi.common.util.jvm.HotSpotMemoryLayoutSpecification64bit;
-import org.apache.hudi.common.util.jvm.HotSpotMemoryLayoutSpecification64bitCompressed;
-import org.apache.hudi.common.util.jvm.OpenJ9MemoryLayoutSpecification32bit;
-import org.apache.hudi.common.util.jvm.OpenJ9MemoryLayoutSpecification64bit;
-import org.apache.hudi.common.util.jvm.OpenJ9MemoryLayoutSpecification64bitCompressed;
-
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryPoolMXBean;
-import java.lang.reflect.Array;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.ArrayDeque;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.IdentityHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
+import org.openjdk.jol.info.GraphLayout;
 
 /**
  * Contains utility methods for calculating the memory usage of objects. It only works on the HotSpot and OpenJ9 JVMs, and infers
- * the actual memory layout (32 bit vs. 64 bit word size, compressed object pointers vs. uncompressed) from best
+ * the actual memory layout (32 bit vs. 64 bit word size, compressed object pointers vs. uncompressed) from the best
  * available indicators. It can reliably detect a 32 bit vs. 64 bit JVM. It can only make an educated guess at whether
  * compressed OOPs are used, though; specifically, it knows what the JVM's default choice of OOP compression would be
  * based on HotSpot version and maximum heap sizes, but if the choice is explicitly overridden with the
@@ -54,14 +32,9 @@ import java.util.Set;
  * @author Attila Szegedi
  */
 public class ObjectSizeCalculator {
-  private static class CurrentLayout {
-
-    private static final MemoryLayoutSpecification SPEC = getEffectiveMemoryLayoutSpecification();
-  }
-
   /**
    * Given an object, returns the total allocated size, in bytes, of the object and all other objects reachable from it.
-   * Attempts to to detect the current JVM memory layout, but may fail with {@link UnsupportedOperationException};
+   * Attempts to detect the current JVM memory layout, but may fail with {@link UnsupportedOperationException};
    *
    * @param obj the object; can be null. Passing in a {@link java.lang.Class} object doesn't do anything special, it
    *        measures the size of all objects reachable through it (which will include its class loader, and by
@@ -71,282 +44,16 @@ public class ObjectSizeCalculator {
    * @throws UnsupportedOperationException if the current vm memory layout cannot be detected.
    */
   public static long getObjectSize(Object obj) throws UnsupportedOperationException {
-    return obj == null ? 0 : new ObjectSizeCalculator(CurrentLayout.SPEC).calculateObjectSize(obj);
-  }
-
-  // Fixed object header size for arrays.
-  private final int arrayHeaderSize;
-  // Fixed object header size for non-array objects.
-  private final int objectHeaderSize;
-  // Padding for the object size - if the object size is not an exact multiple
-  // of this, it is padded to the next multiple.
-  private final int objectPadding;
-  // Size of reference (pointer) fields.
-  private final int referenceSize;
-  // Padding for the fields of superclass before fields of subclasses are
-  // added.
-  private final int superclassFieldPadding;
-
-  private final Map<Class<?>, ClassSizeInfo> classSizeInfos = new IdentityHashMap<>();
-
-  private final Set<Object> alreadyVisited = Collections.newSetFromMap(new IdentityHashMap<>());
-  private final Deque<Object> pending = new ArrayDeque<>(64);
-  private long size;
-
-  /**
-   * Creates an object size calculator that can calculate object sizes for a given {@code memoryLayoutSpecification}.
-   *
-   * @param memoryLayoutSpecification a description of the JVM memory layout.
-   */
-  public ObjectSizeCalculator(MemoryLayoutSpecification memoryLayoutSpecification) {
-    Objects.requireNonNull(memoryLayoutSpecification);
-    arrayHeaderSize = memoryLayoutSpecification.getArrayHeaderSize();
-    objectHeaderSize = memoryLayoutSpecification.getObjectHeaderSize();
-    objectPadding = memoryLayoutSpecification.getObjectPadding();
-    referenceSize = memoryLayoutSpecification.getReferenceSize();
-    superclassFieldPadding = memoryLayoutSpecification.getSuperclassFieldPadding();
-  }
-
-  /**
-   * Given an object, returns the total allocated size, in bytes, of the object and all other objects reachable from it.
-   *
-   * @param obj the object; can be null. Passing in a {@link java.lang.Class} object doesn't do anything special, it
-   *        measures the size of all objects reachable through it (which will include its class loader, and by
-   *        extension, all other Class objects loaded by the same loader, and all the parent class loaders). It doesn't
-   *        provide the size of the static fields in the JVM class that the Class object represents.
-   * @return the total allocated size of the object and all other objects it retains.
-   */
-  public synchronized long calculateObjectSize(Object obj) {
-    // Breadth-first traversal instead of naive depth-first with recursive
-    // implementation, so we don't blow the stack traversing long linked lists.
-    try {
-      for (;;) {
-        visit(obj);
-        if (pending.isEmpty()) {
-          return size;
-        }
-        obj = pending.removeFirst();
-      }
-    } finally {
-      alreadyVisited.clear();
-      pending.clear();
-      size = 0;
-    }
-  }
-
-  private ClassSizeInfo getClassSizeInfo(final Class<?> clazz) {
-    ClassSizeInfo csi = classSizeInfos.get(clazz);
-    if (csi == null) {
-      csi = new ClassSizeInfo(clazz);
-      classSizeInfos.put(clazz, csi);
-    }
-    return csi;
-  }
-
-  private void visit(Object obj) {
-    if (alreadyVisited.contains(obj)) {
-      return;
-    }
-    final Class<?> clazz = obj.getClass();
-    if (clazz == ArrayElementsVisitor.class) {
-      ((ArrayElementsVisitor) obj).visit(this);
-    } else {
-      alreadyVisited.add(obj);
-      if (clazz.isArray()) {
-        visitArray(obj);
-      } else {
-        getClassSizeInfo(clazz).visit(obj, this);
-      }
-    }
-  }
-
-  private void visitArray(Object array) {
-    final Class<?> componentType = array.getClass().getComponentType();
-    final int length = Array.getLength(array);
-    if (componentType.isPrimitive()) {
-      increaseByArraySize(length, getPrimitiveFieldSize(componentType));
-    } else {
-      increaseByArraySize(length, referenceSize);
-      // If we didn't use an ArrayElementsVisitor, we would be enqueueing every
-      // element of the array here instead. For large arrays, it would
-      // tremendously enlarge the queue. In essence, we're compressing it into
-      // a small command object instead. This is different than immediately
-      // visiting the elements, as their visiting is scheduled for the end of
-      // the current queue.
-      switch (length) {
-        case 0: {
-          break;
-        }
-        case 1: {
-          enqueue(Array.get(array, 0));
-          break;
-        }
-        default: {
-          enqueue(new ArrayElementsVisitor((Object[]) array));
-        }
-      }
-    }
-  }
-
-  private void increaseByArraySize(int length, long elementSize) {
-    increaseSize(roundTo(arrayHeaderSize + length * elementSize, objectPadding));
-  }
-
-  private static class ArrayElementsVisitor {
-
-    private final Object[] array;
-
-    ArrayElementsVisitor(Object[] array) {
-      this.array = array;
-    }
-
-    public void visit(ObjectSizeCalculator calc) {
-      for (Object elem : array) {
-        if (elem != null) {
-          calc.visit(elem);
-        }
-      }
-    }
-  }
-
-  void enqueue(Object obj) {
-    if (obj != null) {
-      pending.addLast(obj);
-    }
-  }
-
-  void increaseSize(long objectSize) {
-    size += objectSize;
-  }
-
-  static long roundTo(long x, int multiple) {
-    return ((x + multiple - 1) / multiple) * multiple;
-  }
-
-  private class ClassSizeInfo {
-
-    // Padded fields + header size
-    private final long objectSize;
-    // Only the fields size - used to calculate the subclasses' memory
-    // footprint.
-    private final long fieldsSize;
-    private final Field[] referenceFields;
-
-    public ClassSizeInfo(Class<?> clazz) {
-      long fieldsSize = 0;
-      final List<Field> referenceFields = new LinkedList<>();
-      for (Field f : clazz.getDeclaredFields()) {
-        if (Modifier.isStatic(f.getModifiers())) {
-          continue;
-        }
-        final Class<?> type = f.getType();
-        if (type.isPrimitive()) {
-          fieldsSize += getPrimitiveFieldSize(type);
-        } else {
-          f.setAccessible(true);
-          referenceFields.add(f);
-          fieldsSize += referenceSize;
-        }
-      }
-      final Class<?> superClass = clazz.getSuperclass();
-      if (superClass != null) {
-        final ClassSizeInfo superClassInfo = getClassSizeInfo(superClass);
-        fieldsSize += roundTo(superClassInfo.fieldsSize, superclassFieldPadding);
-        referenceFields.addAll(Arrays.asList(superClassInfo.referenceFields));
-      }
-      this.fieldsSize = fieldsSize;
-      this.objectSize = roundTo(objectHeaderSize + fieldsSize, objectPadding);
-      this.referenceFields = referenceFields.toArray(new Field[referenceFields.size()]);
-    }
-
-    void visit(Object obj, ObjectSizeCalculator calc) {
-      calc.increaseSize(objectSize);
-      enqueueReferencedObjects(obj, calc);
-    }
-
-    public void enqueueReferencedObjects(Object obj, ObjectSizeCalculator calc) {
-      for (Field f : referenceFields) {
-        try {
-          calc.enqueue(f.get(obj));
-        } catch (IllegalAccessException e) {
-          throw new AssertionError("Unexpected denial of access to " + f, e);
-        }
-      }
-    }
-  }
-
-  private static long getPrimitiveFieldSize(Class<?> type) {
-    if (type == boolean.class || type == byte.class) {
-      return 1;
-    }
-    if (type == char.class || type == short.class) {
-      return 2;
-    }
-    if (type == int.class || type == float.class) {
-      return 4;
-    }
-    if (type == long.class || type == double.class) {
-      return 8;
-    }
-    throw new AssertionError("Encountered unexpected primitive type " + type.getName());
-  }
-
-  static MemoryLayoutSpecification getEffectiveMemoryLayoutSpecification() {
-    final String vmName = System.getProperty("java.vm.name");
-    if (vmName == null || !(vmName.startsWith("Java HotSpot(TM) ") || vmName.startsWith("OpenJDK")
-        || vmName.startsWith("TwitterJDK") || vmName.startsWith("Eclipse OpenJ9"))) {
-      throw new UnsupportedOperationException("ObjectSizeCalculator only supported on HotSpot or Eclipse OpenJ9 VMs");
-    }
-
-    final String strVmVersion = System.getProperty("java.vm.version");
-    // Support for OpenJ9 JVM
-    if (strVmVersion.startsWith("openj9")) {
-      final String dataModel = System.getProperty("sun.arch.data.model");
-      if ("32".equals(dataModel)) {
-        // Running with 32-bit data model
-        return new OpenJ9MemoryLayoutSpecification32bit();
-      } else if (!"64".equals(dataModel)) {
-        throw new UnsupportedOperationException(
-                "Unrecognized value '" + dataModel + "' of sun.arch.data.model system property");
-      }
-
-      long maxMemory = 0;
-      for (MemoryPoolMXBean mp : ManagementFactory.getMemoryPoolMXBeans()) {
-        maxMemory += mp.getUsage().getMax();
-      }
-      if (maxMemory < 57L * 1024 * 1024 * 1024) {
-        // OpenJ9 use compressed references below 57GB of RAM total
-        return new OpenJ9MemoryLayoutSpecification64bitCompressed();
-      } else {
-        // it's a 64-bit uncompressed references object model
-        return new OpenJ9MemoryLayoutSpecification64bit();
-      }
-    } else {
-      // Support for HotSpot JVM
-      final String dataModel = System.getProperty("sun.arch.data.model");
-      if ("32".equals(dataModel)) {
-        // Running with 32-bit data model
-        return new HotSpotMemoryLayoutSpecification32bit();
-      } else if (!"64".equals(dataModel)) {
-        throw new UnsupportedOperationException(
-                "Unrecognized value '" + dataModel + "' of sun.arch.data.model system property");
-      }
-
-      final int vmVersion = Integer.parseInt(strVmVersion.substring(0, strVmVersion.indexOf('.')));
-      if (vmVersion >= 17) {
-        long maxMemory = 0;
-        for (MemoryPoolMXBean mp : ManagementFactory.getMemoryPoolMXBeans()) {
-          maxMemory += mp.getUsage().getMax();
-        }
-        if (maxMemory < 30L * 1024 * 1024 * 1024) {
-          // HotSpot 17.0 and above use compressed OOPs below 30GB of RAM total
-          // for all memory pools (yes, including code cache).
-          return new HotSpotMemoryLayoutSpecification64bitCompressed();
-        }
-      }
-
-      // In other cases, it's a 64-bit uncompressed OOPs object model
-      return new HotSpotMemoryLayoutSpecification64bit();
-    }
+    // JDK versions 16 or later enforce strong encapsulation and block illegal reflective access.
+    // In effect, we cannot calculate object size by deep reflection and invoking `setAccessible` on a field,
+    // especially when the `isAccessible` is false. More details in JEP 403. While integrating Hudi with other
+    // software packages that compile against JDK 16 or later (e.g. Trino), the IllegalAccessException will be thrown.
+    // In that case, we use Java Object Layout (JOL) to estimate the object size.
+    //
+    // NOTE: We cannot get the object size base on the amount of byte serialized because there is no guarantee
+    //       that the incoming object is serializable. We could have used Java's Instrumentation API, but it
+    //       needs an instrumentation agent that can be hooked to the JVM. In lieu of that, we are using JOL.
+    //       GraphLayout gives the deep size of an object, including the size of objects that are referenced from the given object.
+    return obj == null ? 0 : GraphLayout.parseInstance(obj).totalSize();
   }
 }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestObjectSizeCalculator.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestObjectSizeCalculator.java
new file mode 100644
index 0000000000..712f4b85f8
--- /dev/null
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestObjectSizeCalculator.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.model.HoodieRecord;
+
+import org.apache.avro.Schema;
+import org.junit.jupiter.api.Test;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hudi.common.util.ObjectSizeCalculator.getObjectSize;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestObjectSizeCalculator {
+
+  @Test
+  public void testGetObjectSize() {
+    EmptyClass emptyClass = new EmptyClass();
+    StringClass stringClass = new StringClass();
+    PayloadClass payloadClass = new PayloadClass();
+    String emptyString = "";
+    String string = "hello";
+    String[] stringArray = {emptyString, string, " world"};
+    String[] anotherStringArray = new String[100];
+    List<String> stringList = new ArrayList<>();
+    StringBuilder stringBuilder = new StringBuilder(100);
+    int maxIntPrimitive = Integer.MAX_VALUE;
+    int minIntPrimitive = Integer.MIN_VALUE;
+    Integer maxInteger = Integer.MAX_VALUE;
+    Integer minInteger = Integer.MIN_VALUE;
+    long zeroLong = 0L;
+    double zeroDouble = 0.0;
+    boolean booleanField = true;
+    Object object = new Object();
+    String name = "Alice Bob";
+    Person person = new Person(name);
+
+    assertEquals(40, getObjectSize(emptyString));
+    assertEquals(56, getObjectSize(string));
+    assertEquals(184, getObjectSize(stringArray));
+    assertEquals(416, getObjectSize(anotherStringArray));
+    assertEquals(40, getObjectSize(stringList));
+    assertEquals(240, getObjectSize(stringBuilder));
+    assertEquals(16, getObjectSize(maxIntPrimitive));
+    assertEquals(16, getObjectSize(minIntPrimitive));
+    assertEquals(16, getObjectSize(maxInteger));
+    assertEquals(16, getObjectSize(minInteger));
+    assertEquals(24, getObjectSize(zeroLong));
+    assertEquals(24, getObjectSize(zeroDouble));
+    assertEquals(16, getObjectSize(booleanField));
+    assertEquals(80, getObjectSize(DayOfWeek.TUESDAY));
+    assertEquals(16, getObjectSize(object));
+    assertEquals(32, getObjectSize(emptyClass));
+    assertEquals(40, getObjectSize(stringClass));
+    assertEquals(40, getObjectSize(payloadClass));
+    assertEquals(1240, getObjectSize(Schema.create(Schema.Type.STRING)));
+    assertEquals(104, getObjectSize(person));
+  }
+
+  class EmptyClass {
+  }
+
+  class StringClass {
+    private String s;
+  }
+
+  class PayloadClass implements Serializable {
+    private HoodieRecord record;
+  }
+
+  class Person {
+    private String name;
+
+    public Person(String name) {
+      this.name = name;
+    }
+  }
+
+  public enum DayOfWeek {
+    MONDAY, TUESDAY, WEDNESDAY, THURSDAY, FRIDAY, SATURDAY, SUNDAY
+  }
+}
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
index e432f9dc42..40827c650a 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
@@ -185,12 +185,12 @@ public class ITTestHoodieSanity extends ITTestBase {
 
     // Ensure row count is 80 (without duplicates) (100 - 20 deleted)
     stdOutErr = executeHiveCommand("select count(1) from " + snapshotTableName);
-    assertEquals(80, Integer.parseInt(stdOutErr.getLeft().trim()),
+    assertEquals(80, Integer.parseInt(stdOutErr.getLeft().substring(stdOutErr.getLeft().lastIndexOf("\n")).trim()),
         "Expecting 80 rows to be present in the snapshot table");
 
     if (roTableName.isPresent()) {
       stdOutErr = executeHiveCommand("select count(1) from " + roTableName.get());
-      assertEquals(80, Integer.parseInt(stdOutErr.getLeft().trim()),
+      assertEquals(80, Integer.parseInt(stdOutErr.getLeft().substring(stdOutErr.getLeft().lastIndexOf("\n")).trim()),
           "Expecting 80 rows to be present in the snapshot table");
     }
 
diff --git a/packaging/hudi-flink-bundle/pom.xml b/packaging/hudi-flink-bundle/pom.xml
index 2b90f139b9..f890b65b00 100644
--- a/packaging/hudi-flink-bundle/pom.xml
+++ b/packaging/hudi-flink-bundle/pom.xml
@@ -163,6 +163,7 @@
                   <include>org.apache.htrace:htrace-core4</include>
                   <include>commons-codec:commons-codec</include>
                   <include>commons-io:commons-io</include>
+                  <include>org.openjdk.jol:jol-core</include>
                 </includes>
               </artifactSet>
               <relocations>
@@ -222,6 +223,10 @@
                   <pattern>com.fasterxml.jackson.</pattern>
                   <shadedPattern>${flink.bundle.shade.prefix}com.fasterxml.jackson.</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>org.openjdk.jol.</pattern>
+                  <shadedPattern>org.apache.hudi.org.openjdk.jol.</shadedPattern>
+                </relocation>
                 <!-- The classes below in org.apache.hadoop.metrics2 package come from
                 hbase-hadoop-compat and hbase-hadoop2-compat, which have to be shaded one by one,
                 instead of shading all classes under org.apache.hadoop.metrics2 including ones
diff --git a/packaging/hudi-hadoop-mr-bundle/pom.xml b/packaging/hudi-hadoop-mr-bundle/pom.xml
index 2f3a7a7837..bb20b5691a 100644
--- a/packaging/hudi-hadoop-mr-bundle/pom.xml
+++ b/packaging/hudi-hadoop-mr-bundle/pom.xml
@@ -90,6 +90,7 @@
                   <include>org.apache.htrace:htrace-core4</include>
                   <include>com.yammer.metrics:metrics-core</include>
                   <include>commons-io:commons-io</include>
+                  <include>org.openjdk.jol:jol-core</include>
                 </includes>
               </artifactSet>
               <relocations>
@@ -144,6 +145,10 @@
                   <pattern>com.google.common.</pattern>
                   <shadedPattern>org.apache.hudi.com.google.common.</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>org.openjdk.jol.</pattern>
+                  <shadedPattern>org.apache.hudi.org.openjdk.jol.</shadedPattern>
+                </relocation>
                 <!-- The classes below in org.apache.hadoop.metrics2 package come from
                 hbase-hadoop-compat and hbase-hadoop2-compat, which have to be shaded one by one,
                 instead of shading all classes under org.apache.hadoop.metrics2 including ones
diff --git a/packaging/hudi-hive-sync-bundle/pom.xml b/packaging/hudi-hive-sync-bundle/pom.xml
index 3bff950e24..87ae223a92 100644
--- a/packaging/hudi-hive-sync-bundle/pom.xml
+++ b/packaging/hudi-hive-sync-bundle/pom.xml
@@ -90,6 +90,7 @@
                   <include>org.objenesis:objenesis</include>
                   <include>com.esotericsoftware:minlog</include>
                   <include>commons-io:commons-io</include>
+                  <include>org.openjdk.jol:jol-core</include>
                 </includes>
               </artifactSet>
               <relocations>
@@ -124,6 +125,10 @@
                   <pattern>org.apache.htrace.</pattern>
                   <shadedPattern>org.apache.hudi.org.apache.htrace.</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>org.openjdk.jol.</pattern>
+                  <shadedPattern>org.apache.hudi.org.openjdk.jol.</shadedPattern>
+                </relocation>
                 <!-- The classes below in org.apache.hadoop.metrics2 package come from
                 hbase-hadoop-compat and hbase-hadoop2-compat, which have to be shaded one by one,
                 instead of shading all classes under org.apache.hadoop.metrics2 including ones
diff --git a/packaging/hudi-integ-test-bundle/pom.xml b/packaging/hudi-integ-test-bundle/pom.xml
index 5bee479997..572a5daa79 100644
--- a/packaging/hudi-integ-test-bundle/pom.xml
+++ b/packaging/hudi-integ-test-bundle/pom.xml
@@ -184,6 +184,7 @@
                   <include>io.prometheus:simpleclient_dropwizard</include>
                   <include>io.prometheus:simpleclient_pushgateway</include>
                   <include>io.prometheus:simpleclient_common</include>
+                  <include>org.openjdk.jol:jol-core</include>
                 </includes>
               </artifactSet>
               <relocations>
@@ -309,6 +310,10 @@
                   <pattern>org.apache.parquet.avro.</pattern>
                   <shadedPattern>org.apache.hudi.org.apache.parquet.avro.</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>org.openjdk.jol.</pattern>
+                  <shadedPattern>org.apache.hudi.org.openjdk.jol.</shadedPattern>
+                </relocation>
                 <!-- The classes below in org.apache.hadoop.metrics2 package come from
                 hbase-hadoop-compat and hbase-hadoop2-compat, which have to be shaded one by one,
                 instead of shading all classes under org.apache.hadoop.metrics2 including ones
diff --git a/packaging/hudi-kafka-connect-bundle/pom.xml b/packaging/hudi-kafka-connect-bundle/pom.xml
index 6868bd15ed..4a1982e70e 100644
--- a/packaging/hudi-kafka-connect-bundle/pom.xml
+++ b/packaging/hudi-kafka-connect-bundle/pom.xml
@@ -133,6 +133,7 @@
                                     <include>org.apache.htrace:htrace-core4</include>
                                     <include>org.scala-lang:*</include>
                                     <include>commons-io:commons-io</include>
+                                    <include>org.openjdk.jol:jol-core</include>
                                 </includes>
                             </artifactSet>
                             <relocations>
@@ -174,6 +175,10 @@
                                     <pattern>org.apache.htrace.</pattern>
                                     <shadedPattern>org.apache.hudi.org.apache.htrace.</shadedPattern>
                                 </relocation>
+                                <relocation>
+                                    <pattern>org.openjdk.jol.</pattern>
+                                    <shadedPattern>org.apache.hudi.org.openjdk.jol.</shadedPattern>
+                                </relocation>
                                 <!-- The classes below in org.apache.hadoop.metrics2 package come from
                 hbase-hadoop-compat and hbase-hadoop2-compat, which have to be shaded one by one,
                 instead of shading all classes under org.apache.hadoop.metrics2 including ones
diff --git a/packaging/hudi-presto-bundle/pom.xml b/packaging/hudi-presto-bundle/pom.xml
index d005896133..332fdd05f2 100644
--- a/packaging/hudi-presto-bundle/pom.xml
+++ b/packaging/hudi-presto-bundle/pom.xml
@@ -94,6 +94,7 @@
                   <include>commons-io:commons-io</include>
                   <include>commons-lang:commons-lang</include>
                   <include>com.google.protobuf:protobuf-java</include>
+                  <include>org.openjdk.jol:jol-core</include>
                 </includes>
               </artifactSet>
               <relocations>
@@ -164,6 +165,10 @@
                   <pattern>org.apache.parquet.avro.</pattern>
                   <shadedPattern>${presto.bundle.bootstrap.shade.prefix}org.apache.parquet.avro.</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>org.openjdk.jol.</pattern>
+                  <shadedPattern>org.apache.hudi.org.openjdk.jol.</shadedPattern>
+                </relocation>
                 <!-- The classes below in org.apache.hadoop.metrics2 package come from
                 hbase-hadoop-compat and hbase-hadoop2-compat, which have to be shaded one by one,
                 instead of shading all classes under org.apache.hadoop.metrics2 including ones
diff --git a/packaging/hudi-spark-bundle/pom.xml b/packaging/hudi-spark-bundle/pom.xml
index db430058de..d21842ddb0 100644
--- a/packaging/hudi-spark-bundle/pom.xml
+++ b/packaging/hudi-spark-bundle/pom.xml
@@ -135,6 +135,7 @@
                   <include>org.apache.curator:curator-recipes</include>
                   <include>commons-codec:commons-codec</include>
                   <include>commons-io:commons-io</include>
+                  <include>org.openjdk.jol:jol-core</include>
                 </includes>
               </artifactSet>
               <relocations>
@@ -220,6 +221,10 @@
                   <pattern>com.google.common.</pattern>
                   <shadedPattern>org.apache.hudi.com.google.common.</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>org.openjdk.jol.</pattern>
+                  <shadedPattern>org.apache.hudi.org.openjdk.jol.</shadedPattern>
+                </relocation>
                 <!-- TODO: Revisit GH ISSUE #533 & PR#633-->
                 <!-- The classes below in org.apache.hadoop.metrics2 package come from
                 hbase-hadoop-compat and hbase-hadoop2-compat, which have to be shaded one by one,
diff --git a/packaging/hudi-trino-bundle/pom.xml b/packaging/hudi-trino-bundle/pom.xml
index acca9e86ca..e378511fd2 100644
--- a/packaging/hudi-trino-bundle/pom.xml
+++ b/packaging/hudi-trino-bundle/pom.xml
@@ -94,6 +94,7 @@
                   <include>commons-lang:commons-lang</include>
                   <include>commons-io:commons-io</include>
                   <include>com.google.protobuf:protobuf-java</include>
+                  <include>org.openjdk.jol:jol-core</include>
                 </includes>
               </artifactSet>
               <relocations>
@@ -156,6 +157,10 @@
                   <pattern>com.google.protobuf.</pattern>
                   <shadedPattern>${trino.bundle.bootstrap.shade.prefix}com.google.protobuf.</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>org.openjdk.jol.</pattern>
+                  <shadedPattern>org.apache.hudi.org.openjdk.jol.</shadedPattern>
+                </relocation>
                 <!-- The classes below in org.apache.hadoop.metrics2 package come from
                 hbase-hadoop-compat and hbase-hadoop2-compat, which have to be shaded one by one,
                 instead of shading all classes under org.apache.hadoop.metrics2 including ones
diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml
index 6f53aec314..86d3f9f5b9 100644
--- a/packaging/hudi-utilities-bundle/pom.xml
+++ b/packaging/hudi-utilities-bundle/pom.xml
@@ -167,6 +167,7 @@
                   <include>org.apache.curator:curator-recipes</include>
                   <include>commons-codec:commons-codec</include>
                   <include>commons-io:commons-io</include>
+                  <include>org.openjdk.jol:jol-core</include>
                 </includes>
               </artifactSet>
               <relocations>
@@ -244,6 +245,10 @@
                   <pattern>org.eclipse.jetty.</pattern>
                   <shadedPattern>org.apache.hudi.org.eclipse.jetty.</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>org.openjdk.jol.</pattern>
+                  <shadedPattern>org.apache.hudi.org.openjdk.jol.</shadedPattern>
+                </relocation>
                 <!-- The classes below in org.apache.hadoop.metrics2 package come from
                 hbase-hadoop-compat and hbase-hadoop2-compat, which have to be shaded one by one,
                 instead of shading all classes under org.apache.hadoop.metrics2 including ones
diff --git a/pom.xml b/pom.xml
index 26a07130ba..7f591f8d3b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -197,6 +197,7 @@
     <protoc.version>3.21.5</protoc.version>
     <dynamodb.lockclient.version>1.1.0</dynamodb.lockclient.version>
     <zookeeper.version>3.5.7</zookeeper.version>
+    <openjdk.jol.version>0.16</openjdk.jol.version>
     <dynamodb-local.port>8000</dynamodb-local.port>
     <dynamodb-local.endpoint>http://localhost:${dynamodb-local.port}</dynamodb-local.endpoint>
     <springboot.version>2.7.3</springboot.version>
@@ -594,6 +595,12 @@
         <version>${scala.collection-compat.version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.openjdk.jol</groupId>
+        <artifactId>jol-core</artifactId>
+        <version>${openjdk.jol.version}</version>
+      </dependency>
+
       <!-- Logging -->
       <!-- NOTE: All the following deps have to have "provided" scope to make sure these are not conflicting
            w/ implementations that are using Hudi as a library. For ex, all Spark < 3.3 are still relying on Log4j1


[hudi] 15/17: [MINOR] fixing validate async operations to poll completed clean instances (#6814)

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-0.12.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 1de1dfcd840fbc1885ce4a41348defbafae7ca24
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Wed Sep 28 12:14:24 2022 -0700

    [MINOR] fixing validate async operations to poll completed clean instances (#6814)
---
 .../apache/hudi/integ/testsuite/dag/nodes/ValidateAsyncOperations.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateAsyncOperations.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateAsyncOperations.java
index 0835ec9722..714f3bf6ca 100644
--- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateAsyncOperations.java
+++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateAsyncOperations.java
@@ -62,7 +62,7 @@ public class ValidateAsyncOperations extends DagNode<Option<String>> {
         
         HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath)
             .setConf(executionContext.getJsc().hadoopConfiguration()).build();
-        Option<HoodieInstant> latestCleanInstant = metaClient.getActiveTimeline().filter(instant -> instant.getAction().equals(HoodieTimeline.CLEAN_ACTION)).lastInstant();
+        Option<HoodieInstant> latestCleanInstant = metaClient.getActiveTimeline().getCleanerTimeline().filterCompletedInstants().lastInstant();
         if (latestCleanInstant.isPresent()) {
           log.warn("Latest clean commit " + latestCleanInstant.get());
           HoodieCleanMetadata cleanMetadata = CleanerUtils.getCleanerMetadata(metaClient, latestCleanInstant.get());


[hudi] 11/17: [HUDI-4453] Fix schema to include partition columns in bootstrap operation (#6676)

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-0.12.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 739da48f1c19ac088b5b1c06efdaf28ad36275e9
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Tue Sep 27 20:00:59 2022 -0700

    [HUDI-4453] Fix schema to include partition columns in bootstrap operation (#6676)
    
    Turn off the type inference of the partition column to be consistent with
    existing behavior. Add notes around partition column type inference.
---
 .../HoodieSparkBootstrapSchemaProvider.java        | 33 +++++++++-------------
 .../org/apache/hudi/HoodieBootstrapRelation.scala  |  2 +-
 2 files changed, 14 insertions(+), 21 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
index e2a9e68372..b161182b83 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
@@ -18,15 +18,14 @@
 
 package org.apache.hudi.client.bootstrap;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieFileStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.bootstrap.FileStatusUtils;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.util.AvroOrcUtils;
-import org.apache.hudi.common.util.ParquetUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
@@ -36,8 +35,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.orc.OrcFile;
 import org.apache.orc.Reader;
 import org.apache.orc.TypeDescription;
-import org.apache.parquet.schema.MessageType;
-import org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter;
 import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.types.StructType;
 
@@ -72,26 +69,22 @@ public class HoodieSparkBootstrapSchemaProvider extends HoodieBootstrapSchemaPro
   }
 
   private static Schema getBootstrapSourceSchemaParquet(HoodieWriteConfig writeConfig, HoodieEngineContext context, Path filePath) {
-    Configuration hadoopConf = context.getHadoopConf().get();
-    MessageType parquetSchema = new ParquetUtils().readSchema(hadoopConf, filePath);
-
-    hadoopConf.set(
-        SQLConf.PARQUET_BINARY_AS_STRING().key(),
-        SQLConf.PARQUET_BINARY_AS_STRING().defaultValueString());
-    hadoopConf.set(
-        SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(),
-        SQLConf.PARQUET_INT96_AS_TIMESTAMP().defaultValueString());
-    hadoopConf.set(
-        SQLConf.CASE_SENSITIVE().key(),
-        SQLConf.CASE_SENSITIVE().defaultValueString());
-    ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(hadoopConf);
-
-    StructType sparkSchema = converter.convert(parquetSchema);
+    // NOTE: The type inference of partition column in the parquet table is turned off explicitly,
+    // to be consistent with the existing bootstrap behavior, where the partition column is String
+    // typed in Hudi table.
+    // TODO(HUDI-4932): add a config to allow type inference of partition column in bootstrap and
+    //  support other types of partition column as well
+    ((HoodieSparkEngineContext) context).getSqlContext()
+        .setConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE(), false);
+    StructType parquetSchema = ((HoodieSparkEngineContext) context).getSqlContext().read()
+        .option("basePath", writeConfig.getBootstrapSourceBasePath())
+        .parquet(filePath.toString())
+        .schema();
     String tableName = HoodieAvroUtils.sanitizeName(writeConfig.getTableName());
     String structName = tableName + "_record";
     String recordNamespace = "hoodie." + tableName;
 
-    return AvroConversionUtils.convertStructTypeToAvroSchema(sparkSchema, structName, recordNamespace);
+    return AvroConversionUtils.convertStructTypeToAvroSchema(parquetSchema, structName, recordNamespace);
   }
 
   private static Schema getBootstrapSourceSchemaOrc(HoodieWriteConfig writeConfig, HoodieEngineContext context, Path filePath) {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
index 4ec7f65913..0dd54237ef 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
@@ -146,7 +146,7 @@ class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext,
     if (fullSchema == null) {
       logInfo("Inferring schema..")
       val schemaResolver = new TableSchemaResolver(metaClient)
-      val tableSchema = TableSchemaResolver.appendPartitionColumns(schemaResolver.getTableAvroSchemaWithoutMetadataFields, metaClient.getTableConfig.getPartitionFields)
+      val tableSchema = schemaResolver.getTableAvroSchema(false)
       dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
       fullSchema = StructType(skeletonSchema.fields ++ dataSchema.fields)
     }


[hudi] 07/17: [HUDI-4907] Prevent single commit multi instant issue (#6766)

Posted by yu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yuzhaojing pushed a commit to branch release-0.12.1-rc1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 00d80215dfac458050d242844be1605cfe47ea15
Author: voonhous <vo...@gmail.com>
AuthorDate: Tue Sep 27 15:52:23 2022 +0800

    [HUDI-4907] Prevent single commit multi instant issue (#6766)
    
    
    Co-authored-by: TengHuo <te...@outlook.com>
    Co-authored-by: yuzhao.cyz <yu...@gmail.com>
---
 .../java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java   | 2 +-
 .../src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java        | 6 +++---
 .../src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java    | 3 ++-
 3 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index e3b0d82704..c87d5b2443 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -341,7 +341,7 @@ public class StreamWriteOperatorCoordinator
 
   private static CkpMetadata initCkpMetadata(HoodieTableMetaClient metaClient) throws IOException {
     CkpMetadata ckpMetadata = CkpMetadata.getInstance(metaClient.getFs(), metaClient.getBasePath());
-    ckpMetadata.bootstrap(metaClient);
+    ckpMetadata.bootstrap();
     return ckpMetadata;
   }
 
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
index 4cdebf986f..6895b2a0c6 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java
@@ -94,7 +94,7 @@ public class CkpMetadata implements Serializable {
    *
    * <p>This expects to be called by the driver.
    */
-  public void bootstrap(HoodieTableMetaClient metaClient) throws IOException {
+  public void bootstrap() throws IOException {
     fs.delete(path, true);
     fs.mkdirs(path);
   }
@@ -173,8 +173,8 @@ public class CkpMetadata implements Serializable {
   @Nullable
   public String lastPendingInstant() {
     load();
-    for (int i = this.messages.size() - 1; i >= 0; i--) {
-      CkpMessage ckpMsg = this.messages.get(i);
+    if (this.messages.size() > 0) {
+      CkpMessage ckpMsg = this.messages.get(this.messages.size() - 1);
       // consider 'aborted' as pending too to reuse the instant
       if (!ckpMsg.isComplete()) {
         return ckpMsg.getInstant();
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
index a6fb493b9b..fe7ce3f947 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java
@@ -32,6 +32,7 @@ import org.junit.jupiter.api.io.TempDir;
 import java.io.File;
 import java.util.stream.IntStream;
 
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
@@ -63,7 +64,7 @@ public class TestCkpMetadata {
 
     assertThat(metadata.lastPendingInstant(), is("2"));
     metadata.commitInstant("2");
-    assertThat(metadata.lastPendingInstant(), is("1"));
+    assertThat(metadata.lastPendingInstant(), equalTo(null));
 
     // test cleaning
     IntStream.range(3, 6).forEach(i -> metadata.startInstant(i + ""));