You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/05/16 09:40:17 UTC

[hudi] branch master updated: [HUDI-4098] Metadata table heartbeat for instant has expired, last heartbeat 0 (#5583)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 43e08193ef [HUDI-4098] Metadata table heartbeat for instant has expired, last heartbeat 0 (#5583)
43e08193ef is described below

commit 43e08193ef712c19bf986c65a184991c6de960b6
Author: Danny Chan <yu...@gmail.com>
AuthorDate: Mon May 16 17:40:08 2022 +0800

    [HUDI-4098] Metadata table heartbeat for instant has expired, last heartbeat 0 (#5583)
---
 .../FlinkHoodieBackedTableMetadataWriter.java      |  5 +++
 .../sink/TestStreamWriteOperatorCoordinator.java   | 45 ++++++++++++++++++++++
 2 files changed, 50 insertions(+)

diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 76774e9618..222ff78edc 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -138,6 +138,11 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad
         // reuses the same instant time without rollback first.  It is a no-op here as the
         // clean plan is the same, so we don't need to delete the requested and inflight instant
         // files in the active timeline.
+
+        // The metadata writer uses LAZY cleaning strategy without auto commit,
+        // write client then checks the heartbeat expiration when committing the instant,
+        // sets up the heartbeat explicitly to make the check pass.
+        writeClient.getHeartbeatClient().start(instantTime);
       }
 
       List<WriteStatus> statuses = preppedRecordList.size() > 0
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index 55885dcab5..59a0580e56 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -22,6 +22,8 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
@@ -253,6 +255,49 @@ public class TestStreamWriteOperatorCoordinator {
     assertThat(completedTimeline.nthFromLastInstant(1).get().getAction(), is(HoodieTimeline.COMMIT_ACTION));
   }
 
+  @Test
+  void testSyncMetadataTableWithReusedInstant() throws Exception {
+    // reset
+    reset();
+    // override the default configuration
+    Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
+    OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 1);
+    coordinator = new StreamWriteOperatorCoordinator(conf, context);
+    coordinator.start();
+    coordinator.setExecutor(new MockCoordinatorExecutor(context));
+
+    final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0);
+
+    coordinator.handleEventFromOperator(0, event0);
+
+    String instant = coordinator.getInstant();
+    assertNotEquals("", instant);
+
+    final String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath());
+    HoodieTableMetaClient metadataTableMetaClient = StreamerUtil.createMetaClient(metadataTableBasePath, HadoopConfigurations.getHadoopConf(conf));
+    HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
+    assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(1L));
+    assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP));
+
+    // writes a normal commit
+    mockWriteWithMetadata();
+    instant = coordinator.getInstant();
+    // creates an inflight commit on the metadata timeline
+    metadataTableMetaClient.getActiveTimeline()
+        .createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieActiveTimeline.DELTA_COMMIT_ACTION, instant));
+    metadataTableMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instant);
+    metadataTableMetaClient.reloadActiveTimeline();
+
+    // write another commit with existing instant on the metadata timeline
+    instant = mockWriteWithMetadata();
+    metadataTableMetaClient.reloadActiveTimeline();
+
+    completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
+    assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(3L));
+    assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant));
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------