You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2020/05/28 11:52:06 UTC

[flink] 40/42: [FLINK-13632] Port EnumValueSerializer upgrade test to TypeSerializerUpgradeTestBase

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit da1a402e3d8b7c696e034e466f70a20d5335cc13
Author: klion26 <qc...@gmail.com>
AuthorDate: Wed Apr 15 19:49:58 2020 +0800

    [FLINK-13632] Port EnumValueSerializer upgrade test to TypeSerializerUpgradeTestBase
---
 .../resources/flink-1.6-scala-enum-serializer-data | Bin 40 -> 0 bytes
 .../flink-1.6-scala-enum-serializer-snapshot       | Bin 1865 -> 0 bytes
 .../resources/flink-1.7-scala-enum-serializer-data | Bin 40 -> 0 bytes
 .../flink-1.7-scala-enum-serializer-snapshot       | Bin 1853 -> 0 bytes
 ... => EnumValueSerializerCompatibilityTest.scala} |   6 +-
 .../EnumValueSerializerSnapshotMigrationTest.scala |  61 ------
 .../typeutils/EnumValueSerializerUpgradeTest.scala | 231 +++++----------------
 7 files changed, 57 insertions(+), 241 deletions(-)

diff --git a/flink-scala/src/test/resources/flink-1.6-scala-enum-serializer-data b/flink-scala/src/test/resources/flink-1.6-scala-enum-serializer-data
deleted file mode 100644
index 6459c29..0000000
Binary files a/flink-scala/src/test/resources/flink-1.6-scala-enum-serializer-data and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.6-scala-enum-serializer-snapshot b/flink-scala/src/test/resources/flink-1.6-scala-enum-serializer-snapshot
deleted file mode 100644
index 3cf1738..0000000
Binary files a/flink-scala/src/test/resources/flink-1.6-scala-enum-serializer-snapshot and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.7-scala-enum-serializer-data b/flink-scala/src/test/resources/flink-1.7-scala-enum-serializer-data
deleted file mode 100644
index 6459c29..0000000
Binary files a/flink-scala/src/test/resources/flink-1.7-scala-enum-serializer-data and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.7-scala-enum-serializer-snapshot b/flink-scala/src/test/resources/flink-1.7-scala-enum-serializer-snapshot
deleted file mode 100644
index 99ea254..0000000
Binary files a/flink-scala/src/test/resources/flink-1.7-scala-enum-serializer-snapshot and /dev/null differ
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerCompatibilityTest.scala
similarity index 97%
copy from flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
copy to flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerCompatibilityTest.scala
index 97c2b05..c3cdda4 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerCompatibilityTest.scala
@@ -33,7 +33,7 @@ import scala.reflect.NameTransformer
 import scala.tools.nsc.reporters.ConsoleReporter
 import scala.tools.nsc.{GenericRunnerSettings, Global}
 
-class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike {
+class EnumValueSerializerCompatibilityTest extends TestLogger with JUnitSuiteLike {
 
   private val _tempFolder = new TemporaryFolder()
 
@@ -123,7 +123,7 @@ class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike {
 
   def checkCompatibility(enumSourceA: String, enumSourceB: String)
     : TypeSerializerSchemaCompatibility[Enumeration#Value] = {
-    import EnumValueSerializerUpgradeTest._
+    import EnumValueSerializerCompatibilityTest._
 
     val classLoader = compileAndLoadEnum(tempFolder.newFolder(), s"$enumName.scala", enumSourceA)
 
@@ -156,7 +156,7 @@ class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike {
   }
 }
 
-object EnumValueSerializerUpgradeTest {
+object EnumValueSerializerCompatibilityTest {
   def compileAndLoadEnum(root: File, filename: String, source: String): ClassLoader = {
     val file = writeSourceFile(root, filename, source)
 
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerSnapshotMigrationTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerSnapshotMigrationTest.scala
deleted file mode 100644
index daff774..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerSnapshotMigrationTest.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.typeutils
-
-import java.util
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshotMigrationTestBase}
-import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase.{TestSpecification, TestSpecifications}
-import org.apache.flink.api.scala.createTypeInformation
-import org.apache.flink.testutils.migration.MigrationVersion
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-/**
-  * Migration tests for the [[EnumValueSerializer]].
- */
-@RunWith(classOf[Parameterized])
-class EnumValueSerializerSnapshotMigrationTest(
-  spec: TestSpecification[Letters.Value])
-extends TypeSerializerSnapshotMigrationTestBase[Letters.Value](spec) {}
-
-object EnumValueSerializerSnapshotMigrationTest {
-
-  private val supplier =
-    new util.function.Supplier[EnumValueSerializer[Letters.type]] {
-      override def get(): EnumValueSerializer[Letters.type] =
-        new EnumValueSerializer(Letters)
-    }
-
-  @Parameterized.Parameters(name = "Test Specification = {0}")
-  def testSpecifications(): util.Collection[TestSpecification[_]] = {
-    val spec =
-      new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7)
-
-    spec.add(
-      "scala-enum-serializer",
-      classOf[EnumValueSerializer[Letters.Value]],
-      classOf[ScalaEnumSerializerSnapshot[Letters.Value]],
-      supplier
-    )
-
-    spec.get()
-  }
-}
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
index 97c2b05..9c96df6 100644
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializerUpgradeTest.scala
@@ -18,191 +18,68 @@
 
 package org.apache.flink.api.scala.typeutils
 
-import java.io._
-import java.net.{URL, URLClassLoader}
-
-import org.apache.flink.api.common.typeutils.{TypeSerializerSchemaCompatibility, TypeSerializerSnapshotSerializationUtil}
-import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper}
-import org.apache.flink.util.TestLogger
-import org.junit.rules.TemporaryFolder
-import org.junit.{Rule, Test}
-import org.junit.Assert._
-import org.scalatest.junit.JUnitSuiteLike
-
-import scala.reflect.NameTransformer
-import scala.tools.nsc.reporters.ConsoleReporter
-import scala.tools.nsc.{GenericRunnerSettings, Global}
-
-class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike {
-
-  private val _tempFolder = new TemporaryFolder()
-
-  @Rule
-  def tempFolder = _tempFolder
-
-  val enumName = "EnumValueSerializerUpgradeTestEnum"
-
-  val enumA =
-    s"""
-      |object $enumName extends Enumeration {
-      |  val A, B, C = Value
-      |}
-    """.stripMargin
-
-  val enumB =
-    s"""
-       |object $enumName extends Enumeration {
-       |  val A, B, C, D = Value
-       |}
-    """.stripMargin
-
-  val enumC =
-    s"""
-       |object $enumName extends Enumeration {
-       |  val A, C = Value
-       |}
-    """.stripMargin
-
-  val enumD =
-    s"""
-       |object $enumName extends Enumeration {
-       |  val A, C, B = Value
-       |}
-    """.stripMargin
-
-  val enumE =
-    s"""
-       |object $enumName extends Enumeration {
-       |  val A = Value(42)
-       |  val B = Value(5)
-       |  val C = Value(1337)
-       |}
-    """.stripMargin
-
-  /**
-    * Check that identical enums don't require migration
-    */
-  @Test
-  def checkIdenticalEnums(): Unit = {
-    assertTrue(checkCompatibility(enumA, enumA).isCompatibleAsIs)
-  }
-
-  /**
-    * Check that appending fields to the enum does not require migration
-    */
-  @Test
-  def checkAppendedField(): Unit = {
-    assertTrue(checkCompatibility(enumA, enumB).isCompatibleAsIs)
-  }
-
-  /**
-    * Check that removing enum fields makes the snapshot incompatible.
-    */
-  @Test
-  def checkRemovedField(): Unit = {
-    assertTrue(checkCompatibility(enumA, enumC).isIncompatible)
-  }
-
-  /**
-    * Check that changing the enum field order makes the snapshot incompatible.
-    */
-  @Test
-  def checkDifferentFieldOrder(): Unit = {
-    assertTrue(checkCompatibility(enumA, enumD).isIncompatible)
-  }
-
-  /**
-    * Check that changing the enum ids causes a migration
-    */
-  @Test
-  def checkDifferentIds(): Unit = {
-    assertTrue(
-      "Different ids should be incompatible.",
-      checkCompatibility(enumA, enumE).isIncompatible)
-  }
-
-  def checkCompatibility(enumSourceA: String, enumSourceB: String)
-    : TypeSerializerSchemaCompatibility[Enumeration#Value] = {
-    import EnumValueSerializerUpgradeTest._
-
-    val classLoader = compileAndLoadEnum(tempFolder.newFolder(), s"$enumName.scala", enumSourceA)
-
-    val enum = instantiateEnum[Enumeration](classLoader, enumName)
-
-    val enumValueSerializer = new EnumValueSerializer(enum)
-    val snapshot = enumValueSerializer.snapshotConfiguration()
-
-    val baos = new ByteArrayOutputStream()
-    val output = new DataOutputViewStreamWrapper(baos)
-    TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
-      output, snapshot, enumValueSerializer)
-
-    output.close()
-    baos.close()
-
-    val bais = new ByteArrayInputStream(baos.toByteArray)
-    val input=  new DataInputViewStreamWrapper(bais)
-
-    val classLoader2 = compileAndLoadEnum(tempFolder.newFolder(), s"$enumName.scala", enumSourceB)
-
-    val snapshot2 = TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
-      input,
-      classLoader2,
-      enumValueSerializer)
-    val enum2 = instantiateEnum[Enumeration](classLoader2, enumName)
-
-    val enumValueSerializer2 = new EnumValueSerializer(enum2)
-    snapshot2.resolveSchemaCompatibility(enumValueSerializer2)
-  }
-}
+import java.util
+
+import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase.TestSpecification
+import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerMatchers, TypeSerializerSchemaCompatibility, TypeSerializerUpgradeTestBase}
+import org.apache.flink.testutils.migration.MigrationVersion
+import org.hamcrest.Matcher
+import org.hamcrest.Matchers.is
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+/**
+  * A [[TypeSerializerUpgradeTestBase]] for [[EnumValueSerializer]].
+ */
+@RunWith(classOf[Parameterized])
+class EnumValueSerializerUpgradeTest(
+  spec: TestSpecification[Letters.Value, Letters.Value])
+extends TypeSerializerUpgradeTestBase[Letters.Value, Letters.Value](spec) {}
 
 object EnumValueSerializerUpgradeTest {
-  def compileAndLoadEnum(root: File, filename: String, source: String): ClassLoader = {
-    val file = writeSourceFile(root, filename, source)
 
-    compileScalaFile(file)
-
-    new URLClassLoader(
-      Array[URL](root.toURI.toURL),
-      Thread.currentThread().getContextClassLoader)
-  }
-
-  def instantiateEnum[T <: Enumeration](classLoader: ClassLoader, enumName: String): T = {
-    val clazz = classLoader.loadClass(enumName + "$").asInstanceOf[Class[_ <: Enumeration]]
-    val field = clazz.getField(NameTransformer.MODULE_INSTANCE_NAME)
-
-    field.get(null).asInstanceOf[T]
+  private val supplier =
+    new util.function.Supplier[EnumValueSerializer[Letters.type]] {
+      override def get(): EnumValueSerializer[Letters.type] =
+        new EnumValueSerializer(Letters)
+    }
+
+  @Parameterized.Parameters(name = "Test Specification = {0}")
+  def testSpecifications(): util.Collection[TestSpecification[_, _]] = {
+    val testSpecifications =
+      new util.ArrayList[TypeSerializerUpgradeTestBase.TestSpecification[_, _]]
+
+    for (migrationVersion <- TypeSerializerUpgradeTestBase.MIGRATION_VERSIONS) {
+      testSpecifications.add(
+        new TypeSerializerUpgradeTestBase.TestSpecification[Letters.Value, Letters.Value](
+        "scala-enum-serializer",
+          migrationVersion,
+          classOf[EnumValueSerializerSetup],
+          classOf[EnumValueSerializerVerifier]))
+    }
+
+    testSpecifications
   }
 
-  def writeSourceFile(root: File, filename: String, source: String): File = {
-    val file = new File(root, filename)
-    val fileWriter = new FileWriter(file)
-
-    fileWriter.write(source)
-
-    fileWriter.close()
-
-    file
+  /**
+   * This class is only public to work with
+   * [[org.apache.flink.api.common.typeutils.ClassRelocator]].
+   */
+  final class EnumValueSerializerSetup
+      extends TypeSerializerUpgradeTestBase.PreUpgradeSetup[Letters.Value] {
+    override def createPriorSerializer: TypeSerializer[Letters.Value] = supplier.get()
+
+    override def createTestData: Letters.Value = Letters.A
   }
 
-  def compileScalaFile(file: File): Unit = {
-    val in = new BufferedReader(new StringReader(""))
-    val out = new PrintWriter(new BufferedWriter(
-      new OutputStreamWriter(System.out)))
-
-    val settings = new GenericRunnerSettings(out.println _)
+  final class EnumValueSerializerVerifier extends
+      TypeSerializerUpgradeTestBase.UpgradeVerifier[Letters.Value] {
+    override def createUpgradedSerializer: TypeSerializer[Letters.Value] = supplier.get()
 
-    // use the java classpath so that scala libraries are available to the compiler
-    settings.usejavacp.value = true
-    settings.outdir.value = file.getParent
+    override def testDataMatcher: Matcher[Letters.Value] = is(Letters.A)
 
-    val reporter = new ConsoleReporter(settings)
-    val global = new Global(settings, reporter)
-    val run = new global.Run
-
-    run.compile(List(file.getAbsolutePath))
-
-    reporter.printSummary()
+    override def schemaCompatibilityMatcher(version: MigrationVersion):
+        Matcher[TypeSerializerSchemaCompatibility[Letters.Value]] =
+      TypeSerializerMatchers.isCompatibleAsIs[Letters.Value]()
   }
 }
-