You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ha...@apache.org on 2024/01/15 02:15:04 UTC
(flink) 17/32: [FLINK-30613][serializer] Migrate ScalaEnumSerializerSnapshot to implement new method of resolving schema compatibility
This is an automated email from the ASF dual-hosted git repository.
hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2946bbeb2799ba0ed4a4417738d3000ab4f0b6ea
Author: Hangxiang Yu <ma...@gmail.com>
AuthorDate: Tue Jan 24 10:39:00 2023 +0800
[FLINK-30613][serializer] Migrate ScalaEnumSerializerSnapshot to implement new method of resolving schema compatibility
---
.../typeutils/ScalaEnumSerializerSnapshot.scala | 33 +++++++++++-----------
.../EnumValueSerializerCompatibilityTest.scala | 2 +-
.../scala/typeutils/EnumValueSerializerTest.scala | 6 +++-
3 files changed, 23 insertions(+), 18 deletions(-)
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaEnumSerializerSnapshot.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaEnumSerializerSnapshot.scala
index f3742a0c841..2cac0129598 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaEnumSerializerSnapshot.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaEnumSerializerSnapshot.scala
@@ -26,30 +26,30 @@ import scala.collection.mutable.ListBuffer
class ScalaEnumSerializerSnapshot[E <: Enumeration] extends TypeSerializerSnapshot[E#Value] {
var enumClass: Class[E] = _
- var previousEnumConstants: List[(String, Int)] = _
+ var enumConstants: List[(String, Int)] = _
def this(enum: E) = {
this()
this.enumClass = Preconditions.checkNotNull(enum).getClass.asInstanceOf[Class[E]]
- this.previousEnumConstants = enum.values.toList.map(x => (x.toString, x.id))
+ this.enumConstants = enum.values.toList.map(x => (x.toString, x.id))
}
def this(enumClass: Class[E], previousEnumConstants: List[(String, Int)]) = {
this()
this.enumClass = Preconditions.checkNotNull(enumClass)
- this.previousEnumConstants = Preconditions.checkNotNull(previousEnumConstants)
+ this.enumConstants = Preconditions.checkNotNull(previousEnumConstants)
}
override def getCurrentVersion: Int = ScalaEnumSerializerSnapshot.VERSION
override def writeSnapshot(out: DataOutputView): Unit = {
Preconditions.checkState(enumClass != null)
- Preconditions.checkState(previousEnumConstants != null)
+ Preconditions.checkState(enumConstants != null)
out.writeUTF(enumClass.getName)
- out.writeInt(previousEnumConstants.length)
- for ((name, idx) <- previousEnumConstants) {
+ out.writeInt(enumConstants.length)
+ for ((name, idx) <- enumConstants) {
out.writeUTF(name)
out.writeInt(idx)
}
@@ -71,7 +71,7 @@ class ScalaEnumSerializerSnapshot[E <: Enumeration] extends TypeSerializerSnapsh
listBuffer += ((name, idx))
}
- previousEnumConstants = listBuffer.toList
+ enumConstants = listBuffer.toList
}
override def restoreSerializer(): TypeSerializer[E#Value] = {
@@ -80,25 +80,26 @@ class ScalaEnumSerializerSnapshot[E <: Enumeration] extends TypeSerializerSnapsh
new EnumValueSerializer(enumObject)
}
- override def resolveSchemaCompatibility(
- newSerializer: TypeSerializer[E#Value]): TypeSerializerSchemaCompatibility[E#Value] = {
+ override def resolveSchemaCompatibility(oldSerializerSnapshot: TypeSerializerSnapshot[E#Value])
+ : TypeSerializerSchemaCompatibility[E#Value] = {
Preconditions.checkState(enumClass != null)
- Preconditions.checkState(previousEnumConstants != null)
+ Preconditions.checkState(enumConstants != null)
- if (!newSerializer.isInstanceOf[EnumValueSerializer[E]]) {
+ if (!oldSerializerSnapshot.isInstanceOf[ScalaEnumSerializerSnapshot[E]]) {
return TypeSerializerSchemaCompatibility.incompatible()
}
- val newEnumSerializer = newSerializer.asInstanceOf[EnumValueSerializer[E]]
- if (!enumClass.equals(newEnumSerializer.enum.getClass)) {
+ val oldEnumSerializerSnapshot =
+ oldSerializerSnapshot.asInstanceOf[ScalaEnumSerializerSnapshot[E]]
+ if (!enumClass.equals(oldEnumSerializerSnapshot.enumClass)) {
return TypeSerializerSchemaCompatibility.incompatible()
}
- for ((previousEnumName, index) <- previousEnumConstants) {
+ for ((oldEnumName, index) <- oldEnumSerializerSnapshot.enumConstants) {
try {
- val newEnumName = newEnumSerializer.enum(index).toString
- if (previousEnumName != newEnumName) {
+ val enumName = enumConstants(index)._1
+ if (enumName != oldEnumName) {
return TypeSerializerSchemaCompatibility.incompatible()
}
} catch {
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerCompatibilityTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerCompatibilityTest.scala
index 228a816368d..c5e4b329735 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerCompatibilityTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerCompatibilityTest.scala
@@ -141,7 +141,7 @@ class EnumValueSerializerCompatibilityTest {
val enum2 = instantiateEnum[Enumeration](classLoader2, enumName)
val enumValueSerializer2 = new EnumValueSerializer(enum2)
- snapshot2.resolveSchemaCompatibility(enumValueSerializer2)
+ enumValueSerializer2.snapshotConfiguration().resolveSchemaCompatibility(snapshot2)
}
}
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerTest.scala
index 9a9faf9c369..a564eac50c9 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerTest.scala
@@ -32,7 +32,11 @@ class EnumValueSerializerTest {
val snapshot = enumSerializer.snapshotConfiguration()
- assertThat(snapshot.resolveSchemaCompatibility(enumSerializer).isCompatibleAsIs).isTrue
+ assertThat(
+ enumSerializer
+ .snapshotConfiguration()
+ .resolveSchemaCompatibility(snapshot)
+ .isCompatibleAsIs).isTrue
}
}