You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/10 03:43:35 UTC

[flink-statefun] 06/09: [FLINK-15954] Add PersistedTable support in statefun runtime

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 65ed306e33bde474811d10dfc8d93d715a4ee2f2
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Fri Feb 7 17:43:10 2020 +0100

    [FLINK-15954] Add PersistedTable support in statefun runtime
    
    This closes #17.
---
 .../flink/statefun/flink/core/state/FlinkState.java      | 16 ++++++++++++++++
 .../statefun/flink/core/state/MultiplexedState.java      | 15 +++++++++++++++
 .../apache/flink/statefun/flink/core/state/State.java    |  5 +++++
 3 files changed, 36 insertions(+)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkState.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkState.java
index dfc5bf6..dfebc8b 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkState.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkState.java
@@ -19,6 +19,8 @@ package org.apache.flink.statefun.flink.core.state;
 
 import java.util.Objects;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -30,7 +32,9 @@ import org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes;
 import org.apache.flink.statefun.sdk.Address;
 import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.state.Accessor;
+import org.apache.flink.statefun.sdk.state.PersistedTable;
 import org.apache.flink.statefun.sdk.state.PersistedValue;
+import org.apache.flink.statefun.sdk.state.TableAccessor;
 
 public final class FlinkState implements State {
 
@@ -60,6 +64,18 @@ public final class FlinkState implements State {
   }
 
   @Override
+  public <K, V> TableAccessor<K, V> createFlinkStateTableAccessor(
+      FunctionType functionType, PersistedTable<K, V> persistedTable) {
+    MapState<K, V> handle =
+        runtimeContext.getMapState(
+            new MapStateDescriptor<>(
+                flinkStateName(functionType, persistedTable.name()),
+                persistedTable.keyType(),
+                persistedTable.valueType()));
+    return new FlinkTableAccessor<>(handle);
+  }
+
+  @Override
   public void setCurrentKey(Address address) {
     keyedStateBackend.setCurrentKey(KeyBy.apply(address));
   }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/MultiplexedState.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/MultiplexedState.java
index 9d9c09b..8ccb7c0 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/MultiplexedState.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/MultiplexedState.java
@@ -35,7 +35,9 @@ import org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes;
 import org.apache.flink.statefun.sdk.Address;
 import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.state.Accessor;
+import org.apache.flink.statefun.sdk.state.PersistedTable;
 import org.apache.flink.statefun.sdk.state.PersistedValue;
+import org.apache.flink.statefun.sdk.state.TableAccessor;
 
 public final class MultiplexedState implements State {
 
@@ -66,6 +68,19 @@ public final class MultiplexedState implements State {
   }
 
   @Override
+  public <K, V> TableAccessor<K, V> createFlinkStateTableAccessor(
+      FunctionType functionType, PersistedTable<K, V> persistedTable) {
+    final MultiplexedStateKey uniqueSubKeyPrefix =
+        multiplexedSubstateKey(functionType, persistedTable.name());
+    final TypeSerializer<K> keySerializer =
+        types.registerType(persistedTable.keyType()).createSerializer(executionConfiguration);
+    final TypeSerializer<V> valueSerializer =
+        types.registerType(persistedTable.valueType()).createSerializer(executionConfiguration);
+    return new MultiplexedTableStateAccessor<>(
+        sharedMapStateHandle, uniqueSubKeyPrefix, keySerializer, valueSerializer);
+  }
+
+  @Override
   public void setCurrentKey(Address address) {
     keyedStateBackend.setCurrentKey(KeyBy.apply(address));
   }
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/State.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/State.java
index 64fffb5..9f1a83a 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/State.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/State.java
@@ -20,12 +20,17 @@ package org.apache.flink.statefun.flink.core.state;
 import org.apache.flink.statefun.sdk.Address;
 import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.state.Accessor;
+import org.apache.flink.statefun.sdk.state.PersistedTable;
 import org.apache.flink.statefun.sdk.state.PersistedValue;
+import org.apache.flink.statefun.sdk.state.TableAccessor;
 
 public interface State {
 
   <T> Accessor<T> createFlinkStateAccessor(
       FunctionType functionType, PersistedValue<T> persistedValue);
 
+  <K, V> TableAccessor<K, V> createFlinkStateTableAccessor(
+      FunctionType functionType, PersistedTable<K, V> persistedTable);
+
   void setCurrentKey(Address address);
 }