You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/01/17 09:32:43 UTC

[GitHub] [hudi] codope commented on a change in pull request #4420: [HUDI-1847] Adding inline scheduling support for spark datasource path for compaction and clustering

codope commented on a change in pull request #4420:
URL: https://github.com/apache/hudi/pull/4420#discussion_r785765597



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -464,27 +464,43 @@ protected void postCommit(HoodieTable<T, I, K, O> table, HoodieCommitMetadata me
   }
 
   protected void runTableServicesInline(HoodieTable<T, I, K, O> table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
-    if (config.areAnyTableServicesInline()) {
+    if (config.areAnyTableServicesInline() || config.scheduleInlineTableServices()) {
       if (config.isMetadataTableEnabled()) {
         table.getHoodieView().sync();
       }
       // Do an inline compaction if enabled
       if (config.inlineCompactionEnabled()) {
         runAnyPendingCompactions(table);
         metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "true");
-        inlineCompact(extraMetadata);
+        inlineScheduleCompactAndOptionallyExecute(extraMetadata, !config.scheduleInlineCompaction());

Review comment:
       Let's say INLINE_COMPACT is true and SCHEDULE_INLINE_COMPACT is also true. Then `inlineScheduleCompactAndOptionallyExecute` will schedule compaction but never execute right? This sounds counter-intuitive from user point of view. Don't you think?

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java
##########
@@ -177,6 +177,12 @@
       .withDocumentation("Determines how to handle updates, deletes to file groups that are under clustering."
           + " Default strategy just rejects the update");
 
+  public static final ConfigProperty<String> SCHEDULE_INLINE_CLUSTERING = ConfigProperty
+      .key("hoodie.clustering.schedule.inline")
+      .defaultValue("false")
+      .withDocumentation("When set to true, clustering service will be attempted for inline scheduling after each write. Users have to ensure "

Review comment:
       I think we should add more details regarding how it interacts with (or differs from) other inline/async configs otherwise it might confuse users why we have separate configs. First, we could mention what will happen if this config is enabled together with the existing inline/async compaction/clustering configs. Secondly, we can use the corresponding config keys in the doc to be more explicit.

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -464,27 +464,43 @@ protected void postCommit(HoodieTable<T, I, K, O> table, HoodieCommitMetadata me
   }
 
   protected void runTableServicesInline(HoodieTable<T, I, K, O> table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
-    if (config.areAnyTableServicesInline()) {
+    if (config.areAnyTableServicesInline() || config.scheduleInlineTableServices()) {
       if (config.isMetadataTableEnabled()) {
         table.getHoodieView().sync();
       }
       // Do an inline compaction if enabled
       if (config.inlineCompactionEnabled()) {
         runAnyPendingCompactions(table);
         metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "true");

Review comment:
       Should we move this as well inside `inlineScheduleCompactAndOptionallyExecute` and set only when execution happens?

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
##########
@@ -133,6 +134,56 @@ public void testSimpleInsertAndUpdate(HoodieFileFormat fileFormat, boolean popul
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testInlineScheduleCompaction(boolean scheduleInlineCompaction) throws Exception {
+    HoodieFileFormat fileFormat = HoodieFileFormat.PARQUET;
+    Properties properties = new Properties();
+    properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), fileFormat.toString());
+    HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
+
+    HoodieWriteConfig cfg = getConfigBuilder(false)
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
+            .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(2).withPreserveCommitMetadata(true).withScheduleInlineCompaction(scheduleInlineCompaction).build())
+        .build();
+    try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+
+      HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+      /*
+       * Write 1 (only inserts)
+       */
+      String newCommitTime = "001";
+      client.startCommitWithTime(newCommitTime);
+
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
+      Stream<HoodieBaseFile> dataFiles = insertRecordsToMORTable(metaClient, records, client, cfg, newCommitTime, true);
+      assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit");
+
+      /*
+       * Write 2 (updates)
+       */
+      newCommitTime = "004";
+      client.startCommitWithTime(newCommitTime);
+      records = dataGen.generateUpdates(newCommitTime, 100);
+      updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime, true);
+
+      // validate compaction has been scheduled inline
+      /*HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient);
+      hoodieTable.getHoodieView().sync();
+      FileStatus[] allFiles = listAllBaseFilesInPath(hoodieTable);
+      HoodieTableFileSystemView tableView = getHoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles);
+      Stream<HoodieBaseFile> dataFilesToRead = tableView.getLatestBaseFiles();
+      assertTrue(dataFilesToRead.findAny().isPresent());*/

Review comment:
       Let's remove this if not needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org