You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2022/02/22 05:01:58 UTC

[hudi] branch master updated: [HUDI-2189] Adding delete partitions support to DeltaStreamer (#4787)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 14dbbdf  [HUDI-2189] Adding delete partitions support to DeltaStreamer (#4787)
14dbbdf is described below

commit 14dbbdf4c7a45ab5a10889f8e558984455315829
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Tue Feb 22 00:01:30 2022 -0500

    [HUDI-2189] Adding delete partitions support to DeltaStreamer (#4787)
---
 .../hudi/utilities/deltastreamer/DeltaSync.java    |  4 ++
 .../functional/HoodieDeltaStreamerTestBase.java    |  2 +-
 .../functional/TestHoodieDeltaStreamer.java        | 57 +++++++++++++++++++++-
 3 files changed, 61 insertions(+), 2 deletions(-)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index c376243..082a9b1 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -585,6 +585,10 @@ public class DeltaSync implements Serializable {
       case INSERT_OVERWRITE_TABLE:
         writeStatusRDD = writeClient.insertOverwriteTable(records, instantTime).getWriteStatuses();
         break;
+      case DELETE_PARTITION:
+        List<String> partitions = records.map(record -> record.getPartitionPath()).distinct().collect();
+        writeStatusRDD = writeClient.deletePartitions(partitions, instantTime).getWriteStatuses();
+        break;
       default:
         throw new HoodieDeltaStreamerException("Unknown operation : " + cfg.operation);
     }
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java
index 9b7ee3b..02b1848 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java
@@ -173,7 +173,7 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase {
     props.setProperty("include", "sql-transformer.properties");
     props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName());
     props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
-    props.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
+    props.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
     props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
     props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc");
 
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index a6fdf00..1c80896 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -272,6 +272,19 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
       assertEquals(expected, recordCount);
     }
 
+    static Map<String, Long> getPartitionRecordCount(String basePath, SQLContext sqlContext) {
+      sqlContext.clearCache();
+      List<Row> rows = sqlContext.read().format("org.apache.hudi").load(basePath).groupBy(HoodieRecord.PARTITION_PATH_METADATA_FIELD).count().collectAsList();
+      Map<String, Long> partitionRecordCount = new HashMap<>();
+      rows.stream().forEach(row -> partitionRecordCount.put(row.getString(0), row.getLong(1)));
+      return partitionRecordCount;
+    }
+
+    static void assertNoPartitionMatch(String basePath, SQLContext sqlContext, String partitionToValidate) {
+      sqlContext.clearCache();
+      assertEquals(0, sqlContext.read().format("org.apache.hudi").load(basePath).filter(HoodieRecord.PARTITION_PATH_METADATA_FIELD + " = " + partitionToValidate).count());
+    }
+
     static void assertDistinctRecordCount(long expected, String tablePath, SQLContext sqlContext) {
       sqlContext.clearCache();
       long recordCount = sqlContext.read().format("org.apache.hudi").load(tablePath).select("_hoodie_record_key").distinct().count();
@@ -1378,6 +1391,13 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
 
   private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile,
                                        String propsFileName, String parquetSourceRoot, boolean addCommonProps) throws IOException {
+    prepareParquetDFSSource(useSchemaProvider, hasTransformer, sourceSchemaFile, targetSchemaFile, propsFileName, parquetSourceRoot, addCommonProps,
+        "not_there");
+  }
+
+  private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer, String sourceSchemaFile, String targetSchemaFile,
+                                       String propsFileName, String parquetSourceRoot, boolean addCommonProps,
+                                       String partitionPath) throws IOException {
     // Properties used for testing delta-streamer with Parquet source
     TypedProperties parquetProps = new TypedProperties();
 
@@ -1388,7 +1408,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     parquetProps.setProperty("include", "base.properties");
     parquetProps.setProperty("hoodie.embed.timeline.server", "false");
     parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
-    parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
+    parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", partitionPath);
     if (useSchemaProvider) {
       parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/" + sourceSchemaFile);
       if (hasTransformer) {
@@ -1855,6 +1875,31 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     testDeltaStreamerWithSpecifiedOperation(dfsBasePath + "/insert_overwrite_table", WriteOperationType.INSERT_OVERWRITE_TABLE);
   }
 
+  @Test
+  public void testDeletePartitions() throws Exception {
+    prepareParquetDFSSource(false, false, "source.avsc", "target.avsc",
+        PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false, "partition_path");
+    String tableBasePath = dfsBasePath + "/test_parquet_table" + testNum;
+    HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+        TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
+            null, PROPS_FILENAME_TEST_PARQUET, false,
+            false, 100000, false, null, null, "timestamp", null), jsc);
+    deltaStreamer.sync();
+    TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath, sqlContext);
+    testNum++;
+
+    prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
+    prepareParquetDFSSource(false, false);
+    // set write operation to DELETE_PARTITION and add transformer to filter only for records with partition HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION
+    deltaStreamer = new HoodieDeltaStreamer(
+        TestHelpers.makeConfig(tableBasePath, WriteOperationType.DELETE_PARTITION, ParquetDFSSource.class.getName(),
+            Collections.singletonList(TestSpecificPartitionTransformer.class.getName()), PROPS_FILENAME_TEST_PARQUET, false,
+            false, 100000, false, null, null, "timestamp", null), jsc);
+    deltaStreamer.sync();
+    // No records should match the HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION.
+    TestHelpers.assertNoPartitionMatch(tableBasePath, sqlContext, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
+  }
+  
   void testDeltaStreamerWithSpecifiedOperation(final String tableBasePath, WriteOperationType operationType) throws Exception {
     // Initial insert
     HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT);
@@ -2001,6 +2046,16 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
     }
   }
 
+  public static class TestSpecificPartitionTransformer implements Transformer {
+
+    @Override
+    public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
+                              TypedProperties properties) {
+      Dataset<Row> toReturn = rowDataset.filter("partition_path == '" + HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH + "'");
+      return toReturn;
+    }
+  }
+
   /**
    * Add new field evoluted_optional_union_field with value of the field rider.
    */