You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/19 06:11:27 UTC

[1/4] flink git commit: [hotfix] Correct equals & hashCode implementation of KryoSerializer

Repository: flink
Updated Branches:
  refs/heads/release-1.3 51fb7ed79 -> 94111f985


[hotfix] Correct equals & hashCode implementation of KryoSerializer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/94111f98
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/94111f98
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/94111f98

Branch: refs/heads/release-1.3
Commit: 94111f98558274f6c17284ba7a9b627143ac7b1e
Parents: fe5b92f
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu May 18 19:16:06 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu May 18 23:16:27 2017 +0200

----------------------------------------------------------------------
 .../typeutils/runtime/KryoRegistration.java     |  8 ++++++--
 .../typeutils/runtime/kryo/KryoSerializer.java  | 20 +++++++++++---------
 2 files changed, 17 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/94111f98/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java
index 882073d..17a49b8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java
@@ -23,9 +23,9 @@ import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
 import java.io.Serializable;
 
 /**
@@ -56,6 +56,7 @@ public class KryoRegistration implements Serializable {
 	 * <p>This can be a dummy serializer {@link KryoRegistrationSerializerConfigSnapshot.DummyKryoSerializerClass} if
 	 * the serializer class no longer exists when this registration instance was restored.
 	 */
+	@Nullable
 	private final Class<? extends Serializer<?>> serializerClass;
 
 	/**
@@ -65,9 +66,10 @@ public class KryoRegistration implements Serializable {
 	 * <p>This can be a dummy serializer {@link KryoRegistrationSerializerConfigSnapshot.DummyKryoSerializerClass} if
 	 * the serializer class no longer exists or is no longer valid when this registration instance was restored.
 	 */
+	@Nullable
 	private final ExecutionConfig.SerializableSerializer<? extends Serializer<?>> serializableSerializerInstance;
 
-	private SerializerDefinitionType serializerDefinitionType;
+	private final SerializerDefinitionType serializerDefinitionType;
 
 	public KryoRegistration(Class<?> registeredClass) {
 		this.registeredClass = Preconditions.checkNotNull(registeredClass);
@@ -106,10 +108,12 @@ public class KryoRegistration implements Serializable {
 		return serializerDefinitionType;
 	}
 
+	@Nullable
 	public Class<? extends Serializer<?>> getSerializerClass() {
 		return serializerClass;
 	}
 
+	@Nullable
 	public ExecutionConfig.SerializableSerializer<? extends Serializer<?>> getSerializableSerializerInstance() {
 		return serializableSerializerInstance;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/94111f98/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index 655de76..6730136 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -276,16 +276,17 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 	}
 	
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public int hashCode() {
-		return Objects.hash(
-			type,
-			registeredTypes,
-			registeredTypesWithSerializerClasses,
-			defaultSerializerClasses);
+		int result = type.hashCode();
+		result = 31 * result + (kryoRegistrations.hashCode());
+		result = 31 * result + (defaultSerializers.hashCode());
+		result = 31 * result + (defaultSerializerClasses.hashCode());
+
+		return result;
 	}
-	
+
 	@Override
 	public boolean equals(Object obj) {
 		if (obj instanceof KryoSerializer) {
@@ -293,8 +294,9 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
 
 			return other.canEqual(this) &&
 				type == other.type &&
-				kryoRegistrations.equals(other.kryoRegistrations) &&
-				defaultSerializerClasses.equals(other.defaultSerializerClasses);
+				Objects.equals(kryoRegistrations, other.kryoRegistrations) &&
+				Objects.equals(defaultSerializerClasses, other.defaultSerializerClasses) &&
+				Objects.equals(defaultSerializers, other.defaultSerializers);
 		} else {
 			return false;
 		}


[2/4] flink git commit: [hotfix] Remove some raw type usage in RocksDBKeyedStateBackend

Posted by tr...@apache.org.
[hotfix] Remove some raw type usage in RocksDBKeyedStateBackend

Introduce more generic parameters


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c573540
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c573540
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c573540

Branch: refs/heads/release-1.3
Commit: 6c573540f1e558192cbd3763446a9e6dd848efce
Parents: cfb6a69
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu May 18 17:05:50 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu May 18 23:16:27 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 69 +++++++++-----------
 .../state/KeyedBackendSerializationProxy.java   | 10 +--
 .../state/heap/HeapKeyedStateBackend.java       | 12 ++--
 .../runtime/state/SerializationProxiesTest.java | 12 ++--
 4 files changed, 49 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6c573540/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index ddc7e17..d0f73bf 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -123,8 +123,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackend.class);
 
-	private final JobID jobId;
-
 	private final String operatorIdentifier;
 
 	/** The column family options from the options factory */
@@ -165,7 +163,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	 * TODO this map can be removed when eager-state registration is in place.
 	 * TODO we currently need this cached to check state migration strategies when new serializers are registered.
 	 */
-	private Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot> restoredKvStateMetaInfos;
+	private Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos;
 
 	/** Number of bytes required to prefix the key groups. */
 	private final int keyGroupPrefixBytes;
@@ -198,7 +196,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig);
 
