You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2019/01/07 20:11:16 UTC

[flink] branch master updated: [FLINK-11273][state] Fix shared InputView object between event processing and queryable state thread

This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b28bfd  [FLINK-11273][state] Fix shared InputView object between event processing and queryable state thread
0b28bfd is described below

commit 0b28bfd0d9f368154bfcd1564388935eff05b055
Author: Stefan Richter <s....@data-artisans.com>
AuthorDate: Mon Jan 7 17:27:27 2019 +0100

    [FLINK-11273][state] Fix shared InputView object between event processing and queryable state thread
    
    This was introduced with FLINK-9702.
---
 .../flink/contrib/streaming/state/RocksDBMapState.java       | 12 ++++--------
 1 file changed, 4 insertions(+), 8 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 13cbded..4c174d4 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -242,7 +242,6 @@ class RocksDBMapState<K, N, UK, UV>
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
 	public byte[] getSerializedValue(
 			final byte[] serializedKeyAndNamespace,
 			final TypeSerializer<K> safeKeySerializer,
@@ -274,13 +273,14 @@ class RocksDBMapState<K, N, UK, UV>
 
 		final TypeSerializer<UK> dupUserKeySerializer = serializer.getKeySerializer();
 		final TypeSerializer<UV> dupUserValueSerializer = serializer.getValueSerializer();
+		final DataInputDeserializer inputView = new DataInputDeserializer();
 
 		final Iterator<Map.Entry<UK, UV>> iterator = new RocksDBMapIterator<Map.Entry<UK, UV>>(
 				backend.db,
 				keyPrefixBytes,
 				dupUserKeySerializer,
 				dupUserValueSerializer,
-				dataInputView
+				inputView
 			) {
 
 			@Override
@@ -368,7 +368,6 @@ class RocksDBMapState<K, N, UK, UV>
 		private final TypeSerializer<UV> valueSerializer;
 
 		private final DataInputDeserializer dataInputView;
-		private final DataOutputSerializer dataOutputView;
 
 		RocksDBMapEntry(
 				@Nonnull final RocksDB db,
@@ -377,8 +376,7 @@ class RocksDBMapState<K, N, UK, UV>
 				@Nonnull final byte[] rawValueBytes,
 				@Nonnull final TypeSerializer<UK> keySerializer,
 				@Nonnull final TypeSerializer<UV> valueSerializer,
-				@Nonnull DataInputDeserializer dataInputView,
-				@Nonnull DataOutputSerializer dataOutputView) {
+				@Nonnull DataInputDeserializer dataInputView) {
 			this.db = db;
 
 			this.userKeyOffset = userKeyOffset;
@@ -389,7 +387,6 @@ class RocksDBMapState<K, N, UK, UV>
 			this.rawValueBytes = rawValueBytes;
 			this.deleted = false;
 			this.dataInputView = dataInputView;
-			this.dataOutputView = dataOutputView;
 		}
 
 		public void remove() {
@@ -585,8 +582,7 @@ class RocksDBMapState<K, N, UK, UV>
 						iterator.value(),
 						keySerializer,
 						valueSerializer,
-						dataInputView,
-						dataOutputView);
+						dataInputView);
 
 					cacheEntries.add(entry);