You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ha...@apache.org on 2024/01/15 02:14:58 UTC
(flink) 11/32: [FLINK-30613][serializer] Migrate DecimalDataSerializer to implement new method of resolving schema compatibility
This is an automated email from the ASF dual-hosted git repository.
hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9e6a8b47cfcf583928b7811e47912af0e3299cd8
Author: Hangxiang Yu <ma...@gmail.com>
AuthorDate: Tue Jan 24 10:17:55 2023 +0800
[FLINK-30613][serializer] Migrate DecimalDataSerializer to implement new method of resolving schema compatibility
---
.../serializers/python/DecimalDataSerializer.java | 29 +++++++++++-----------
.../runtime/typeutils/DecimalDataSerializer.java | 29 +++++++++++-----------
2 files changed, 30 insertions(+), 28 deletions(-)
diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/DecimalDataSerializer.java b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/DecimalDataSerializer.java
index 46c15023d95..e1043e97741 100644
--- a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/DecimalDataSerializer.java
+++ b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/serializers/python/DecimalDataSerializer.java
@@ -135,8 +135,8 @@ public class DecimalDataSerializer extends TypeSerializer<DecimalData> {
private static final int CURRENT_VERSION = 1;
- private int previousPrecision;
- private int previousScale;
+ private int precision;
+ private int scale;
@SuppressWarnings("unused")
public DecimalSerializerSnapshot() {
@@ -144,8 +144,8 @@ public class DecimalDataSerializer extends TypeSerializer<DecimalData> {
}
DecimalSerializerSnapshot(int precision, int scale) {
- this.previousPrecision = precision;
- this.previousScale = scale;
+ this.precision = precision;
+ this.scale = scale;
}
@Override
@@ -155,32 +155,33 @@ public class DecimalDataSerializer extends TypeSerializer<DecimalData> {
@Override
public void writeSnapshot(DataOutputView out) throws IOException {
- out.writeInt(previousPrecision);
- out.writeInt(previousScale);
+ out.writeInt(precision);
+ out.writeInt(scale);
}
@Override
public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader)
throws IOException {
- this.previousPrecision = in.readInt();
- this.previousScale = in.readInt();
+ this.precision = in.readInt();
+ this.scale = in.readInt();
}
@Override
public TypeSerializer<DecimalData> restoreSerializer() {
- return new DecimalDataSerializer(previousPrecision, previousScale);
+ return new DecimalDataSerializer(precision, scale);
}
@Override
public TypeSerializerSchemaCompatibility<DecimalData> resolveSchemaCompatibility(
- TypeSerializer<DecimalData> newSerializer) {
- if (!(newSerializer instanceof DecimalDataSerializer)) {
+ TypeSerializerSnapshot<DecimalData> oldSerializerSnapshot) {
+ if (!(oldSerializerSnapshot instanceof DecimalSerializerSnapshot)) {
return TypeSerializerSchemaCompatibility.incompatible();
}
- DecimalDataSerializer newDecimalDataSerializer = (DecimalDataSerializer) newSerializer;
- if (previousPrecision != newDecimalDataSerializer.precision
- || previousScale != newDecimalDataSerializer.scale) {
+ DecimalSerializerSnapshot oldDecimalSerializerSnapshot =
+ (DecimalSerializerSnapshot) oldSerializerSnapshot;
+ if (precision != oldDecimalSerializerSnapshot.precision
+ || scale != oldDecimalSerializerSnapshot.scale) {
return TypeSerializerSchemaCompatibility.incompatible();
} else {
return TypeSerializerSchemaCompatibility.compatibleAsIs();
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/DecimalDataSerializer.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/DecimalDataSerializer.java
index 21e6f29778c..05f2757008b 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/DecimalDataSerializer.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/typeutils/DecimalDataSerializer.java
@@ -145,8 +145,8 @@ public final class DecimalDataSerializer extends TypeSerializer<DecimalData> {
private static final int CURRENT_VERSION = 3;
- private int previousPrecision;
- private int previousScale;
+ private int precision;
+ private int scale;
@SuppressWarnings("unused")
public DecimalSerializerSnapshot() {
@@ -154,8 +154,8 @@ public final class DecimalDataSerializer extends TypeSerializer<DecimalData> {
}
DecimalSerializerSnapshot(int precision, int scale) {
- this.previousPrecision = precision;
- this.previousScale = scale;
+ this.precision = precision;
+ this.scale = scale;
}
@Override
@@ -165,32 +165,33 @@ public final class DecimalDataSerializer extends TypeSerializer<DecimalData> {
@Override
public void writeSnapshot(DataOutputView out) throws IOException {
- out.writeInt(previousPrecision);
- out.writeInt(previousScale);
+ out.writeInt(precision);
+ out.writeInt(scale);
}
@Override
public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader)
throws IOException {
- this.previousPrecision = in.readInt();
- this.previousScale = in.readInt();
+ this.precision = in.readInt();
+ this.scale = in.readInt();
}
@Override
public TypeSerializer<DecimalData> restoreSerializer() {
- return new DecimalDataSerializer(previousPrecision, previousScale);
+ return new DecimalDataSerializer(precision, scale);
}
@Override
public TypeSerializerSchemaCompatibility<DecimalData> resolveSchemaCompatibility(
- TypeSerializer<DecimalData> newSerializer) {
- if (!(newSerializer instanceof DecimalDataSerializer)) {
+ TypeSerializerSnapshot<DecimalData> oldSerializerSnapshot) {
+ if (!(oldSerializerSnapshot instanceof DecimalSerializerSnapshot)) {
return TypeSerializerSchemaCompatibility.incompatible();
}
- DecimalDataSerializer newDecimalDataSerializer = (DecimalDataSerializer) newSerializer;
- if (previousPrecision != newDecimalDataSerializer.precision
- || previousScale != newDecimalDataSerializer.scale) {
+ DecimalSerializerSnapshot oldDecimalSerializerSnapshot =
+ (DecimalSerializerSnapshot) oldSerializerSnapshot;
+ if (precision != oldDecimalSerializerSnapshot.precision
+ || scale != oldDecimalSerializerSnapshot.scale) {
return TypeSerializerSchemaCompatibility.incompatible();
} else {
return TypeSerializerSchemaCompatibility.compatibleAsIs();