You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/08 14:06:45 UTC

[flink] 01/04: [FLINK-11280] [rocksdb] Lazily create RocksDBSerializedCompositeKeyBuilder only after restore

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7a1147b9b8d5a92ea38e25358a645d35dc16f9b5
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Jan 8 11:17:50 2019 +0100

    [FLINK-11280] [rocksdb] Lazily create RocksDBSerializedCompositeKeyBuilder only after restore
    
    Prior to this commit, the composite key builder was created in the
    constructor of the RocksDBKeyedStateBackend. The creation of the builder
    requires providing a key serializer.
    
    This is problematic, because the key serializer may be reconfigured
    during the restore phase, therefore invalidating the key serializer used
    by the composite key builder.
    
    This commit resolves this by lazily creating the composite key builder
    only after the restore phase, which would be the point-in-time when we
    are certain the key serializer will no longer be changed and is final.
---
 .../streaming/state/RocksDBKeyedStateBackend.java     | 19 ++++++++++++++++---
 1 file changed, 16 insertions(+), 3 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 5a47b30..e95b7e8 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -242,8 +242,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	/** The native metrics monitor. */
 	private RocksDBNativeMetricMonitor nativeMetricMonitor;
 
-	/** Helper to build the byte arrays of composite keys to address data in RocksDB. Shared across all states.*/
-	private final RocksDBSerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
+	/**
+	 * Helper to build the byte arrays of composite keys to address data in RocksDB. Shared across all states.
+	 *
+	 * <p>We create the builder after the restore phase in the {@link #restore(Object)} method. The timing of
+	 * the creation is important, because only after the restore we are certain that the key serializer
+	 * is final after potential reconfigurations during the restore.
+	 */
+	private RocksDBSerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
 
 	public RocksDBKeyedStateBackend(
 		String operatorIdentifier,
@@ -297,7 +303,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		this.kvStateInformation = new LinkedHashMap<>();
 
 		this.writeOptions = new WriteOptions().setDisableWAL(true);
-		this.sharedRocksKeyBuilder = new RocksDBSerializedCompositeKeyBuilder<>(keySerializer, keyGroupPrefixBytes, 32);
 
 		this.metricOptions = metricOptions;
 		this.metricGroup = metricGroup;
@@ -535,6 +540,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				}
 			}
 
+			// it is important that we only create the key builder after the restore, and not before;
+			// restore operations may reconfigure the key serializer, so accessing the key serializer
+			// only now we can be certain that the key serializer used in the builder is final.
+			this.sharedRocksKeyBuilder = new RocksDBSerializedCompositeKeyBuilder<>(
+				getKeySerializer(),
+				keyGroupPrefixBytes,
+				32);
+
 			initializeSnapshotStrategy(incrementalRestoreOperation);
 		} catch (Exception ex) {
 			dispose();