You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/27 04:32:18 UTC
[incubator-seatunnel] branch api-draft updated: [api-draft][flink] non-key operator can't get the keyed state store (#1961)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new ffb2d86e [api-draft][flink] non-key operator can't get the keyed state store (#1961)
ffb2d86e is described below
commit ffb2d86eea7ecdda029657bb349628b9fba09793
Author: Zongwen Li <zo...@gmail.com>
AuthorDate: Fri May 27 12:32:13 2022 +0800
[api-draft][flink] non-key operator can't get the keyed state store (#1961)
---
.../flink/source/BaseSeaTunnelSourceFunction.java | 23 ++++++++++++----------
1 file changed, 13 insertions(+), 10 deletions(-)
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
index 1778596f..57b6e38b 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java
@@ -24,8 +24,8 @@ import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
import org.apache.seatunnel.translation.source.BaseSourceFunction;
import org.apache.flink.api.common.state.CheckpointListener;
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -54,7 +54,7 @@ public abstract class BaseSeaTunnelSourceFunction extends RichSourceFunction<Row
protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
protected transient volatile BaseSourceFunction<SeaTunnelRow> internalSource;
- protected transient MapState<Integer, List<byte[]>> sourceState;
+ protected transient ListState<Map<Integer, List<byte[]>>> sourceState;
protected transient volatile Map<Integer, List<byte[]>> restoredState = new HashMap<>();
/**
@@ -114,20 +114,23 @@ public abstract class BaseSeaTunnelSourceFunction extends RichSourceFunction<Row
LOG.debug("snapshotState() called on closed source");
} else {
sourceState.clear();
- sourceState.putAll(internalSource.snapshotState(snapshotContext.getCheckpointId()));
+ sourceState.add(internalSource.snapshotState(snapshotContext.getCheckpointId()));
}
}
@Override
public void initializeState(FunctionInitializationContext initializeContext) throws Exception {
- this.sourceState = initializeContext.getKeyedStateStore()
- .getMapState(new MapStateDescriptor<>(
- getStateName(),
- BasicTypeInfo.INT_TYPE_INFO,
- Types.LIST(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)));
+ this.sourceState = initializeContext.getOperatorStateStore()
+ .getListState(
+ new ListStateDescriptor<>(
+ getStateName(),
+ Types.MAP(
+ BasicTypeInfo.INT_TYPE_INFO,
+ Types.LIST(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO))
+ ));
if (initializeContext.isRestored()) {
// populate actual holder for restored state
- sourceState.entries().forEach(entry -> restoredState.put(entry.getKey(), entry.getValue()));
+ sourceState.get().forEach(map -> restoredState.putAll(map));
LOG.info("Consumer subtask {} restored state", getRuntimeContext().getIndexOfThisSubtask());
} else {
LOG.info("Consumer subtask {} has no restore state.", getRuntimeContext().getIndexOfThisSubtask());