You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2022/03/05 02:04:51 UTC
[hudi] branch master updated: [HUDI-2761] Fixing timeline server for repeated refreshes (#4812)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6a46130 [HUDI-2761] Fixing timeline server for repeated refreshes (#4812)
6a46130 is described below
commit 6a46130037e6be48d6c014e513dbca6e940b83d8
Author: Sivabalan Narayanan <n....@gmail.com>
AuthorDate: Fri Mar 4 21:04:16 2022 -0500
[HUDI-2761] Fixing timeline server for repeated refreshes (#4812)
* Fixing timeline server for repeated refreshes
---
.../TestHoodieSparkMergeOnReadTableRollback.java | 15 +++++----------
.../apache/hudi/sink/StreamWriteOperatorCoordinator.java | 5 +----
.../org/apache/hudi/timeline/service/RequestHandler.java | 8 ++++++--
3 files changed, 12 insertions(+), 16 deletions(-)
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
index d552955..7655cf9 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
@@ -21,7 +21,6 @@ package org.apache.hudi.table.functional;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -150,11 +149,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) throws Exception {
// NOTE: First writer will have Metadata table DISABLED
HoodieWriteConfig.Builder cfgBuilder =
- getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.SIMPLE)
- .withMetadataConfig(
- HoodieMetadataConfig.newBuilder()
- .enable(false)
- .build());
+ getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.SIMPLE);
addConfigsForPopulateMetaFields(cfgBuilder, true);
HoodieWriteConfig cfg = cfgBuilder.build();
@@ -209,7 +204,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
final String commitTime1 = "002";
// WriteClient with custom config (disable small file handling)
// NOTE: Second writer will have Metadata table ENABLED
- try (SparkRDDWriteClient secondClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(false));) {
+ try (SparkRDDWriteClient secondClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(true));) {
secondClient.startCommitWithTime(commitTime1);
List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
@@ -245,8 +240,8 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
/*
* Write 3 (inserts + updates - testing successful delta commit)
*/
- final String commitTime2 = "002";
- try (SparkRDDWriteClient thirdClient = getHoodieWriteClient(cfg);) {
+ final String commitTime2 = "003";
+ try (SparkRDDWriteClient thirdClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(true));) {
thirdClient.startCommitWithTime(commitTime2);
List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
@@ -287,7 +282,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
/*
* Write 4 (updates)
*/
- newCommitTime = "003";
+ newCommitTime = "004";
thirdClient.startCommitWithTime(newCommitTime);
writeStatusJavaRDD = thirdClient.upsert(writeRecords, newCommitTime);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 4782070..f3253e4 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -23,7 +23,6 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
@@ -337,11 +336,9 @@ public class StreamWriteOperatorCoordinator
}
private void startInstant() {
- final String instant = HoodieActiveTimeline.createNewInstantTime();
// put the assignment in front of metadata generation,
// because the instant request from write task is asynchronous.
- this.instant = instant;
- this.writeClient.startCommitWithTime(instant, tableState.commitAction);
+ this.instant = this.writeClient.startCommit();
this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, this.instant);
LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE));
diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
index 4744fbb..602ab16 100644
--- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
+++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
@@ -123,18 +123,22 @@ public class RequestHandler {
String timelineHashFromClient = ctx.queryParam(RemoteHoodieTableFileSystemView.TIMELINE_HASH, "");
HoodieTimeline localTimeline =
viewManager.getFileSystemView(basePath).getTimeline().filterCompletedAndCompactionInstants();
+ String localLastKnownInstant = localTimeline.lastInstant().isPresent() ? localTimeline.lastInstant().get().getTimestamp()
+ : HoodieTimeline.INVALID_INSTANT_TS;
if (LOG.isDebugEnabled()) {
LOG.debug("Client [ LastTs=" + lastKnownInstantFromClient + ", TimelineHash=" + timelineHashFromClient
+ "], localTimeline=" + localTimeline.getInstants().collect(Collectors.toList()));
}
- if ((localTimeline.getInstants().count() == 0)
+ if ((!localTimeline.getInstants().findAny().isPresent())
&& HoodieTimeline.INVALID_INSTANT_TS.equals(lastKnownInstantFromClient)) {
return false;
}
String localTimelineHash = localTimeline.getTimelineHash();
- if (!localTimelineHash.equals(timelineHashFromClient)) {
+ // refresh if timeline hash mismatches and if local's last known instant < client's last known instant
+ if (!localTimelineHash.equals(timelineHashFromClient)
+ && HoodieTimeline.compareTimestamps(localLastKnownInstant, HoodieTimeline.LESSER_THAN, lastKnownInstantFromClient)) {
return true;
}