You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ha...@apache.org on 2024/01/15 02:15:04 UTC

(flink) 17/32: [FLINK-30613][serializer] Migrate ScalaEnumSerializerSnapshot to implement new method of resolving schema compatibility

This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2946bbeb2799ba0ed4a4417738d3000ab4f0b6ea
Author: Hangxiang Yu <ma...@gmail.com>
AuthorDate: Tue Jan 24 10:39:00 2023 +0800

    [FLINK-30613][serializer] Migrate ScalaEnumSerializerSnapshot to implement new method of resolving schema compatibility
---
 .../typeutils/ScalaEnumSerializerSnapshot.scala    | 33 +++++++++++-----------
 .../EnumValueSerializerCompatibilityTest.scala     |  2 +-
 .../scala/typeutils/EnumValueSerializerTest.scala  |  6 +++-
 3 files changed, 23 insertions(+), 18 deletions(-)

diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaEnumSerializerSnapshot.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaEnumSerializerSnapshot.scala
index f3742a0c841..2cac0129598 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaEnumSerializerSnapshot.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaEnumSerializerSnapshot.scala
@@ -26,30 +26,30 @@ import scala.collection.mutable.ListBuffer
 class ScalaEnumSerializerSnapshot[E <: Enumeration] extends TypeSerializerSnapshot[E#Value] {
 
   var enumClass: Class[E] = _
-  var previousEnumConstants: List[(String, Int)] = _
+  var enumConstants: List[(String, Int)] = _
 
   def this(enum: E) = {
     this()
     this.enumClass = Preconditions.checkNotNull(enum).getClass.asInstanceOf[Class[E]]
-    this.previousEnumConstants = enum.values.toList.map(x => (x.toString, x.id))
+    this.enumConstants = enum.values.toList.map(x => (x.toString, x.id))
   }
 
   def this(enumClass: Class[E], previousEnumConstants: List[(String, Int)]) = {
     this()
     this.enumClass = Preconditions.checkNotNull(enumClass)
-    this.previousEnumConstants = Preconditions.checkNotNull(previousEnumConstants)
+    this.enumConstants = Preconditions.checkNotNull(previousEnumConstants)
   }
 
   override def getCurrentVersion: Int = ScalaEnumSerializerSnapshot.VERSION
 
   override def writeSnapshot(out: DataOutputView): Unit = {
     Preconditions.checkState(enumClass != null)
-    Preconditions.checkState(previousEnumConstants != null)
+    Preconditions.checkState(enumConstants != null)
 
     out.writeUTF(enumClass.getName)
 
-    out.writeInt(previousEnumConstants.length)
-    for ((name, idx) <- previousEnumConstants) {
+    out.writeInt(enumConstants.length)
+    for ((name, idx) <- enumConstants) {
       out.writeUTF(name)
       out.writeInt(idx)
     }
@@ -71,7 +71,7 @@ class ScalaEnumSerializerSnapshot[E <: Enumeration] extends TypeSerializerSnapsh
       listBuffer += ((name, idx))
     }
 
-    previousEnumConstants = listBuffer.toList
+    enumConstants = listBuffer.toList
   }
 
   override def restoreSerializer(): TypeSerializer[E#Value] = {
@@ -80,25 +80,26 @@ class ScalaEnumSerializerSnapshot[E <: Enumeration] extends TypeSerializerSnapsh
     new EnumValueSerializer(enumObject)
   }
 
-  override def resolveSchemaCompatibility(
-      newSerializer: TypeSerializer[E#Value]): TypeSerializerSchemaCompatibility[E#Value] = {
+  override def resolveSchemaCompatibility(oldSerializerSnapshot: TypeSerializerSnapshot[E#Value])
+      : TypeSerializerSchemaCompatibility[E#Value] = {
 
     Preconditions.checkState(enumClass != null)
-    Preconditions.checkState(previousEnumConstants != null)
+    Preconditions.checkState(enumConstants != null)
 
-    if (!newSerializer.isInstanceOf[EnumValueSerializer[E]]) {
+    if (!oldSerializerSnapshot.isInstanceOf[ScalaEnumSerializerSnapshot[E]]) {
       return TypeSerializerSchemaCompatibility.incompatible()
     }
 
-    val newEnumSerializer = newSerializer.asInstanceOf[EnumValueSerializer[E]]
-    if (!enumClass.equals(newEnumSerializer.enum.getClass)) {
+    val oldEnumSerializerSnapshot =
+      oldSerializerSnapshot.asInstanceOf[ScalaEnumSerializerSnapshot[E]]
+    if (!enumClass.equals(oldEnumSerializerSnapshot.enumClass)) {
       return TypeSerializerSchemaCompatibility.incompatible()
     }
 
-    for ((previousEnumName, index) <- previousEnumConstants) {
+    for ((oldEnumName, index) <- oldEnumSerializerSnapshot.enumConstants) {
       try {
-        val newEnumName = newEnumSerializer.enum(index).toString
-        if (previousEnumName != newEnumName) {
+        val enumName = enumConstants(index)._1
+        if (enumName != oldEnumName) {
           return TypeSerializerSchemaCompatibility.incompatible()
         }
       } catch {
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerCompatibilityTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerCompatibilityTest.scala
index 228a816368d..c5e4b329735 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerCompatibilityTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerCompatibilityTest.scala
@@ -141,7 +141,7 @@ class EnumValueSerializerCompatibilityTest {
     val enum2 = instantiateEnum[Enumeration](classLoader2, enumName)
 
     val enumValueSerializer2 = new EnumValueSerializer(enum2)
-    snapshot2.resolveSchemaCompatibility(enumValueSerializer2)
+    enumValueSerializer2.snapshotConfiguration().resolveSchemaCompatibility(snapshot2)
   }
 }
 
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerTest.scala
index 9a9faf9c369..a564eac50c9 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerTest.scala
@@ -32,7 +32,11 @@ class EnumValueSerializerTest {
 
     val snapshot = enumSerializer.snapshotConfiguration()
 
-    assertThat(snapshot.resolveSchemaCompatibility(enumSerializer).isCompatibleAsIs).isTrue
+    assertThat(
+      enumSerializer
+        .snapshotConfiguration()
+        .resolveSchemaCompatibility(snapshot)
+        .isCompatibleAsIs).isTrue
   }
 }