You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/28 01:25:07 UTC

[flink-statefun] 02/05: [FLINK-16318] Use DynamicallyRegisteredTypes

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 1700316b70a6190e855ff4ba5a12cb092d264277
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Thu Feb 27 16:58:45 2020 +0100

    [FLINK-16318] Use DynamicallyRegisteredTypes
    
    This closes #40.
---
 .../java/org/apache/flink/statefun/flink/core/state/FlinkState.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkState.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkState.java
index ffbfcf4..98c2a47 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkState.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkState.java
@@ -74,8 +74,8 @@ public final class FlinkState implements State {
         runtimeContext.getMapState(
             new MapStateDescriptor<>(
                 flinkStateName(functionType, persistedTable.name()),
-                persistedTable.keyType(),
-                persistedTable.valueType()));
+                types.registerType(persistedTable.keyType()),
+                types.registerType(persistedTable.valueType())));
     return new FlinkTableAccessor<>(handle);
   }
 
@@ -86,7 +86,7 @@ public final class FlinkState implements State {
         runtimeContext.getListState(
             new ListStateDescriptor<>(
                 flinkStateName(functionType, persistedAppendingBuffer.name()),
-                persistedAppendingBuffer.elementType()));
+                types.registerType(persistedAppendingBuffer.elementType())));
     return new FlinkAppendingBufferAccessor<>(handle);
   }