You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/02/05 17:41:20 UTC
[flink] 04/09: [hotfix] Add a wildcard type in heap state backend
related classes
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e3b777ecf47ecd16ddffcc7f24eb6f43f4818487
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Jan 13 17:07:41 2021 +0100
[hotfix] Add a wildcard type in heap state backend related classes
---
.../org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java | 4 ++--
.../flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java | 5 +++--
.../org/apache/flink/runtime/state/heap/HeapRestoreOperation.java | 4 ++--
.../org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java | 4 ++--
4 files changed, 9 insertions(+), 8 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 84fb4b2..c81bb68 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -97,7 +97,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
/** Map of registered priority queue set states. */
- private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates;
+ private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates;
/** The configuration for local recovery. */
private final LocalRecoveryConfig localRecoveryConfig;
@@ -122,7 +122,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
CloseableRegistry cancelStreamRegistry,
StreamCompressionDecorator keyGroupCompressionDecorator,
Map<String, StateTable<K, ?, ?>> registeredKVStates,
- Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates,
+ Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
LocalRecoveryConfig localRecoveryConfig,
HeapPriorityQueueSetFactory priorityQueueSetFactory,
SnapshotStrategyRunner<KeyedStateHandle, ?> snapshotStrategyRunner,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
index 9be1ded..12bbc05 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
@@ -89,7 +89,8 @@ public class HeapKeyedStateBackendBuilder<K> extends AbstractKeyedStateBackendBu
// Map of registered Key/Value states
Map<String, StateTable<K, ?, ?>> registeredKVStates = new HashMap<>();
// Map of registered priority queue set states
- Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates = new HashMap<>();
+ Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates =
+ new HashMap<>();
CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
HeapSnapshotStrategy<K> snapshotStrategy =
initSnapshotStrategy(registeredKVStates, registeredPQStates);
@@ -145,7 +146,7 @@ public class HeapKeyedStateBackendBuilder<K> extends AbstractKeyedStateBackendBu
private HeapSnapshotStrategy<K> initSnapshotStrategy(
Map<String, StateTable<K, ?, ?>> registeredKVStates,
- Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates) {
+ Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates) {
return new HeapSnapshotStrategy<>(
registeredKVStates,
registeredPQStates,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
index 666254a..86a7491 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
@@ -73,7 +73,7 @@ public class HeapRestoreOperation<K> implements RestoreOperation<Void> {
private final StateSerializerProvider<K> keySerializerProvider;
private final ClassLoader userCodeClassLoader;
private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
- private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates;
+ private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates;
private final CloseableRegistry cancelStreamRegistry;
private final HeapPriorityQueueSetFactory priorityQueueSetFactory;
@Nonnull private final KeyGroupRange keyGroupRange;
@@ -86,7 +86,7 @@ public class HeapRestoreOperation<K> implements RestoreOperation<Void> {
StateSerializerProvider<K> keySerializerProvider,
ClassLoader userCodeClassLoader,
Map<String, StateTable<K, ?, ?>> registeredKVStates,
- Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates,
+ Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
CloseableRegistry cancelStreamRegistry,
HeapPriorityQueueSetFactory priorityQueueSetFactory,
@Nonnull KeyGroupRange keyGroupRange,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
index 71687c7..4010422 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
@@ -62,7 +62,7 @@ class HeapSnapshotStrategy<K>
implements SnapshotStrategy<KeyedStateHandle, HeapSnapshotStrategy.HeapSnapshotResources> {
private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
- private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates;
+ private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates;
private final StreamCompressionDecorator keyGroupCompressionDecorator;
private final LocalRecoveryConfig localRecoveryConfig;
private final KeyGroupRange keyGroupRange;
@@ -70,7 +70,7 @@ class HeapSnapshotStrategy<K>
HeapSnapshotStrategy(
Map<String, StateTable<K, ?, ?>> registeredKVStates,
- Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates,
+ Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
StreamCompressionDecorator keyGroupCompressionDecorator,
LocalRecoveryConfig localRecoveryConfig,
KeyGroupRange keyGroupRange,