You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ha...@apache.org on 2024/01/15 02:14:58 UTC

(flink) 11/32: [FLINK-30613][serializer] Migrate DecimalDataSerializer to implement new method of resolving schema compatibility

This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9e6a8b47cfcf583928b7811e47912af0e3299cd8
Author: Hangxiang Yu <ma...@gmail.com>
AuthorDate: Tue Jan 24 10:17:55 2023 +0800

    [FLINK-30613][serializer] Migrate DecimalDataSerializer to implement new method of resolving schema compatibility
---
 .../serializers/python/DecimalDataSerializer.java  | 29 +++++++++++-----------
 .../runtime/typeutils/DecimalDataSerializer.java   | 29 +++++++++++-----------
 2 files changed, 30 insertions(+), 28 deletions(-)

diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/DecimalDataSerializer.java b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/DecimalDataSerializer.java
index 46c15023d95..e1043e97741 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/DecimalDataSerializer.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/DecimalDataSerializer.java
@@ -135,8 +135,8 @@ public class DecimalDataSerializer extends TypeSerializer<DecimalData> {
 
         private static final int CURRENT_VERSION = 1;
 
-        private int previousPrecision;
-        private int previousScale;
+        private int precision;
+        private int scale;
 
         @SuppressWarnings("unused")
         public DecimalSerializerSnapshot() {
@@ -144,8 +144,8 @@ public class DecimalDataSerializer extends TypeSerializer<DecimalData> {
         }
 
         DecimalSerializerSnapshot(int precision, int scale) {
-            this.previousPrecision = precision;
-            this.previousScale = scale;
+            this.precision = precision;
+            this.scale = scale;
         }
 
         @Override
@@ -155,32 +155,33 @@ public class DecimalDataSerializer extends TypeSerializer<DecimalData> {
 
         @Override
         public void writeSnapshot(DataOutputView out) throws IOException {
-            out.writeInt(previousPrecision);
-            out.writeInt(previousScale);
+            out.writeInt(precision);
+            out.writeInt(scale);
         }
 
         @Override
         public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader)
                 throws IOException {
-            this.previousPrecision = in.readInt();
-            this.previousScale = in.readInt();
+            this.precision = in.readInt();
+            this.scale = in.readInt();
         }
 
         @Override
         public TypeSerializer<DecimalData> restoreSerializer() {
-            return new DecimalDataSerializer(previousPrecision, previousScale);
+            return new DecimalDataSerializer(precision, scale);
         }
 
         @Override
         public TypeSerializerSchemaCompatibility<DecimalData> resolveSchemaCompatibility(
-                TypeSerializer<DecimalData> newSerializer) {
-            if (!(newSerializer instanceof DecimalDataSerializer)) {
+                TypeSerializerSnapshot<DecimalData> oldSerializerSnapshot) {
+            if (!(oldSerializerSnapshot instanceof DecimalSerializerSnapshot)) {
                 return TypeSerializerSchemaCompatibility.incompatible();
             }
 
-            DecimalDataSerializer newDecimalDataSerializer = (DecimalDataSerializer) newSerializer;
-            if (previousPrecision != newDecimalDataSerializer.precision
-                    || previousScale != newDecimalDataSerializer.scale) {
+            DecimalSerializerSnapshot oldDecimalSerializerSnapshot =
+                    (DecimalSerializerSnapshot) oldSerializerSnapshot;
+            if (precision != oldDecimalSerializerSnapshot.precision
+                    || scale != oldDecimalSerializerSnapshot.scale) {
                 return TypeSerializerSchemaCompatibility.incompatible();
             } else {
                 return TypeSerializerSchemaCompatibility.compatibleAsIs();
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/DecimalDataSerializer.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/DecimalDataSerializer.java
index 21e6f29778c..05f2757008b 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/DecimalDataSerializer.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/DecimalDataSerializer.java
@@ -145,8 +145,8 @@ public final class DecimalDataSerializer extends TypeSerializer<DecimalData> {
 
         private static final int CURRENT_VERSION = 3;
 
-        private int previousPrecision;
-        private int previousScale;
+        private int precision;
+        private int scale;
 
         @SuppressWarnings("unused")
         public DecimalSerializerSnapshot() {
@@ -154,8 +154,8 @@ public final class DecimalDataSerializer extends TypeSerializer<DecimalData> {
         }
 
         DecimalSerializerSnapshot(int precision, int scale) {
-            this.previousPrecision = precision;
-            this.previousScale = scale;
+            this.precision = precision;
+            this.scale = scale;
         }
 
         @Override
@@ -165,32 +165,33 @@ public final class DecimalDataSerializer extends TypeSerializer<DecimalData> {
 
         @Override
         public void writeSnapshot(DataOutputView out) throws IOException {
-            out.writeInt(previousPrecision);
-            out.writeInt(previousScale);
+            out.writeInt(precision);
+            out.writeInt(scale);
         }
 
         @Override
         public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader)
                 throws IOException {
-            this.previousPrecision = in.readInt();
-            this.previousScale = in.readInt();
+            this.precision = in.readInt();
+            this.scale = in.readInt();
         }
 
         @Override
         public TypeSerializer<DecimalData> restoreSerializer() {
-            return new DecimalDataSerializer(previousPrecision, previousScale);
+            return new DecimalDataSerializer(precision, scale);
         }
 
         @Override
         public TypeSerializerSchemaCompatibility<DecimalData> resolveSchemaCompatibility(
-                TypeSerializer<DecimalData> newSerializer) {
-            if (!(newSerializer instanceof DecimalDataSerializer)) {
+                TypeSerializerSnapshot<DecimalData> oldSerializerSnapshot) {
+            if (!(oldSerializerSnapshot instanceof DecimalSerializerSnapshot)) {
                 return TypeSerializerSchemaCompatibility.incompatible();
             }
 
-            DecimalDataSerializer newDecimalDataSerializer = (DecimalDataSerializer) newSerializer;
-            if (previousPrecision != newDecimalDataSerializer.precision
-                    || previousScale != newDecimalDataSerializer.scale) {
+            DecimalSerializerSnapshot oldDecimalSerializerSnapshot =
+                    (DecimalSerializerSnapshot) oldSerializerSnapshot;
+            if (precision != oldDecimalSerializerSnapshot.precision
+                    || scale != oldDecimalSerializerSnapshot.scale) {
                 return TypeSerializerSchemaCompatibility.incompatible();
             } else {
                 return TypeSerializerSchemaCompatibility.compatibleAsIs();