You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/27 04:32:18 UTC

[incubator-seatunnel] branch api-draft updated: [api-draft][flink] non-key operator can't get the keyed state store (#1961)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new ffb2d86e [api-draft][flink] non-key operator can't get the keyed state store (#1961)
ffb2d86e is described below

commit ffb2d86eea7ecdda029657bb349628b9fba09793
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Fri May 27 12:32:13 2022 +0800

    [api-draft][flink] non-key operator can't get the keyed state store (#1961)
---
 .../flink/source/BaseSeaTunnelSourceFunction.java  | 23 ++++++++++++----------
 1 file changed, 13 insertions(+), 10 deletions(-)

diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
index 1778596f..57b6e38b 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
@@ -24,8 +24,8 @@ import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
 import org.apache.seatunnel.translation.source.BaseSourceFunction;
 
 import org.apache.flink.api.common.state.CheckpointListener;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -54,7 +54,7 @@ public abstract class BaseSeaTunnelSourceFunction extends RichSourceFunction<Row
     protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
     protected transient volatile BaseSourceFunction<SeaTunnelRow> internalSource;
 
-    protected transient MapState<Integer, List<byte[]>> sourceState;
+    protected transient ListState<Map<Integer, List<byte[]>>> sourceState;
     protected transient volatile Map<Integer, List<byte[]>> restoredState = new HashMap<>();
 
     /**
@@ -114,20 +114,23 @@ public abstract class BaseSeaTunnelSourceFunction extends RichSourceFunction<Row
             LOG.debug("snapshotState() called on closed source");
         } else {
             sourceState.clear();
-            sourceState.putAll(internalSource.snapshotState(snapshotContext.getCheckpointId()));
+            sourceState.add(internalSource.snapshotState(snapshotContext.getCheckpointId()));
         }
     }
 
     @Override
     public void initializeState(FunctionInitializationContext initializeContext) throws Exception {
-        this.sourceState = initializeContext.getKeyedStateStore()
-            .getMapState(new MapStateDescriptor<>(
-                getStateName(),
-                BasicTypeInfo.INT_TYPE_INFO,
-                Types.LIST(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)));
+        this.sourceState = initializeContext.getOperatorStateStore()
+            .getListState(
+                new ListStateDescriptor<>(
+                    getStateName(),
+                    Types.MAP(
+                        BasicTypeInfo.INT_TYPE_INFO,
+                        Types.LIST(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO))
+                ));
         if (initializeContext.isRestored()) {
             // populate actual holder for restored state
-            sourceState.entries().forEach(entry -> restoredState.put(entry.getKey(), entry.getValue()));
+            sourceState.get().forEach(map -> restoredState.putAll(map));
             LOG.info("Consumer subtask {} restored state", getRuntimeContext().getIndexOfThisSubtask());
         } else {
             LOG.info("Consumer subtask {} has no restore state.", getRuntimeContext().getIndexOfThisSubtask());