-		this.jobId = Preconditions.checkNotNull(jobId);
 		this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier);
 
 		this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
@@ -314,8 +311,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			final long checkpointTimestamp,
 			final CheckpointStreamFactory checkpointStreamFactory) throws Exception {
 
-		final RocksDBIncrementalSnapshotOperation snapshotOperation =
-			new RocksDBIncrementalSnapshotOperation(
+		final RocksDBIncrementalSnapshotOperation<K> snapshotOperation =
+			new RocksDBIncrementalSnapshotOperation<>(
 				this,
 				checkpointStreamFactory,
 				checkpointId,
@@ -365,7 +362,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		long startTime = System.currentTimeMillis();
 
-		final RocksDBFullSnapshotOperation snapshotOperation = new RocksDBFullSnapshotOperation(this, streamFactory);
+		final RocksDBFullSnapshotOperation<K> snapshotOperation = new RocksDBFullSnapshotOperation<>(this, streamFactory);
 		// hold the db lock while operation on the db to guard us against async db disposal
 		synchronized (asyncSnapshotLock) {
 
@@ -440,12 +437,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	/**
 	 * Encapsulates the process to perform a snapshot of a RocksDBKeyedStateBackend.
 	 */
-	static final class RocksDBFullSnapshotOperation {
+	static final class RocksDBFullSnapshotOperation<K> {
 
 		static final int FIRST_BIT_IN_BYTE_MASK = 0x80;
 		static final int END_OF_KEY_GROUP_MARK = 0xFFFF;
 
-		private final RocksDBKeyedStateBackend<?> stateBackend;
+		private final RocksDBKeyedStateBackend<K> stateBackend;
 		private final KeyGroupRangeOffsets keyGroupRangeOffsets;
 		private final CheckpointStreamFactory checkpointStreamFactory;
 
@@ -461,7 +458,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		private KeyGroupsStateHandle snapshotResultStateHandle;
 
 		RocksDBFullSnapshotOperation(
-				RocksDBKeyedStateBackend<?> stateBackend,
+				RocksDBKeyedStateBackend<K> stateBackend,
 				CheckpointStreamFactory checkpointStreamFactory) {
 
 			this.stateBackend = stateBackend;
@@ -601,8 +598,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				++kvStateId;
 			}
 
-			KeyedBackendSerializationProxy serializationProxy =
-					new KeyedBackendSerializationProxy(stateBackend.getKeySerializer(), metaInfoSnapshots);
+			KeyedBackendSerializationProxy<K> serializationProxy =
+					new KeyedBackendSerializationProxy<>(stateBackend.getKeySerializer(), metaInfoSnapshots);
 
 			serializationProxy.write(outputView);
 		}
@@ -710,10 +707,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 	}
 
-	private static final class RocksDBIncrementalSnapshotOperation {
+	private static final class RocksDBIncrementalSnapshotOperation<K> {
 
 		/** The backend which we snapshot */
-		private final RocksDBKeyedStateBackend<?> stateBackend;
+		private final RocksDBKeyedStateBackend<K> stateBackend;
 
 		/** Stream factory that creates the outpus streams to DFS */
 		private final CheckpointStreamFactory checkpointStreamFactory;
@@ -748,7 +745,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		private StreamStateHandle metaStateHandle = null;
 
 		private RocksDBIncrementalSnapshotOperation(
-				RocksDBKeyedStateBackend<?> stateBackend,
+				RocksDBKeyedStateBackend<K> stateBackend,
 				CheckpointStreamFactory checkpointStreamFactory,
 				long checkpointId,
 				long checkpointTimestamp) {
@@ -810,8 +807,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
 				closeableRegistry.registerClosable(outputStream);
 
-				KeyedBackendSerializationProxy serializationProxy =
-					new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfoSnapshots);
+				KeyedBackendSerializationProxy<K> serializationProxy =
+					new KeyedBackendSerializationProxy<>(stateBackend.keySerializer, stateMetaInfoSnapshots);
 				DataOutputView out = new DataOutputViewStreamWrapper(outputStream);
 
 				serializationProxy.write(out);
@@ -964,10 +961,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				LOG.info("Converting RocksDB state from old savepoint.");
 				restoreOldSavepointKeyedState(restoreState);
 			} else if (restoreState.iterator().next() instanceof IncrementalKeyedStateHandle) {
-				RocksDBIncrementalRestoreOperation restoreOperation = new RocksDBIncrementalRestoreOperation(this);
+				RocksDBIncrementalRestoreOperation<K> restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
 				restoreOperation.restore(restoreState);
 			} else {
-				RocksDBFullRestoreOperation restoreOperation = new RocksDBFullRestoreOperation(this);
+				RocksDBFullRestoreOperation<K> restoreOperation = new RocksDBFullRestoreOperation<>(this);
 				restoreOperation.doRestore(restoreState);
 			}
 		} catch (Exception ex) {
@@ -1037,9 +1034,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	/**
 	 * Encapsulates the process of restoring a RocksDBKeyedStateBackend from a snapshot.
 	 */
-	static final class RocksDBFullRestoreOperation {
+	static final class RocksDBFullRestoreOperation<K> {
 
-		private final RocksDBKeyedStateBackend<?> rocksDBKeyedStateBackend;
+		private final RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend;
 
 		/** Current key-groups state handle from which we restore key-groups */
 		private KeyGroupsStateHandle currentKeyGroupsStateHandle;
@@ -1055,7 +1052,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		 *
 		 * @param rocksDBKeyedStateBackend the state backend into which we restore
 		 */
-		public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<?> rocksDBKeyedStateBackend) {
+		public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend) {
 			this.rocksDBKeyedStateBackend = Preconditions.checkNotNull(rocksDBKeyedStateBackend);
 		}
 
@@ -1116,11 +1113,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		 * @throws ClassNotFoundException
 		 * @throws RocksDBException
 		 */
-		@SuppressWarnings("unchecked")
-		private void restoreKVStateMetaData() throws IOException, ClassNotFoundException, RocksDBException {
+		private void restoreKVStateMetaData() throws IOException, RocksDBException {
 
-			KeyedBackendSerializationProxy serializationProxy =
-					new KeyedBackendSerializationProxy(rocksDBKeyedStateBackend.userCodeClassLoader);
+			KeyedBackendSerializationProxy<K> serializationProxy =
+					new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
 
 			serializationProxy.read(currentStateHandleInView);
 
@@ -1130,7 +1126,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					serializationProxy.getKeySerializer(),
 					TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
 					serializationProxy.getKeySerializerConfigSnapshot(),
-					(TypeSerializer) rocksDBKeyedStateBackend.keySerializer)
+					rocksDBKeyedStateBackend.keySerializer)
 				.isRequiresMigration()) {
 
 				// TODO replace with state migration; note that key hash codes need to remain the same after migration
@@ -1221,15 +1217,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 	}
 
-	private static class RocksDBIncrementalRestoreOperation {
+	private static class RocksDBIncrementalRestoreOperation<T> {
 
-		private final RocksDBKeyedStateBackend<?> stateBackend;
+		private final RocksDBKeyedStateBackend<T> stateBackend;
 
-		private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<?> stateBackend) {
+		private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) {
 			this.stateBackend = stateBackend;
 		}
 
-		@SuppressWarnings("unchecked")
 		private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> readMetaData(
 				StreamStateHandle metaStateHandle) throws Exception {
 
@@ -1239,8 +1234,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				inputStream = metaStateHandle.openInputStream();
 				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
 
-				KeyedBackendSerializationProxy serializationProxy =
-					new KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader);
+				KeyedBackendSerializationProxy<T> serializationProxy =
+					new KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader);
 				DataInputView in = new DataInputViewStreamWrapper(inputStream);
 				serializationProxy.read(in);
 
@@ -1250,7 +1245,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 						serializationProxy.getKeySerializer(),
 						TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
 						serializationProxy.getKeySerializerConfigSnapshot(),
-						(TypeSerializer) stateBackend.keySerializer)
+						stateBackend.keySerializer)
 					.isRequiresMigration()) {
 
 					// TODO replace with state migration; note that key hash codes need to remain the same after migration
@@ -1536,7 +1531,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			// TODO with eager registration in place, these checks should be moved to restore()
 
 			RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfo =
-				restoredKvStateMetaInfos.get(descriptor.getName());
+				(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>) restoredKvStateMetaInfos.get(descriptor.getName());
 
 			Preconditions.checkState(
 				newMetaInfo.getName().equals(restoredMetaInfo.getName()),
@@ -1556,7 +1551,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			// check compatibility results to determine if state migration is required
 
-			CompatibilityResult<N> namespaceCompatibility = StateMigrationUtil.resolveCompatibilityResult(
+			CompatibilityResult<?> namespaceCompatibility = StateMigrationUtil.resolveCompatibilityResult(
 					restoredMetaInfo.getNamespaceSerializer(),
 					MigrationNamespaceSerializerProxy.class,
 					restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
@@ -1929,7 +1924,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					new InstantiationUtil.ClassLoaderObjectInputStream(
 							new DataInputViewStream(inputView), userCodeClassLoader);
 
-			StateDescriptor stateDescriptor = (StateDescriptor) ooIn.readObject();
+			StateDescriptor<?, ?> stateDescriptor = (StateDescriptor<?, ?>) ooIn.readObject();
 
 			columnFamilyMapping.put(mappingByte, stateDescriptor);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6c573540/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
index 94fb9f1..f265f78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -39,11 +39,11 @@ import java.util.List;
  * Serialization proxy for all meta data in keyed state backends. In the future we might also requiresMigration the actual state
  * serialization logic here.
  */
-public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable {
+public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritable {
 
 	public static final int VERSION = 3;
 
-	private TypeSerializer<?> keySerializer;
+	private TypeSerializer<K> keySerializer;
 	private TypeSerializerConfigSnapshot keySerializerConfigSnapshot;
 
 	private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots;
@@ -55,7 +55,7 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
 	}
 
 	public KeyedBackendSerializationProxy(
-			TypeSerializer<?> keySerializer,
+			TypeSerializer<K> keySerializer,
 			List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) {
 
 		this.keySerializer = Preconditions.checkNotNull(keySerializer);
@@ -70,7 +70,7 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
 		return stateMetaInfoSnapshots;
 	}
 
-	public TypeSerializer<?> getKeySerializer() {
+	public TypeSerializer<K> getKeySerializer() {
 		return keySerializer;
 	}
 
@@ -122,7 +122,7 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
 	public void read(DataInputView in) throws IOException {
 		super.read(in);
 
-		final TypeSerializerSerializationProxy<?> keySerializerProxy =
+		final TypeSerializerSerializationProxy<K> keySerializerProxy =
 			new TypeSerializerSerializationProxy<>(userCodeClassLoader);
 
 		// only starting from version 3, we have the key serializer and its config snapshot written

http://git-wip-us.apache.org/repos/asf/flink/blob/6c573540/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 6eb314b..3e5645b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -272,8 +272,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			}
 		}
 
-		final KeyedBackendSerializationProxy serializationProxy =
-				new KeyedBackendSerializationProxy(keySerializer, metaInfoSnapshots);
+		final KeyedBackendSerializationProxy<K> serializationProxy =
+				new KeyedBackendSerializationProxy<>(keySerializer, metaInfoSnapshots);
 
 		//--------------------------------------------------- this becomes the end of sync part
 
@@ -383,8 +383,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			try {
 				DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(fsDataInputStream);
 
-				KeyedBackendSerializationProxy serializationProxy =
-						new KeyedBackendSerializationProxy(userCodeClassLoader);
+				KeyedBackendSerializationProxy<K> serializationProxy =
+						new KeyedBackendSerializationProxy<>(userCodeClassLoader);
 
 				serializationProxy.read(inView);
 
@@ -395,7 +395,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 						serializationProxy.getKeySerializer(),
 						TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
 						serializationProxy.getKeySerializerConfigSnapshot(),
-						(TypeSerializer) keySerializer)
+						keySerializer)
 						.isRequiresMigration()) {
 
 						// TODO replace with state migration; note that key hash codes need to remain the same after migration
@@ -405,7 +405,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 					keySerializerRestored = true;
 				}
-				
+
 				List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
 						serializationProxy.getStateMetaInfoSnapshots();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6c573540/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 8bbbd5f..3d5b210 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -66,8 +66,8 @@ public class SerializationProxiesTest {
 		stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
 			StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer).snapshot());
 
-		KeyedBackendSerializationProxy serializationProxy =
-				new KeyedBackendSerializationProxy(keySerializer, stateMetaInfoList);
+		KeyedBackendSerializationProxy<?> serializationProxy =
+				new KeyedBackendSerializationProxy<>(keySerializer, stateMetaInfoList);
 
 		byte[] serialized;
 		try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
@@ -76,7 +76,7 @@ public class SerializationProxiesTest {
 		}
 
 		serializationProxy =
-				new KeyedBackendSerializationProxy(Thread.currentThread().getContextClassLoader());
+				new KeyedBackendSerializationProxy<>(Thread.currentThread().getContextClassLoader());
 
 		try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
 			serializationProxy.read(new DataInputViewStreamWrapper(in));
@@ -103,8 +103,8 @@ public class SerializationProxiesTest {
 		stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
 			StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer).snapshot());
 
-		KeyedBackendSerializationProxy serializationProxy =
-			new KeyedBackendSerializationProxy(keySerializer, stateMetaInfoList);
+		KeyedBackendSerializationProxy<?> serializationProxy =
+			new KeyedBackendSerializationProxy<>(keySerializer, stateMetaInfoList);
 
 		byte[] serialized;
 		try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
@@ -113,7 +113,7 @@ public class SerializationProxiesTest {
 		}
 
 		serializationProxy =
-			new KeyedBackendSerializationProxy(Thread.currentThread().getContextClassLoader());
+			new KeyedBackendSerializationProxy<>(Thread.currentThread().getContextClassLoader());
 
 		// mock failure when deserializing serializers
 		TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class);


[3/4] flink git commit: [hotfix] Remove unnecessary job id from RocksDBKeyedStateBackend

Posted by tr...@apache.org.
[hotfix] Remove unnecessary job id from RocksDBKeyedStateBackend


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe5b92f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe5b92f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe5b92f7

Branch: refs/heads/release-1.3
Commit: fe5b92f7e17a90315245706d5bc6868f9f729e56
Parents: 6c57354
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu May 18 17:48:30 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu May 18 23:16:27 2017 +0200

----------------------------------------------------------------------
 .../contrib/streaming/state/RocksDBKeyedStateBackend.java    | 2 --
 .../flink/contrib/streaming/state/RocksDBStateBackend.java   | 3 +--
 .../test/query/KVStateRequestSerializerRocksDBTest.java      | 8 +++-----
 3 files changed, 4 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fe5b92f7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index d0f73bf..88a759d 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -18,7 +18,6 @@
 package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -180,7 +179,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	private static final String SST_FILE_SUFFIX = ".sst";
 
 	public RocksDBKeyedStateBackend(
-			JobID jobId,
 			String operatorIdentifier,
 			ClassLoader userCodeClassLoader,
 			File instanceBasePath,

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5b92f7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 55b8be2..2b70dcd 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -300,10 +300,9 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 		lazyInitializeForJob(env, operatorIdentifier);
 
 		File instanceBasePath =
-				new File(getNextStoragePath(), "job-" + jobId.toString() + "_op-" + operatorIdentifier + "_uuid-" + UUID.randomUUID());
+				new File(getNextStoragePath(), "job-" + jobId + "_op-" + operatorIdentifier + "_uuid-" + UUID.randomUUID());
 
 		return new RocksDBKeyedStateBackend<>(
-				jobID,
 				operatorIdentifier,
 				env.getUserClassLoader(),
 				instanceBasePath,

http://git-wip-us.apache.org/repos/asf/flink/blob/fe5b92f7/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
index 05f72c2..0f99afb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.test.query;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -66,7 +65,6 @@ public final class KVStateRequestSerializerRocksDBTest {
 	final static class RocksDBKeyedStateBackend2<K> extends RocksDBKeyedStateBackend<K> {
 
 		RocksDBKeyedStateBackend2(
-				final JobID jobId,
 				final String operatorIdentifier,
 				final ClassLoader userCodeClassLoader,
 				final File instanceBasePath,
@@ -78,7 +76,7 @@ public final class KVStateRequestSerializerRocksDBTest {
 				final KeyGroupRange keyGroupRange,
 				final ExecutionConfig executionConfig) throws Exception {
 
-			super(jobId, operatorIdentifier, userCodeClassLoader,
+			super(operatorIdentifier, userCodeClassLoader,
 				instanceBasePath,
 				dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer,
 				numberOfKeyGroups, keyGroupRange, executionConfig, false);
@@ -110,7 +108,7 @@ public final class KVStateRequestSerializerRocksDBTest {
 		ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions();
 		final RocksDBKeyedStateBackend2<Long> longHeapKeyedStateBackend =
 			new RocksDBKeyedStateBackend2<>(
-				new JobID(), "no-op",
+				"no-op",
 				ClassLoader.getSystemClassLoader(),
 				temporaryFolder.getRoot(),
 				dbOptions,
@@ -147,7 +145,7 @@ public final class KVStateRequestSerializerRocksDBTest {
 		ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions();
 		final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend =
 			new RocksDBKeyedStateBackend<>(
-				new JobID(), "no-op",
+				"no-op",
 				ClassLoader.getSystemClassLoader(),
 				temporaryFolder.getRoot(),
 				dbOptions,


[4/4] flink git commit: [hotfix] Restore KeySerializer only once

Posted by tr...@apache.org.
[hotfix] Restore KeySerializer only once


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cfb6a698
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cfb6a698
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cfb6a698

Branch: refs/heads/release-1.3
Commit: cfb6a6982cce89f76209d7c4bea4c9905fd5092a
Parents: 51fb7ed
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu May 18 16:41:15 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu May 18 23:16:27 2017 +0200

----------------------------------------------------------------------
 .../state/heap/HeapKeyedStateBackend.java       | 22 +++++++++++++-------
 1 file changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cfb6a698/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 8d3d8a0..6eb314b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -362,6 +362,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		int numRegisteredKvStates = 0;
 		stateTables.clear();
 
+		boolean keySerializerRestored = false;
+
 		for (KeyedStateHandle keyedStateHandle : state) {
 
 			if (keyedStateHandle == null) {
@@ -386,20 +388,24 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 				serializationProxy.read(inView);
 
-				// check for key serializer compatibility; this also reconfigures the
-				// key serializer to be compatible, if it is required and is possible
-				if (StateMigrationUtil.resolveCompatibilityResult(
+				if (!keySerializerRestored) {
+					// check for key serializer compatibility; this also reconfigures the
+					// key serializer to be compatible, if it is required and is possible
+					if (StateMigrationUtil.resolveCompatibilityResult(
 						serializationProxy.getKeySerializer(),
 						TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
 						serializationProxy.getKeySerializerConfigSnapshot(),
 						(TypeSerializer) keySerializer)
-					.isRequiresMigration()) {
+						.isRequiresMigration()) {
 
-					// TODO replace with state migration; note that key hash codes need to remain the same after migration
-					throw new RuntimeException("The new key serializer is not compatible to read previous keys. " +
-						"Aborting now since state migration is currently not available");
-				}
+						// TODO replace with state migration; note that key hash codes need to remain the same after migration
+						throw new RuntimeException("The new key serializer is not compatible to read previous keys. " +
+							"Aborting now since state migration is currently not available");
+					}
 
+					keySerializerRestored = true;
+				}
+				
 				List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
 						serializationProxy.getStateMetaInfoSnapshots();