You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/05/19 06:11:07 UTC
[2/4] flink git commit: [hotfix] Remove some raw type usage in
RocksDBKeyedStateBackend
[hotfix] Remove some raw type usage in RocksDBKeyedStateBackend
Introduce more generic parameters
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f2af1a9f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f2af1a9f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f2af1a9f
Branch: refs/heads/master
Commit: f2af1a9f916bd9b941a48a1da577d19fc07badde
Parents: ef6f7b6
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu May 18 17:05:50 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu May 18 23:19:32 2017 +0200
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 69 +++++++++-----------
.../state/KeyedBackendSerializationProxy.java | 10 +--
.../state/heap/HeapKeyedStateBackend.java | 12 ++--
.../runtime/state/SerializationProxiesTest.java | 12 ++--
4 files changed, 49 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f2af1a9f/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index ddc7e17..d0f73bf 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -123,8 +123,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackend.class);
- private final JobID jobId;
-
private final String operatorIdentifier;
/** The column family options from the options factory */
@@ -165,7 +163,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* TODO this map can be removed when eager-state registration is in place.
* TODO we currently need this cached to check state migration strategies when new serializers are registered.
*/
- private Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot> restoredKvStateMetaInfos;
+ private Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos;
/** Number of bytes required to prefix the key groups. */
private final int keyGroupPrefixBytes;
@@ -198,7 +196,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig);
- this.jobId = Preconditions.checkNotNull(jobId);
this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier);
this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
@@ -314,8 +311,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
final long checkpointTimestamp,
final CheckpointStreamFactory checkpointStreamFactory) throws Exception {
- final RocksDBIncrementalSnapshotOperation snapshotOperation =
- new RocksDBIncrementalSnapshotOperation(
+ final RocksDBIncrementalSnapshotOperation<K> snapshotOperation =
+ new RocksDBIncrementalSnapshotOperation<>(
this,
checkpointStreamFactory,
checkpointId,
@@ -365,7 +362,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
long startTime = System.currentTimeMillis();
- final RocksDBFullSnapshotOperation snapshotOperation = new RocksDBFullSnapshotOperation(this, streamFactory);
+ final RocksDBFullSnapshotOperation<K> snapshotOperation = new RocksDBFullSnapshotOperation<>(this, streamFactory);
// hold the db lock while operation on the db to guard us against async db disposal
synchronized (asyncSnapshotLock) {
@@ -440,12 +437,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/**
* Encapsulates the process to perform a snapshot of a RocksDBKeyedStateBackend.
*/
- static final class RocksDBFullSnapshotOperation {
+ static final class RocksDBFullSnapshotOperation<K> {
static final int FIRST_BIT_IN_BYTE_MASK = 0x80;
static final int END_OF_KEY_GROUP_MARK = 0xFFFF;
- private final RocksDBKeyedStateBackend<?> stateBackend;
+ private final RocksDBKeyedStateBackend<K> stateBackend;
private final KeyGroupRangeOffsets keyGroupRangeOffsets;
private final CheckpointStreamFactory checkpointStreamFactory;
@@ -461,7 +458,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private KeyGroupsStateHandle snapshotResultStateHandle;
RocksDBFullSnapshotOperation(
- RocksDBKeyedStateBackend<?> stateBackend,
+ RocksDBKeyedStateBackend<K> stateBackend,
CheckpointStreamFactory checkpointStreamFactory) {
this.stateBackend = stateBackend;
@@ -601,8 +598,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
++kvStateId;
}
- KeyedBackendSerializationProxy serializationProxy =
- new KeyedBackendSerializationProxy(stateBackend.getKeySerializer(), metaInfoSnapshots);
+ KeyedBackendSerializationProxy<K> serializationProxy =
+ new KeyedBackendSerializationProxy<>(stateBackend.getKeySerializer(), metaInfoSnapshots);
serializationProxy.write(outputView);
}
@@ -710,10 +707,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
- private static final class RocksDBIncrementalSnapshotOperation {
+ private static final class RocksDBIncrementalSnapshotOperation<K> {
/** The backend which we snapshot */
- private final RocksDBKeyedStateBackend<?> stateBackend;
+ private final RocksDBKeyedStateBackend<K> stateBackend;
/** Stream factory that creates the outpus streams to DFS */
private final CheckpointStreamFactory checkpointStreamFactory;
@@ -748,7 +745,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private StreamStateHandle metaStateHandle = null;
private RocksDBIncrementalSnapshotOperation(
- RocksDBKeyedStateBackend<?> stateBackend,
+ RocksDBKeyedStateBackend<K> stateBackend,
CheckpointStreamFactory checkpointStreamFactory,
long checkpointId,
long checkpointTimestamp) {
@@ -810,8 +807,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
closeableRegistry.registerClosable(outputStream);
- KeyedBackendSerializationProxy serializationProxy =
- new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfoSnapshots);
+ KeyedBackendSerializationProxy<K> serializationProxy =
+ new KeyedBackendSerializationProxy<>(stateBackend.keySerializer, stateMetaInfoSnapshots);
DataOutputView out = new DataOutputViewStreamWrapper(outputStream);
serializationProxy.write(out);
@@ -964,10 +961,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
LOG.info("Converting RocksDB state from old savepoint.");
restoreOldSavepointKeyedState(restoreState);
} else if (restoreState.iterator().next() instanceof IncrementalKeyedStateHandle) {
- RocksDBIncrementalRestoreOperation restoreOperation = new RocksDBIncrementalRestoreOperation(this);
+ RocksDBIncrementalRestoreOperation<K> restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
restoreOperation.restore(restoreState);
} else {
- RocksDBFullRestoreOperation restoreOperation = new RocksDBFullRestoreOperation(this);
+ RocksDBFullRestoreOperation<K> restoreOperation = new RocksDBFullRestoreOperation<>(this);
restoreOperation.doRestore(restoreState);
}
} catch (Exception ex) {
@@ -1037,9 +1034,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/**
* Encapsulates the process of restoring a RocksDBKeyedStateBackend from a snapshot.
*/
- static final class RocksDBFullRestoreOperation {
+ static final class RocksDBFullRestoreOperation<K> {
- private final RocksDBKeyedStateBackend<?> rocksDBKeyedStateBackend;
+ private final RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend;
/** Current key-groups state handle from which we restore key-groups */
private KeyGroupsStateHandle currentKeyGroupsStateHandle;
@@ -1055,7 +1052,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
*
* @param rocksDBKeyedStateBackend the state backend into which we restore
*/
- public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<?> rocksDBKeyedStateBackend) {
+ public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend) {
this.rocksDBKeyedStateBackend = Preconditions.checkNotNull(rocksDBKeyedStateBackend);
}
@@ -1116,11 +1113,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
* @throws ClassNotFoundException
* @throws RocksDBException
*/
- @SuppressWarnings("unchecked")
- private void restoreKVStateMetaData() throws IOException, ClassNotFoundException, RocksDBException {
+ private void restoreKVStateMetaData() throws IOException, RocksDBException {
- KeyedBackendSerializationProxy serializationProxy =
- new KeyedBackendSerializationProxy(rocksDBKeyedStateBackend.userCodeClassLoader);
+ KeyedBackendSerializationProxy<K> serializationProxy =
+ new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
serializationProxy.read(currentStateHandleInView);
@@ -1130,7 +1126,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
serializationProxy.getKeySerializer(),
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
serializationProxy.getKeySerializerConfigSnapshot(),
- (TypeSerializer) rocksDBKeyedStateBackend.keySerializer)
+ rocksDBKeyedStateBackend.keySerializer)
.isRequiresMigration()) {
// TODO replace with state migration; note that key hash codes need to remain the same after migration
@@ -1221,15 +1217,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
- private static class RocksDBIncrementalRestoreOperation {
+ private static class RocksDBIncrementalRestoreOperation<T> {
- private final RocksDBKeyedStateBackend<?> stateBackend;
+ private final RocksDBKeyedStateBackend<T> stateBackend;
- private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<?> stateBackend) {
+ private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) {
this.stateBackend = stateBackend;
}
- @SuppressWarnings("unchecked")
private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> readMetaData(
StreamStateHandle metaStateHandle) throws Exception {
@@ -1239,8 +1234,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
inputStream = metaStateHandle.openInputStream();
stateBackend.cancelStreamRegistry.registerClosable(inputStream);
- KeyedBackendSerializationProxy serializationProxy =
- new KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader);
+ KeyedBackendSerializationProxy<T> serializationProxy =
+ new KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader);
DataInputView in = new DataInputViewStreamWrapper(inputStream);
serializationProxy.read(in);
@@ -1250,7 +1245,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
serializationProxy.getKeySerializer(),
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
serializationProxy.getKeySerializerConfigSnapshot(),
- (TypeSerializer) stateBackend.keySerializer)
+ stateBackend.keySerializer)
.isRequiresMigration()) {
// TODO replace with state migration; note that key hash codes need to remain the same after migration
@@ -1536,7 +1531,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// TODO with eager registration in place, these checks should be moved to restore()
RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfo =
- restoredKvStateMetaInfos.get(descriptor.getName());
+ (RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>) restoredKvStateMetaInfos.get(descriptor.getName());
Preconditions.checkState(
newMetaInfo.getName().equals(restoredMetaInfo.getName()),
@@ -1556,7 +1551,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// check compatibility results to determine if state migration is required
- CompatibilityResult<N> namespaceCompatibility = StateMigrationUtil.resolveCompatibilityResult(
+ CompatibilityResult<?> namespaceCompatibility = StateMigrationUtil.resolveCompatibilityResult(
restoredMetaInfo.getNamespaceSerializer(),
MigrationNamespaceSerializerProxy.class,
restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
@@ -1929,7 +1924,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
new InstantiationUtil.ClassLoaderObjectInputStream(
new DataInputViewStream(inputView), userCodeClassLoader);
- StateDescriptor stateDescriptor = (StateDescriptor) ooIn.readObject();
+ StateDescriptor<?, ?> stateDescriptor = (StateDescriptor<?, ?>) ooIn.readObject();
columnFamilyMapping.put(mappingByte, stateDescriptor);
http://git-wip-us.apache.org/repos/asf/flink/blob/f2af1a9f/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
index 94fb9f1..f265f78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -39,11 +39,11 @@ import java.util.List;
* Serialization proxy for all meta data in keyed state backends. In the future we might also requiresMigration the actual state
* serialization logic here.
*/
-public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable {
+public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritable {
public static final int VERSION = 3;
- private TypeSerializer<?> keySerializer;
+ private TypeSerializer<K> keySerializer;
private TypeSerializerConfigSnapshot keySerializerConfigSnapshot;
private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots;
@@ -55,7 +55,7 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
}
public KeyedBackendSerializationProxy(
- TypeSerializer<?> keySerializer,
+ TypeSerializer<K> keySerializer,
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) {
this.keySerializer = Preconditions.checkNotNull(keySerializer);
@@ -70,7 +70,7 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
return stateMetaInfoSnapshots;
}
- public TypeSerializer<?> getKeySerializer() {
+ public TypeSerializer<K> getKeySerializer() {
return keySerializer;
}
@@ -122,7 +122,7 @@ public class KeyedBackendSerializationProxy extends VersionedIOReadableWritable
public void read(DataInputView in) throws IOException {
super.read(in);
- final TypeSerializerSerializationProxy<?> keySerializerProxy =
+ final TypeSerializerSerializationProxy<K> keySerializerProxy =
new TypeSerializerSerializationProxy<>(userCodeClassLoader);
// only starting from version 3, we have the key serializer and its config snapshot written
http://git-wip-us.apache.org/repos/asf/flink/blob/f2af1a9f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 6eb314b..3e5645b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -272,8 +272,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
- final KeyedBackendSerializationProxy serializationProxy =
- new KeyedBackendSerializationProxy(keySerializer, metaInfoSnapshots);
+ final KeyedBackendSerializationProxy<K> serializationProxy =
+ new KeyedBackendSerializationProxy<>(keySerializer, metaInfoSnapshots);
//--------------------------------------------------- this becomes the end of sync part
@@ -383,8 +383,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
try {
DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(fsDataInputStream);
- KeyedBackendSerializationProxy serializationProxy =
- new KeyedBackendSerializationProxy(userCodeClassLoader);
+ KeyedBackendSerializationProxy<K> serializationProxy =
+ new KeyedBackendSerializationProxy<>(userCodeClassLoader);
serializationProxy.read(inView);
@@ -395,7 +395,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
serializationProxy.getKeySerializer(),
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
serializationProxy.getKeySerializerConfigSnapshot(),
- (TypeSerializer) keySerializer)
+ keySerializer)
.isRequiresMigration()) {
// TODO replace with state migration; note that key hash codes need to remain the same after migration
@@ -405,7 +405,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
keySerializerRestored = true;
}
-
+
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
serializationProxy.getStateMetaInfoSnapshots();
http://git-wip-us.apache.org/repos/asf/flink/blob/f2af1a9f/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 8bbbd5f..3d5b210 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -66,8 +66,8 @@ public class SerializationProxiesTest {
stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer).snapshot());
- KeyedBackendSerializationProxy serializationProxy =
- new KeyedBackendSerializationProxy(keySerializer, stateMetaInfoList);
+ KeyedBackendSerializationProxy<?> serializationProxy =
+ new KeyedBackendSerializationProxy<>(keySerializer, stateMetaInfoList);
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
@@ -76,7 +76,7 @@ public class SerializationProxiesTest {
}
serializationProxy =
- new KeyedBackendSerializationProxy(Thread.currentThread().getContextClassLoader());
+ new KeyedBackendSerializationProxy<>(Thread.currentThread().getContextClassLoader());
try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
serializationProxy.read(new DataInputViewStreamWrapper(in));
@@ -103,8 +103,8 @@ public class SerializationProxiesTest {
stateMetaInfoList.add(new RegisteredKeyedBackendStateMetaInfo<>(
StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer).snapshot());
- KeyedBackendSerializationProxy serializationProxy =
- new KeyedBackendSerializationProxy(keySerializer, stateMetaInfoList);
+ KeyedBackendSerializationProxy<?> serializationProxy =
+ new KeyedBackendSerializationProxy<>(keySerializer, stateMetaInfoList);
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
@@ -113,7 +113,7 @@ public class SerializationProxiesTest {
}
serializationProxy =
- new KeyedBackendSerializationProxy(Thread.currentThread().getContextClassLoader());
+ new KeyedBackendSerializationProxy<>(Thread.currentThread().getContextClassLoader());
// mock failure when deserializing serializers
TypeSerializerSerializationProxy<?> mockProxy = mock(TypeSerializerSerializationProxy.class);