You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/01/17 15:54:13 UTC
[incubator-seatunnel] branch dev updated: Fix NPE when starting from checkpoint. (#3904)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b8f2a9eb4 Fix NPE when starting from checkpoint. (#3904)
b8f2a9eb4 is described below
commit b8f2a9eb4f91ef399c12e968d3aded016d04004e
Author: lightzhao <40...@users.noreply.github.com>
AuthorDate: Tue Jan 17 23:54:07 2023 +0800
Fix NPE when starting from checkpoint. (#3904)
Co-authored-by: zhaoliang01 <zh...@58.com>
---
.../translation/flink/source/BaseSeaTunnelSourceFunction.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
index 79860a901..0d811b38c 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
@@ -54,7 +54,7 @@ public abstract class BaseSeaTunnelSourceFunction extends RichSourceFunction<Row
protected transient volatile BaseSourceFunction<SeaTunnelRow> internalSource;
protected transient ListState<Map<Integer, List<byte[]>>> sourceState;
- protected transient volatile Map<Integer, List<byte[]>> restoredState = new HashMap<>();
+ protected transient volatile Map<Integer, List<byte[]>> restoredState;
protected final AtomicLong latestCompletedCheckpointId = new AtomicLong(0);
protected final AtomicLong latestTriggerCheckpointId = new AtomicLong(0);
@@ -142,6 +142,7 @@ public abstract class BaseSeaTunnelSourceFunction extends RichSourceFunction<Row
@Override
public void initializeState(FunctionInitializationContext initializeContext) throws Exception {
+ this.restoredState = new HashMap<>();
this.sourceState = initializeContext.getOperatorStateStore()
.getListState(
new ListStateDescriptor<>(