You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ha...@apache.org on 2024/01/15 02:15:08 UTC

(flink) 21/32: [FLINK-30613][serializer] Migrate SortedMapSerializerSnapshot to implement new method of resolving schema compatibility

This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ac16648281c014c33b2d6fe50688944035ec2240
Author: Hangxiang Yu <ma...@gmail.com>
AuthorDate: Tue Jan 24 10:41:45 2023 +0800

    [FLINK-30613][serializer] Migrate SortedMapSerializerSnapshot to implement new method of resolving schema compatibility
---
 .../table/runtime/typeutils/SortedMapSerializerSnapshot.java     | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)

diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/SortedMapSerializerSnapshot.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/SortedMapSerializerSnapshot.java
index fbcbd35a915..4ebf48e4ce7 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/SortedMapSerializerSnapshot.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/SortedMapSerializerSnapshot.java
@@ -100,12 +100,13 @@ public class SortedMapSerializerSnapshot<K, V> implements TypeSerializerSnapshot
 
     @Override
     public TypeSerializerSchemaCompatibility<SortedMap<K, V>> resolveSchemaCompatibility(
-            TypeSerializer<SortedMap<K, V>> newSerializer) {
-        if (!(newSerializer instanceof SortedMapSerializer)) {
+            TypeSerializerSnapshot<SortedMap<K, V>> oldSerializerSnapshot) {
+        if (!(oldSerializerSnapshot instanceof SortedMapSerializerSnapshot)) {
             return TypeSerializerSchemaCompatibility.incompatible();
         }
-        SortedMapSerializer newSortedMapSerializer = (SortedMapSerializer) newSerializer;
-        if (!comparator.equals(newSortedMapSerializer.getComparator())) {
+        SortedMapSerializerSnapshot<K, V> oldSortedMapSerializerSnapshot =
+                (SortedMapSerializerSnapshot<K, V>) oldSerializerSnapshot;
+        if (!comparator.equals(oldSortedMapSerializerSnapshot.comparator)) {
             return TypeSerializerSchemaCompatibility.incompatible();
         } else {
             return TypeSerializerSchemaCompatibility.compatibleAsIs();