You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/06/28 07:11:15 UTC

[GitHub] [hudi] danny0405 commented on a change in pull request #3168: [HUDI-2084] Resend the uncommitted write metadata when start up

danny0405 commented on a change in pull request #3168:
URL: https://github.com/apache/hudi/pull/3168#discussion_r659532453



##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
##########
@@ -291,30 +257,115 @@ private void initHiveSync() {
     this.hiveSyncContext = HiveSyncContext.create(conf);
   }
 
+  private void syncHiveIfEnabled() {
+    if (conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED)) {
+      this.hiveSyncExecutor.execute(this::syncHive, "sync hive metadata for instant %s", this.instant);
+    }
+  }
+
+  /**
+   * Sync hoodie table metadata to Hive metastore.
+   */
+  public void syncHive() {
+    hiveSyncContext.hiveSyncTool().syncHoodieTable();
+  }
+
   private void reset() {
-    this.eventBuffer = new BatchWriteSuccessEvent[this.parallelism];
+    this.eventBuffer = new WriteMetadataEvent[this.parallelism];
   }
 
-  /** Checks the buffer is ready to commit. */
+  /**
+   * Checks the buffer is ready to commit.
+   */
   private boolean allEventsReceived() {
     return Arrays.stream(eventBuffer)
         .allMatch(event -> event != null && event.isReady(this.instant));
   }
 
+  private void addEventToBuffer(WriteMetadataEvent event) {
+    if (this.eventBuffer[event.getTaskID()] != null) {
+      this.eventBuffer[event.getTaskID()].mergeWith(event);
+    } else {
+      this.eventBuffer[event.getTaskID()] = event;
+    }
+  }
+
+  private void startInstant() {
+    final String instant = HoodieActiveTimeline.createNewInstantTime();
+    this.writeClient.startCommitWithTime(instant, tableState.commitAction);
+    this.instant = instant;
+    this.writeClient.transitionRequestedToInflight(tableState.commitAction, this.instant);
+    LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
+        this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE));
+  }
+
+  /**
+   * Initializes the instant.
+   *
+   * <p>Recommits the last inflight instant if the write metadata checkpoint successfully
+   * but was not committed due to some rare cases.
+   *
+   * <p>Starts a new instant, a writer can not flush data buffer
+   * until it finds a new inflight instant on the timeline.
+   */
+  private void initInstant(String instant) {
+    HoodieTimeline completedTimeline =
+        StreamerUtil.createMetaClient(conf).getActiveTimeline().filterCompletedInstants();
+    executor.execute(() -> {
+      if (instant.equals("") || completedTimeline.containsInstant(instant)) {
+        // the last instant committed successfully
+        reset();
+      } else {
+        commitInstant(instant);

Review comment:
       Add a log here:
   ```java
   LOG.info("Recommit instant {}", instant);
   ```

##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
##########
@@ -291,30 +257,115 @@ private void initHiveSync() {
     this.hiveSyncContext = HiveSyncContext.create(conf);
   }
 
+  private void syncHiveIfEnabled() {
+    if (conf.getBoolean(FlinkOptions.HIVE_SYNC_ENABLED)) {
+      this.hiveSyncExecutor.execute(this::syncHive, "sync hive metadata for instant %s", this.instant);
+    }
+  }
+
+  /**
+   * Sync hoodie table metadata to Hive metastore.
+   */
+  public void syncHive() {
+    hiveSyncContext.hiveSyncTool().syncHoodieTable();
+  }
+
   private void reset() {
-    this.eventBuffer = new BatchWriteSuccessEvent[this.parallelism];
+    this.eventBuffer = new WriteMetadataEvent[this.parallelism];
   }
 
-  /** Checks the buffer is ready to commit. */
+  /**
+   * Checks the buffer is ready to commit.
+   */
   private boolean allEventsReceived() {
     return Arrays.stream(eventBuffer)
         .allMatch(event -> event != null && event.isReady(this.instant));
   }
 
+  private void addEventToBuffer(WriteMetadataEvent event) {
+    if (this.eventBuffer[event.getTaskID()] != null) {
+      this.eventBuffer[event.getTaskID()].mergeWith(event);
+    } else {
+      this.eventBuffer[event.getTaskID()] = event;
+    }
+  }
+
+  private void startInstant() {
+    final String instant = HoodieActiveTimeline.createNewInstantTime();
+    this.writeClient.startCommitWithTime(instant, tableState.commitAction);
+    this.instant = instant;
+    this.writeClient.transitionRequestedToInflight(tableState.commitAction, this.instant);
+    LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
+        this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE));
+  }
+
+  /**
+   * Initializes the instant.
+   *
+   * <p>Recommits the last inflight instant if the write metadata checkpoint successfully
+   * but was not committed due to some rare cases.
+   *
+   * <p>Starts a new instant, a writer can not flush data buffer
+   * until it finds a new inflight instant on the timeline.
+   */
+  private void initInstant(String instant) {
+    HoodieTimeline completedTimeline =
+        StreamerUtil.createMetaClient(conf).getActiveTimeline().filterCompletedInstants();
+    executor.execute(() -> {
+      if (instant.equals("") || completedTimeline.containsInstant(instant)) {
+        // the last instant committed successfully
+        reset();
+      } else {
+        commitInstant(instant);
+      }
+      // starts a new instant
+      startInstant();
+    }, "recommit instant %s", instant);
+  }

Review comment:
       recommit -> initialize




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org