You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/19 06:11:27 UTC
[1/4] flink git commit: [hotfix] Correct equals & hashCode
implementation of KryoSerializer
Repository: flink
Updated Branches:
refs/heads/release-1.3 51fb7ed79 -> 94111f985
[hotfix] Correct equals & hashCode implementation of KryoSerializer
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/94111f98
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/94111f98
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/94111f98
Branch: refs/heads/release-1.3
Commit: 94111f98558274f6c17284ba7a9b627143ac7b1e
Parents: fe5b92f
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu May 18 19:16:06 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu May 18 23:16:27 2017 +0200
----------------------------------------------------------------------
.../typeutils/runtime/KryoRegistration.java | 8 ++++++--
.../typeutils/runtime/kryo/KryoSerializer.java | 20 +++++++++++---------
2 files changed, 17 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/94111f98/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java
index 882073d..17a49b8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java
@@ -23,9 +23,9 @@ import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
import org.apache.flink.util.Preconditions;
+import javax.annotation.Nullable;
import java.io.Serializable;
/**
@@ -56,6 +56,7 @@ public class KryoRegistration implements Serializable {
* <p>This can be a dummy serializer {@link KryoRegistrationSerializerConfigSnapshot.DummyKryoSerializerClass} if
* the serializer class no longer exists when this registration instance was restored.
*/
+ @Nullable
private final Class<? extends Serializer<?>> serializerClass;
/**
@@ -65,9 +66,10 @@ public class KryoRegistration implements Serializable {
* <p>This can be a dummy serializer {@link KryoRegistrationSerializerConfigSnapshot.DummyKryoSerializerClass} if
* the serializer class no longer exists or is no longer valid when this registration instance was restored.
*/
+ @Nullable
private final ExecutionConfig.SerializableSerializer<? extends Serializer<?>> serializableSerializerInstance;
- private SerializerDefinitionType serializerDefinitionType;
+ private final SerializerDefinitionType serializerDefinitionType;
public KryoRegistration(Class<?> registeredClass) {
this.registeredClass = Preconditions.checkNotNull(registeredClass);
@@ -106,10 +108,12 @@ public class KryoRegistration implements Serializable {
return serializerDefinitionType;
}
+ @Nullable
public Class<? extends Serializer<?>> getSerializerClass() {
return serializerClass;
}
+ @Nullable
public ExecutionConfig.SerializableSerializer<? extends Serializer<?>> getSerializableSerializerInstance() {
return serializableSerializerInstance;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/94111f98/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index 655de76..6730136 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -276,16 +276,17 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
}
// --------------------------------------------------------------------------------------------
-
+
@Override
public int hashCode() {
- return Objects.hash(
- type,
- registeredTypes,
- registeredTypesWithSerializerClasses,
- defaultSerializerClasses);
+ int result = type.hashCode();
+ result = 31 * result + (kryoRegistrations.hashCode());
+ result = 31 * result + (defaultSerializers.hashCode());
+ result = 31 * result + (defaultSerializerClasses.hashCode());
+
+ return result;
}
-
+
@Override
public boolean equals(Object obj) {
if (obj instanceof KryoSerializer) {
@@ -293,8 +294,9 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
return other.canEqual(this) &&
type == other.type &&
- kryoRegistrations.equals(other.kryoRegistrations) &&
- defaultSerializerClasses.equals(other.defaultSerializerClasses);
+ Objects.equals(kryoRegistrations, other.kryoRegistrations) &&
+ Objects.equals(defaultSerializerClasses, other.defaultSerializerClasses) &&
+ Objects.equals(defaultSerializers, other.defaultSerializers);
} else {
return false;
}
[2/4] flink git commit: [hotfix] Remove some raw type usage in
RocksDBKeyedStateBackend
Posted by tr...@apache.org.
[hotfix] Remove some raw type usage in RocksDBKeyedStateBackend
Introduce more generic parameters
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c573540
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c573540
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c573540
Branch: refs/heads/release-1.3
Commit: 6c573540f1e558192cbd3763446a9e6dd848efce
Parents: cfb6a69
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu May 18 17:05:50 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu May 18 23:16:27 2017 +0200
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 69 +++++++++-----------
.../state/KeyedBackendSerializationProxy.java | 10 +--
.../state/heap/HeapKeyedStateBackend.java | 12 ++--
.../runtime/state/SerializationProxiesTest.java | 12 ++--
4 files changed, 49 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6c573540/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index ddc7e17..d0f73bf 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -123,8 +123,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackend.class);
- private final JobID jobId;
-
private final String operatorIdentifier;
/** The column family options from the options factory */
@@ -165,7 +163,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* TODO this map can be removed when eager-state registration is in place.
* TODO we currently need this cached to check state migration strategies when new serializers are registered.
*/
- private Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot> restoredKvStateMetaInfos;
+ private Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos;
/** Number of bytes required to prefix the key groups. */
private final int keyGroupPrefixBytes;
@@ -198,7 +196,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig);
- this.jobId = Preconditions.checkNotNull(jobId);
this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier);
this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
@@ -314,8 +311,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
final long checkpointTimestamp,
final CheckpointStreamFactory checkpointStreamFactory) throws Exception {
- final RocksDBIncrementalSnapshotOperation snapshotOperation =
- new RocksDBIncrementalSnapshotOperation(
+ final RocksDBIncrementalSnapshotOperation<K> snapshotOperation =
+ new RocksDBIncrementalSnapshotOperation<>(
this,
checkpointStreamFactory,
checkpointId,
@@ -365,7 +362,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
long startTime = System.currentTimeMillis();
- final RocksDBFullSnapshotOperation snapshotOperation = new RocksDBFullSnapshotOperation(this, streamFactory);
+ final RocksDBFullSnapshotOperation<K> snapshotOperation = new RocksDBFullSnapshotOperation<>(this, streamFactory);
// hold the db lock while operation on the db to guard us against async db disposal
synchronized (asyncSnapshotLock) {
@@ -440,12 +437,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/**
* Encapsulates the process to perform a snapshot of a RocksDBKeyedStateBackend.
*/
- static final class RocksDBFullSnapshotOperation {
+ static final class RocksDBFullSnapshotOperation<K> {
static final int FIRST_BIT_IN_BYTE_MASK = 0x80;
static final int END_OF_KEY_GROUP_MARK = 0xFFFF;
- private final RocksDBKeyedStateBackend<?> stateBackend;
+ private final RocksDBKeyedStateBackend<K> stateBackend;
private final KeyGroupRangeOffsets keyGroupRangeOffsets;
private final CheckpointStreamFactory checkpointStreamFactory;
@@ -461,7 +458,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private KeyGroupsStateHandle snapshotResultStateHandle;
RocksDBFullSnapshotOperation(
- RocksDBKeyedStateBackend<?> stateBackend,
+ RocksDBKeyedStateBackend<K> stateBackend,
CheckpointStreamFactory checkpointStreamFactory) {
this.stateBackend = stateBackend;
@@ -601,8 +598,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
++kvStateId;
}
- KeyedBackendSerializationProxy serializationProxy =
- new KeyedBackendSerializationProxy(stateBackend.getKeySerializer(), metaInfoSnapshots);
+ KeyedBackendSerializationProxy<K> serializationProxy =
+ new KeyedBackendSerializationProxy<>(stateBackend.getKeySerializer(), metaInfoSnapshots);
serializationProxy.write(outputView);
}
@@ -710,10 +707,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
- private static final class RocksDBIncrementalSnapshotOperation {
+ private static final class RocksDBIncrementalSnapshotOperation<K> {
/** The backend which we snapshot */
- private final RocksDBKeyedStateBackend<?> stateBackend;
+ private final RocksDBKeyedStateBackend<K> stateBackend;
/** Stream factory that creates the outpus streams to DFS */
private final CheckpointStreamFactory checkpointStreamFactory;
@@ -748,7 +745,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private StreamStateHandle metaStateHandle = null;
private RocksDBIncrementalSnapshotOperation(
- RocksDBKeyedStateBackend<?> stateBackend,
+ RocksDBKeyedStateBackend<K> stateBackend,
CheckpointStreamFactory checkpointStreamFactory,
long checkpointId,
long checkpointTimestamp) {
@@ -810,8 +807,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
closeableRegistry.registerClosable(outputStream);
- KeyedBackendSerializationProxy serializationProxy =
- new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfoSnapshots);
+ KeyedBackendSerializationProxy<K> serializationProxy =
+ new KeyedBackendSerializationProxy<>(stateBackend.keySerializer, stateMetaInfoSnapshots);
DataOutputView out = new DataOutputViewStreamWrapper(outputStream);
serializationProxy.write(out);
@@ -964,10 +961,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
LOG.info("Converting RocksDB state from old savepoint.");
restoreOldSavepointKeyedState(restoreState);
} else if (restoreState.iterator().next() instanceof IncrementalKeyedStateHandle) {
- RocksDBIncrementalRestoreOperation restoreOperation = new RocksDBIncrementalRestoreOperation(this);
+ RocksDBIncrementalRestoreOperation<K> restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
restoreOperation.restore(restoreState);
} else {
- RocksDBFullRestoreOperation restoreOperation = new RocksDBFullRestoreOperation(this);
+ RocksDBFullRestoreOperation<K> restoreOperation = new RocksDBFullRestoreOperation<>(this);
restoreOperation.doRestore(restoreState);
}
} catch (Exception ex) {
@@ -1037,9 +1034,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/**
* Encapsulates the process of restoring a RocksDBKeyedStateBackend from a snapshot.
*/
- static final class RocksDBFullRestoreOperation {
+ static final class RocksDBFullRestoreOperation<K> {
- private final RocksDBKeyedStateBackend<?> rocksDBKeyedStateBackend;
+ private final RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend;
/** Current key-groups state handle from which we restore key-groups */
private KeyGroupsStateHandle currentKeyGroupsStateHandle;
@@ -1055,7 +1052,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
*
* @param rocksDBKeyedStateBackend the state backend into which we restore
*/
- public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<?> rocksDBKeyedStateBackend) {
+ public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend) {
this.rocksDBKeyedStateBackend = Preconditions.checkNotNull(rocksDBKeyedStateBackend);
}
@@ -1116,11 +1113,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* @throws ClassNotFoundException
* @throws RocksDBException
*/
- @SuppressWarnings("unchecked")
- private void restoreKVStateMetaData() throws IOException, ClassNotFoundException, RocksDBException {
+ private void restoreKVStateMetaData() throws IOException, RocksDBException {
- KeyedBackendSerializationProxy serializationProxy =
- new KeyedBackendSerializationProxy(rocksDBKeyedStateBackend.userCodeClassLoader);
+ KeyedBackendSerializationProxy<K> serializationProxy =
+ new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
serializationProxy.read(currentStateHandleInView);
@@ -1130,7 +1126,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
serializationProxy.getKeySerializer(),
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
serializationProxy.getKeySerializerConfigSnapshot(),
- (TypeSerializer) rocksDBKeyedStateBackend.keySerializer)
+ rocksDBKeyedStateBackend.keySerializer)
.isRequiresMigration()) {
// TODO replace with state migration; note that key hash codes need to remain the same after migration
@@ -1221,15 +1217,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
- private static class RocksDBIncrementalRestoreOperation {
+ private static class RocksDBIncrementalRestoreOperation<T> {
- private final RocksDBKeyedStateBackend<?> stateBackend;
+ private final RocksDBKeyedStateBackend<T> stateBackend;
- private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<?> stateBackend) {
+ private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) {
this.stateBackend = stateBackend;
}
- @SuppressWarnings("unchecked")
private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> readMetaData(
StreamStateHandle metaStateHandle) throws Exception {
@@ -1239,8 +1234,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
inputStream = metaStateHandle.openInputStream();
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
- KeyedBackendSerializationProxy serializationProxy =
- new KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader);
+ KeyedBackendSerializationProxy<T> serializationProxy =
+ new KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader);
DataInputView in = new DataInputViewStreamWrapper(inputStream);
serializationProxy.read(in);
@@ -1250,7 +1245,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
serializationProxy.getKeySerializer(),
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
serializationProxy.getKeySerializerConfigSnapshot(),
- (TypeSerializer) stateBackend.keySerializer)
+ stateBackend.keySerializer)
.isRequiresMigration()) {
// TODO replace with state migration; note that key hash codes need to remain the same after migration
@@ -1536,7 +1531,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// TODO with eager registration in place, these checks should be moved to restore()
RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfo =
- restoredKvStateMetaInfos.get(descriptor.getName());
+ (RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>) restoredKvStateMetaInfos.get(descriptor.getName());
Preconditions.checkState(
newMetaInfo.getName().equals(restoredMetaInfo.getName()),
@@ -1556,7 +1551,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// check compatibility results to determine if state migration is required
- CompatibilityResult<N> namespaceCompatibility = StateMigrationUtil.resolveCompatibilityResult(
+ CompatibilityResult<?> namespaceCompatibility = StateMigrationUtil.resolveCompatibilityResult(
restoredMetaInfo.getNamespaceSerializer(),
MigrationNamespaceSerializerProxy.class,
restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
@@ -1929,7 +1924,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
new InstantiationUtil.ClassLoaderObjectInputStream(
new DataInputViewStream(inputView), userCodeClassLoader);
- StateDescriptor stateDescriptor = (StateDescriptor) ooIn.readObject();
+ StateDescriptor<?, ?> stateDescriptor = (StateDescriptor<?, ?>) ooIn.readObject();
columnFamilyMapping.put(mappingByte, stateDescriptor);
http://git-wip-us.apache.org/repos/asf/flink/blob/6c573540/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
index 94fb9f1..f265f78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -39,11 +39,11 @@ import java.util.List;
* Serialization proxy for all meta data in keyed state backends. In the future we might also requiresMigration the actual state
* serialization logic here.
*/
-public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable {
+public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritable {
public static final int VERSION = 3;
- private TypeSerializer<?> keySerializer;
+ private TypeSerializer<K> keySerializer;
private TypeSerializerConfigSnapshot keySerializerConfigSnapshot;
private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots;
@@ -55,7 +55,7 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
}
public KeyedBackendSerializationProxy(
- TypeSerializer<?> keySerializer,
+ TypeSerializer<K> keySerializer,
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) {
this.keySerializer = Preconditions.checkNotNull(keySerializer);
@@ -70,7 +70,7 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
return stateMetaInfoSnapshots;
}
- public TypeSerializer<?> getKeySerializer() {
+ public TypeSerializer<K> getKeySerializer() {
return keySerializer;
}
@@ -122,7 +122,7 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
public void read(DataInputView in) throws IOException {
super.read(in);
- final TypeSerializerSerializationProxy<?> keySerializerProxy =
+ final TypeSerializerSerializationProxy<K> keySerializerProxy =
new TypeSerializerSerializationProxy<>(userCodeClassLoader);
// only starting from version 3, we have the key serializer and its config snapshot written
http://git-wip-us.apache.org/repos/asf/flink/blob/6c573540/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 6eb314b..3e5645b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -272,8 +272,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
- final KeyedBackendSerializationProxy serializationProxy =
- new KeyedBackendSerializationProxy(keySerializer, metaInfoSnapshots);
+ final KeyedBackendSerializationProxy<K> serializationProxy =
+ new KeyedBackendSerializationProxy<>(keySerializer, metaInfoSnapshots);
//--------------------------------------------------- this becomes the end of sync part
@@ -383,8 +383,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
try {
DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(fsDataInputStream);
- KeyedBackendSerializationProxy serializationProxy =
- new KeyedBackendSerializationProxy(userCodeClassLoader);
+ KeyedBackendSerializationProxy<K> serializationProxy =
+ new KeyedBackendSerializationProxy<>(userCodeClassLoader);
serializationProxy.read(inView);
@@ -395,7 +395,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
serializationProxy.getKeySerializer(),
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
serializationProxy.getKeySerializerConfigSnapshot(),
- (TypeSerializer) keySerializer)
+ keySerializer)
.isRequiresMigration()) {
// TODO replace with state migration; note that key hash codes need to remain the same after migration
@@ -405,7 +405,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
keySerializerRestored = true;
}
-
+
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
serializationProxy.getStateMetaInfoSnapshots();
http://git-wip-us.apache.org/repos/asf/flink/blob/6c573540/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 8bbbd5f..3d5b210 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -66,8 +66,8 @@ public class SerializationProxiesTest {
stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer).snapshot());
- KeyedBackendSerializationProxy serializationProxy =
- new KeyedBackendSerializationProxy(keySerializer, stateMetaInfoList);
+ KeyedBackendSerializationProxy<?> serializationProxy =
+ new KeyedBackendSerializationProxy<>(keySerializer, stateMetaInfoList);
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
@@ -76,7 +76,7 @@ public class SerializationProxiesTest {
}
serializationProxy =
- new KeyedBackendSerializationProxy(Thread.currentThread().getContextClassLoader());
+ new KeyedBackendSerializationProxy<>(Thread.currentThread().getContextClassLoader());
try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
serializationProxy.read(new DataInputViewStreamWrapper(in));
@@ -103,8 +103,8 @@ public class SerializationProxiesTest {
stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer).snapshot());
- KeyedBackendSerializationProxy serializationProxy =
- new KeyedBackendSerializationProxy(keySerializer, stateMetaInfoList);
+ KeyedBackendSerializationProxy<?> serializationProxy =
+ new KeyedBackendSerializationProxy<>(keySerializer, stateMetaInfoList);
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
@@ -113,7 +113,7 @@ public class SerializationProxiesTest {
}
serializationProxy =
- new KeyedBackendSerializationProxy(Thread.currentThread().getContextClassLoader());
+ new KeyedBackendSerializationProxy<>(Thread.currentThread().getContextClassLoader());
// mock failure when deserializing serializers
TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class);
[3/4] flink git commit: [hotfix] Remove unnecessary job id from
RocksDBKeyedStateBackend
Posted by tr...@apache.org.
[hotfix] Remove unnecessary job id from RocksDBKeyedStateBackend
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe5b92f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe5b92f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe5b92f7
Branch: refs/heads/release-1.3
Commit: fe5b92f7e17a90315245706d5bc6868f9f729e56
Parents: 6c57354
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu May 18 17:48:30 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu May 18 23:16:27 2017 +0200
----------------------------------------------------------------------
.../contrib/streaming/state/RocksDBKeyedStateBackend.java | 2 --
.../flink/contrib/streaming/state/RocksDBStateBackend.java | 3 +--
.../test/query/KVStateRequestSerializerRocksDBTest.java | 8 +++-----
3 files changed, 4 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fe5b92f7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index d0f73bf..88a759d 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -18,7 +18,6 @@
package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -180,7 +179,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private static final String SST_FILE_SUFFIX = ".sst";
public RocksDBKeyedStateBackend(
- JobID jobId,
String operatorIdentifier,
ClassLoader userCodeClassLoader,
File instanceBasePath,
http://git-wip-us.apache.org/repos/asf/flink/blob/fe5b92f7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 55b8be2..2b70dcd 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -300,10 +300,9 @@ public class RocksDBStateBackend extends AbstractStateBackend {
lazyInitializeForJob(env, operatorIdentifier);
File instanceBasePath =
- new File(getNextStoragePath(), "job-" + jobId.toString() + "_op-" + operatorIdentifier + "_uuid-" + UUID.randomUUID());
+ new File(getNextStoragePath(), "job-" + jobId + "_op-" + operatorIdentifier + "_uuid-" + UUID.randomUUID());
return new RocksDBKeyedStateBackend<>(
- jobID,
operatorIdentifier,
env.getUserClassLoader(),
instanceBasePath,
http://git-wip-us.apache.org/repos/asf/flink/blob/fe5b92f7/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
index 05f72c2..0f99afb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.test.query;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -66,7 +65,6 @@ public final class KVStateRequestSerializerRocksDBTest {
final static class RocksDBKeyedStateBackend2<K> extends RocksDBKeyedStateBackend<K> {
RocksDBKeyedStateBackend2(
- final JobID jobId,
final String operatorIdentifier,
final ClassLoader userCodeClassLoader,
final File instanceBasePath,
@@ -78,7 +76,7 @@ public final class KVStateRequestSerializerRocksDBTest {
final KeyGroupRange keyGroupRange,
final ExecutionConfig executionConfig) throws Exception {
- super(jobId, operatorIdentifier, userCodeClassLoader,
+ super(operatorIdentifier, userCodeClassLoader,
instanceBasePath,
dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer,
numberOfKeyGroups, keyGroupRange, executionConfig, false);
@@ -110,7 +108,7 @@ public final class KVStateRequestSerializerRocksDBTest {
ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions();
final RocksDBKeyedStateBackend2<Long> longHeapKeyedStateBackend =
new RocksDBKeyedStateBackend2<>(
- new JobID(), "no-op",
+ "no-op",
ClassLoader.getSystemClassLoader(),
temporaryFolder.getRoot(),
dbOptions,
@@ -147,7 +145,7 @@ public final class KVStateRequestSerializerRocksDBTest {
ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions();
final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend =
new RocksDBKeyedStateBackend<>(
- new JobID(), "no-op",
+ "no-op",
ClassLoader.getSystemClassLoader(),
temporaryFolder.getRoot(),
dbOptions,
[4/4] flink git commit: [hotfix] Restore KeySerializer only once
Posted by tr...@apache.org.
[hotfix] Restore KeySerializer only once
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cfb6a698
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cfb6a698
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cfb6a698
Branch: refs/heads/release-1.3
Commit: cfb6a6982cce89f76209d7c4bea4c9905fd5092a
Parents: 51fb7ed
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu May 18 16:41:15 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu May 18 23:16:27 2017 +0200
----------------------------------------------------------------------
.../state/heap/HeapKeyedStateBackend.java | 22 +++++++++++++-------
1 file changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cfb6a698/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 8d3d8a0..6eb314b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -362,6 +362,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
int numRegisteredKvStates = 0;
stateTables.clear();
+ boolean keySerializerRestored = false;
+
for (KeyedStateHandle keyedStateHandle : state) {
if (keyedStateHandle == null) {
@@ -386,20 +388,24 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
serializationProxy.read(inView);
- // check for key serializer compatibility; this also reconfigures the
- // key serializer to be compatible, if it is required and is possible
- if (StateMigrationUtil.resolveCompatibilityResult(
+ if (!keySerializerRestored) {
+ // check for key serializer compatibility; this also reconfigures the
+ // key serializer to be compatible, if it is required and is possible
+ if (StateMigrationUtil.resolveCompatibilityResult(
serializationProxy.getKeySerializer(),
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
serializationProxy.getKeySerializerConfigSnapshot(),
(TypeSerializer) keySerializer)
- .isRequiresMigration()) {
+ .isRequiresMigration()) {
- // TODO replace with state migration; note that key hash codes need to remain the same after migration
- throw new RuntimeException("The new key serializer is not compatible to read previous keys. " +
- "Aborting now since state migration is currently not available");
- }
+ // TODO replace with state migration; note that key hash codes need to remain the same after migration
+ throw new RuntimeException("The new key serializer is not compatible to read previous keys. " +
+ "Aborting now since state migration is currently not available");
+ }
+ keySerializerRestored = true;
+ }
+
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
serializationProxy.getStateMetaInfoSnapshots();