You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by da...@apache.org on 2023/04/19 06:16:42 UTC
[hudi] branch master updated: [HUDI-6087] Fix flink streaming source savepoint instant (#8476)
This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 577b2fbb3de [HUDI-6087] Fix flink streaming source savepoint instant (#8476)
577b2fbb3de is described below
commit 577b2fbb3de6b57d0e1b33b785447991421f83b8
Author: voonhous <vo...@gmail.com>
AuthorDate: Wed Apr 19 14:16:32 2023 +0800
[HUDI-6087] Fix flink streaming source savepoint instant (#8476)
---
.../hudi/source/StreamReadMonitoringFunction.java | 2 -
.../source/TestStreamReadMonitoringFunction.java | 65 ++++++++++++++++++++++
2 files changed, 65 insertions(+), 2 deletions(-)
diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index ffd37111a27..5bd7abeef4d 100644
--- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -242,11 +242,9 @@ public class StreamReadMonitoringFunction
if (checkpointLock != null) {
// this is to cover the case where cancel() is called before the run()
synchronized (checkpointLock) {
- issuedInstant = null;
isRunning = false;
}
} else {
- issuedInstant = null;
isRunning = false;
}
}
diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
index 541890f7b05..616edc37f1c 100644
--- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
+++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
@@ -252,6 +252,71 @@ public class TestStreamReadMonitoringFunction {
}
}
+ /**
+ * When stopping with savepoint, these interface methods are called:
+ * <ul>
+ * <li>cancel()</li>
+ * <li>snapshotState()</li>
+ * <li>close()</li>
+ * </ul>
+ * This test ensured that the state is saved properly when these 3 methods are called in the order listed above.
+ */
+ @Test
+ public void testStopWithSavepointAndRestore() throws Exception {
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+ conf.setString(FlinkOptions.READ_START_COMMIT, FlinkOptions.START_COMMIT_EARLIEST);
+ StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
+ OperatorSubtaskState state;
+ try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = createHarness(function)) {
+ harness.setup();
+ harness.open();
+
+ CountDownLatch latch = new CountDownLatch(4);
+ CollectingSourceContext sourceContext = new CollectingSourceContext(latch);
+ runAsync(sourceContext, function);
+
+ assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation");
+ Thread.sleep(1000L);
+
+ // Simulate a stop-with-savepoint
+ function.cancel();
+
+ state = harness.snapshot(1, 1);
+
+ // Stop the stream task.
+ function.close();
+
+ assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation");
+ assertThat("Should produce the expected splits",
+ sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
+ assertTrue(sourceContext.splits.stream().noneMatch(split -> split.getInstantRange().isPresent()),
+ "All instants should have range limit");
+
+ }
+
+ TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
+ StreamReadMonitoringFunction function2 = TestUtils.getMonitorFunc(conf);
+ try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = createHarness(function2)) {
+ harness.setup();
+ // Recover to process the remaining snapshots.
+ harness.initializeState(state);
+ harness.open();
+
+ CountDownLatch latch = new CountDownLatch(4);
+ CollectingSourceContext sourceContext = new CollectingSourceContext(latch);
+ runAsync(sourceContext, function2);
+
+ // Stop the stream task.
+ function.close();
+
+ assertTrue(latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS), "Should finish splits generation");
+ assertThat("Should produce the expected splits",
+ sourceContext.getPartitionPaths(), is("par1,par2,par3,par4"));
+ assertTrue(sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()),
+ "All the instants should have range limit");
+ }
+ }
+
private AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> createHarness(
StreamReadMonitoringFunction function) throws Exception {
StreamSource<MergeOnReadInputSplit, StreamReadMonitoringFunction> streamSource = new StreamSource<>(function);