You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/04/19 06:16:42 UTC

[hudi] branch master updated: [HUDI-6087] Fix flink streaming source savepoint instant (#8476)

This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 577b2fbb3de [HUDI-6087] Fix flink streaming source savepoint instant  (#8476)
577b2fbb3de is described below

commit 577b2fbb3de6b57d0e1b33b785447991421f83b8
Author: voonhous <vo...@gmail.com>
AuthorDate: Wed Apr 19 14:16:32 2023 +0800

    [HUDI-6087] Fix flink streaming source savepoint instant  (#8476)
---
 .../hudi/source/StreamReadMonitoringFunction.java  |  2 -
 .../source/TestStreamReadMonitoringFunction.java   | 65 ++++++++++++++++++++++
 2 files changed, 65 insertions(+), 2 deletions(-)

diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index ffd37111a27..5bd7abeef4d 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -242,11 +242,9 @@ public class StreamReadMonitoringFunction
     if (checkpointLock != null) {
       // this is to cover the case where cancel() is called before the run()
       synchronized (checkpointLock) {
-        issuedInstant = null;
         isRunning = false;
       }
     } else {
-      issuedInstant = null;
       isRunning = false;
     }
   }
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
index 541890f7b05..616edc37f1c 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
@@ -252,6 +252,71 @@ public class TestStreamReadMonitoringFunction {
     }
   }
 
+  /**
+   * When stopping with savepoint, these interface methods are called:
+   * <ul>
+   * <li>cancel()</li>
+   * <li>snapshotState()</li>
+   * <li>close()</li>
+   * </ul>
+   * This test ensured that the state is saved properly when these 3 methods are called in the order listed above.
+   */
+  @Test
+  public void testStopWithSavepointAndRestore() throws Exception {
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+    conf.setString(FlinkOptions.READ_START_COMMIT, FlinkOptions.START_COMMIT_EARLIEST);
+    StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
+    OperatorSubtaskState state;
+    try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = createHarness(function)) {
+      harness.setup();
+      harness.open();
+
+      CountDownLatch latch = new CountDownLatch(4);
+      CollectingSourceContext sourceContext = new CollectingSourceContext(latch);
+      runAsync(sourceContext, function);
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation");
+      Thread.sleep(1000L);
+
+      // Simulate a stop-with-savepoint
+      function.cancel();
+
+      state = harness.snapshot(1, 1);
+
+      // Stop the stream task.
+      function.close();
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation");
+      assertThat("Should produce the expected splits",
+          sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
+      assertTrue(sourceContext.splits.stream().noneMatch(split -> split.getInstantRange().isPresent()),
+          "All instants should have range limit");
+
+    }
+
+    TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
+    StreamReadMonitoringFunction function2 = TestUtils.getMonitorFunc(conf);
+    try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = createHarness(function2)) {
+      harness.setup();
+      // Recover to process the remaining snapshots.
+      harness.initializeState(state);
+      harness.open();
+
+      CountDownLatch latch = new CountDownLatch(4);
+      CollectingSourceContext sourceContext = new CollectingSourceContext(latch);
+      runAsync(sourceContext, function2);
+
+      // Stop the stream task.
+      function.close();
+
+      assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation");
+      assertThat("Should produce the expected splits",
+          sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
+      assertTrue(sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()),
+          "All the instants should have range limit");
+    }
+  }
+
   private AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> createHarness(
       StreamReadMonitoringFunction function) throws Exception {
     StreamSource<MergeOnReadInputSplit, StreamReadMonitoringFunction> streamSource = new StreamSource<>(function);