You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/02/05 17:41:20 UTC

[flink] 04/09: [hotfix] Add a wildcard type in heap state backend related classes

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e3b777ecf47ecd16ddffcc7f24eb6f43f4818487
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Jan 13 17:07:41 2021 +0100

    [hotfix] Add a wildcard type in heap state backend related classes
---
 .../org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java   | 4 ++--
 .../flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java       | 5 +++--
 .../org/apache/flink/runtime/state/heap/HeapRestoreOperation.java    | 4 ++--
 .../org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java    | 4 ++--
 4 files changed, 9 insertions(+), 8 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 84fb4b2..c81bb68 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -97,7 +97,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
     private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
 
     /** Map of registered priority queue set states. */
-    private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates;
+    private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates;
 
     /** The configuration for local recovery. */
     private final LocalRecoveryConfig localRecoveryConfig;
@@ -122,7 +122,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
             CloseableRegistry cancelStreamRegistry,
             StreamCompressionDecorator keyGroupCompressionDecorator,
             Map<String, StateTable<K, ?, ?>> registeredKVStates,
-            Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates,
+            Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
             LocalRecoveryConfig localRecoveryConfig,
             HeapPriorityQueueSetFactory priorityQueueSetFactory,
             SnapshotStrategyRunner<KeyedStateHandle, ?> snapshotStrategyRunner,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
index 9be1ded..12bbc05 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendBuilder.java
@@ -89,7 +89,8 @@ public class HeapKeyedStateBackendBuilder<K> extends AbstractKeyedStateBackendBu
         // Map of registered Key/Value states
         Map<String, StateTable<K, ?, ?>> registeredKVStates = new HashMap<>();
         // Map of registered priority queue set states
-        Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates = new HashMap<>();
+        Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates =
+                new HashMap<>();
         CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
         HeapSnapshotStrategy<K> snapshotStrategy =
                 initSnapshotStrategy(registeredKVStates, registeredPQStates);
@@ -145,7 +146,7 @@ public class HeapKeyedStateBackendBuilder<K> extends AbstractKeyedStateBackendBu
 
     private HeapSnapshotStrategy<K> initSnapshotStrategy(
             Map<String, StateTable<K, ?, ?>> registeredKVStates,
-            Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates) {
+            Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates) {
         return new HeapSnapshotStrategy<>(
                 registeredKVStates,
                 registeredPQStates,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
index 666254a..86a7491 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapRestoreOperation.java
@@ -73,7 +73,7 @@ public class HeapRestoreOperation<K> implements RestoreOperation<Void> {
     private final StateSerializerProvider<K> keySerializerProvider;
     private final ClassLoader userCodeClassLoader;
     private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
-    private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates;
+    private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates;
     private final CloseableRegistry cancelStreamRegistry;
     private final HeapPriorityQueueSetFactory priorityQueueSetFactory;
     @Nonnull private final KeyGroupRange keyGroupRange;
@@ -86,7 +86,7 @@ public class HeapRestoreOperation<K> implements RestoreOperation<Void> {
             StateSerializerProvider<K> keySerializerProvider,
             ClassLoader userCodeClassLoader,
             Map<String, StateTable<K, ?, ?>> registeredKVStates,
-            Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates,
+            Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
             CloseableRegistry cancelStreamRegistry,
             HeapPriorityQueueSetFactory priorityQueueSetFactory,
             @Nonnull KeyGroupRange keyGroupRange,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
index 71687c7..4010422 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSnapshotStrategy.java
@@ -62,7 +62,7 @@ class HeapSnapshotStrategy<K>
         implements SnapshotStrategy<KeyedStateHandle, HeapSnapshotStrategy.HeapSnapshotResources> {
 
     private final Map<String, StateTable<K, ?, ?>> registeredKVStates;
-    private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates;
+    private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates;
     private final StreamCompressionDecorator keyGroupCompressionDecorator;
     private final LocalRecoveryConfig localRecoveryConfig;
     private final KeyGroupRange keyGroupRange;
@@ -70,7 +70,7 @@ class HeapSnapshotStrategy<K>
 
     HeapSnapshotStrategy(
             Map<String, StateTable<K, ?, ?>> registeredKVStates,
-            Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates,
+            Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates,
             StreamCompressionDecorator keyGroupCompressionDecorator,
             LocalRecoveryConfig localRecoveryConfig,
             KeyGroupRange keyGroupRange,