You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/08 14:06:44 UTC

[flink] branch master updated (d77151c -> 31685a3)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from d77151c  Revert "[FLINK-10848][YARN] properly remove YARN ContainerRequest upon container allocation success"
     new 7a1147b  [FLINK-11280] [rocksdb] Lazily create RocksDBSerializedCompositeKeyBuilder only after restore
     new ef08a86  [hotfix] [rocksdb] Only log snapshot restore message when state is actually restored
     new f39fd29  [hotfix] [rocksdb] Remove unused method in RocksDBSerializedCompositeKeyBuilder
     new 31685a3  [FLINK-11280] [state backends, tests] Do not set current key before restore in TtlStateTestBase tests

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/state/ttl/StateBackendTestContext.java |  3 +--
 .../flink/runtime/state/ttl/TtlStateTestBase.java  |  3 +++
 .../streaming/state/RocksDBKeyedStateBackend.java  | 27 ++++++++++++++++------
 .../RocksDBSerializedCompositeKeyBuilder.java      |  5 ----
 4 files changed, 24 insertions(+), 14 deletions(-)


[flink] 02/04: [hotfix] [rocksdb] Only log snapshot restore message when state is actually restored

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ef08a86b53fc936b4aa2a05d7673b4a373eb8fa2
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Jan 8 11:25:57 2019 +0100

    [hotfix] [rocksdb] Only log snapshot restore message when state is actually restored
---
 .../flink/contrib/streaming/state/RocksDBKeyedStateBackend.java   | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index e95b7e8..7ddf993 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -517,10 +517,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		LOG.info("Initializing RocksDB keyed state backend.");
 
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Restoring snapshot from state handles: {}, will use {} thread(s) to download files from DFS.", restoreState, restoringThreadNum);
-		}
-
 		// clear all meta data
 		kvStateInformation.clear();
 
@@ -529,6 +525,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			if (restoreState == null || restoreState.isEmpty()) {
 				createDB();
 			} else {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Restoring snapshot from state handles: {}, will use {} thread(s) to download files from DFS.", restoreState, restoringThreadNum);
+				}
+
 				KeyedStateHandle firstStateHandle = restoreState.iterator().next();
 				if (firstStateHandle instanceof IncrementalKeyedStateHandle
 					|| firstStateHandle instanceof IncrementalLocalKeyedStateHandle) {


[flink] 04/04: [FLINK-11280] [state backends, tests] Do not set current key before restore in TtlStateTestBase tests

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 31685a36d1344e34c37b678c0335d560ccf6f4f4
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Jan 8 13:42:34 2019 +0100

    [FLINK-11280] [state backends, tests] Do not set current key before restore in TtlStateTestBase tests
---
 .../org/apache/flink/runtime/state/ttl/StateBackendTestContext.java    | 3 +--
 .../test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java | 3 +++
 2 files changed, 4 insertions(+), 2 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
index eaec234..c1de3cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
@@ -76,9 +76,8 @@ public abstract class StateBackendTestContext {
 			keyedStateBackend = stateBackend.createKeyedStateBackend(
 				env, new JobID(), "test", StringSerializer.INSTANCE, 10,
 				new KeyGroupRange(0, 9), env.getTaskKvStateRegistry(), timeProvider);
-			keyedStateBackend.setCurrentKey("defaultKey");
 		} catch (Exception e) {
-			throw new RuntimeException("unexpected");
+			throw new RuntimeException("unexpected", e);
 		}
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
index f9f108a..9b6882a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
@@ -113,6 +113,7 @@ public abstract class TtlStateTestBase {
 		this.ttlConfig = ttlConfig;
 		sbetc.createAndRestoreKeyedStateBackend();
 		sbetc.restoreSnapshot(null);
+		sbetc.setCurrentKey("defaultKey");
 		createState();
 		ctx().initTestValues();
 	}
@@ -129,6 +130,7 @@ public abstract class TtlStateTestBase {
 		KeyedStateHandle snapshot = sbetc.takeSnapshot();
 		sbetc.createAndRestoreKeyedStateBackend();
 		sbetc.restoreSnapshot(snapshot);
+		sbetc.setCurrentKey("defaultKey");
 		createState();
 	}
 
@@ -397,6 +399,7 @@ public abstract class TtlStateTestBase {
 		sbetc.createAndRestoreKeyedStateBackend();
 
 		sbetc.restoreSnapshot(snapshot);
+		sbetc.setCurrentKey("defaultKey");
 		sbetc.createState(ctx().createStateDescriptor(), "");
 	}
 


[flink] 03/04: [hotfix] [rocksdb] Remove unused method in RocksDBSerializedCompositeKeyBuilder

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f39fd2929f2b231d54cb8435ccf2b84174168ebc
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Jan 8 11:27:39 2019 +0100

    [hotfix] [rocksdb] Remove unused method in RocksDBSerializedCompositeKeyBuilder
---
 .../streaming/state/RocksDBSerializedCompositeKeyBuilder.java        | 5 -----
 1 file changed, 5 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
index 8e83e29..41aa685 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
@@ -198,9 +198,4 @@ class RocksDBSerializedCompositeKeyBuilder<K> {
 		return keySerializerTypeVariableSized &
 			RocksDBKeySerializationUtils.isSerializerTypeVariableSized(namespaceSerializer);
 	}
-
-	@VisibleForTesting
-	boolean isKeySerializerTypeVariableSized() {
-		return keySerializerTypeVariableSized;
-	}
 }


