You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/07/16 04:02:25 UTC

[hudi] branch master updated: [HUDI-4403] Fix the end input metadata for bounded source (#6116)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0faa562b6f [HUDI-4403] Fix the end input metadata for bounded source (#6116)
0faa562b6f is described below

commit 0faa562b6fc4bf3368d099bf9b43381baf77dd1a
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Sat Jul 16 12:02:17 2022 +0800

    [HUDI-4403] Fix the end input metadata for bounded source (#6116)
---
 .../org/apache/hudi/sink/StreamWriteFunction.java  | 12 ++++++++++-
 .../hudi/sink/StreamWriteOperatorCoordinator.java  | 23 ++++++++++++--------
 .../sink/common/AbstractStreamWriteFunction.java   |  9 +++++++-
 .../java/org/apache/hudi/sink/utils/Pipelines.java | 22 +++++++++++--------
 .../apache/hudi/sink/ITTestDataStreamWrite.java    | 25 ++++------------------
 5 files changed, 50 insertions(+), 41 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 2748af5290..bbaba04144 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -408,6 +408,16 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
         && this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0);
   }
 
+  private void cleanWriteHandles() {
+    if (freshInstant(currentInstant)) {
+      // In rare cases, when a checkpoint was aborted and the instant time
+      // is reused, the merge handle generates a new file name
+      // with the reused instant time of last checkpoint, the write handles
+      // should be kept and reused in case data loss.
+      this.writeClient.cleanHandles();
+    }
+  }
+
   @SuppressWarnings("unchecked, rawtypes")
   private boolean flushBucket(DataBucket bucket) {
     String instant = instantToWrite(true);
@@ -479,7 +489,7 @@ public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
     this.eventGateway.sendEventToCoordinator(event);
     this.buckets.clear();
     this.tracer.reset();
-    this.writeClient.cleanHandles();
+    cleanWriteHandles();
     this.writeStatuses.addAll(writeStatus);
     // blocks flushing until the coordinator starts a new instant
     this.confirming = true;
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 6aa4c0b1f8..b726b02cad 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -365,7 +365,10 @@ public class StreamWriteOperatorCoordinator
    */
   private boolean allEventsReceived() {
     return Arrays.stream(eventBuffer)
-        .allMatch(event -> event != null && event.isReady(this.instant));
+        // we do not use event.isReady to check the instant
+        // because the write task may send an event eagerly for empty
+        // data set, the even may have a timestamp of last committed instant.
+        .allMatch(event -> event != null && event.isLastBatch());
   }
 
   private void addEventToBuffer(WriteMetadataEvent event) {
@@ -425,12 +428,14 @@ public class StreamWriteOperatorCoordinator
     addEventToBuffer(event);
     if (allEventsReceived()) {
       // start to commit the instant.
-      commitInstant(this.instant);
-      // The executor thread inherits the classloader of the #handleEventFromOperator
-      // caller, which is a AppClassLoader.
-      Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
-      // sync Hive synchronously if it is enabled in batch mode.
-      syncHive();
+      boolean committed = commitInstant(this.instant);
+      if (committed) {
+        // The executor thread inherits the classloader of the #handleEventFromOperator
+        // caller, which is a AppClassLoader.
+        Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+        // sync Hive synchronously if it is enabled in batch mode.
+        syncHive();
+      }
     }
   }
 
@@ -474,8 +479,8 @@ public class StreamWriteOperatorCoordinator
   /**
    * Commits the instant.
    */
-  private void commitInstant(String instant) {
-    commitInstant(instant, -1);
+  private boolean commitInstant(String instant) {
+    return commitInstant(instant, -1);
   }
 
   /**
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
index 6cf3d10fc2..04b7f43547 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
@@ -243,6 +243,13 @@ public abstract class AbstractStreamWriteFunction<I>
     return this.ckpMetadata.lastPendingInstant();
   }
 
+  /**
+   * Returns whether the instant is fresh new(not aborted).
+   */
+  protected boolean freshInstant(String instant) {
+    return !this.ckpMetadata.isAborted(instant);
+  }
+
   /**
    * Prepares the instant time to write with for next checkpoint.
    *
@@ -279,6 +286,6 @@ public abstract class AbstractStreamWriteFunction<I>
    * Returns whether the pending instant is invalid to write with.
    */
   private boolean invalidInstant(String instant, boolean hasData) {
-    return instant.equals(this.currentInstant) && hasData && !this.ckpMetadata.isAborted(instant);
+    return instant.equals(this.currentInstant) && hasData && freshInstant(instant);
   }
 }
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 87a6551986..31355255f9 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -358,9 +358,9 @@ public class Pipelines {
    * The whole pipeline looks like the following:
    *
    * <pre>
-   *                                           /=== | task1 | ===\
-   *      | plan generation | ===> hash                           | commit |
-   *                                           \=== | task2 | ===/
+   *                                     /=== | task1 | ===\
+   *      | plan generation | ===> hash                      | commit |
+   *                                     \=== | task2 | ===/
    *
    *      Note: both the compaction plan generation task and commission task are singleton.
    * </pre>
@@ -374,6 +374,8 @@ public class Pipelines {
             TypeInformation.of(CompactionPlanEvent.class),
             new CompactionPlanOperator(conf))
         .setParallelism(1) // plan generate must be singleton
+        // make the distribution strategy deterministic to avoid concurrent modifications
+        // on the same bucket files
         .keyBy(plan -> plan.getOperation().getFileGroupId().getFileId())
         .transform("compact_task",
             TypeInformation.of(CompactionCommitEvent.class),
@@ -393,9 +395,9 @@ public class Pipelines {
    * The whole pipeline looks like the following:
    *
    * <pre>
-   *                                           /=== | task1 | ===\
-   *      | plan generation | ===> hash                           | commit |
-   *                                           \=== | task2 | ===/
+   *                                     /=== | task1 | ===\
+   *      | plan generation | ===> hash                      | commit |
+   *                                     \=== | task2 | ===/
    *
    *      Note: both the clustering plan generation task and commission task are singleton.
    * </pre>
@@ -410,9 +412,11 @@ public class Pipelines {
             TypeInformation.of(ClusteringPlanEvent.class),
             new ClusteringPlanOperator(conf))
         .setParallelism(1) // plan generate must be singleton
-        .keyBy(plan -> plan.getClusteringGroupInfo().getOperations()
-          .stream().map(ClusteringOperation::getFileId)
-          .collect(Collectors.joining()))
+        .keyBy(plan ->
+            // make the distribution strategy deterministic to avoid concurrent modifications
+            // on the same bucket files
+            plan.getClusteringGroupInfo().getOperations()
+                .stream().map(ClusteringOperation::getFileId).collect(Collectors.joining()))
         .transform("clustering_task",
             TypeInformation.of(ClusteringCommitEvent.class),
             new ClusteringOperator(conf, rowType))
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index 3d96c1cafa..1589cf31e7 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -248,21 +248,8 @@ public class ITTestDataStreamWrite extends TestLogger {
       Pipelines.clean(conf, pipeline);
       Pipelines.compact(conf, pipeline);
     }
-    JobClient client = execEnv.executeAsync(jobName);
-    if (isMor) {
-      if (client.getJobStatus().get() != JobStatus.FAILED) {
-        try {
-          TimeUnit.SECONDS.sleep(20); // wait long enough for the compaction to finish
-          client.cancel();
-        } catch (Throwable var1) {
-          // ignored
-        }
-      }
-    } else {
-      // wait for the streaming job to finish
-      client.getJobExecutionResult().get();
-    }
 
+    execute(execEnv, isMor, jobName);
     TestData.checkWrittenDataCOW(tempFile, expected);
   }
 
@@ -322,17 +309,14 @@ public class ITTestDataStreamWrite extends TestLogger {
     execEnv.addOperator(pipeline.getTransformation());
 
     Pipelines.cluster(conf, rowType, pipeline);
-    JobClient client = execEnv.executeAsync(jobName);
-
-    // wait for the streaming job to finish
-    client.getJobExecutionResult().get();
+    execEnv.execute(jobName);
 
     TestData.checkWrittenDataCOW(tempFile, expected);
   }
 
   public void execute(StreamExecutionEnvironment execEnv, boolean isMor, String jobName) throws Exception {
-    JobClient client = execEnv.executeAsync(jobName);
     if (isMor) {
+      JobClient client = execEnv.executeAsync(jobName);
       if (client.getJobStatus().get() != JobStatus.FAILED) {
         try {
           TimeUnit.SECONDS.sleep(20); // wait long enough for the compaction to finish
@@ -343,7 +327,7 @@ public class ITTestDataStreamWrite extends TestLogger {
       }
     } else {
       // wait for the streaming job to finish
-      client.getJobExecutionResult().get();
+      execEnv.execute(jobName);
     }
   }
 
@@ -451,5 +435,4 @@ public class ITTestDataStreamWrite extends TestLogger {
     execute(execEnv, true, "Api_Sink_Test");
     TestData.checkWrittenDataCOW(tempFile, EXPECTED);
   }
-
 }