You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/04/27 09:12:14 UTC

[hudi] branch master updated: [HUDI-5517] HoodieTimeline support filter instants by state transition time (#7627)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 77039ae734a [HUDI-5517] HoodieTimeline support filter instants by state transition time (#7627)
77039ae734a is described below

commit 77039ae734aead741e8f528637342a2538b4c456
Author: Rex(Hui) An <bo...@gmail.com>
AuthorDate: Thu Apr 27 17:12:07 2023 +0800

    [HUDI-5517] HoodieTimeline support filter instants by state transition time (#7627)
    
    Introduces the completion time on the timeline, the completion time can represent the trasanction completion sequence, we can use it to support instant filtering of incremental use cases like: incremental source, incremental meta sync, etc.
---
 .../hudi/client/utils/MetadataConversionUtils.java |   2 +
 .../src/main/avro/HoodieArchivedMetaEntry.avsc     |   5 +
 .../hudi/common/config/HoodieCommonConfig.java     |   8 ++
 .../table/timeline/HoodieArchivedTimeline.java     |   5 +-
 .../table/timeline/HoodieDefaultTimeline.java      |  12 +++
 .../hudi/common/table/timeline/HoodieInstant.java  | 100 +++++++++++++++-----
 .../hudi/common/table/timeline/HoodieTimeline.java |  11 +++
 .../hudi/common/util/InternalSchemaCache.java      |  11 +--
 .../common/table/timeline/TestHoodieInstant.java   |  79 ++++++++++++++++
 .../scala/org/apache/hudi/DataSourceOptions.scala  |  11 ++-
 .../org/apache/hudi/IncrementalRelation.scala      |  22 ++++-
 .../hudi/MergeOnReadIncrementalRelation.scala      |  28 +++++-
 .../sql/hudi/streaming/HoodieStreamSource.scala    |  22 ++++-
 .../procedures/ShowArchivedCommitsProcedure.scala  |   6 +-
 .../command/procedures/ShowCommitsProcedure.scala  |   6 +-
 .../TestIncrementalReadByStateTransitionTime.scala | 104 +++++++++++++++++++++
 ...TestStreamSourceReadByStateTransitionTime.scala |  99 ++++++++++++++++++++
 .../hudi/functional/TestStreamingSource.scala      |  57 ++++++-----
 18 files changed, 517 insertions(+), 71 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
index 1e1dea5d846..e068d4b0432 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/MetadataConversionUtils.java
@@ -50,6 +50,7 @@ public class MetadataConversionUtils {
     HoodieArchivedMetaEntry archivedMetaWrapper = new HoodieArchivedMetaEntry();
     archivedMetaWrapper.setCommitTime(hoodieInstant.getTimestamp());
     archivedMetaWrapper.setActionState(hoodieInstant.getState().name());
+    archivedMetaWrapper.setStateTransitionTime(hoodieInstant.getStateTransitionTime());
     switch (hoodieInstant.getAction()) {
       case HoodieTimeline.CLEAN_ACTION: {
         if (hoodieInstant.isCompleted()) {
@@ -135,6 +136,7 @@ public class MetadataConversionUtils {
     HoodieArchivedMetaEntry archivedMetaWrapper = new HoodieArchivedMetaEntry();
     archivedMetaWrapper.setCommitTime(hoodieInstant.getTimestamp());
     archivedMetaWrapper.setActionState(hoodieInstant.getState().name());
+    archivedMetaWrapper.setStateTransitionTime(hoodieInstant.getStateTransitionTime());
     switch (hoodieInstant.getAction()) {
       case HoodieTimeline.CLEAN_ACTION: {
         archivedMetaWrapper.setActionType(ActionType.clean.name());
diff --git a/hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc b/hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc
index 81bcaf745e5..c2c2b00b7ca 100644
--- a/hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc
+++ b/hudi-common/src/main/avro/HoodieArchivedMetaEntry.avsc
@@ -128,6 +128,11 @@
             "HoodieIndexCommitMetadata"
          ],
          "default": null
+      },
+      {
+         "name":"stateTransitionTime",
+         "type":["null","string"],
+         "default": null
       }
    ]
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
index 30db250fcc7..c234b9494e5 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
@@ -72,6 +72,14 @@ public class HoodieCommonConfig extends HoodieConfig {
       .markAdvanced()
       .withDocumentation("Turn on compression for BITCASK disk map used by the External Spillable Map");
 
+  public static final ConfigProperty<Boolean> READ_BY_STATE_TRANSITION_TIME = ConfigProperty
+      .key("hoodie.datasource.read.by.state.transition.time")
+      .defaultValue(false)
+      .sinceVersion("0.14.0")
+      .withDocumentation("For incremental mode, whether to enable to pulling commits in range by state transition time(completion time) "
+          + "instead of commit time(start time). Please be aware that enabling this will result in"
+          + "`begin.instanttime` and `end.instanttime` using `stateTransitionTime` instead of the instant's commit time.");
+
   public static final ConfigProperty<String> HOODIE_FS_ATOMIC_CREATION_SUPPORT = ConfigProperty
       .key("hoodie.fs.atomic_creation.support")
       .defaultValue("")
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index 1f14f5245a6..d5145ee0c6c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -84,6 +84,7 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
   private static final String HOODIE_COMMIT_ARCHIVE_LOG_FILE_PREFIX = "commits";
   private static final String ACTION_TYPE_KEY = "actionType";
   private static final String ACTION_STATE = "actionState";
+  private static final String STATE_TRANSITION_TIME = "stateTransitionTime";
   private HoodieTableMetaClient metaClient;
   private final Map<String, byte[]> readCommits = new HashMap<>();
 
@@ -178,6 +179,7 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
   private HoodieInstant readCommit(GenericRecord record, boolean loadDetails) {
     final String instantTime = record.get(HoodiePartitionMetadata.COMMIT_TIME_KEY).toString();
     final String action = record.get(ACTION_TYPE_KEY).toString();
+    final String stateTransitionTime = (String) record.get(STATE_TRANSITION_TIME);
     if (loadDetails) {
       getMetadataKey(action).map(key -> {
         Object actionData = record.get(key);
@@ -191,7 +193,8 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
         return null;
       });
     }
-    return new HoodieInstant(HoodieInstant.State.valueOf(record.get(ACTION_STATE).toString()), action, instantTime);
+    return new HoodieInstant(HoodieInstant.State.valueOf(record.get(ACTION_STATE).toString()), action,
+        instantTime, stateTransitionTime);
   }
 
   @Nonnull
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index d9db86942ab..3af81ea9f9f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -197,6 +197,13 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
         getInstantsAsStream().filter(s -> HoodieTimeline.isInRange(s.getTimestamp(), startTs, endTs)), details);
   }
 
+  @Override
+  public HoodieDefaultTimeline findInstantsInRangeByStateTransitionTs(String startTs, String endTs) {
+    return new HoodieDefaultTimeline(
+        getInstantsAsStream().filter(s -> HoodieTimeline.isInRange(s.getStateTransitionTime(), startTs, endTs)),
+        details);
+  }
+
   @Override
   public HoodieDefaultTimeline findInstantsAfter(String instantTime, int numCommits) {
     return new HoodieDefaultTimeline(getInstantsAsStream()
@@ -409,6 +416,11 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
     return getInstantsAsStream().sorted(HoodieInstant.COMPARATOR.reversed());
   }
 
+  @Override
+  public Stream<HoodieInstant> getInstantsOrderedByStateTransitionTs() {
+    return getInstantsAsStream().sorted(HoodieInstant.STATE_TRANSITION_COMPARATOR);
+  }
+
   @Override
   public boolean isBeforeTimelineStarts(String instant) {
     Option<HoodieInstant> firstNonSavepointCommit = getFirstNonSavepointCommit();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
index 94ae2625499..3bc3ad02646 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
@@ -18,13 +18,18 @@
 
 package org.apache.hudi.common.table.timeline;
 
+import org.apache.hudi.common.util.StringUtils;
+
 import org.apache.hadoop.fs.FileStatus;
 
 import java.io.Serializable;
 import java.util.Comparator;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 /**
  * A Hoodie Instant represents a action done on a hoodie table. All actions start with a inflight instant and then
@@ -34,6 +39,14 @@ import java.util.Objects;
  */
 public class HoodieInstant implements Serializable, Comparable<HoodieInstant> {
 
+  // Instant like 20230104152218702.commit.request, 20230104152218702.inflight
+  private static final Pattern NAME_FORMAT =
+      Pattern.compile("^(\\d+)(\\.\\w+)(\\.\\D+)?$");
+
+  private static final String DELIMITER = ".";
+
+  private static final String FILE_NAME_FORMAT_ERROR = "The fileName %s doesn't match instant format";
+
   /**
    * A COMPACTION action eventually becomes COMMIT when completed. So, when grouping instants
    * for state transitions, this needs to be taken into account
@@ -46,14 +59,36 @@ public class HoodieInstant implements Serializable, Comparable<HoodieInstant> {
   public static final Comparator<HoodieInstant> COMPARATOR = Comparator.comparing(HoodieInstant::getTimestamp)
       .thenComparing(ACTION_COMPARATOR).thenComparing(HoodieInstant::getState);
 
+  public static final Comparator<HoodieInstant> STATE_TRANSITION_COMPARATOR =
+      Comparator.comparing(HoodieInstant::getStateTransitionTime)
+          .thenComparing(COMPARATOR);
+
+  public static final String EMPTY_FILE_EXTENSION = "";
+
   public static String getComparableAction(String action) {
     return COMPARABLE_ACTIONS.getOrDefault(action, action);
   }
 
+  public static String extractTimestamp(String fileName) throws IllegalArgumentException {
+    Objects.requireNonNull(fileName);
+
+    Matcher matcher = NAME_FORMAT.matcher(fileName);
+    if (matcher.find()) {
+      return matcher.group(1);
+    }
+
+    throw new IllegalArgumentException(String.format(FILE_NAME_FORMAT_ERROR, fileName));
+  }
+
   public static String getTimelineFileExtension(String fileName) {
     Objects.requireNonNull(fileName);
-    int dotIndex = fileName.indexOf('.');
-    return dotIndex == -1 ? "" : fileName.substring(dotIndex);
+
+    Matcher matcher = NAME_FORMAT.matcher(fileName);
+    if (matcher.find()) {
+      return fileName.substring(matcher.group(1).length());
+    }
+
+    return EMPTY_FILE_EXTENSION;
   }
 
   /**
@@ -70,9 +105,10 @@ public class HoodieInstant implements Serializable, Comparable<HoodieInstant> {
     NIL
   }
 
-  private State state = State.COMPLETED;
-  private String action;
-  private String timestamp;
+  private final State state;
+  private final String action;
+  private final String timestamp;
+  private final String stateTransitionTime;
 
   /**
    * Load the instant from the meta FileStatus.
@@ -80,22 +116,27 @@ public class HoodieInstant implements Serializable, Comparable<HoodieInstant> {
   public HoodieInstant(FileStatus fileStatus) {
     // First read the instant timestamp. [==>20170101193025<==].commit
     String fileName = fileStatus.getPath().getName();
-    String fileExtension = getTimelineFileExtension(fileName);
-    timestamp = fileName.replace(fileExtension, "");
-
-    // Next read the action for this marker
-    action = fileExtension.replaceFirst(".", "");
-    if (action.equals("inflight")) {
-      // This is to support backwards compatibility on how in-flight commit files were written
-      // General rule is inflight extension is .<action>.inflight, but for commit it is .inflight
-      action = "commit";
-      state = State.INFLIGHT;
-    } else if (action.contains(HoodieTimeline.INFLIGHT_EXTENSION)) {
-      state = State.INFLIGHT;
-      action = action.replace(HoodieTimeline.INFLIGHT_EXTENSION, "");
-    } else if (action.contains(HoodieTimeline.REQUESTED_EXTENSION)) {
-      state = State.REQUESTED;
-      action = action.replace(HoodieTimeline.REQUESTED_EXTENSION, "");
+    Matcher matcher = NAME_FORMAT.matcher(fileName);
+    if (matcher.find()) {
+      timestamp = matcher.group(1);
+      if (matcher.group(2).equals(HoodieTimeline.INFLIGHT_EXTENSION)) {
+        // This is to support backwards compatibility on how in-flight commit files were written
+        // General rule is inflight extension is .<action>.inflight, but for commit it is .inflight
+        action = HoodieTimeline.COMMIT_ACTION;
+        state = State.INFLIGHT;
+      } else {
+        action = matcher.group(2).replaceFirst(DELIMITER, StringUtils.EMPTY_STRING);
+        if (matcher.groupCount() == 3 && matcher.group(3) != null) {
+          state = State.valueOf(matcher.group(3).replaceFirst(DELIMITER, StringUtils.EMPTY_STRING).toUpperCase());
+        } else {
+          // Like 20230104152218702.commit
+          state = State.COMPLETED;
+        }
+      }
+      stateTransitionTime =
+          HoodieInstantTimeGenerator.formatDate(new Date(fileStatus.getModificationTime()));
+    } else {
+      throw new IllegalArgumentException(String.format(FILE_NAME_FORMAT_ERROR, fileName));
     }
   }
 
@@ -104,12 +145,21 @@ public class HoodieInstant implements Serializable, Comparable<HoodieInstant> {
     this.state = isInflight ? State.INFLIGHT : State.COMPLETED;
     this.action = action;
     this.timestamp = timestamp;
+    this.stateTransitionTime = null;
   }
 
   public HoodieInstant(State state, String action, String timestamp) {
     this.state = state;
     this.action = action;
     this.timestamp = timestamp;
+    this.stateTransitionTime = null;
+  }
+
+  public HoodieInstant(State state, String action, String timestamp, String stateTransitionTime) {
+    this.state = state;
+    this.action = action;
+    this.timestamp = timestamp;
+    this.stateTransitionTime = stateTransitionTime;
   }
 
   public boolean isCompleted() {
@@ -214,6 +264,10 @@ public class HoodieInstant implements Serializable, Comparable<HoodieInstant> {
     return state;
   }
 
+  public String getStateTransitionTime() {
+    return stateTransitionTime;
+  }
+
   @Override
   public int hashCode() {
     return Objects.hash(state, action, timestamp);
@@ -226,6 +280,8 @@ public class HoodieInstant implements Serializable, Comparable<HoodieInstant> {
 
   @Override
   public String toString() {
-    return "[" + ((isInflight() || isRequested()) ? "==>" : "") + timestamp + "__" + action + "__" + state + "]";
+    return "[" + ((isInflight() || isRequested()) ? "==>" : "")
+        + timestamp + "__" + action + "__" + state
+        + (StringUtils.isNullOrEmpty(stateTransitionTime) ? "" : ("__" + stateTransitionTime)) + "]";
   }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index 6b6fb28af86..ce3d880a5bd 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -235,6 +235,12 @@ public interface HoodieTimeline extends Serializable {
    */
   HoodieTimeline findInstantsInRange(String startTs, String endTs);
 
+  /**`
+   * Create a new Timeline with instants after startTs and before or on endTs
+   * by state transition timestamp of actions.
+   */
+  HoodieTimeline findInstantsInRangeByStateTransitionTs(String startTs, String endTs);
+
   /**
    * Create a new Timeline with all the instants after startTs.
    */
@@ -349,6 +355,11 @@ public interface HoodieTimeline extends Serializable {
    */
   Stream<HoodieInstant> getReverseOrderedInstants();
 
+  /**
+   * Get the stream of instants in order by state transition timestamp of actions.
+   */
+  Stream<HoodieInstant> getInstantsOrderedByStateTransitionTs();
+
   /**
    * @return true if the passed in instant is before the first completed instant in the timeline
    */
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java b/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
index d3823054650..6485fdd9575 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
@@ -183,18 +183,13 @@ public class InternalSchemaCache {
   public static InternalSchema getInternalSchemaByVersionId(long versionId, String tablePath, Configuration hadoopConf, String validCommits) {
     String avroSchema = "";
     Set<String> commitSet = Arrays.stream(validCommits.split(",")).collect(Collectors.toSet());
-    List<String> validateCommitList = commitSet.stream().map(fileName -> {
-      String fileExtension = HoodieInstant.getTimelineFileExtension(fileName);
-      return fileName.replace(fileExtension, "");
-    }).collect(Collectors.toList());
+    List<String> validateCommitList = commitSet.stream().map(HoodieInstant::extractTimestamp).collect(Collectors.toList());
 
     FileSystem fs = FSUtils.getFs(tablePath, hadoopConf);
     Path hoodieMetaPath = new Path(tablePath, HoodieTableMetaClient.METAFOLDER_NAME);
     //step1:
-    Path candidateCommitFile = commitSet.stream().filter(fileName -> {
-      String fileExtension = HoodieInstant.getTimelineFileExtension(fileName);
-      return fileName.replace(fileExtension, "").equals(versionId + "");
-    }).findFirst().map(f -> new Path(hoodieMetaPath, f)).orElse(null);
+    Path candidateCommitFile = commitSet.stream().filter(fileName -> HoodieInstant.extractTimestamp(fileName).equals(versionId + ""))
+        .findFirst().map(f -> new Path(hoodieMetaPath, f)).orElse(null);
     if (candidateCommitFile != null) {
       try {
         byte[] data;
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieInstant.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieInstant.java
new file mode 100644
index 00000000000..c4a1d00e90d
--- /dev/null
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieInstant.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.timeline;
+
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.util.Option;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.testutils.Assertions.assertStreamEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHoodieInstant extends HoodieCommonTestHarness {
+
+  @Test
+  public void testExtractTimestamp() {
+    String fileName = "20230104152218702.inflight";
+    assertEquals("20230104152218702", HoodieInstant.extractTimestamp(fileName));
+
+    fileName = "20230104152218702.commit.request";
+    assertEquals("20230104152218702", HoodieInstant.extractTimestamp(fileName));
+  }
+
+  @Test
+  public void testGetTimelineFileExtension() {
+    String fileName = "20230104152218702.inflight";
+    assertEquals(".inflight", HoodieInstant.getTimelineFileExtension(fileName));
+
+    fileName = "20230104152218702.commit.request";
+    assertEquals(".commit.request", HoodieInstant.getTimelineFileExtension(fileName));
+  }
+
+  @Test
+  public void testCreateHoodieInstantByFileStatus() throws IOException {
+    try {
+      initMetaClient();
+      HoodieInstant instantRequested =
+          new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMMIT_ACTION, "001");
+      HoodieInstant instantCommitted =
+          new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "001");
+      HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+      timeline.createNewInstant(instantRequested);
+      timeline.transitionRequestedToInflight(instantRequested, Option.empty());
+      timeline.saveAsComplete(
+          new HoodieInstant(true, instantRequested.getAction(), instantRequested.getTimestamp()),
+          Option.empty());
+      metaClient.reloadActiveTimeline();
+      timeline = metaClient.getActiveTimeline();
+      assertEquals(1, timeline.countInstants());
+
+      assertStreamEquals(Stream.of(instantCommitted),
+          timeline.getInstantsAsStream(), "Instants in timeline is not matched");
+
+      // Make sure StateTransitionTime is set in the timeline
+      assertEquals(0,
+          timeline.getInstantsAsStream().filter(s -> s.getStateTransitionTime().isEmpty()).count());
+    } finally {
+      cleanMetaClient();
+    }
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 4cef215d0e6..31e5512a3f9 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -115,13 +115,17 @@ object DataSourceReadOptions {
     .noDefaultValue()
     .withDocumentation("Instant time to start incrementally pulling data from. The instanttime here need not necessarily " +
       "correspond to an instant on the timeline. New data written with an instant_time > BEGIN_INSTANTTIME are fetched out. " +
-      "For e.g: ‘20170901080000’ will get all new data written after Sep 1, 2017 08:00AM.")
+      "For e.g: ‘20170901080000’ will get all new data written after Sep 1, 2017 08:00AM. Note that if `"
+      + HoodieCommonConfig.READ_BY_STATE_TRANSITION_TIME.key() + "` enabled, will use instant's "
+      + "`stateTransitionTime` to perform comparison.")
 
   val END_INSTANTTIME: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.read.end.instanttime")
     .noDefaultValue()
     .withDocumentation("Instant time to limit incrementally fetched data to. " +
-      "New data written with an instant_time <= END_INSTANTTIME are fetched out.")
+      "New data written with an instant_time <= END_INSTANTTIME are fetched out. Note that if `"
+      + HoodieCommonConfig.READ_BY_STATE_TRANSITION_TIME.key() + "` enabled, will use instant's "
+      + "`stateTransitionTime` to perform comparison.")
 
   val INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.read.schema.use.end.instanttime")
@@ -200,6 +204,9 @@ object DataSourceReadOptions {
 
   val SCHEMA_EVOLUTION_ENABLED: ConfigProperty[java.lang.Boolean] = HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE
 
+  val READ_BY_STATE_TRANSITION_TIME: ConfigProperty[Boolean] = HoodieCommonConfig.READ_BY_STATE_TRANSITION_TIME
+
+
   /** @deprecated Use {@link QUERY_TYPE} and its methods instead */
   @Deprecated
   val QUERY_TYPE_OPT_KEY = QUERY_TYPE.key()
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 79bd90ec024..0d178abc5ba 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -64,6 +64,11 @@ class IncrementalRelation(val sqlContext: SQLContext,
     new HoodieSparkEngineContext(new JavaSparkContext(sqlContext.sparkContext)),
     metaClient)
   private val commitTimeline = hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants()
+
+  private val useStateTransitionTime = optParams.get(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.key)
+    .map(_.toBoolean)
+    .getOrElse(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.defaultValue)
+
   if (commitTimeline.empty()) {
     throw new HoodieException("No instants to incrementally pull")
   }
@@ -81,9 +86,17 @@ class IncrementalRelation(val sqlContext: SQLContext,
 
   private val lastInstant = commitTimeline.lastInstant().get()
 
-  private val commitsTimelineToReturn = commitTimeline.findInstantsInRange(
-    optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key),
-    optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key(), lastInstant.getTimestamp))
+  private val commitsTimelineToReturn = {
+    if (useStateTransitionTime) {
+      commitTimeline.findInstantsInRangeByStateTransitionTs(
+        optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key),
+        optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key(), lastInstant.getStateTransitionTime))
+    } else {
+      commitTimeline.findInstantsInRange(
+        optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key),
+        optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key(), lastInstant.getTimestamp))
+    }
+  }
   private val commitsToReturn = commitsTimelineToReturn.getInstantsAsStream.iterator().toList
 
   // use schema from a file produced in the end/latest instant
@@ -204,6 +217,9 @@ class IncrementalRelation(val sqlContext: SQLContext,
       val endInstantArchived = commitTimeline.isBeforeTimelineStarts(endInstantTime)
 
       val scanDf = if (fallbackToFullTableScan && (startInstantArchived || endInstantArchived)) {
+        if (useStateTransitionTime) {
+          throw new HoodieException("Cannot use stateTransitionTime while enables full table scan")
+        }
         log.info(s"Falling back to full table scan as startInstantArchived: $startInstantArchived, endInstantArchived: $endInstantArchived")
         fullTableScanDataFrame(startInstantTime, endInstantTime)
       } else {
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 636624f3950..74c03e9887c 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -61,6 +61,8 @@ case class MergeOnReadIncrementalRelation(override val sqlContext: SQLContext,
   override protected def timeline: HoodieTimeline = {
     if (fullTableScan) {
       metaClient.getCommitsAndCompactionTimeline
+    } else if (useStateTransitionTime) {
+      metaClient.getCommitsAndCompactionTimeline.findInstantsInRangeByStateTransitionTs(startTimestamp, endTimestamp)
     } else {
       metaClient.getCommitsAndCompactionTimeline.findInstantsInRange(startTimestamp, endTimestamp)
     }
@@ -133,8 +135,16 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation {
   // Validate this Incremental implementation is properly configured
   validate()
 
+  protected val useStateTransitionTime: Boolean =
+    optParams.get(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.key)
+      .map(_.toBoolean)
+      .getOrElse(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.defaultValue)
+
   protected def startTimestamp: String = optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key)
-  protected def endTimestamp: String = optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key, super.timeline.lastInstant().get.getTimestamp)
+  protected def endTimestamp: String = optParams.getOrElse(
+    DataSourceReadOptions.END_INSTANTTIME.key,
+    if (useStateTransitionTime) super.timeline.lastInstant().get.getStateTransitionTime
+    else super.timeline.lastInstant().get.getTimestamp)
 
   protected def startInstantArchived: Boolean = super.timeline.isBeforeTimelineStarts(startTimestamp)
   protected def endInstantArchived: Boolean = super.timeline.isBeforeTimelineStarts(endTimestamp)
@@ -154,7 +164,11 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation {
     if (!startInstantArchived || !endInstantArchived) {
       // If endTimestamp commit is not archived, will filter instants
       // before endTimestamp.
-      super.timeline.findInstantsInRange(startTimestamp, endTimestamp).getInstants.asScala.toList
+      if (useStateTransitionTime) {
+        super.timeline.findInstantsInRangeByStateTransitionTs(startTimestamp, endTimestamp).getInstants.asScala.toList
+      } else {
+        super.timeline.findInstantsInRange(startTimestamp, endTimestamp).getInstants.asScala.toList
+      }
     } else {
       super.timeline.getInstants.asScala.toList
     }
@@ -171,7 +185,11 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation {
   protected lazy val incrementalSpanRecordFilters: Seq[Filter] = {
     val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
 
-    val largerThanFilter = GreaterThan(HoodieRecord.COMMIT_TIME_METADATA_FIELD, startTimestamp)
+    val largerThanFilter = if (startInstantArchived) {
+      GreaterThan(HoodieRecord.COMMIT_TIME_METADATA_FIELD, startTimestamp)
+    } else {
+      GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, includedCommits.head.getTimestamp)
+    }
 
     val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD,
       if (endInstantArchived) endTimestamp else includedCommits.last.getTimestamp)
@@ -199,6 +217,10 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation {
     if (!this.tableConfig.populateMetaFields()) {
       throw new HoodieException("Incremental queries are not supported when meta fields are disabled")
     }
+
+    if (useStateTransitionTime && fullTableScan) {
+      throw new HoodieException("Cannot use stateTransitionTime while enables full table scan")
+    }
   }
 
   protected def globPattern: String =
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
index d8f53a15dd6..2f77c4cd770 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala
@@ -69,6 +69,11 @@ class HoodieStreamSource(
     parameters.get(DataSourceReadOptions.QUERY_TYPE.key).contains(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) &&
     parameters.get(DataSourceReadOptions.INCREMENTAL_FORMAT.key).contains(DataSourceReadOptions.INCREMENTAL_FORMAT_CDC_VAL)
 
+  private val useStateTransitionTime =
+    parameters.get(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.key())
+      .map(_.toBoolean)
+      .getOrElse(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.defaultValue())
+
   @transient private lazy val initialOffsets = {
     val metadataLog = new HoodieMetadataLog(sqlContext.sparkSession, metadataPath)
     metadataLog.get(0).getOrElse {
@@ -101,7 +106,16 @@ class HoodieStreamSource(
     metaClient.reloadActiveTimeline()
     metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants() match {
       case activeInstants if !activeInstants.empty() =>
-        Some(HoodieSourceOffset(activeInstants.lastInstant().get().getTimestamp))
+        val timestamp = if (useStateTransitionTime) {
+          activeInstants.getInstantsOrderedByStateTransitionTs
+            .skip(activeInstants.countInstants() - 1)
+            .findFirst()
+            .get()
+            .getStateTransitionTime
+        } else {
+          activeInstants.lastInstant().get().getTimestamp
+        }
+        Some(HoodieSourceOffset(timestamp))
       case _ =>
         None
     }
@@ -137,6 +151,7 @@ class HoodieStreamSource(
         // Consume the data between (startCommitTime, endCommitTime]
         val incParams = parameters ++ Map(
           DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL,
+          DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.key() -> useStateTransitionTime.toString,
           DataSourceReadOptions.BEGIN_INSTANTTIME.key -> startCommitTime(startOffset),
           DataSourceReadOptions.END_INSTANTTIME.key -> endOffset.commitTime
         )
@@ -163,10 +178,7 @@ class HoodieStreamSource(
     startOffset match {
       case INIT_OFFSET => startOffset.commitTime
       case HoodieSourceOffset(commitTime) =>
-        val time = HoodieActiveTimeline.parseDateFromInstantTime(commitTime).getTime
-        // As we consume the data between (start, end], start is not included,
-        // so we +1s to the start commit time here.
-        HoodieActiveTimeline.formatDate(new Date(time + 1000))
+        commitTime
       case _=> throw new IllegalStateException("UnKnow offset type.")
     }
   }
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowArchivedCommitsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowArchivedCommitsProcedure.scala
index 9d780144315..01c7465b3f4 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowArchivedCommitsProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowArchivedCommitsProcedure.scala
@@ -43,6 +43,7 @@ class ShowArchivedCommitsProcedure(includeExtraMetadata: Boolean) extends BasePr
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
     StructField("commit_time", DataTypes.StringType, nullable = true, Metadata.empty),
+    StructField("state_transition_time", DataTypes.StringType, nullable = true, Metadata.empty),
     StructField("total_bytes_written", DataTypes.LongType, nullable = true, Metadata.empty),
     StructField("total_files_added", DataTypes.LongType, nullable = true, Metadata.empty),
     StructField("total_files_updated", DataTypes.LongType, nullable = true, Metadata.empty),
@@ -54,6 +55,7 @@ class ShowArchivedCommitsProcedure(includeExtraMetadata: Boolean) extends BasePr
 
   private val METADATA_OUTPUT_TYPE = new StructType(Array[StructField](
     StructField("commit_time", DataTypes.StringType, nullable = true, Metadata.empty),
+    StructField("state_transition_time", DataTypes.StringType, nullable = true, Metadata.empty),
     StructField("action", DataTypes.StringType, nullable = true, Metadata.empty),
     StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty),
     StructField("file_id", DataTypes.StringType, nullable = true, Metadata.empty),
@@ -121,7 +123,7 @@ class ShowArchivedCommitsProcedure(includeExtraMetadata: Boolean) extends BasePr
       for (partitionWriteStat <- commitMetadata.getPartitionToWriteStats.entrySet) {
         for (hoodieWriteStat <- partitionWriteStat.getValue) {
           rows.add(Row(
-            commit.getTimestamp, commit.getAction, hoodieWriteStat.getPartitionPath,
+            commit.getTimestamp, commit.getStateTransitionTime, commit.getAction, hoodieWriteStat.getPartitionPath,
             hoodieWriteStat.getFileId, hoodieWriteStat.getPrevCommit, hoodieWriteStat.getNumWrites,
             hoodieWriteStat.getNumInserts, hoodieWriteStat.getNumDeletes, hoodieWriteStat.getNumUpdateWrites,
             hoodieWriteStat.getTotalWriteErrors, hoodieWriteStat.getTotalLogBlocks, hoodieWriteStat.getTotalCorruptLogBlock,
@@ -151,7 +153,7 @@ class ShowArchivedCommitsProcedure(includeExtraMetadata: Boolean) extends BasePr
     for (i <- 0 until newCommits.size) {
       val commit = newCommits.get(i)
       val commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get, classOf[HoodieCommitMetadata])
-      rows.add(Row(commit.getTimestamp, commitMetadata.fetchTotalBytesWritten, commitMetadata.fetchTotalFilesInsert,
+      rows.add(Row(commit.getTimestamp, commit.getStateTransitionTime, commitMetadata.fetchTotalBytesWritten, commitMetadata.fetchTotalFilesInsert,
         commitMetadata.fetchTotalFilesUpdated, commitMetadata.fetchTotalPartitionsWritten,
         commitMetadata.fetchTotalRecordsWritten, commitMetadata.fetchTotalUpdateRecordsWritten,
         commitMetadata.fetchTotalWriteErrors))
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitsProcedure.scala
index ddbad810042..3c3fde385bc 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitsProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitsProcedure.scala
@@ -39,6 +39,7 @@ class ShowCommitsProcedure(includeExtraMetadata: Boolean) extends BaseProcedure
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
     StructField("commit_time", DataTypes.StringType, nullable = true, Metadata.empty),
+    StructField("state_transition_time", DataTypes.StringType, nullable = true, Metadata.empty),
     StructField("action", DataTypes.StringType, nullable = true, Metadata.empty),
     StructField("total_bytes_written", DataTypes.LongType, nullable = true, Metadata.empty),
     StructField("total_files_added", DataTypes.LongType, nullable = true, Metadata.empty),
@@ -51,6 +52,7 @@ class ShowCommitsProcedure(includeExtraMetadata: Boolean) extends BaseProcedure
 
   private val METADATA_OUTPUT_TYPE = new StructType(Array[StructField](
     StructField("commit_time", DataTypes.StringType, nullable = true, Metadata.empty),
+    StructField("state_transition_time", DataTypes.StringType, nullable = true, Metadata.empty),
     StructField("action", DataTypes.StringType, nullable = true, Metadata.empty),
     StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty),
     StructField("file_id", DataTypes.StringType, nullable = true, Metadata.empty),
@@ -104,7 +106,7 @@ class ShowCommitsProcedure(includeExtraMetadata: Boolean) extends BaseProcedure
       for (partitionWriteStat <- commitMetadata.getPartitionToWriteStats.entrySet) {
         for (hoodieWriteStat <- partitionWriteStat.getValue) {
           rows.add(Row(
-            commit.getTimestamp, commit.getAction, hoodieWriteStat.getPartitionPath,
+            commit.getTimestamp, commit.getStateTransitionTime, commit.getAction, hoodieWriteStat.getPartitionPath,
             hoodieWriteStat.getFileId, hoodieWriteStat.getPrevCommit, hoodieWriteStat.getNumWrites,
             hoodieWriteStat.getNumInserts, hoodieWriteStat.getNumDeletes, hoodieWriteStat.getNumUpdateWrites,
             hoodieWriteStat.getTotalWriteErrors, hoodieWriteStat.getTotalLogBlocks, hoodieWriteStat.getTotalCorruptLogBlock,
@@ -134,7 +136,7 @@ class ShowCommitsProcedure(includeExtraMetadata: Boolean) extends BaseProcedure
     for (i <- 0 until newCommits.size) {
       val commit = newCommits.get(i)
       val commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get, classOf[HoodieCommitMetadata])
-      rows.add(Row(commit.getTimestamp, commit.getAction, commitMetadata.fetchTotalBytesWritten, commitMetadata.fetchTotalFilesInsert,
+      rows.add(Row(commit.getTimestamp, commit.getStateTransitionTime, commit.getAction, commitMetadata.fetchTotalBytesWritten, commitMetadata.fetchTotalFilesInsert,
         commitMetadata.fetchTotalFilesUpdated, commitMetadata.fetchTotalPartitionsWritten,
         commitMetadata.fetchTotalRecordsWritten, commitMetadata.fetchTotalUpdateRecordsWritten,
         commitMetadata.fetchTotalWriteErrors))
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadByStateTransitionTime.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadByStateTransitionTime.scala
new file mode 100644
index 00000000000..b1e81d9857d
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadByStateTransitionTime.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.testutils.HoodieSparkClientTestBase
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.junit.jupiter.api.{AfterEach, Assertions, BeforeEach}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
+
+import scala.collection.JavaConversions.asScalaBuffer
+
+class TestIncrementalReadByStateTransitionTime extends HoodieSparkClientTestBase  {
+
+  var spark: SparkSession = null
+
+  val commonOpts = Map(
+    "hoodie.insert.shuffle.parallelism" -> "4",
+    "hoodie.upsert.shuffle.parallelism" -> "4",
+    DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+    DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+    DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
+    HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+    HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1"
+  )
+
+  @BeforeEach
+  override def setUp(): Unit = {
+    setTableName("hoodie_test")
+    initPath()
+    initSparkContexts()
+    spark = sqlContext.sparkSession
+    initTestDataGenerator()
+    initFileSystem()
+  }
+
+  @AfterEach
+  override def tearDown(): Unit = {
+    cleanupSparkContexts()
+    cleanupTestDataGenerator()
+    cleanupFileSystem()
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieTableType])
+  def testReadingWithStateTransitionTime(tableType: HoodieTableType): Unit = {
+    val records = recordsToStrings(dataGen.generateInserts("001", 100)).toList
+    val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+    inputDF.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
+      .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    val metaClient = HoodieTableMetaClient.builder()
+      .setConf(spark.sparkContext.hadoopConfiguration)
+      .setBasePath(basePath)
+      .setLoadActiveTimelineOnLoad(true)
+      .build()
+
+    val firstInstant = metaClient.getActiveTimeline.filterCompletedInstants().getInstantsOrderedByStateTransitionTs
+      .findFirst().get()
+
+    val result1 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), "000")
+      .option(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.key(), "true")
+      .option(DataSourceReadOptions.END_INSTANTTIME.key(), firstInstant.getTimestamp)
+      .load(basePath)
+      .count()
+
+    Assertions.assertEquals(result1, 0)
+    val result2 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), "000")
+      .option(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.key(), "true")
+      .option(DataSourceReadOptions.END_INSTANTTIME.key(), firstInstant.getStateTransitionTime)
+      .load(basePath)
+      .count()
+    Assertions.assertEquals(result2, 100)
+  }
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamSourceReadByStateTransitionTime.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamSourceReadByStateTransitionTime.scala
new file mode 100644
index 00000000000..63175e2d19d
--- /dev/null
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamSourceReadByStateTransitionTime.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.engine.EngineType
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
+import org.apache.hudi.common.model.{HoodieFailedWritesCleaningPolicy, HoodieRecord, HoodieTableType}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator
+import org.apache.hudi.common.testutils.HoodieTestTable.makeNewCommitTime
+import org.apache.hudi.config.{HoodieCleanConfig, HoodieWriteConfig}
+import org.apache.spark.api.java.JavaRDD
+
+import scala.collection.JavaConversions.asScalaBuffer
+import scala.jdk.CollectionConverters.mapAsJavaMapConverter
+
+class TestStreamSourceReadByStateTransitionTime extends TestStreamingSource {
+
+  override val useTransitionTime: Boolean = true
+
+  private val dataGen = new HoodieTestDataGenerator(System.currentTimeMillis())
+
+  test("Test streaming read out of order data") {
+    HoodieTableType.values().foreach { tableType =>
+      withTempDir { inputDir =>
+        val tablePath = s"${inputDir.getCanonicalPath}/test_stream_${tableType.name()}"
+        HoodieTableMetaClient.withPropertyBuilder()
+          .setTableType(tableType)
+          .setTableName(s"test_stream_${tableType.name()}")
+          .setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
+          .setPreCombineField("timestamp")
+          .initTable(spark.sessionState.newHadoopConf(), tablePath)
+
+        val writeConfig = HoodieWriteConfig.newBuilder()
+          .withEngineType(EngineType.SPARK)
+          .withPath(tablePath)
+          .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+          .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+            .build())
+          .withProps(commonOptions.asJava)
+          .build()
+
+        val context = new HoodieSparkEngineContext(sparkContext)
+        val writeClient = new SparkRDDWriteClient(context, writeConfig)
+        val instantTime1 = makeNewCommitTime(1, "%09d")
+        val instantTime2 = makeNewCommitTime(2,"%09d")
+
+        val records1 = sparkContext.parallelize(dataGen.generateInserts(instantTime1, 10).toSeq, 2)
+        val records2 = sparkContext.parallelize(dataGen.generateInserts(instantTime2, 15).toSeq, 2)
+
+        writeClient.startCommitWithTime(instantTime1)
+        writeClient.startCommitWithTime(instantTime2)
+        writeClient.insert(records2.toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[Nothing]]], instantTime2)
+        val df = spark.readStream
+          .format("hudi")
+          .option(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.key(), useTransitionTime)
+          .load(tablePath)
+
+        testStream(df) (
+          AssertOnQuery { q => q.processAllAvailable(); true },
+          // Should read all records from instantTime2
+          assertCountMatched(15, true),
+
+          AssertOnQuery { _ =>
+            writeClient.insert(records1.toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[Nothing]]], instantTime1)
+            true
+          },
+          AssertOnQuery { q => q.processAllAvailable(); true },
+
+          // Should read all records from instantTime1
+          assertCountMatched(10, true),
+          StopStream
+        )
+      }
+    }
+  }
+
+  def assertCountMatched(count: Int, lastOnly: Boolean): CheckAnswerRowsByFunc = CheckAnswerRowsByFunc(rows => {
+    assert(rows.size == count)
+  }, lastOnly = lastOnly)
+}
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
index e7b568931b0..96108b0d550 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
@@ -18,7 +18,7 @@
 package org.apache.hudi.functional
 
 import org.apache.hudi.DataSourceReadOptions.START_OFFSET
-import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
 import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD, RECORDKEY_FIELD}
 import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ}
 import org.apache.hudi.common.table.HoodieTableMetaClient
@@ -29,7 +29,7 @@ import org.apache.spark.sql.{Row, SaveMode}
 class TestStreamingSource extends StreamTest {
 
   import testImplicits._
-  private val commonOptions = Map(
+  protected val commonOptions: Map[String, String] = Map(
     RECORDKEY_FIELD.key -> "id",
     PRECOMBINE_FIELD.key -> "ts",
     INSERT_PARALLELISM_VALUE.key -> "4",
@@ -38,6 +38,8 @@ class TestStreamingSource extends StreamTest {
   )
   private val columns = Seq("id", "name", "price", "ts")
 
+  val useTransitionTime: Boolean = false
+
   org.apache.log4j.Logger.getRootLogger.setLevel(org.apache.log4j.Level.WARN)
 
   override protected def sparkConf = {
@@ -51,45 +53,46 @@ class TestStreamingSource extends StreamTest {
     withTempDir { inputDir =>
       val tablePath = s"${inputDir.getCanonicalPath}/test_cow_stream"
       HoodieTableMetaClient.withPropertyBuilder()
-          .setTableType(COPY_ON_WRITE)
-          .setTableName(getTableName(tablePath))
-          .setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
+        .setTableType(COPY_ON_WRITE)
+        .setTableName(getTableName(tablePath))
+        .setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
         .setPreCombineField("ts")
-          .initTable(spark.sessionState.newHadoopConf(), tablePath)
+        .initTable(spark.sessionState.newHadoopConf(), tablePath)
 
       addData(tablePath, Seq(("1", "a1", "10", "000")))
       val df = spark.readStream
         .format("org.apache.hudi")
+        .option(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.key(), useTransitionTime)
         .load(tablePath)
         .select("id", "name", "price", "ts")
 
       testStream(df)(
-        AssertOnQuery {q => q.processAllAvailable(); true },
+        AssertOnQuery { q => q.processAllAvailable(); true },
         CheckAnswerRows(Seq(Row("1", "a1", "10", "000")), lastOnly = true, isSorted = false),
         StopStream,
 
         addDataToQuery(tablePath, Seq(("1", "a1", "12", "000"))),
         StartStream(),
-        AssertOnQuery {q => q.processAllAvailable(); true },
+        AssertOnQuery { q => q.processAllAvailable(); true },
         CheckAnswerRows(Seq(Row("1", "a1", "12", "000")), lastOnly = true, isSorted = false),
 
         addDataToQuery(tablePath,
           Seq(("2", "a2", "12", "000"),
-              ("3", "a3", "12", "000"),
-              ("4", "a4", "12", "000"))),
-        AssertOnQuery {q => q.processAllAvailable(); true },
+            ("3", "a3", "12", "000"),
+            ("4", "a4", "12", "000"))),
+        AssertOnQuery { q => q.processAllAvailable(); true },
         CheckAnswerRows(
           Seq(Row("2", "a2", "12", "000"),
-             Row("3", "a3", "12", "000"),
-             Row("4", "a4", "12", "000")),
+            Row("3", "a3", "12", "000"),
+            Row("4", "a4", "12", "000")),
           lastOnly = true, isSorted = false),
-          StopStream,
+        StopStream,
 
         addDataToQuery(tablePath, Seq(("5", "a5", "12", "000"))),
         addDataToQuery(tablePath, Seq(("6", "a6", "12", "000"))),
         addDataToQuery(tablePath, Seq(("5", "a5", "15", "000"))),
         StartStream(),
-        AssertOnQuery {q => q.processAllAvailable(); true },
+        AssertOnQuery { q => q.processAllAvailable(); true },
         CheckAnswerRows(
           Seq(Row("6", "a6", "12", "000"),
             Row("5", "a5", "15", "000")),
@@ -111,11 +114,12 @@ class TestStreamingSource extends StreamTest {
       addData(tablePath, Seq(("1", "a1", "10", "000")))
       val df = spark.readStream
         .format("org.apache.hudi")
+        .option(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.key(), useTransitionTime)
         .load(tablePath)
         .select("id", "name", "price", "ts")
 
       testStream(df)(
-        AssertOnQuery {q => q.processAllAvailable(); true },
+        AssertOnQuery { q => q.processAllAvailable(); true },
         CheckAnswerRows(Seq(Row("1", "a1", "10", "000")), lastOnly = true, isSorted = false),
         StopStream,
 
@@ -124,7 +128,7 @@ class TestStreamingSource extends StreamTest {
             ("3", "a3", "12", "000"),
             ("2", "a2", "10", "001"))),
         StartStream(),
-        AssertOnQuery {q => q.processAllAvailable(); true },
+        AssertOnQuery { q => q.processAllAvailable(); true },
         CheckAnswerRows(
           Seq(Row("3", "a3", "12", "000"),
             Row("2", "a2", "10", "001")),
@@ -134,7 +138,7 @@ class TestStreamingSource extends StreamTest {
         addDataToQuery(tablePath, Seq(("5", "a5", "12", "000"))),
         addDataToQuery(tablePath, Seq(("6", "a6", "12", "000"))),
         StartStream(),
-        AssertOnQuery {q => q.processAllAvailable(); true },
+        AssertOnQuery { q => q.processAllAvailable(); true },
         CheckAnswerRows(
           Seq(Row("5", "a5", "12", "000"),
             Row("6", "a6", "12", "000")),
@@ -157,18 +161,19 @@ class TestStreamingSource extends StreamTest {
       val df = spark.readStream
         .format("org.apache.hudi")
         .option(START_OFFSET.key(), "latest")
+        .option(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.key(), useTransitionTime)
         .load(tablePath)
         .select("id", "name", "price", "ts")
 
       testStream(df)(
-        AssertOnQuery {q => q.processAllAvailable(); true },
+        AssertOnQuery { q => q.processAllAvailable(); true },
         // Start from the latest, should contains no data
         CheckAnswerRows(Seq(), lastOnly = true, isSorted = false),
         StopStream,
 
         addDataToQuery(tablePath, Seq(("2", "a1", "12", "000"))),
         StartStream(),
-        AssertOnQuery {q => q.processAllAvailable(); true },
+        AssertOnQuery { q => q.processAllAvailable(); true },
         CheckAnswerRows(Seq(Row("2", "a1", "12", "000")), lastOnly = false, isSorted = false)
       )
     }
@@ -188,16 +193,22 @@ class TestStreamingSource extends StreamTest {
       addData(tablePath, Seq(("2", "a1", "11", "001")))
       addData(tablePath, Seq(("3", "a1", "12", "002")))
 
-      val timestamp = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants()
-        .firstInstant().get().getTimestamp
+      val timestamp = if (useTransitionTime) {
+        metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants()
+          .firstInstant().get().getStateTransitionTime
+      } else {
+        metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants()
+          .firstInstant().get().getTimestamp
+      }
       val df = spark.readStream
         .format("org.apache.hudi")
         .option(START_OFFSET.key(), timestamp)
+        .option(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.key(), useTransitionTime)
         .load(tablePath)
         .select("id", "name", "price", "ts")
 
       testStream(df)(
-        AssertOnQuery {q => q.processAllAvailable(); true },
+        AssertOnQuery { q => q.processAllAvailable(); true },
         // Start after the first commit
         CheckAnswerRows(Seq(Row("2", "a1", "11", "001"), Row("3", "a1", "12", "002")), lastOnly = true, isSorted = false)
       )