You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/09 11:55:39 UTC

[flink] 01/02: [FLINK-11287] [rocksdb] RocksDBListState should be using registered serializer in state meta infos

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e5ed8c85139d1dfdb4b6a47a63ac36143a7b5c64
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Jan 8 15:02:43 2019 +0100

    [FLINK-11287] [rocksdb] RocksDBListState should be using registered serializer in state meta infos
    
    This closes #7434.
---
 .../apache/flink/contrib/streaming/state/RocksDBListState.java    | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index 13f5559..72a5bc6 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -19,7 +19,6 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -77,7 +76,6 @@ class RocksDBListState<K, N, V>
 	 * @param namespaceSerializer The serializer for the namespace.
 	 * @param valueSerializer The serializer for the state.
 	 * @param defaultValue The default value for the state.
-	 * @param elementSerializer The serializer for elements of the list state.
 	 * @param backend The backend for which this state is bind to.
 	 */
 	private RocksDBListState(
@@ -85,11 +83,12 @@ class RocksDBListState<K, N, V>
 			TypeSerializer<N> namespaceSerializer,
 			TypeSerializer<List<V>> valueSerializer,
 			List<V> defaultValue,
-			TypeSerializer<V> elementSerializer,
 			RocksDBKeyedStateBackend<K> backend) {
 
 		super(columnFamily, namespaceSerializer, valueSerializer, defaultValue, backend);
-		this.elementSerializer = elementSerializer;
+
+		ListSerializer<V> castedListSerializer = (ListSerializer<V>) valueSerializer;
+		this.elementSerializer = castedListSerializer.getElementSerializer();
 	}
 
 	@Override
@@ -281,7 +280,6 @@ class RocksDBListState<K, N, V>
 			registerResult.f1.getNamespaceSerializer(),
 			(TypeSerializer<List<E>>) registerResult.f1.getStateSerializer(),
 			(List<E>) stateDesc.getDefaultValue(),
-			((ListStateDescriptor<E>) stateDesc).getElementSerializer(),
 			backend);
 	}