You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/03/05 02:04:51 UTC

[hudi] branch master updated: [HUDI-2761] Fixing timeline server for repeated refreshes (#4812)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a46130  [HUDI-2761] Fixing timeline server for repeated refreshes (#4812)
6a46130 is described below

commit 6a46130037e6be48d6c014e513dbca6e940b83d8
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Fri Mar 4 21:04:16 2022 -0500

    [HUDI-2761] Fixing timeline server for repeated refreshes (#4812)
    
    * Fixing timeline server for repeated refreshes
---
 .../TestHoodieSparkMergeOnReadTableRollback.java          | 15 +++++----------
 .../apache/hudi/sink/StreamWriteOperatorCoordinator.java  |  5 +----
 .../org/apache/hudi/timeline/service/RequestHandler.java  |  8 ++++++--
 3 files changed, 12 insertions(+), 16 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
index d552955..7655cf9 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
@@ -21,7 +21,6 @@ package org.apache.hudi.table.functional;
 
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -150,11 +149,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
   void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) throws Exception {
     // NOTE: First writer will have Metadata table DISABLED
     HoodieWriteConfig.Builder cfgBuilder =
-        getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.SIMPLE)
-            .withMetadataConfig(
-                HoodieMetadataConfig.newBuilder()
-                    .enable(false)
-                    .build());
+        getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.SIMPLE);
     
     addConfigsForPopulateMetaFields(cfgBuilder, true);
     HoodieWriteConfig cfg = cfgBuilder.build();
@@ -209,7 +204,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
       final String commitTime1 = "002";
       // WriteClient with custom config (disable small file handling)
       // NOTE: Second writer will have Metadata table ENABLED
-      try (SparkRDDWriteClient secondClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(false));) {
+      try (SparkRDDWriteClient secondClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(true));) {
         secondClient.startCommitWithTime(commitTime1);
 
         List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
@@ -245,8 +240,8 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
       /*
        * Write 3 (inserts + updates - testing successful delta commit)
        */
-      final String commitTime2 = "002";
-      try (SparkRDDWriteClient thirdClient = getHoodieWriteClient(cfg);) {
+      final String commitTime2 = "003";
+      try (SparkRDDWriteClient thirdClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(true));) {
         thirdClient.startCommitWithTime(commitTime2);
 
         List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
@@ -287,7 +282,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
         /*
          * Write 4 (updates)
          */
-        newCommitTime = "003";
+        newCommitTime = "004";
         thirdClient.startCommitWithTime(newCommitTime);
 
         writeStatusJavaRDD = thirdClient.upsert(writeRecords, newCommitTime);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 4782070..f3253e4 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -23,7 +23,6 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Option;
@@ -337,11 +336,9 @@ public class StreamWriteOperatorCoordinator
   }
 
   private void startInstant() {
-    final String instant = HoodieActiveTimeline.createNewInstantTime();
     // put the assignment in front of metadata generation,
     // because the instant request from write task is asynchronous.
-    this.instant = instant;
-    this.writeClient.startCommitWithTime(instant, tableState.commitAction);
+    this.instant = this.writeClient.startCommit();
     this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, this.instant);
     LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
         this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE));
diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
index 4744fbb..602ab16 100644
--- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
+++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
@@ -123,18 +123,22 @@ public class RequestHandler {
     String timelineHashFromClient = ctx.queryParam(RemoteHoodieTableFileSystemView.TIMELINE_HASH, "");
     HoodieTimeline localTimeline =
         viewManager.getFileSystemView(basePath).getTimeline().filterCompletedAndCompactionInstants();
+    String localLastKnownInstant = localTimeline.lastInstant().isPresent() ? localTimeline.lastInstant().get().getTimestamp()
+        : HoodieTimeline.INVALID_INSTANT_TS;
     if (LOG.isDebugEnabled()) {
       LOG.debug("Client [ LastTs=" + lastKnownInstantFromClient + ", TimelineHash=" + timelineHashFromClient
           + "], localTimeline=" + localTimeline.getInstants().collect(Collectors.toList()));
     }
 
-    if ((localTimeline.getInstants().count() == 0)
+    if ((!localTimeline.getInstants().findAny().isPresent())
         && HoodieTimeline.INVALID_INSTANT_TS.equals(lastKnownInstantFromClient)) {
       return false;
     }
 
     String localTimelineHash = localTimeline.getTimelineHash();
-    if (!localTimelineHash.equals(timelineHashFromClient)) {
+    // refresh if timeline hash mismatches and if local's last known instant < client's last known instant
+    if (!localTimelineHash.equals(timelineHashFromClient)
+        && HoodieTimeline.compareTimestamps(localLastKnownInstant, HoodieTimeline.LESSER_THAN, lastKnownInstantFromClient)) {
       return true;
     }