You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/01/17 15:54:13 UTC

[incubator-seatunnel] branch dev updated: Fix NPE when starting from checkpoint. (#3904)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b8f2a9eb4 Fix NPE when starting from checkpoint. (#3904)
b8f2a9eb4 is described below

commit b8f2a9eb4f91ef399c12e968d3aded016d04004e
Author: lightzhao <40...@users.noreply.github.com>
AuthorDate: Tue Jan 17 23:54:07 2023 +0800

    Fix NPE when starting from checkpoint. (#3904)
    
    Co-authored-by: zhaoliang01 <zh...@58.com>
---
 .../translation/flink/source/BaseSeaTunnelSourceFunction.java          | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
index 79860a901..0d811b38c 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
@@ -54,7 +54,7 @@ public abstract class BaseSeaTunnelSourceFunction extends RichSourceFunction<Row
     protected transient volatile BaseSourceFunction<SeaTunnelRow> internalSource;
 
     protected transient ListState<Map<Integer, List<byte[]>>> sourceState;
-    protected transient volatile Map<Integer, List<byte[]>> restoredState = new HashMap<>();
+    protected transient volatile Map<Integer, List<byte[]>> restoredState;
 
     protected final AtomicLong latestCompletedCheckpointId = new AtomicLong(0);
     protected final AtomicLong latestTriggerCheckpointId = new AtomicLong(0);
@@ -142,6 +142,7 @@ public abstract class BaseSeaTunnelSourceFunction extends RichSourceFunction<Row
 
     @Override
     public void initializeState(FunctionInitializationContext initializeContext) throws Exception {
+        this.restoredState = new HashMap<>();
         this.sourceState = initializeContext.getOperatorStateStore()
             .getListState(
                 new ListStateDescriptor<>(