You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/10/28 05:12:02 UTC

[GitHub] [hudi] xushiyan commented on a diff in pull request #6983: [HUDI-5031]Hudi merge into creates empty partition files when the sou…

xushiyan commented on code in PR #6983:
URL: https://github.com/apache/hudi/pull/6983#discussion_r1007622645


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java:
##########
@@ -69,28 +73,29 @@ public CopyOnWriteInsertHandler(HoodieWriteConfig config, String instantTime,
 
   @Override
   public void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) {
-    final HoodieRecord insertPayload = payload.record;
+    final HoodieRecord<T> insertPayload = payload.record;
     String partitionPath = insertPayload.getPartitionPath();
     HoodieWriteHandle<?,?,?,?> handle = handles.get(partitionPath);
-    if (handle == null) {
-      // If the records are sorted, this means that we encounter a new partition path
-      // and the records for the previous partition path are all written,
-      // so we can safely closely existing open handle to reduce memory footprint.
-      if (areRecordsSorted) {
-        closeOpenHandles();
+    if (handle == null || !handle.canWrite(payload.record)) {
+      if (handle == null) {
+        // If the records are sorted, this means that we encounter a new partition path
+        // and the records for the previous partition path are all written,
+        // so we can safely closely existing open handle to reduce memory footprint.
+        if (areRecordsSorted) {
+          closeOpenHandles();
+        }
+      } else {
+        // Handle is full. Close the handle and add the WriteStatus
+        statuses.addAll(handle.close());
       }
-      // Lazily initialize the handle, for the first time
-      handle = writeHandleFactory.create(config, instantTime, hoodieTable,
-          insertPayload.getPartitionPath(), idPrefix, taskContextSupplier);
-      handles.put(partitionPath, handle);
-    }
-
-    if (!handle.canWrite(payload.record)) {
-      // Handle is full. Close the handle and add the WriteStatus
-      statuses.addAll(handle.close());
-      // Open new handle
+      Option<IndexedRecord> insertRecord = payload.insertValue;
+      // just skip the ignored record,do not make partitions on fs
+      if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) {
+        return;
+      }
+      // Lazily initialize the handle for the first time or Open new handle
       handle = writeHandleFactory.create(config, instantTime, hoodieTable,
-          insertPayload.getPartitionPath(), idPrefix, taskContextSupplier);
+              insertPayload.getPartitionPath(), idPrefix, taskContextSupplier);

Review Comment:
   unnecessary change



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java:
##########
@@ -69,28 +73,29 @@ public CopyOnWriteInsertHandler(HoodieWriteConfig config, String instantTime,
 
   @Override
   public void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) {
-    final HoodieRecord insertPayload = payload.record;
+    final HoodieRecord<T> insertPayload = payload.record;
     String partitionPath = insertPayload.getPartitionPath();
     HoodieWriteHandle<?,?,?,?> handle = handles.get(partitionPath);
-    if (handle == null) {
-      // If the records are sorted, this means that we encounter a new partition path
-      // and the records for the previous partition path are all written,
-      // so we can safely closely existing open handle to reduce memory footprint.
-      if (areRecordsSorted) {
-        closeOpenHandles();
+    if (handle == null || !handle.canWrite(payload.record)) {
+      if (handle == null) {

Review Comment:
   why change this logic flow? pls revert unnecessary changes



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java:
##########
@@ -69,28 +73,29 @@ public CopyOnWriteInsertHandler(HoodieWriteConfig config, String instantTime,
 
   @Override
   public void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) {
-    final HoodieRecord insertPayload = payload.record;
+    final HoodieRecord<T> insertPayload = payload.record;
     String partitionPath = insertPayload.getPartitionPath();
     HoodieWriteHandle<?,?,?,?> handle = handles.get(partitionPath);
-    if (handle == null) {
-      // If the records are sorted, this means that we encounter a new partition path
-      // and the records for the previous partition path are all written,
-      // so we can safely closely existing open handle to reduce memory footprint.
-      if (areRecordsSorted) {
-        closeOpenHandles();
+    if (handle == null || !handle.canWrite(payload.record)) {
+      if (handle == null) {
+        // If the records are sorted, this means that we encounter a new partition path
+        // and the records for the previous partition path are all written,
+        // so we can safely closely existing open handle to reduce memory footprint.
+        if (areRecordsSorted) {
+          closeOpenHandles();
+        }
+      } else {
+        // Handle is full. Close the handle and add the WriteStatus
+        statuses.addAll(handle.close());
       }
-      // Lazily initialize the handle, for the first time
-      handle = writeHandleFactory.create(config, instantTime, hoodieTable,
-          insertPayload.getPartitionPath(), idPrefix, taskContextSupplier);
-      handles.put(partitionPath, handle);
-    }
-
-    if (!handle.canWrite(payload.record)) {
-      // Handle is full. Close the handle and add the WriteStatus
-      statuses.addAll(handle.close());
-      // Open new handle
+      Option<IndexedRecord> insertRecord = payload.insertValue;
+      // just skip the ignored record,do not make partitions on fs
+      if (insertRecord.isPresent() && insertRecord.get().equals(IGNORE_RECORD)) {
+        return;
+      }

Review Comment:
   if you want to return based on these conditions, why not return early?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org