[flink] 01/04: [FLINK-11280] [rocksdb] Lazily create RocksDBSerializedCompositeKeyBuilder only after restore

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7a1147b9b8d5a92ea38e25358a645d35dc16f9b5
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Tue Jan 8 11:17:50 2019 +0100

    [FLINK-11280] [rocksdb] Lazily create RocksDBSerializedCompositeKeyBuilder only after restore
    
    Prior to this commit, the composite key builder was created in the
    constructor of the RocksDBKeyedStateBackend. The creation of the builder
    requires providing a key serializer.
    
    This is problematic, because the key serializer may be reconfigured
    during the restore phase, therefore invalidating the key serializer used
    by the composite key builder.
    
    This commit resolves this by lazily creating the composite key builder
    only after the restore phase, which would be the point-in-time when we
    are certain the key serializer will no longer be changed and is final.
---
 .../streaming/state/RocksDBKeyedStateBackend.java     | 19 ++++++++++++++++---
 1 file changed, 16 insertions(+), 3 deletions(-)

diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 5a47b30..e95b7e8 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -242,8 +242,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	/** The native metrics monitor. */
 	private RocksDBNativeMetricMonitor nativeMetricMonitor;
 
-	/** Helper to build the byte arrays of composite keys to address data in RocksDB. Shared across all states.*/
-	private final RocksDBSerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
+	/**
+	 * Helper to build the byte arrays of composite keys to address data in RocksDB. Shared across all states.
+	 *
+	 * <p>We create the builder after the restore phase in the {@link #restore(Object)} method. The timing of
+	 * the creation is important, because only after the restore we are certain that the key serializer
+	 * is final after potential reconfigurations during the restore.
+	 */
+	private RocksDBSerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
 
 	public RocksDBKeyedStateBackend(
 		String operatorIdentifier,
@@ -297,7 +303,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		this.kvStateInformation = new LinkedHashMap<>();
 
 		this.writeOptions = new WriteOptions().setDisableWAL(true);
-		this.sharedRocksKeyBuilder = new RocksDBSerializedCompositeKeyBuilder<>(keySerializer, keyGroupPrefixBytes, 32);
 
 		this.metricOptions = metricOptions;
 		this.metricGroup = metricGroup;
@@ -535,6 +540,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				}
 			}
 
+			// it is important that we only create the key builder after the restore, and not before;
+			// restore operations may reconfigure the key serializer, so accessing the key serializer
+			// only now we can be certain that the key serializer used in the builder is final.
+			this.sharedRocksKeyBuilder = new RocksDBSerializedCompositeKeyBuilder<>(
+				getKeySerializer(),
+				keyGroupPrefixBytes,
+				32);
+
 			initializeSnapshotStrategy(incrementalRestoreOperation);
 		} catch (Exception ex) {
 			dispose();