You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/30 22:04:26 UTC

[15/50] [abbrv] flink git commit: [FLINK-6018] Add tests for KryoSerializer restore with registered types

[FLINK-6018] Add tests for KryoSerializer restore with registered types

This commit also renames isCompatibleWith() to canRestoreFrom() to make
the method asymetric because in the case of KryoSerializer we
can restore from state that was stored using no registed
types/serializers while the other way around is not possible.

This closes #3534.
This closes #3603.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09164cf2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09164cf2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09164cf2

Branch: refs/heads/table-retraction
Commit: 09164cf2388888bc2f92f0ca63bb1f15283e895c
Parents: 68289b1
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Mar 16 15:17:05 2017 +0100
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Mar 24 12:34:03 2017 +0800

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |    2 +-
 .../api/common/typeutils/TypeSerializer.java    |    2 +-
 .../typeutils/runtime/kryo/KryoSerializer.java  |   16 +
 .../AbstractKeyedCEPPatternOperator.java        |    2 +-
 .../state/AbstractKeyedStateBackend.java        |    4 +-
 .../state/DefaultOperatorStateBackend.java      |    6 +-
 .../state/RegisteredBackendStateMetaInfo.java   |    6 +-
 .../state/heap/HeapKeyedStateBackend.java       |    2 +-
 .../runtime/state/StateBackendTestBase.java     | 1346 ++++++++++++------
 9 files changed, 967 insertions(+), 419 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/09164cf2/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 5b72e03..2ce527f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -821,7 +821,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				descriptor.getSerializer());
 
 		if (stateInfo != null) {
-			if (newMetaInfo.isCompatibleWith(stateInfo.f1)) {
+			if (newMetaInfo.canRestoreFrom(stateInfo.f1)) {
 				stateInfo.f1 = newMetaInfo;
 				return stateInfo.f0;
 			} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/09164cf2/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index ac7fbc8..6edaec6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -161,7 +161,7 @@ public abstract class TypeSerializer<T> implements Serializable {
 
 	public abstract int hashCode();
 
-	public boolean isCompatibleWith(TypeSerializer<?> other) {
+	public boolean canRestoreFrom(TypeSerializer<?> other) {
 		return equals(other);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09164cf2/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index 44c952a..cba0c84 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -383,4 +383,20 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 		checkKryoInitialized();
 		return this.kryo;
 	}
+
+	@Override
+	public boolean canRestoreFrom(TypeSerializer<?> other) {
+		if (other instanceof KryoSerializer) {
+			KryoSerializer<?> otherKryo = (KryoSerializer<?>) other;
+
+			// we cannot include the Serializers here because they don't implement the equals method
+			return other.canEqual(this) &&
+					type == otherKryo.type &&
+					(registeredTypes.equals(otherKryo.registeredTypes) || otherKryo.registeredTypes.isEmpty()) &&
+					(registeredTypesWithSerializerClasses.equals(otherKryo.registeredTypesWithSerializerClasses) || otherKryo.registeredTypesWithSerializerClasses.isEmpty()) &&
+					(defaultSerializerClasses.equals(otherKryo.defaultSerializerClasses) || otherKryo.defaultSerializerClasses.isEmpty());
+		} else {
+			return false;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09164cf2/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index b6d57cd..3e18660 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -404,7 +404,7 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 		}
 
 		@Override
-		public boolean isCompatibleWith(TypeSerializer<?> other) {
+		public boolean canRestoreFrom(TypeSerializer<?> other) {
 			return equals(other) || other instanceof AbstractKeyedCEPPatternOperator.PriorityQueueSerializer;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09164cf2/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 1f2f4a2..e6e7b23 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -277,7 +277,7 @@ public abstract class AbstractKeyedStateBackend<K>
 		}
 
 		if (!stateDescriptor.isSerializerInitialized()) {
-			throw new IllegalStateException("The serializer of the descriptor has not been initialized!"); 
+			stateDescriptor.initializeSerializerUnlessSet(executionConfig);
 		}
 
 		InternalKvState<?> existing = keyValueStatesByName.get(stateDescriptor.getName());
@@ -355,8 +355,6 @@ public abstract class AbstractKeyedStateBackend<K>
 
 		checkNotNull(namespace, "Namespace");
 
-		stateDescriptor.initializeSerializerUnlessSet(executionConfig);
-
 		if (lastName != null && lastName.equals(stateDescriptor.getName())) {
 			lastState.setCurrentNamespace(namespace);
 			return (S) lastState;

http://git-wip-us.apache.org/repos/asf/flink/blob/09164cf2/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index 71cccae..ca7cb48 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -134,8 +134,8 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 					"Incompatible assignment mode. Provided: " + mode + ", expected: " +
 							partitionableListState.getAssignmentMode());
 			Preconditions.checkState(
-					partitionableListState.getPartitionStateSerializer().
-							isCompatibleWith(stateDescriptor.getElementSerializer()),
+					stateDescriptor.getElementSerializer().
+							canRestoreFrom(partitionableListState.getPartitionStateSerializer()),
 					"Incompatible type serializers. Provided: " + stateDescriptor.getElementSerializer() +
 							", found: " + partitionableListState.getPartitionStateSerializer());
 		}
@@ -258,7 +258,7 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 
 						registeredStates.put(listState.getName(), listState);
 					} else {
-						Preconditions.checkState(listState.getPartitionStateSerializer().isCompatibleWith(
+						Preconditions.checkState(listState.getPartitionStateSerializer().canRestoreFrom(
 								stateMetaInfo.getStateSerializer()), "Incompatible state serializers found: " +
 								listState.getPartitionStateSerializer() + " is not compatible with " +
 								stateMetaInfo.getStateSerializer());

http://git-wip-us.apache.org/repos/asf/flink/blob/09164cf2/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
index 80bdacd..0d4b3c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
@@ -74,7 +74,7 @@ public class RegisteredBackendStateMetaInfo<N, S> {
 		return stateSerializer;
 	}
 
-	public boolean isCompatibleWith(RegisteredBackendStateMetaInfo<?, ?> other) {
+	public boolean canRestoreFrom(RegisteredBackendStateMetaInfo<?, ?> other) {
 
 		if (this == other) {
 			return true;
@@ -94,8 +94,8 @@ public class RegisteredBackendStateMetaInfo<N, S> {
 			return false;
 		}
 
-		return (stateSerializer.isCompatibleWith(other.stateSerializer)) &&
-				(namespaceSerializer.isCompatibleWith(other.namespaceSerializer)
+		return (stateSerializer.canRestoreFrom(other.stateSerializer)) &&
+				(namespaceSerializer.canRestoreFrom(other.namespaceSerializer)
 						// we also check if there is just a migration proxy that should be replaced by any real serializer
 						|| other.namespaceSerializer instanceof MigrationNamespaceSerializerProxy);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/09164cf2/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index f3e4ec6..46ec5c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -141,7 +141,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			stateTable = newStateTable(newMetaInfo);
 			stateTables.put(stateName, stateTable);
 		} else {
-			if (!newMetaInfo.isCompatibleWith(stateTable.getMetaInfo())) {
+			if (!newMetaInfo.canRestoreFrom(stateTable.getMetaInfo())) {
 				throw new RuntimeException("Trying to access state using incompatible meta info, was " +
 						stateTable.getMetaInfo() + " trying access with " + newMetaInfo);
 			}