You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/02/24 16:20:57 UTC

[flink] 02/09: [hotfix] Cleanup raw types around PriorityQueueSetFactory

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7f3aa390892bed6e00ab254e311f6a46c623a1d5
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Feb 2 17:37:54 2021 +0100

    [hotfix] Cleanup raw types around PriorityQueueSetFactory
---
 .../org/apache/flink/runtime/state/PriorityQueueSetFactory.java   | 2 +-
 .../apache/flink/runtime/state/heap/HeapKeyedStateBackend.java    | 8 ++++----
 .../flink/runtime/state/heap/HeapPriorityQueueSetFactory.java     | 2 +-
 .../flink/runtime/state/ttl/mock/MockKeyedStateBackend.java       | 2 +-
 .../flink/contrib/streaming/state/RocksDBKeyedStateBackend.java   | 2 +-
 .../contrib/streaming/state/RocksDBPriorityQueueSetFactory.java   | 2 +-
 .../operators/sorted/state/BatchExecutionKeyedStateBackend.java   | 4 ++--
 7 files changed, 11 insertions(+), 11 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
index baeb591..96ce98b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PriorityQueueSetFactory.java
@@ -36,7 +36,7 @@ public interface PriorityQueueSetFactory {
      * @return the queue with the specified unique name.
      */
     @Nonnull
-    <T extends HeapPriorityQueueElement & PriorityComparable & Keyed>
+    <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
             KeyGroupedInternalPriorityQueue<T> create(
                     @Nonnull String stateName,
                     @Nonnull TypeSerializer<T> byteOrderedElementSerializer);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 0b42a32..8e6c356 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -157,13 +157,13 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
     @SuppressWarnings("unchecked")
     @Nonnull
     @Override
-    public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed>
+    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
             KeyGroupedInternalPriorityQueue<T> create(
                     @Nonnull String stateName,
                     @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
 
-        final HeapPriorityQueueSnapshotRestoreWrapper existingState =
-                registeredPQStates.get(stateName);
+        final HeapPriorityQueueSnapshotRestoreWrapper<T> existingState =
+                (HeapPriorityQueueSnapshotRestoreWrapper<T>) registeredPQStates.get(stateName);
 
         if (existingState != null) {
             // TODO we implement the simple way of supporting the current functionality, mimicking
@@ -197,7 +197,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
     }
 
     @Nonnull
-    private <T extends HeapPriorityQueueElement & PriorityComparable & Keyed>
+    private <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
             KeyGroupedInternalPriorityQueue<T> createInternal(
                     RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo) {
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java
index 8074c1a..6646d5f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.java
@@ -50,7 +50,7 @@ public class HeapPriorityQueueSetFactory implements PriorityQueueSetFactory {
 
     @Nonnull
     @Override
-    public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed>
+    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
             HeapPriorityQueueSet<T> create(
                     @Nonnull String stateName,
                     @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
index d3d3757..c946365 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
@@ -278,7 +278,7 @@ public class MockKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
     @Nonnull
     @Override
-    public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed>
+    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
             KeyGroupedInternalPriorityQueue<T> create(
                     @Nonnull String stateName,
                     @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index d6c7cff..8aab8b8 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -455,7 +455,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
     @Nonnull
     @Override
-    public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed>
+    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
             KeyGroupedInternalPriorityQueue<T> create(
                     @Nonnull String stateName,
                     @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
index 717c2b8..fb063a7 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
@@ -98,7 +98,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
 
     @Nonnull
     @Override
-    public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed>
+    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
             KeyGroupedInternalPriorityQueue<T> create(
                     @Nonnull String stateName,
                     @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java
index 0bf5a5a..128abb6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.java
@@ -244,8 +244,8 @@ class BatchExecutionKeyedStateBackend<K> implements CheckpointableKeyedStateBack
 
     @Nonnull
     @Override
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed>
+    @SuppressWarnings({"unchecked"})
+    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
             KeyGroupedInternalPriorityQueue<T> create(
                     @Nonnull String stateName,
                     @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {