You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2020/05/28 11:52:06 UTC
[flink] 40/42: [FLINK-13632] Port EnumValueSerializer upgrade test
to TypeSerializerUpgradeTestBase
This is an automated email from the ASF dual-hosted git repository.
aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit da1a402e3d8b7c696e034e466f70a20d5335cc13
Author: klion26 <qc...@gmail.com>
AuthorDate: Wed Apr 15 19:49:58 2020 +0800
[FLINK-13632] Port EnumValueSerializer upgrade test to TypeSerializerUpgradeTestBase
---
.../resources/flink-1.6-scala-enum-serializer-data | Bin 40 -> 0 bytes
.../flink-1.6-scala-enum-serializer-snapshot | Bin 1865 -> 0 bytes
.../resources/flink-1.7-scala-enum-serializer-data | Bin 40 -> 0 bytes
.../flink-1.7-scala-enum-serializer-snapshot | Bin 1853 -> 0 bytes
... => EnumValueSerializerCompatibilityTest.scala} | 6 +-
.../EnumValueSerializerSnapshotMigrationTest.scala | 61 ------
.../typeutils/EnumValueSerializerUpgradeTest.scala | 231 +++++----------------
7 files changed, 57 insertions(+), 241 deletions(-)
diff --git a/flink-scala/src/test/resources/flink-1.6-scala-enum-serializer-data b/flink-scala/src/test/resources/flink-1.6-scala-enum-serializer-data
deleted file mode 100644
index 6459c29..0000000
Binary files a/flink-scala/src/test/resources/flink-1.6-scala-enum-serializer-data and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.6-scala-enum-serializer-snapshot b/flink-scala/src/test/resources/flink-1.6-scala-enum-serializer-snapshot
deleted file mode 100644
index 3cf1738..0000000
Binary files a/flink-scala/src/test/resources/flink-1.6-scala-enum-serializer-snapshot and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.7-scala-enum-serializer-data b/flink-scala/src/test/resources/flink-1.7-scala-enum-serializer-data
deleted file mode 100644
index 6459c29..0000000
Binary files a/flink-scala/src/test/resources/flink-1.7-scala-enum-serializer-data and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.7-scala-enum-serializer-snapshot b/flink-scala/src/test/resources/flink-1.7-scala-enum-serializer-snapshot
deleted file mode 100644
index 99ea254..0000000
Binary files a/flink-scala/src/test/resources/flink-1.7-scala-enum-serializer-snapshot and /dev/null differ
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerCompatibilityTest.scala
similarity index 97%
copy from flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
copy to flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerCompatibilityTest.scala
index 97c2b05..c3cdda4 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerCompatibilityTest.scala
@@ -33,7 +33,7 @@ import scala.reflect.NameTransformer
import scala.tools.nsc.reporters.ConsoleReporter
import scala.tools.nsc.{GenericRunnerSettings, Global}
-class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike {
+class EnumValueSerializerCompatibilityTest extends TestLogger with JUnitSuiteLike {
private val _tempFolder = new TemporaryFolder()
@@ -123,7 +123,7 @@ class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike {
def checkCompatibility(enumSourceA: String, enumSourceB: String)
: TypeSerializerSchemaCompatibility[Enumeration#Value] = {
- import EnumValueSerializerUpgradeTest._
+ import EnumValueSerializerCompatibilityTest._
val classLoader = compileAndLoadEnum(tempFolder.newFolder(), s"$enumName.scala", enumSourceA)
@@ -156,7 +156,7 @@ class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike {
}
}
-object EnumValueSerializerUpgradeTest {
+object EnumValueSerializerCompatibilityTest {
def compileAndLoadEnum(root: File, filename: String, source: String): ClassLoader = {
val file = writeSourceFile(root, filename, source)
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerSnapshotMigrationTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerSnapshotMigrationTest.scala
deleted file mode 100644
index daff774..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerSnapshotMigrationTest.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.typeutils
-
-import java.util
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshotMigrationTestBase}
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase.{TestSpecification, TestSpecifications}
-import org.apache.flink.api.scala.createTypeInformation
-import org.apache.flink.testutils.migration.MigrationVersion
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-/**
- * Migration tests for the [[EnumValueSerializer]].
- */
-@RunWith(classOf[Parameterized])
-class EnumValueSerializerSnapshotMigrationTest(
- spec: TestSpecification[Letters.Value])
-extends TypeSerializerSnapshotMigrationTestBase[Letters.Value](spec) {}
-
-object EnumValueSerializerSnapshotMigrationTest {
-
- private val supplier =
- new util.function.Supplier[EnumValueSerializer[Letters.type]] {
- override def get(): EnumValueSerializer[Letters.type] =
- new EnumValueSerializer(Letters)
- }
-
- @Parameterized.Parameters(name = "Test Specification = {0}")
- def testSpecifications(): util.Collection[TestSpecification[_]] = {
- val spec =
- new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7)
-
- spec.add(
- "scala-enum-serializer",
- classOf[EnumValueSerializer[Letters.Value]],
- classOf[ScalaEnumSerializerSnapshot[Letters.Value]],
- supplier
- )
-
- spec.get()
- }
-}
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
index 97c2b05..9c96df6 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
@@ -18,191 +18,68 @@
package org.apache.flink.api.scala.typeutils
-import java.io._
-import java.net.{URL, URLClassLoader}
-
-import org.apache.flink.api.common.typeutils.{TypeSerializerSchemaCompatibility, TypeSerializerSnapshotSerializationUtil}
-import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper}
-import org.apache.flink.util.TestLogger
-import org.junit.rules.TemporaryFolder
-import org.junit.{Rule, Test}
-import org.junit.Assert._
-import org.scalatest.junit.JUnitSuiteLike
-
-import scala.reflect.NameTransformer
-import scala.tools.nsc.reporters.ConsoleReporter
-import scala.tools.nsc.{GenericRunnerSettings, Global}
-
-class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike {
-
- private val _tempFolder = new TemporaryFolder()
-
- @Rule
- def tempFolder = _tempFolder
-
- val enumName = "EnumValueSerializerUpgradeTestEnum"
-
- val enumA =
- s"""
- |object $enumName extends Enumeration {
- | val A, B, C = Value
- |}
- """.stripMargin
-
- val enumB =
- s"""
- |object $enumName extends Enumeration {
- | val A, B, C, D = Value
- |}
- """.stripMargin
-
- val enumC =
- s"""
- |object $enumName extends Enumeration {
- | val A, C = Value
- |}
- """.stripMargin
-
- val enumD =
- s"""
- |object $enumName extends Enumeration {
- | val A, C, B = Value
- |}
- """.stripMargin
-
- val enumE =
- s"""
- |object $enumName extends Enumeration {
- | val A = Value(42)
- | val B = Value(5)
- | val C = Value(1337)
- |}
- """.stripMargin
-
- /**
- * Check that identical enums don't require migration
- */
- @Test
- def checkIdenticalEnums(): Unit = {
- assertTrue(checkCompatibility(enumA, enumA).isCompatibleAsIs)
- }
-
- /**
- * Check that appending fields to the enum does not require migration
- */
- @Test
- def checkAppendedField(): Unit = {
- assertTrue(checkCompatibility(enumA, enumB).isCompatibleAsIs)
- }
-
- /**
- * Check that removing enum fields makes the snapshot incompatible.
- */
- @Test
- def checkRemovedField(): Unit = {
- assertTrue(checkCompatibility(enumA, enumC).isIncompatible)
- }
-
- /**
- * Check that changing the enum field order makes the snapshot incompatible.
- */
- @Test
- def checkDifferentFieldOrder(): Unit = {
- assertTrue(checkCompatibility(enumA, enumD).isIncompatible)
- }
-
- /**
- * Check that changing the enum ids causes a migration
- */
- @Test
- def checkDifferentIds(): Unit = {
- assertTrue(
- "Different ids should be incompatible.",
- checkCompatibility(enumA, enumE).isIncompatible)
- }
-
- def checkCompatibility(enumSourceA: String, enumSourceB: String)
- : TypeSerializerSchemaCompatibility[Enumeration#Value] = {
- import EnumValueSerializerUpgradeTest._
-
- val classLoader = compileAndLoadEnum(tempFolder.newFolder(), s"$enumName.scala", enumSourceA)
-
- val enum = instantiateEnum[Enumeration](classLoader, enumName)
-
- val enumValueSerializer = new EnumValueSerializer(enum)
- val snapshot = enumValueSerializer.snapshotConfiguration()
-
- val baos = new ByteArrayOutputStream()
- val output = new DataOutputViewStreamWrapper(baos)
- TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
- output, snapshot, enumValueSerializer)
-
- output.close()
- baos.close()
-
- val bais = new ByteArrayInputStream(baos.toByteArray)
- val input= new DataInputViewStreamWrapper(bais)
-
- val classLoader2 = compileAndLoadEnum(tempFolder.newFolder(), s"$enumName.scala", enumSourceB)
-
- val snapshot2 = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
- input,
- classLoader2,
- enumValueSerializer)
- val enum2 = instantiateEnum[Enumeration](classLoader2, enumName)
-
- val enumValueSerializer2 = new EnumValueSerializer(enum2)
- snapshot2.resolveSchemaCompatibility(enumValueSerializer2)
- }
-}
+import java.util
+
+import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase.TestSpecification
+import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerMatchers, TypeSerializerSchemaCompatibility, TypeSerializerUpgradeTestBase}
+import org.apache.flink.testutils.migration.MigrationVersion
+import org.hamcrest.Matcher
+import org.hamcrest.Matchers.is
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+/**
+ * A [[TypeSerializerUpgradeTestBase]] for [[EnumValueSerializer]].
+ */
+@RunWith(classOf[Parameterized])
+class EnumValueSerializerUpgradeTest(
+ spec: TestSpecification[Letters.Value, Letters.Value])
+extends TypeSerializerUpgradeTestBase[Letters.Value, Letters.Value](spec) {}
object EnumValueSerializerUpgradeTest {
- def compileAndLoadEnum(root: File, filename: String, source: String): ClassLoader = {
- val file = writeSourceFile(root, filename, source)
- compileScalaFile(file)
-
- new URLClassLoader(
- Array[URL](root.toURI.toURL),
- Thread.currentThread().getContextClassLoader)
- }
-
- def instantiateEnum[T <: Enumeration](classLoader: ClassLoader, enumName: String): T = {
- val clazz = classLoader.loadClass(enumName + "$").asInstanceOf[Class[_ <: Enumeration]]
- val field = clazz.getField(NameTransformer.MODULE_INSTANCE_NAME)
-
- field.get(null).asInstanceOf[T]
+ private val supplier =
+ new util.function.Supplier[EnumValueSerializer[Letters.type]] {
+ override def get(): EnumValueSerializer[Letters.type] =
+ new EnumValueSerializer(Letters)
+ }
+
+ @Parameterized.Parameters(name = "Test Specification = {0}")
+ def testSpecifications(): util.Collection[TestSpecification[_, _]] = {
+ val testSpecifications =
+ new util.ArrayList[TypeSerializerUpgradeTestBase.TestSpecification[_, _]]
+
+ for (migrationVersion <- TypeSerializerUpgradeTestBase.MIGRATION_VERSIONS) {
+ testSpecifications.add(
+ new TypeSerializerUpgradeTestBase.TestSpecification[Letters.Value, Letters.Value](
+ "scala-enum-serializer",
+ migrationVersion,
+ classOf[EnumValueSerializerSetup],
+ classOf[EnumValueSerializerVerifier]))
+ }
+
+ testSpecifications
}
- def writeSourceFile(root: File, filename: String, source: String): File = {
- val file = new File(root, filename)
- val fileWriter = new FileWriter(file)
-
- fileWriter.write(source)
-
- fileWriter.close()
-
- file
+ /**
+ * This class is only public to work with
+ * [[org.apache.flink.api.common.typeutils.ClassRelocator]].
+ */
+ final class EnumValueSerializerSetup
+ extends TypeSerializerUpgradeTestBase.PreUpgradeSetup[Letters.Value] {
+ override def createPriorSerializer: TypeSerializer[Letters.Value] = supplier.get()
+
+ override def createTestData: Letters.Value = Letters.A
}
- def compileScalaFile(file: File): Unit = {
- val in = new BufferedReader(new StringReader(""))
- val out = new PrintWriter(new BufferedWriter(
- new OutputStreamWriter(System.out)))
-
- val settings = new GenericRunnerSettings(out.println _)
+ final class EnumValueSerializerVerifier extends
+ TypeSerializerUpgradeTestBase.UpgradeVerifier[Letters.Value] {
+ override def createUpgradedSerializer: TypeSerializer[Letters.Value] = supplier.get()
- // use the java classpath so that scala libraries are available to the compiler
- settings.usejavacp.value = true
- settings.outdir.value = file.getParent
+ override def testDataMatcher: Matcher[Letters.Value] = is(Letters.A)
- val reporter = new ConsoleReporter(settings)
- val global = new Global(settings, reporter)
- val run = new global.Run
-
- run.compile(List(file.getAbsolutePath))
-
- reporter.printSummary()
+ override def schemaCompatibilityMatcher(version: MigrationVersion):
+ Matcher[TypeSerializerSchemaCompatibility[Letters.Value]] =
+ TypeSerializerMatchers.isCompatibleAsIs[Letters.Value]()
}
}
-