You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/04/27 09:12:14 UTC
[hudi] branch master updated: [HUDI-5517] HoodieTimeline support filter instants by state transition time (#7627)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 77039ae734a [HUDI-5517] HoodieTimeline support filter instants by state transition time (#7627)
77039ae734a is described below
commit 77039ae734aead741e8f528637342a2538b4c456
Author: Rex(Hui) An <bo...@gmail.com>
AuthorDate: Thu Apr 27 17:12:07 2023 +0800
[HUDI-5517] HoodieTimeline support filter instants by state transition time (#7627)
Introduces the completion time on the timeline, the completion time can represent the trasanction completion sequence, we can use it to support instant filtering of incremental use cases like: incremental source, incremental meta sync, etc.
---
.../hudi/client/utils/MetadataConversionUtils.java | 2 +
.../src/main/avro/HoodieArchivedMetaEntry.avsc | 5 +
.../hudi/common/config/HoodieCommonConfig.java | 8 ++
.../table/timeline/HoodieArchivedTimeline.java | 5 +-
.../table/timeline/HoodieDefaultTimeline.java | 12 +++
.../hudi/common/table/timeline/HoodieInstant.java | 100 +++++++++++++++-----
.../hudi/common/table/timeline/HoodieTimeline.java | 11 +++
.../hudi/common/util/InternalSchemaCache.java | 11 +--
.../common/table/timeline/TestHoodieInstant.java | 79 ++++++++++++++++
.../scala/org/apache/hudi/DataSourceOptions.scala | 11 ++-
.../org/apache/hudi/IncrementalRelation.scala | 22 ++++-
.../hudi/MergeOnReadIncrementalRelation.scala | 28 +++++-
.../sql/hudi/streaming/HoodieStreamSource.scala | 22 ++++-
.../procedures/ShowArchivedCommitsProcedure.scala | 6 +-
.../command/procedures/ShowCommitsProcedure.scala | 6 +-
.../TestIncrementalReadByStateTransitionTime.scala | 104 +++++++++++++++++++++
...TestStreamSourceReadByStateTransitionTime.scala | 99 ++++++++++++++++++++
.../hudi/functional/TestStreamingSource.scala | 57 ++++++-----
18 files changed, 517 insertions(+), 71 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
index 1e1dea5d846..e068d4b0432 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
@@ -50,6 +50,7 @@ public class MetadataConversionUtils {
HoodieArchivedMetaEntry archivedMetaWrapper = new HoodieArchivedMetaEntry();
archivedMetaWrapper.setCommitTime(hoodieInstant.getTimestamp());
archivedMetaWrapper.setActionState(hoodieInstant.getState().name());
+ archivedMetaWrapper.setStateTransitionTime(hoodieInstant.getStateTransitionTime());
switch (hoodieInstant.getAction()) {
case HoodieTimeline.CLEAN_ACTION: {
if (hoodieInstant.isCompleted()) {
@@ -135,6 +136,7 @@ public class MetadataConversionUtils {
HoodieArchivedMetaEntry archivedMetaWrapper = new HoodieArchivedMetaEntry();
archivedMetaWrapper.setCommitTime(hoodieInstant.getTimestamp());
archivedMetaWrapper.setActionState(hoodieInstant.getState().name());
+ archivedMetaWrapper.setStateTransitionTime(hoodieInstant.getStateTransitionTime());
switch (hoodieInstant.getAction()) {
case HoodieTimeline.CLEAN_ACTION: {
archivedMetaWrapper.setActionType(ActionType.clean.name());
diff --git a/hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc b/hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc
index 81bcaf745e5..c2c2b00b7ca 100644
--- a/hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc
+++ b/hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc
@@ -128,6 +128,11 @@
"HoodieIndexCommitMetadata"
],
"default": null
+ },
+ {
+ "name":"stateTransitionTime",
+ "type":["null","string"],
+ "default": null
}
]
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
index 30db250fcc7..c234b9494e5 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
@@ -72,6 +72,14 @@ public class HoodieCommonConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("Turn on compression for BITCASK disk map used by the External Spillable Map");
+ public static final ConfigProperty<Boolean> READ_BY_STATE_TRANSITION_TIME = ConfigProperty
+ .key("hoodie.datasource.read.by.state.transition.time")
+ .defaultValue(false)
+ .sinceVersion("0.14.0")
+ .withDocumentation("For incremental mode, whether to enable to pulling commits in range by state transition time(completion time) "
+ + "instead of commit time(start time). Please be aware that enabling this will result in"
+ + "`begin.instanttime` and `end.instanttime` using `stateTransitionTime` instead of the instant's commit time.");
+
public static final ConfigProperty<String> HOODIE_FS_ATOMIC_CREATION_SUPPORT = ConfigProperty
.key("hoodie.fs.atomic_creation.support")
.defaultValue("")
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index 1f14f5245a6..d5145ee0c6c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -84,6 +84,7 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
private static final String HOODIE_COMMIT_ARCHIVE_LOG_FILE_PREFIX = "commits";
private static final String ACTION_TYPE_KEY = "actionType";
private static final String ACTION_STATE = "actionState";
+ private static final String STATE_TRANSITION_TIME = "stateTransitionTime";
private HoodieTableMetaClient metaClient;
private final Map<String, byte[]> readCommits = new HashMap<>();
@@ -178,6 +179,7 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
private HoodieInstant readCommit(GenericRecord record, boolean loadDetails) {
final String instantTime = record.get(HoodiePartitionMetadata.COMMIT_TIME_KEY).toString();
final String action = record.get(ACTION_TYPE_KEY).toString();
+ final String stateTransitionTime = (String) record.get(STATE_TRANSITION_TIME);
if (loadDetails) {
getMetadataKey(action).map(key -> {
Object actionData = record.get(key);
@@ -191,7 +193,8 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
return null;
});
}
- return new HoodieInstant(HoodieInstant.State.valueOf(record.get(ACTION_STATE).toString()), action, instantTime);
+ return new HoodieInstant(HoodieInstant.State.valueOf(record.get(ACTION_STATE).toString()), action,
+ instantTime, stateTransitionTime);
}
@Nonnull
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index d9db86942ab..3af81ea9f9f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -197,6 +197,13 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
getInstantsAsStream().filter(s -> HoodieTimeline.isInRange(s.getTimestamp(), startTs, endTs)), details);
}
+ @Override
+ public HoodieDefaultTimeline findInstantsInRangeByStateTransitionTs(String startTs, String endTs) {
+ return new HoodieDefaultTimeline(
+ getInstantsAsStream().filter(s -> HoodieTimeline.isInRange(s.getStateTransitionTime(), startTs, endTs)),
+ details);
+ }
+
@Override
public HoodieDefaultTimeline findInstantsAfter(String instantTime, int numCommits) {
return new HoodieDefaultTimeline(getInstantsAsStream()
@@ -409,6 +416,11 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
return getInstantsAsStream().sorted(HoodieInstant.COMPARATOR.reversed());
}
+ @Override
+ public Stream<HoodieInstant> getInstantsOrderedByStateTransitionTs() {
+ return getInstantsAsStream().sorted(HoodieInstant.STATE_TRANSITION_COMPARATOR);
+ }
+
@Override
public boolean isBeforeTimelineStarts(String instant) {
Option<HoodieInstant> firstNonSavepointCommit = getFirstNonSavepointCommit();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
index 94ae2625499..3bc3ad02646 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
@@ -18,13 +18,18 @@
package org.apache.hudi.common.table.timeline;
+import org.apache.hudi.common.util.StringUtils;
+
import org.apache.hadoop.fs.FileStatus;
import java.io.Serializable;
import java.util.Comparator;
+import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
/**
* A Hoodie Instant represents a action done on a hoodie table. All actions start with a inflight instant and then
@@ -34,6 +39,14 @@ import java.util.Objects;
*/
public class HoodieInstant implements Serializable, Comparable<HoodieInstant> {
+ // Instant like 20230104152218702.commit.request, 20230104152218702.inflight
+ private static final Pattern NAME_FORMAT =
+ Pattern.compile("^(\\d+)(\\.\\w+)(\\.\\D+)?$");
+
+ private static final String DELIMITER = ".";
+
+ private static final String FILE_NAME_FORMAT_ERROR = "The fileName %s doesn't match instant format";
+
/**
* A COMPACTION action eventually becomes COMMIT when completed. So, when grouping instants
* for state transitions, this needs to be taken into account
@@ -46,14 +59,36 @@ public class HoodieInstant implements Serializable, Comparable<HoodieInstant> {
public static final Comparator<HoodieInstant> COMPARATOR = Comparator.comparing(HoodieInstant::getTimestamp)
.thenComparing(ACTION_COMPARATOR).thenComparing(HoodieInstant::getState);
+ public static final Comparator<HoodieInstant> STATE_TRANSITION_COMPARATOR =
+ Comparator.comparing(HoodieInstant::getStateTransitionTime)
+ .thenComparing(COMPARATOR);
+
+ public static final String EMPTY_FILE_EXTENSION = "";
+
public static String getComparableAction(String action) {
return COMPARABLE_ACTIONS.getOrDefault(action, action);
}
+ public static String extractTimestamp(String fileName) throws IllegalArgumentException {
+ Objects.requireNonNull(fileName);
+
+ Matcher matcher = NAME_FORMAT.matcher(fileName);
+ if (matcher.find()) {
+ return matcher.group(1);
+ }
+
+ throw new IllegalArgumentException(String.format(FILE_NAME_FORMAT_ERROR, fileName));
+ }
+
public static String getTimelineFileExtension(String fileName) {
Objects.requireNonNull(fileName);
- int dotIndex = fileName.indexOf('.');
- return dotIndex == -1 ? "" : fileName.substring(dotIndex);
+
+ Matcher matcher = NAME_FORMAT.matcher(fileName);
+ if (matcher.find()) {
+ return fileName.substring(matcher.group(1).length());
+ }
+
+ return EMPTY_FILE_EXTENSION;
}
/**
@@ -70,9 +105,10 @@ public class HoodieInstant implements Serializable, Comparable<HoodieInstant> {
NIL
}
- private State state = State.COMPLETED;
- private String action;
- private String timestamp;
+ private final State state;
+ private final String action;
+ private final String timestamp;
+ private final String stateTransitionTime;
/**
* Load the instant from the meta FileStatus.
@@ -80,22 +116,27 @@ public class HoodieInstant implements Serializable, Comparable<HoodieInstant> {
public HoodieInstant(FileStatus fileStatus) {
// First read the instant timestamp. [==>20170101193025<==].commit
String fileName = fileStatus.getPath().getName();
- String fileExtension = getTimelineFileExtension(fileName);
- timestamp = fileName.replace(fileExtension, "");
-
- // Next read the action for this marker
- action = fileExtension.replaceFirst(".", "");
- if (action.equals("inflight")) {
- // This is to support backwards compatibility on how in-flight commit files were written
- // General rule is inflight extension is .<action>.inflight, but for commit it is .inflight
- action = "commit";
- state = State.INFLIGHT;
- } else if (action.contains(HoodieTimeline.INFLIGHT_EXTENSION)) {
- state = State.INFLIGHT;
- action = action.replace(HoodieTimeline.INFLIGHT_EXTENSION, "");
- } else if (action.contains(HoodieTimeline.REQUESTED_EXTENSION)) {
- state = State.REQUESTED;
- action = action.replace(HoodieTimeline.REQUESTED_EXTENSION, "");
+ Matcher matcher = NAME_FORMAT.matcher(fileName);
+ if (matcher.find()) {
+ timestamp = matcher.group(1);
+ if (matcher.group(2).equals(HoodieTimeline.INFLIGHT_EXTENSION)) {
+ // This is to support backwards compatibility on how in-flight commit files were written
+ // General rule is inflight extension is .<action>.inflight, but for commit it is .inflight
+ action = HoodieTimeline.COMMIT_ACTION;
+ state = State.INFLIGHT;
+ } else {
+ action = matcher.group(2).replaceFirst(DELIMITER, StringUtils.EMPTY_STRING);
+ if (matcher.groupCount() == 3 && matcher.group(3) != null) {
+ state = State.valueOf(matcher.group(3).replaceFirst(DELIMITER, StringUtils.EMPTY_STRING).toUpperCase());
+ } else {
+ // Like 20230104152218702.commit
+ state = State.COMPLETED;
+ }
+ }
+ stateTransitionTime =
+ HoodieInstantTimeGenerator.formatDate(new Date(fileStatus.getModificationTime()));
+ } else {
+ throw new IllegalArgumentException(String.format(FILE_NAME_FORMAT_ERROR, fileName));
}
}
@@ -104,12 +145,21 @@ public class HoodieInstant implements Serializable, Comparable<HoodieInstant> {
this.state = isInflight ? State.INFLIGHT : State.COMPLETED;
this.action = action;
this.timestamp = timestamp;
+ this.stateTransitionTime = null;
}
public HoodieInstant(State state, String action, String timestamp) {
this.state = state;
this.action = action;
this.timestamp = timestamp;
+ this.stateTransitionTime = null;
+ }
+
+ public HoodieInstant(State state, String action, String timestamp, String stateTransitionTime) {
+ this.state = state;
+ this.action = action;
+ this.timestamp = timestamp;
+ this.stateTransitionTime = stateTransitionTime;
}
public boolean isCompleted() {
@@ -214,6 +264,10 @@ public class HoodieInstant implements Serializable, Comparable<HoodieInstant> {
return state;
}
+ public String getStateTransitionTime() {
+ return stateTransitionTime;
+ }
+
@Override
public int hashCode() {
return Objects.hash(state, action, timestamp);
@@ -226,6 +280,8 @@ public class HoodieInstant implements Serializable, Comparable<HoodieInstant> {
@Override
public String toString() {
- return "[" + ((isInflight() || isRequested()) ? "==>" : "") + timestamp + "__" + action + "__" + state + "]";
+ return "[" + ((isInflight() || isRequested()) ? "==>" : "")
+ + timestamp + "__" + action + "__" + state
+ + (StringUtils.isNullOrEmpty(stateTransitionTime) ? "" : ("__" + stateTransitionTime)) + "]";
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index 6b6fb28af86..ce3d880a5bd 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -235,6 +235,12 @@ public interface HoodieTimeline extends Serializable {
*/
HoodieTimeline findInstantsInRange(String startTs, String endTs);
+ /**`
+ * Create a new Timeline with instants after startTs and before or on endTs
+ * by state transition timestamp of actions.
+ */
+ HoodieTimeline findInstantsInRangeByStateTransitionTs(String startTs, String endTs);
+
/**
* Create a new Timeline with all the instants after startTs.
*/
@@ -349,6 +355,11 @@ public interface HoodieTimeline extends Serializable {
*/
Stream<HoodieInstant> getReverseOrderedInstants();
+ /**
+ * Get the stream of instants in order by state transition timestamp of actions.
+ */
+ Stream<HoodieInstant> getInstantsOrderedByStateTransitionTs();
+
/**
* @return true if the passed in instant is before the first completed instant in the timeline
*/
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java b/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
index d3823054650..6485fdd9575 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
@@ -183,18 +183,13 @@ public class InternalSchemaCache {
public static InternalSchema getInternalSchemaByVersionId(long versionId, String tablePath, Configuration hadoopConf, String validCommits) {
String avroSchema = "";
Set<String> commitSet = Arrays.stream(validCommits.split(",")).collect(Collectors.toSet());
- List<String> validateCommitList = commitSet.stream().map(fileName -> {
- String fileExtension = HoodieInstant.getTimelineFileExtension(fileName);
- return fileName.replace(fileExtension, "");
- }).collect(Collectors.toList());
+ List<String> validateCommitList = commitSet.stream().map(HoodieInstant::extractTimestamp).collect(Collectors.toList());
FileSystem fs = FSUtils.getFs(tablePath, hadoopConf);
Path hoodieMetaPath = new Path(tablePath, HoodieTableMetaClient.METAFOLDER_NAME);
//step1:
- Path candidateCommitFile = commitSet.stream().filter(fileName -> {
- String fileExtension = HoodieInstant.getTimelineFileExtension(fileName);
- return fileName.replace(fileExtension, "").equals(versionId + "");
- }).findFirst().map(f -> new Path(hoodieMetaPath, f)).orElse(null);
+ Path candidateCommitFile = commitSet.stream().filter(fileName -> HoodieInstant.extractTimestamp(fileName).equals(versionId + ""))
+ .findFirst().map(f -> new Path(hoodieMetaPath, f)).orElse(null);
if (candidateCommitFile != null) {
try {
byte[] data;
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieInstant.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieInstant.java
new file mode 100644
index 00000000000..c4a1d00e90d
--- /dev/null
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieInstant.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.timeline;
+
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.util.Option;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.testutils.Assertions.assertStreamEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHoodieInstant extends HoodieCommonTestHarness {
+
+ @Test
+ public void testExtractTimestamp() {
+ String fileName = "20230104152218702.inflight";
+ assertEquals("20230104152218702", HoodieInstant.extractTimestamp(fileName));
+
+ fileName = "20230104152218702.commit.request";
+ assertEquals("20230104152218702", HoodieInstant.extractTimestamp(fileName));
+ }
+
+ @Test
+ public void testGetTimelineFileExtension() {
+ String fileName = "20230104152218702.inflight";
+ assertEquals(".inflight", HoodieInstant.getTimelineFileExtension(fileName));
+
+ fileName = "20230104152218702.commit.request";
+ assertEquals(".commit.request", HoodieInstant.getTimelineFileExtension(fileName));
+ }
+
+ @Test
+ public void testCreateHoodieInstantByFileStatus() throws IOException {
+ try {
+ initMetaClient();
+ HoodieInstant instantRequested =
+ new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "001");
+ HoodieInstant instantCommitted =
+ new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "001");
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+ timeline.createNewInstant(instantRequested);
+ timeline.transitionRequestedToInflight(instantRequested, Option.empty());
+ timeline.saveAsComplete(
+ new HoodieInstant(true, instantRequested.getAction(), instantRequested.getTimestamp()),
+ Option.empty());
+ metaClient.reloadActiveTimeline();
+ timeline = metaClient.getActiveTimeline();
+ assertEquals(1, timeline.countInstants());
+
+ assertStreamEquals(Stream.of(instantCommitted),
+ timeline.getInstantsAsStream(), "Instants in timeline is not matched");
+
+ // Make sure StateTransitionTime is set in the timeline
+ assertEquals(0,
+ timeline.getInstantsAsStream().filter(s -> s.getStateTransitionTime().isEmpty()).count());
+ } finally {
+ cleanMetaClient();
+ }
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 4cef215d0e6..31e5512a3f9 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -115,13 +115,17 @@ object DataSourceReadOptions {
.noDefaultValue()
.withDocumentation("Instant time to start incrementally pulling data from. The instanttime here need not necessarily " +
"correspond to an instant on the timeline. New data written with an instant_time > BEGIN_INSTANTTIME are fetched out. " +
- "For e.g: ‘20170901080000’ will get all new data written after Sep 1, 2017 08:00AM.")
+ "For e.g: ‘20170901080000’ will get all new data written after Sep 1, 2017 08:00AM. Note that if `"
+ + HoodieCommonConfig.READ_BY_STATE_TRANSITION_TIME.key() + "` enabled, will use instant's "
+ + "`stateTransitionTime` to perform comparison.")
val END_INSTANTTIME: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.end.instanttime")
.noDefaultValue()
.withDocumentation("Instant time to limit incrementally fetched data to. " +
- "New data written with an instant_time <= END_INSTANTTIME are fetched out.")
+ "New data written with an instant_time <= END_INSTANTTIME are fetched out. Note that if `"
+ + HoodieCommonConfig.READ_BY_STATE_TRANSITION_TIME.key() + "` enabled, will use instant's "
+ + "`stateTransitionTime` to perform comparison.")
val INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.schema.use.end.instanttime")
@@ -200,6 +204,9 @@ object DataSourceReadOptions {
val SCHEMA_EVOLUTION_ENABLED: ConfigProperty[java.lang.Boolean] = HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE
+ val READ_BY_STATE_TRANSITION_TIME: ConfigProperty[Boolean] = HoodieCommonConfig.READ_BY_STATE_TRANSITION_TIME
+
+
/** @deprecated Use {@link QUERY_TYPE} and its methods instead */
@Deprecated
val QUERY_TYPE_OPT_KEY = QUERY_TYPE.key()
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 79bd90ec024..0d178abc5ba 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -64,6 +64,11 @@ class IncrementalRelation(val sqlContext: SQLContext,
new HoodieSparkEngineContext(new JavaSparkContext(sqlContext.sparkContext)),
metaClient)
private val commitTimeline = hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants()
+
+ private val useStateTransitionTime = optParams.get(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.key)
+ .map(_.toBoolean)
+ .getOrElse(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.defaultValue)
+
if (commitTimeline.empty()) {
throw new HoodieException("No instants to incrementally pull")
}
@@ -81,9 +86,17 @@ class IncrementalRelation(val sqlContext: SQLContext,
private val lastInstant = commitTimeline.lastInstant().get()
- private val commitsTimelineToReturn = commitTimeline.findInstantsInRange(
- optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key),
- optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key(), lastInstant.getTimestamp))
+ private val commitsTimelineToReturn = {
+ if (useStateTransitionTime) {
+ commitTimeline.findInstantsInRangeByStateTransitionTs(
+ optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key),
+ optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key(), lastInstant.getStateTransitionTime))
+ } else {
+ commitTimeline.findInstantsInRange(
+ optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key),
+ optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key(), lastInstant.getTimestamp))
+ }
+ }
private val commitsToReturn = commitsTimelineToReturn.getInstantsAsStream.iterator().toList
// use schema from a file produced in the end/latest instant
@@ -204,6 +217,9 @@ class IncrementalRelation(val sqlContext: SQLContext,
val endInstantArchived = commitTimeline.isBeforeTimelineStarts(endInstantTime)
val scanDf = if (fallbackToFullTableScan && (startInstantArchived || endInstantArchived)) {
+ if (useStateTransitionTime) {
+ throw new HoodieException("Cannot use stateTransitionTime while enables full table scan")
+ }
log.info(s"Falling back to full table scan as startInstantArchived: $startInstantArchived, endInstantArchived: $endInstantArchived")
fullTableScanDataFrame(startInstantTime, endInstantTime)
} else {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 636624f3950..74c03e9887c 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -61,6 +61,8 @@ case class MergeOnReadIncrementalRelation(override val sqlContext: SQLContext,
override protected def timeline: HoodieTimeline = {
if (fullTableScan) {
metaClient.getCommitsAndCompactionTimeline
+ } else if (useStateTransitionTime) {
+ metaClient.getCommitsAndCompactionTimeline.findInstantsInRangeByStateTransitionTs(startTimestamp, endTimestamp)
} else {
metaClient.getCommitsAndCompactionTimeline.findInstantsInRange(startTimestamp, endTimestamp)
}
@@ -133,8 +135,16 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation {
// Validate this Incremental implementation is properly configured
validate()
+ protected val useStateTransitionTime: Boolean =
+ optParams.get(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.key)
+ .map(_.toBoolean)
+ .getOrElse(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.defaultValue)
+
protected def startTimestamp: String = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
- protected def endTimestamp: String = optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, super.timeline.lastInstant().get.getTimestamp)
+ protected def endTimestamp: String = optParams.getOrElse(
+ DataSourceReadOptions.END_INSTANTTIME.key,
+ if (useStateTransitionTime) super.timeline.lastInstant().get.getStateTransitionTime
+ else super.timeline.lastInstant().get.getTimestamp)
protected def startInstantArchived: Boolean = super.timeline.isBeforeTimelineStarts(startTimestamp)
protected def endInstantArchived: Boolean = super.timeline.isBeforeTimelineStarts(endTimestamp)
@@ -154,7 +164,11 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation {
if (!startInstantArchived || !endInstantArchived) {
// If endTimestamp commit is not archived, will filter instants
// before endTimestamp.
- super.timeline.findInstantsInRange(startTimestamp, endTimestamp).getInstants.asScala.toList
+ if (useStateTransitionTime) {
+ super.timeline.findInstantsInRangeByStateTransitionTs(startTimestamp, endTimestamp).getInstants.asScala.toList
+ } else {
+ super.timeline.findInstantsInRange(startTimestamp, endTimestamp).getInstants.asScala.toList
+ }
} else {
super.timeline.getInstants.asScala.toList
}
@@ -171,7 +185,11 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation {
protected lazy val incrementalSpanRecordFilters: Seq[Filter] = {
val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
- val largerThanFilter = GreaterThan(HoodieRecord.COMMIT_TIME_METADATA_FIELD, startTimestamp)
+ val largerThanFilter = if (startInstantArchived) {
+ GreaterThan(HoodieRecord.COMMIT_TIME_METADATA_FIELD, startTimestamp)
+ } else {
+ GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, includedCommits.head.getTimestamp)
+ }
val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD,
if (endInstantArchived) endTimestamp else includedCommits.last.getTimestamp)
@@ -199,6 +217,10 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation {
if (!this.tableConfig.populateMetaFields()) {
throw new HoodieException("Incremental queries are not supported when meta fields are disabled")
}
+
+ if (useStateTransitionTime && fullTableScan) {
+ throw new HoodieException("Cannot use stateTransitionTime while enables full table scan")
+ }
}
protected def globPattern: String =
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
index d8f53a15dd6..2f77c4cd770 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
@@ -69,6 +69,11 @@ class HoodieStreamSource(
parameters.get(DataSourceReadOptions.QUERY_TYPE.key).contains(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) &&
parameters.get(DataSourceReadOptions.INCREMENTAL_FORMAT.key).contains(DataSourceReadOptions.INCREMENTAL_FORMAT_CDC_VAL)
+ private val useStateTransitionTime =
+ parameters.get(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.key())
+ .map(_.toBoolean)
+ .getOrElse(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.defaultValue())
+
@transient private lazy val initialOffsets = {
val metadataLog = new HoodieMetadataLog(sqlContext.sparkSession, metadataPath)
metadataLog.get(0).getOrElse {
@@ -101,7 +106,16 @@ class HoodieStreamSource(
metaClient.reloadActiveTimeline()
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants() match {
case activeInstants if !activeInstants.empty() =>
- Some(HoodieSourceOffset(activeInstants.lastInstant().get().getTimestamp))
+ val timestamp = if (useStateTransitionTime) {
+ activeInstants.getInstantsOrderedByStateTransitionTs
+ .skip(activeInstants.countInstants() - 1)
+ .findFirst()
+ .get()
+ .getStateTransitionTime
+ } else {
+ activeInstants.lastInstant().get().getTimestamp
+ }
+ Some(HoodieSourceOffset(timestamp))
case _ =>
None
}
@@ -137,6 +151,7 @@ class HoodieStreamSource(
// Consume the data between (startCommitTime, endCommitTime]
val incParams = parameters ++ Map(
DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL,
+ DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.key() -> useStateTransitionTime.toString,
DataSourceReadOptions.BEGIN_INSTANTTIME.key -> startCommitTime(startOffset),
DataSourceReadOptions.END_INSTANTTIME.key -> endOffset.commitTime
)
@@ -163,10 +178,7 @@ class HoodieStreamSource(
startOffset match {
case INIT_OFFSET => startOffset.commitTime
case HoodieSourceOffset(commitTime) =>
- val time = HoodieActiveTimeline.parseDateFromInstantTime(commitTime).getTime
- // As we consume the data between (start, end], start is not included,
- // so we +1s to the start commit time here.
- HoodieActiveTimeline.formatDate(new Date(time + 1000))
+ commitTime
case _=> throw new IllegalStateException("UnKnow offset type.")
}
}
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowArchivedCommitsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowArchivedCommitsProcedure.scala
index 9d780144315..01c7465b3f4 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowArchivedCommitsProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowArchivedCommitsProcedure.scala
@@ -43,6 +43,7 @@ class ShowArchivedCommitsProcedure(includeExtraMetadata: Boolean) extends BasePr
private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("commit_time", DataTypes.StringType, nullable = true, Metadata.empty),
+ StructField("state_transition_time", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("total_bytes_written", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_files_added", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_files_updated", DataTypes.LongType, nullable = true, Metadata.empty),
@@ -54,6 +55,7 @@ class ShowArchivedCommitsProcedure(includeExtraMetadata: Boolean) extends BasePr
private val METADATA_OUTPUT_TYPE = new StructType(Array[StructField](
StructField("commit_time", DataTypes.StringType, nullable = true, Metadata.empty),
+ StructField("state_transition_time", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("action", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("file_id", DataTypes.StringType, nullable = true, Metadata.empty),
@@ -121,7 +123,7 @@ class ShowArchivedCommitsProcedure(includeExtraMetadata: Boolean) extends BasePr
for (partitionWriteStat <- commitMetadata.getPartitionToWriteStats.entrySet) {
for (hoodieWriteStat <- partitionWriteStat.getValue) {
rows.add(Row(
- commit.getTimestamp, commit.getAction, hoodieWriteStat.getPartitionPath,
+ commit.getTimestamp, commit.getStateTransitionTime, commit.getAction, hoodieWriteStat.getPartitionPath,
hoodieWriteStat.getFileId, hoodieWriteStat.getPrevCommit, hoodieWriteStat.getNumWrites,
hoodieWriteStat.getNumInserts, hoodieWriteStat.getNumDeletes, hoodieWriteStat.getNumUpdateWrites,
hoodieWriteStat.getTotalWriteErrors, hoodieWriteStat.getTotalLogBlocks, hoodieWriteStat.getTotalCorruptLogBlock,
@@ -151,7 +153,7 @@ class ShowArchivedCommitsProcedure(includeExtraMetadata: Boolean) extends BasePr
for (i <- 0 until newCommits.size) {
val commit = newCommits.get(i)
val commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get, classOf[HoodieCommitMetadata])
- rows.add(Row(commit.getTimestamp, commitMetadata.fetchTotalBytesWritten, commitMetadata.fetchTotalFilesInsert,
+ rows.add(Row(commit.getTimestamp, commit.getStateTransitionTime, commitMetadata.fetchTotalBytesWritten, commitMetadata.fetchTotalFilesInsert,
commitMetadata.fetchTotalFilesUpdated, commitMetadata.fetchTotalPartitionsWritten,
commitMetadata.fetchTotalRecordsWritten, commitMetadata.fetchTotalUpdateRecordsWritten,
commitMetadata.fetchTotalWriteErrors))
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitsProcedure.scala
index ddbad810042..3c3fde385bc 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitsProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitsProcedure.scala
@@ -39,6 +39,7 @@ class ShowCommitsProcedure(includeExtraMetadata: Boolean) extends BaseProcedure
private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("commit_time", DataTypes.StringType, nullable = true, Metadata.empty),
+ StructField("state_transition_time", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("action", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("total_bytes_written", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_files_added", DataTypes.LongType, nullable = true, Metadata.empty),
@@ -51,6 +52,7 @@ class ShowCommitsProcedure(includeExtraMetadata: Boolean) extends BaseProcedure
private val METADATA_OUTPUT_TYPE = new StructType(Array[StructField](
StructField("commit_time", DataTypes.StringType, nullable = true, Metadata.empty),
+ StructField("state_transition_time", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("action", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("file_id", DataTypes.StringType, nullable = true, Metadata.empty),
@@ -104,7 +106,7 @@ class ShowCommitsProcedure(includeExtraMetadata: Boolean) extends BaseProcedure
for (partitionWriteStat <- commitMetadata.getPartitionToWriteStats.entrySet) {
for (hoodieWriteStat <- partitionWriteStat.getValue) {
rows.add(Row(
- commit.getTimestamp, commit.getAction, hoodieWriteStat.getPartitionPath,
+ commit.getTimestamp, commit.getStateTransitionTime, commit.getAction, hoodieWriteStat.getPartitionPath,
hoodieWriteStat.getFileId, hoodieWriteStat.getPrevCommit, hoodieWriteStat.getNumWrites,
hoodieWriteStat.getNumInserts, hoodieWriteStat.getNumDeletes, hoodieWriteStat.getNumUpdateWrites,
hoodieWriteStat.getTotalWriteErrors, hoodieWriteStat.getTotalLogBlocks, hoodieWriteStat.getTotalCorruptLogBlock,
@@ -134,7 +136,7 @@ class ShowCommitsProcedure(includeExtraMetadata: Boolean) extends BaseProcedure
for (i <- 0 until newCommits.size) {
val commit = newCommits.get(i)
val commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get, classOf[HoodieCommitMetadata])
- rows.add(Row(commit.getTimestamp, commit.getAction, commitMetadata.fetchTotalBytesWritten, commitMetadata.fetchTotalFilesInsert,
+ rows.add(Row(commit.getTimestamp, commit.getStateTransitionTime, commit.getAction, commitMetadata.fetchTotalBytesWritten, commitMetadata.fetchTotalFilesInsert,
commitMetadata.fetchTotalFilesUpdated, commitMetadata.fetchTotalPartitionsWritten,
commitMetadata.fetchTotalRecordsWritten, commitMetadata.fetchTotalUpdateRecordsWritten,
commitMetadata.fetchTotalWriteErrors))
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadByStateTransitionTime.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadByStateTransitionTime.scala
new file mode 100644
index 00000000000..b1e81d9857d
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadByStateTransitionTime.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.testutils.HoodieSparkClientTestBase
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.junit.jupiter.api.{AfterEach, Assertions, BeforeEach}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
+
+import scala.collection.JavaConversions.asScalaBuffer
+
+class TestIncrementalReadByStateTransitionTime extends HoodieSparkClientTestBase {
+
+ var spark: SparkSession = null
+
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+ DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1"
+ )
+
+ @BeforeEach
+ override def setUp(): Unit = {
+ setTableName("hoodie_test")
+ initPath()
+ initSparkContexts()
+ spark = sqlContext.sparkSession
+ initTestDataGenerator()
+ initFileSystem()
+ }
+
+ @AfterEach
+ override def tearDown(): Unit = {
+ cleanupSparkContexts()
+ cleanupTestDataGenerator()
+ cleanupFileSystem()
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieTableType])
+ def testReadingWithStateTransitionTime(tableType: HoodieTableType): Unit = {
+ val records = recordsToStrings(dataGen.generateInserts("001", 100)).toList
+ val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+ inputDF.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
+ .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ val metaClient = HoodieTableMetaClient.builder()
+ .setConf(spark.sparkContext.hadoopConfiguration)
+ .setBasePath(basePath)
+ .setLoadActiveTimelineOnLoad(true)
+ .build()
+
+ val firstInstant = metaClient.getActiveTimeline.filterCompletedInstants().getInstantsOrderedByStateTransitionTs
+ .findFirst().get()
+
+ val result1 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), "000")
+ .option(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.key(), "true")
+ .option(DataSourceReadOptions.END_INSTANTTIME.key(), firstInstant.getTimestamp)
+ .load(basePath)
+ .count()
+
+ Assertions.assertEquals(result1, 0)
+ val result2 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), "000")
+ .option(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.key(), "true")
+ .option(DataSourceReadOptions.END_INSTANTTIME.key(), firstInstant.getStateTransitionTime)
+ .load(basePath)
+ .count()
+ Assertions.assertEquals(result2, 100)
+ }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamSourceReadByStateTransitionTime.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamSourceReadByStateTransitionTime.scala
new file mode 100644
index 00000000000..63175e2d19d
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamSourceReadByStateTransitionTime.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.engine.EngineType
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
+import org.apache.hudi.common.model.{HoodieFailedWritesCleaningPolicy, HoodieRecord, HoodieTableType}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator
+import org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime
+import org.apache.hudi.config.{HoodieCleanConfig, HoodieWriteConfig}
+import org.apache.spark.api.java.JavaRDD
+
+import scala.collection.JavaConversions.asScalaBuffer
+import scala.jdk.CollectionConverters.mapAsJavaMapConverter
+
+class TestStreamSourceReadByStateTransitionTime extends TestStreamingSource {
+
+ override val useTransitionTime: Boolean = true
+
+ private val dataGen = new HoodieTestDataGenerator(System.currentTimeMillis())
+
+ test("Test streaming read out of order data") {
+ HoodieTableType.values().foreach { tableType =>
+ withTempDir { inputDir =>
+ val tablePath = s"${inputDir.getCanonicalPath}/test_stream_${tableType.name()}"
+ HoodieTableMetaClient.withPropertyBuilder()
+ .setTableType(tableType)
+ .setTableName(s"test_stream_${tableType.name()}")
+ .setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
+ .setPreCombineField("timestamp")
+ .initTable(spark.sessionState.newHadoopConf(), tablePath)
+
+ val writeConfig = HoodieWriteConfig.newBuilder()
+ .withEngineType(EngineType.SPARK)
+ .withPath(tablePath)
+ .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .build())
+ .withProps(commonOptions.asJava)
+ .build()
+
+ val context = new HoodieSparkEngineContext(sparkContext)
+ val writeClient = new SparkRDDWriteClient(context, writeConfig)
+ val instantTime1 = makeNewCommitTime(1, "%09d")
+ val instantTime2 = makeNewCommitTime(2,"%09d")
+
+ val records1 = sparkContext.parallelize(dataGen.generateInserts(instantTime1, 10).toSeq, 2)
+ val records2 = sparkContext.parallelize(dataGen.generateInserts(instantTime2, 15).toSeq, 2)
+
+ writeClient.startCommitWithTime(instantTime1)
+ writeClient.startCommitWithTime(instantTime2)
+ writeClient.insert(records2.toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[Nothing]]], instantTime2)
+ val df = spark.readStream
+ .format("hudi")
+ .option(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.key(), useTransitionTime)
+ .load(tablePath)
+
+ testStream(df) (
+ AssertOnQuery { q => q.processAllAvailable(); true },
+ // Should read all records from instantTime2
+ assertCountMatched(15, true),
+
+ AssertOnQuery { _ =>
+ writeClient.insert(records1.toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[Nothing]]], instantTime1)
+ true
+ },
+ AssertOnQuery { q => q.processAllAvailable(); true },
+
+ // Should read all records from instantTime1
+ assertCountMatched(10, true),
+ StopStream
+ )
+ }
+ }
+ }
+
+ def assertCountMatched(count: Int, lastOnly: Boolean): CheckAnswerRowsByFunc = CheckAnswerRowsByFunc(rows => {
+ assert(rows.size == count)
+ }, lastOnly = lastOnly)
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
index e7b568931b0..96108b0d550 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
@@ -18,7 +18,7 @@
package org.apache.hudi.functional
import org.apache.hudi.DataSourceReadOptions.START_OFFSET
-import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD, RECORDKEY_FIELD}
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ}
import org.apache.hudi.common.table.HoodieTableMetaClient
@@ -29,7 +29,7 @@ import org.apache.spark.sql.{Row, SaveMode}
class TestStreamingSource extends StreamTest {
import testImplicits._
- private val commonOptions = Map(
+ protected val commonOptions: Map[String, String] = Map(
RECORDKEY_FIELD.key -> "id",
PRECOMBINE_FIELD.key -> "ts",
INSERT_PARALLELISM_VALUE.key -> "4",
@@ -38,6 +38,8 @@ class TestStreamingSource extends StreamTest {
)
private val columns = Seq("id", "name", "price", "ts")
+ val useTransitionTime: Boolean = false
+
org.apache.log4j.Logger.getRootLogger.setLevel(org.apache.log4j.Level.WARN)
override protected def sparkConf = {
@@ -51,45 +53,46 @@ class TestStreamingSource extends StreamTest {
withTempDir { inputDir =>
val tablePath = s"${inputDir.getCanonicalPath}/test_cow_stream"
HoodieTableMetaClient.withPropertyBuilder()
- .setTableType(COPY_ON_WRITE)
- .setTableName(getTableName(tablePath))
- .setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
+ .setTableType(COPY_ON_WRITE)
+ .setTableName(getTableName(tablePath))
+ .setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
.setPreCombineField("ts")
- .initTable(spark.sessionState.newHadoopConf(), tablePath)
+ .initTable(spark.sessionState.newHadoopConf(), tablePath)
addData(tablePath, Seq(("1", "a1", "10", "000")))
val df = spark.readStream
.format("org.apache.hudi")
+ .option(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.key(), useTransitionTime)
.load(tablePath)
.select("id", "name", "price", "ts")
testStream(df)(
- AssertOnQuery {q => q.processAllAvailable(); true },
+ AssertOnQuery { q => q.processAllAvailable(); true },
CheckAnswerRows(Seq(Row("1", "a1", "10", "000")), lastOnly = true, isSorted = false),
StopStream,
addDataToQuery(tablePath, Seq(("1", "a1", "12", "000"))),
StartStream(),
- AssertOnQuery {q => q.processAllAvailable(); true },
+ AssertOnQuery { q => q.processAllAvailable(); true },
CheckAnswerRows(Seq(Row("1", "a1", "12", "000")), lastOnly = true, isSorted = false),
addDataToQuery(tablePath,
Seq(("2", "a2", "12", "000"),
- ("3", "a3", "12", "000"),
- ("4", "a4", "12", "000"))),
- AssertOnQuery {q => q.processAllAvailable(); true },
+ ("3", "a3", "12", "000"),
+ ("4", "a4", "12", "000"))),
+ AssertOnQuery { q => q.processAllAvailable(); true },
CheckAnswerRows(
Seq(Row("2", "a2", "12", "000"),
- Row("3", "a3", "12", "000"),
- Row("4", "a4", "12", "000")),
+ Row("3", "a3", "12", "000"),
+ Row("4", "a4", "12", "000")),
lastOnly = true, isSorted = false),
- StopStream,
+ StopStream,
addDataToQuery(tablePath, Seq(("5", "a5", "12", "000"))),
addDataToQuery(tablePath, Seq(("6", "a6", "12", "000"))),
addDataToQuery(tablePath, Seq(("5", "a5", "15", "000"))),
StartStream(),
- AssertOnQuery {q => q.processAllAvailable(); true },
+ AssertOnQuery { q => q.processAllAvailable(); true },
CheckAnswerRows(
Seq(Row("6", "a6", "12", "000"),
Row("5", "a5", "15", "000")),
@@ -111,11 +114,12 @@ class TestStreamingSource extends StreamTest {
addData(tablePath, Seq(("1", "a1", "10", "000")))
val df = spark.readStream
.format("org.apache.hudi")
+ .option(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.key(), useTransitionTime)
.load(tablePath)
.select("id", "name", "price", "ts")
testStream(df)(
- AssertOnQuery {q => q.processAllAvailable(); true },
+ AssertOnQuery { q => q.processAllAvailable(); true },
CheckAnswerRows(Seq(Row("1", "a1", "10", "000")), lastOnly = true, isSorted = false),
StopStream,
@@ -124,7 +128,7 @@ class TestStreamingSource extends StreamTest {
("3", "a3", "12", "000"),
("2", "a2", "10", "001"))),
StartStream(),
- AssertOnQuery {q => q.processAllAvailable(); true },
+ AssertOnQuery { q => q.processAllAvailable(); true },
CheckAnswerRows(
Seq(Row("3", "a3", "12", "000"),
Row("2", "a2", "10", "001")),
@@ -134,7 +138,7 @@ class TestStreamingSource extends StreamTest {
addDataToQuery(tablePath, Seq(("5", "a5", "12", "000"))),
addDataToQuery(tablePath, Seq(("6", "a6", "12", "000"))),
StartStream(),
- AssertOnQuery {q => q.processAllAvailable(); true },
+ AssertOnQuery { q => q.processAllAvailable(); true },
CheckAnswerRows(
Seq(Row("5", "a5", "12", "000"),
Row("6", "a6", "12", "000")),
@@ -157,18 +161,19 @@ class TestStreamingSource extends StreamTest {
val df = spark.readStream
.format("org.apache.hudi")
.option(START_OFFSET.key(), "latest")
+ .option(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.key(), useTransitionTime)
.load(tablePath)
.select("id", "name", "price", "ts")
testStream(df)(
- AssertOnQuery {q => q.processAllAvailable(); true },
+ AssertOnQuery { q => q.processAllAvailable(); true },
// Start from the latest, should contains no data
CheckAnswerRows(Seq(), lastOnly = true, isSorted = false),
StopStream,
addDataToQuery(tablePath, Seq(("2", "a1", "12", "000"))),
StartStream(),
- AssertOnQuery {q => q.processAllAvailable(); true },
+ AssertOnQuery { q => q.processAllAvailable(); true },
CheckAnswerRows(Seq(Row("2", "a1", "12", "000")), lastOnly = false, isSorted = false)
)
}
@@ -188,16 +193,22 @@ class TestStreamingSource extends StreamTest {
addData(tablePath, Seq(("2", "a1", "11", "001")))
addData(tablePath, Seq(("3", "a1", "12", "002")))
- val timestamp = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants()
- .firstInstant().get().getTimestamp
+ val timestamp = if (useTransitionTime) {
+ metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants()
+ .firstInstant().get().getStateTransitionTime
+ } else {
+ metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants()
+ .firstInstant().get().getTimestamp
+ }
val df = spark.readStream
.format("org.apache.hudi")
.option(START_OFFSET.key(), timestamp)
+ .option(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.key(), useTransitionTime)
.load(tablePath)
.select("id", "name", "price", "ts")
testStream(df)(
- AssertOnQuery {q => q.processAllAvailable(); true },
+ AssertOnQuery { q => q.processAllAvailable(); true },
// Start after the first commit
CheckAnswerRows(Seq(Row("2", "a1", "11", "001"), Row("3", "a1", "12", "002")), lastOnly = true, isSorted = false)
)