You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/01/08 14:12:04 UTC

[GitHub] tzulitai closed pull request #7429: [FLINK-11280] [rocksdb] Lazily create RocksDBSerializedCompositeKeyBuilder only after restore

tzulitai closed pull request #7429:  [FLINK-11280] [rocksdb] Lazily create RocksDBSerializedCompositeKeyBuilder only after restore
URL: https://github.com/apache/flink/pull/7429
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
index eaec234e425..c1de3cba49d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
@@ -76,9 +76,8 @@ void createAndRestoreKeyedStateBackend() {
 			keyedStateBackend = stateBackend.createKeyedStateBackend(
 				env, new JobID(), "test", StringSerializer.INSTANCE, 10,
 				new KeyGroupRange(0, 9), env.getTaskKvStateRegistry(), timeProvider);
-			keyedStateBackend.setCurrentKey("defaultKey");
 		} catch (Exception e) {
-			throw new RuntimeException("unexpected");
+			throw new RuntimeException("unexpected", e);
 		}
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
index f9f108a3a12..9b6882a35f5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
@@ -113,6 +113,7 @@ private void initTest(StateTtlConfig ttlConfig) throws Exception {
 		this.ttlConfig = ttlConfig;
 		sbetc.createAndRestoreKeyedStateBackend();
 		sbetc.restoreSnapshot(null);
+		sbetc.setCurrentKey("defaultKey");
 		createState();
 		ctx().initTestValues();
 	}
@@ -129,6 +130,7 @@ private void takeAndRestoreSnapshot() throws Exception {
 		KeyedStateHandle snapshot = sbetc.takeSnapshot();
 		sbetc.createAndRestoreKeyedStateBackend();
 		sbetc.restoreSnapshot(snapshot);
+		sbetc.setCurrentKey("defaultKey");
 		createState();
 	}
 
@@ -397,6 +399,7 @@ public void testRestoreTtlAndRegisterNonTtlStateCompatFailure() throws Exception
 		sbetc.createAndRestoreKeyedStateBackend();
 
 		sbetc.restoreSnapshot(snapshot);
+		sbetc.setCurrentKey("defaultKey");
 		sbetc.createState(ctx().createStateDescriptor(), "");
 	}
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 5a47b306596..7ddf9939d63 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -242,8 +242,14 @@
 	/** The native metrics monitor. */
 	private RocksDBNativeMetricMonitor nativeMetricMonitor;
 
-	/** Helper to build the byte arrays of composite keys to address data in RocksDB. Shared across all states.*/
-	private final RocksDBSerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
+	/**
+	 * Helper to build the byte arrays of composite keys to address data in RocksDB. Shared across all states.
+	 *
+	 * <p>We create the builder after the restore phase in the {@link #restore(Object)} method. The timing of
+	 * the creation is important, because only after the restore we are certain that the key serializer
+	 * is final after potential reconfigurations during the restore.
+	 */
+	private RocksDBSerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
 
 	public RocksDBKeyedStateBackend(
 		String operatorIdentifier,
@@ -297,7 +303,6 @@ public RocksDBKeyedStateBackend(
 		this.kvStateInformation = new LinkedHashMap<>();
 
 		this.writeOptions = new WriteOptions().setDisableWAL(true);
-		this.sharedRocksKeyBuilder = new RocksDBSerializedCompositeKeyBuilder<>(keySerializer, keyGroupPrefixBytes, 32);
 
 		this.metricOptions = metricOptions;
 		this.metricGroup = metricGroup;
@@ -512,10 +517,6 @@ public void restore(Collection<KeyedStateHandle> restoreState) throws Exception
 
 		LOG.info("Initializing RocksDB keyed state backend.");
 
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Restoring snapshot from state handles: {}, will use {} thread(s) to download files from DFS.", restoreState, restoringThreadNum);
-		}
-
 		// clear all meta data
 		kvStateInformation.clear();
 
@@ -524,6 +525,10 @@ public void restore(Collection<KeyedStateHandle> restoreState) throws Exception
 			if (restoreState == null || restoreState.isEmpty()) {
 				createDB();
 			} else {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Restoring snapshot from state handles: {}, will use {} thread(s) to download files from DFS.", restoreState, restoringThreadNum);
+				}
+
 				KeyedStateHandle firstStateHandle = restoreState.iterator().next();
 				if (firstStateHandle instanceof IncrementalKeyedStateHandle
 					|| firstStateHandle instanceof IncrementalLocalKeyedStateHandle) {
@@ -535,6 +540,14 @@ public void restore(Collection<KeyedStateHandle> restoreState) throws Exception
 				}
 			}
 
+			// it is important that we only create the key builder after the restore, and not before;
+			// restore operations may reconfigure the key serializer, so accessing the key serializer
+			// only now we can be certain that the key serializer used in the builder is final.
+			this.sharedRocksKeyBuilder = new RocksDBSerializedCompositeKeyBuilder<>(
+				getKeySerializer(),
+				keyGroupPrefixBytes,
+				32);
+
 			initializeSnapshotStrategy(incrementalRestoreOperation);
 		} catch (Exception ex) {
 			dispose();
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
index 8e83e292820..41aa685e45f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
@@ -198,9 +198,4 @@ boolean isAmbiguousCompositeKeyPossible(TypeSerializer<?> namespaceSerializer) {
 		return keySerializerTypeVariableSized &
 			RocksDBKeySerializationUtils.isSerializerTypeVariableSized(namespaceSerializer);
 	}
-
-	@VisibleForTesting
-	boolean isKeySerializerTypeVariableSized() {
-		return keySerializerTypeVariableSized;
-	}
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services