You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/10 03:43:35 UTC
[flink-statefun] 06/09: [FLINK-15954] Add PersistedTable support in
statefun runtime
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 65ed306e33bde474811d10dfc8d93d715a4ee2f2
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Fri Feb 7 17:43:10 2020 +0100
[FLINK-15954] Add PersistedTable support in statefun runtime
This closes #17.
---
.../flink/statefun/flink/core/state/FlinkState.java | 16 ++++++++++++++++
.../statefun/flink/core/state/MultiplexedState.java | 15 +++++++++++++++
.../apache/flink/statefun/flink/core/state/State.java | 5 +++++
3 files changed, 36 insertions(+)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkState.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkState.java
index dfc5bf6..dfebc8b 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkState.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkState.java
@@ -19,6 +19,8 @@ package org.apache.flink.statefun.flink.core.state;
import java.util.Objects;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -30,7 +32,9 @@ import org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes;
import org.apache.flink.statefun.sdk.Address;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.state.Accessor;
+import org.apache.flink.statefun.sdk.state.PersistedTable;
import org.apache.flink.statefun.sdk.state.PersistedValue;
+import org.apache.flink.statefun.sdk.state.TableAccessor;
public final class FlinkState implements State {
@@ -60,6 +64,18 @@ public final class FlinkState implements State {
}
@Override
+ public <K, V> TableAccessor<K, V> createFlinkStateTableAccessor(
+ FunctionType functionType, PersistedTable<K, V> persistedTable) {
+ MapState<K, V> handle =
+ runtimeContext.getMapState(
+ new MapStateDescriptor<>(
+ flinkStateName(functionType, persistedTable.name()),
+ persistedTable.keyType(),
+ persistedTable.valueType()));
+ return new FlinkTableAccessor<>(handle);
+ }
+
+ @Override
public void setCurrentKey(Address address) {
keyedStateBackend.setCurrentKey(KeyBy.apply(address));
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/MultiplexedState.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/MultiplexedState.java
index 9d9c09b..8ccb7c0 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/MultiplexedState.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/MultiplexedState.java
@@ -35,7 +35,9 @@ import org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes;
import org.apache.flink.statefun.sdk.Address;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.state.Accessor;
+import org.apache.flink.statefun.sdk.state.PersistedTable;
import org.apache.flink.statefun.sdk.state.PersistedValue;
+import org.apache.flink.statefun.sdk.state.TableAccessor;
public final class MultiplexedState implements State {
@@ -66,6 +68,19 @@ public final class MultiplexedState implements State {
}
@Override
+ public <K, V> TableAccessor<K, V> createFlinkStateTableAccessor(
+ FunctionType functionType, PersistedTable<K, V> persistedTable) {
+ final MultiplexedStateKey uniqueSubKeyPrefix =
+ multiplexedSubstateKey(functionType, persistedTable.name());
+ final TypeSerializer<K> keySerializer =
+ types.registerType(persistedTable.keyType()).createSerializer(executionConfiguration);
+ final TypeSerializer<V> valueSerializer =
+ types.registerType(persistedTable.valueType()).createSerializer(executionConfiguration);
+ return new MultiplexedTableStateAccessor<>(
+ sharedMapStateHandle, uniqueSubKeyPrefix, keySerializer, valueSerializer);
+ }
+
+ @Override
public void setCurrentKey(Address address) {
keyedStateBackend.setCurrentKey(KeyBy.apply(address));
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/State.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/State.java
index 64fffb5..9f1a83a 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/State.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/State.java
@@ -20,12 +20,17 @@ package org.apache.flink.statefun.flink.core.state;
import org.apache.flink.statefun.sdk.Address;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.state.Accessor;
+import org.apache.flink.statefun.sdk.state.PersistedTable;
import org.apache.flink.statefun.sdk.state.PersistedValue;
+import org.apache.flink.statefun.sdk.state.TableAccessor;
public interface State {
<T> Accessor<T> createFlinkStateAccessor(
FunctionType functionType, PersistedValue<T> persistedValue);
+ <K, V> TableAccessor<K, V> createFlinkStateTableAccessor(
+ FunctionType functionType, PersistedTable<K, V> persistedTable);
+
void setCurrentKey(Address address);
}