You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2019/01/07 20:11:16 UTC
[flink] branch master updated: [FLINK-11273][state] Fix shared
InputView object between event processing and queryable state thread
This is an automated email from the ASF dual-hosted git repository.
srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0b28bfd [FLINK-11273][state] Fix shared InputView object between event processing and queryable state thread
0b28bfd is described below
commit 0b28bfd0d9f368154bfcd1564388935eff05b055
Author: Stefan Richter <s....@data-artisans.com>
AuthorDate: Mon Jan 7 17:27:27 2019 +0100
[FLINK-11273][state] Fix shared InputView object between event processing and queryable state thread
This was introduced with FLINK-9702.
---
.../flink/contrib/streaming/state/RocksDBMapState.java | 12 ++++--------
1 file changed, 4 insertions(+), 8 deletions(-)
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 13cbded..4c174d4 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -242,7 +242,6 @@ class RocksDBMapState<K, N, UK, UV>
}
@Override
- @SuppressWarnings("unchecked")
public byte[] getSerializedValue(
final byte[] serializedKeyAndNamespace,
final TypeSerializer<K> safeKeySerializer,
@@ -274,13 +273,14 @@ class RocksDBMapState<K, N, UK, UV>
final TypeSerializer<UK> dupUserKeySerializer = serializer.getKeySerializer();
final TypeSerializer<UV> dupUserValueSerializer = serializer.getValueSerializer();
+ final DataInputDeserializer inputView = new DataInputDeserializer();
final Iterator<Map.Entry<UK, UV>> iterator = new RocksDBMapIterator<Map.Entry<UK, UV>>(
backend.db,
keyPrefixBytes,
dupUserKeySerializer,
dupUserValueSerializer,
- dataInputView
+ inputView
) {
@Override
@@ -368,7 +368,6 @@ class RocksDBMapState<K, N, UK, UV>
private final TypeSerializer<UV> valueSerializer;
private final DataInputDeserializer dataInputView;
- private final DataOutputSerializer dataOutputView;
RocksDBMapEntry(
@Nonnull final RocksDB db,
@@ -377,8 +376,7 @@ class RocksDBMapState<K, N, UK, UV>
@Nonnull final byte[] rawValueBytes,
@Nonnull final TypeSerializer<UK> keySerializer,
@Nonnull final TypeSerializer<UV> valueSerializer,
- @Nonnull DataInputDeserializer dataInputView,
- @Nonnull DataOutputSerializer dataOutputView) {
+ @Nonnull DataInputDeserializer dataInputView) {
this.db = db;
this.userKeyOffset = userKeyOffset;
@@ -389,7 +387,6 @@ class RocksDBMapState<K, N, UK, UV>
this.rawValueBytes = rawValueBytes;
this.deleted = false;
this.dataInputView = dataInputView;
- this.dataOutputView = dataOutputView;
}
public void remove() {
@@ -585,8 +582,7 @@ class RocksDBMapState<K, N, UK, UV>
iterator.value(),
keySerializer,
valueSerializer,
- dataInputView,
- dataOutputView);
+ dataInputView);
cacheEntries.add(entry);