You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2020/05/28 11:52:07 UTC
[flink] 41/42: [FLINK-13632] Port TraversableSerializer upgrade
test to TypeSerializerUpgradeTestBase
This is an automated email from the ASF dual-hosted git repository.
aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4248de26c3500f86596c2d06c91a9c76c576f3cd
Author: klion26 <qc...@gmail.com>
AuthorDate: Thu Apr 30 15:33:02 2020 +0800
[FLINK-13632] Port TraversableSerializer upgrade test to TypeSerializerUpgradeTestBase
---
.../flink-1.6-traversable-serializer-bitset-data | Bin 140 -> 0 bytes
...link-1.6-traversable-serializer-bitset-snapshot | Bin 992 -> 0 bytes
...link-1.6-traversable-serializer-indexedseq-data | Bin 130 -> 0 bytes
...-1.6-traversable-serializer-indexedseq-snapshot | Bin 992 -> 0 bytes
...flink-1.6-traversable-serializer-linearseq-data | Bin 130 -> 0 bytes
...k-1.6-traversable-serializer-linearseq-snapshot | Bin 992 -> 0 bytes
.../flink-1.6-traversable-serializer-map-data | Bin 138 -> 0 bytes
.../flink-1.6-traversable-serializer-map-snapshot | Bin 3033 -> 0 bytes
...nk-1.6-traversable-serializer-mutable-list-data | Bin 160 -> 0 bytes
....6-traversable-serializer-mutable-list-snapshot | Bin 993 -> 0 bytes
.../flink-1.6-traversable-serializer-seq-data | Bin 130 -> 0 bytes
.../flink-1.6-traversable-serializer-seq-snapshot | Bin 992 -> 0 bytes
.../flink-1.6-traversable-serializer-set-data | Bin 135 -> 0 bytes
.../flink-1.6-traversable-serializer-set-snapshot | Bin 992 -> 0 bytes
...1.6-traversable-serializer-with-case-class-data | Bin 145 -> 0 bytes
...traversable-serializer-with-case-class-snapshot | Bin 3036 -> 0 bytes
...flink-1.6-traversable-serializer-with-pojo-data | Bin 253 -> 0 bytes
...k-1.6-traversable-serializer-with-pojo-snapshot | Bin 6854 -> 0 bytes
.../flink-1.7-traversable-serializer-bitset-data | Bin 140 -> 0 bytes
...link-1.7-traversable-serializer-bitset-snapshot | Bin 987 -> 0 bytes
...link-1.7-traversable-serializer-indexedseq-data | Bin 130 -> 0 bytes
...-1.7-traversable-serializer-indexedseq-snapshot | Bin 987 -> 0 bytes
...flink-1.7-traversable-serializer-linearseq-data | Bin 130 -> 0 bytes
...k-1.7-traversable-serializer-linearseq-snapshot | Bin 987 -> 0 bytes
.../flink-1.7-traversable-serializer-map-data | Bin 138 -> 0 bytes
.../flink-1.7-traversable-serializer-map-snapshot | Bin 3905 -> 0 bytes
...nk-1.7-traversable-serializer-mutable-list-data | Bin 160 -> 0 bytes
....7-traversable-serializer-mutable-list-snapshot | Bin 988 -> 0 bytes
.../flink-1.7-traversable-serializer-seq-data | Bin 130 -> 0 bytes
.../flink-1.7-traversable-serializer-seq-snapshot | Bin 987 -> 0 bytes
.../flink-1.7-traversable-serializer-set-data | Bin 135 -> 0 bytes
.../flink-1.7-traversable-serializer-set-snapshot | Bin 987 -> 0 bytes
...1.7-traversable-serializer-with-case-class-data | Bin 145 -> 0 bytes
...traversable-serializer-with-case-class-snapshot | Bin 3909 -> 0 bytes
...flink-1.7-traversable-serializer-with-pojo-data | Bin 253 -> 0 bytes
...k-1.7-traversable-serializer-with-pojo-snapshot | Bin 9607 -> 0 bytes
...raversableSerializerSnapshotMigrationTest.scala | 118 --------
.../TraversableSerializerUpgradeTest.scala | 326 +++++++++++++++++++++
38 files changed, 326 insertions(+), 118 deletions(-)
diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-bitset-data b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-bitset-data
deleted file mode 100644
index d6c225f..0000000
Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-bitset-data and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-bitset-snapshot b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-bitset-snapshot
deleted file mode 100644
index fd9e834..0000000
Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-bitset-snapshot and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-indexedseq-data b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-indexedseq-data
deleted file mode 100644
index 328992e..0000000
Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-indexedseq-data and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-indexedseq-snapshot b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-indexedseq-snapshot
deleted file mode 100644
index 9c51467..0000000
Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-indexedseq-snapshot and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-linearseq-data b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-linearseq-data
deleted file mode 100644
index 328992e..0000000
Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-linearseq-data and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-linearseq-snapshot b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-linearseq-snapshot
deleted file mode 100644
index 46ceea1..0000000
Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-linearseq-snapshot and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-map-data b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-map-data
deleted file mode 100644
index bf82319..0000000
Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-map-data and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-map-snapshot b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-map-snapshot
deleted file mode 100644
index d8b191e..0000000
Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-map-snapshot and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-mutable-list-data b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-mutable-list-data
deleted file mode 100644
index ca814e2..0000000
Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-mutable-list-data and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-mutable-list-snapshot b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-mutable-list-snapshot
deleted file mode 100644
index 7e4befd..0000000
Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-mutable-list-snapshot and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-seq-data b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-seq-data
deleted file mode 100644
index 328992e..0000000
Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-seq-data and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-seq-snapshot b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-seq-snapshot
deleted file mode 100644
index db73915..0000000
Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-seq-snapshot and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-set-data b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-set-data
deleted file mode 100644
index 2380dd3..0000000
Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-set-data and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-set-snapshot b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-set-snapshot
deleted file mode 100644
index e5f86d1..0000000
Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-set-snapshot and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-with-case-class-data b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-with-case-class-data
deleted file mode 100644
index fc90039..0000000
Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-with-case-class-data and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-with-case-class-snapshot b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-with-case-class-snapshot
deleted file mode 100644
index ac3573e..0000000
Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-with-case-class-snapshot and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-with-pojo-data b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-with-pojo-data
deleted file mode 100644
index 63003a8..0000000
Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-with-pojo-data and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-with-pojo-snapshot b/flink-scala/src/test/resources/flink-1.6-traversable-serializer-with-pojo-snapshot
deleted file mode 100644
index 085ec94..0000000
Binary files a/flink-scala/src/test/resources/flink-1.6-traversable-serializer-with-pojo-snapshot and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-bitset-data b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-bitset-data
deleted file mode 100644
index d6c225f..0000000
Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-bitset-data and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-bitset-snapshot b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-bitset-snapshot
deleted file mode 100644
index 90c34f5..0000000
Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-bitset-snapshot and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-indexedseq-data b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-indexedseq-data
deleted file mode 100644
index 328992e..0000000
Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-indexedseq-data and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-indexedseq-snapshot b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-indexedseq-snapshot
deleted file mode 100644
index 72ed196..0000000
Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-indexedseq-snapshot and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-linearseq-data b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-linearseq-data
deleted file mode 100644
index 328992e..0000000
Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-linearseq-data and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-linearseq-snapshot b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-linearseq-snapshot
deleted file mode 100644
index b9732f5..0000000
Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-linearseq-snapshot and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-map-data b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-map-data
deleted file mode 100644
index bf82319..0000000
Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-map-data and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-map-snapshot b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-map-snapshot
deleted file mode 100644
index 598afc5..0000000
Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-map-snapshot and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-mutable-list-data b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-mutable-list-data
deleted file mode 100644
index ca814e2..0000000
Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-mutable-list-data and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-mutable-list-snapshot b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-mutable-list-snapshot
deleted file mode 100644
index bf8c7d3..0000000
Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-mutable-list-snapshot and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-seq-data b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-seq-data
deleted file mode 100644
index 328992e..0000000
Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-seq-data and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-seq-snapshot b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-seq-snapshot
deleted file mode 100644
index e7cb734..0000000
Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-seq-snapshot and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-set-data b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-set-data
deleted file mode 100644
index 2380dd3..0000000
Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-set-data and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-set-snapshot b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-set-snapshot
deleted file mode 100644
index 463fe1f..0000000
Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-set-snapshot and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-with-case-class-data b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-with-case-class-data
deleted file mode 100644
index fc90039..0000000
Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-with-case-class-data and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-with-case-class-snapshot b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-with-case-class-snapshot
deleted file mode 100644
index fb4085f..0000000
Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-with-case-class-snapshot and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-with-pojo-data b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-with-pojo-data
deleted file mode 100644
index 63003a8..0000000
Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-with-pojo-data and /dev/null differ
diff --git a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-with-pojo-snapshot b/flink-scala/src/test/resources/flink-1.7-traversable-serializer-with-pojo-snapshot
deleted file mode 100644
index 4334c76..0000000
Binary files a/flink-scala/src/test/resources/flink-1.7-traversable-serializer-with-pojo-snapshot and /dev/null differ
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TraversableSerializerSnapshotMigrationTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TraversableSerializerSnapshotMigrationTest.scala
deleted file mode 100644
index 0352e6d..0000000
--- a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TraversableSerializerSnapshotMigrationTest.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.typeutils
-
-import java.util
-import java.util.function.Supplier
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshotMigrationTestBase}
-import org.apache.flink.api.scala.createTypeInformation
-import org.apache.flink.testutils.migration.MigrationVersion
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.{BitSet, LinearSeq, mutable}
-
-/**
- * [[TraversableSerializer]] migration test.
- */
-@RunWith(classOf[Parameterized])
-class TraversableSerializerSnapshotMigrationTest(
- testSpecification: TypeSerializerSnapshotMigrationTestBase.TestSpecification[
- TraversableOnce[_]
- ]) extends TypeSerializerSnapshotMigrationTestBase[TraversableOnce[_]](
- testSpecification
- )
-
-object TraversableSerializerSnapshotMigrationTest {
-
- object Types {
-
- class Pojo(var name: String, var count: Int) {
- def this() = this("", -1)
-
- override def equals(other: Any): Boolean = {
- other match {
- case oP: Pojo => name == oP.name && count == oP.count
- case _ => false
- }
- }
- }
-
- val seqTypeInfo = implicitly[TypeInformation[Seq[Int]]]
- val indexedSeqTypeInfo =
- implicitly[TypeInformation[IndexedSeq[Int]]]
- val linearSeqTypeInfo = implicitly[TypeInformation[LinearSeq[Int]]]
- val mapTypeInfo = implicitly[TypeInformation[Map[String, Int]]]
- val setTypeInfo = implicitly[TypeInformation[Set[Int]]]
- val bitsetTypeInfo = implicitly[TypeInformation[BitSet]]
- val mutableListTypeInfo =
- implicitly[TypeInformation[mutable.MutableList[Int]]]
- val seqTupleTypeInfo = implicitly[TypeInformation[Seq[(Int, String)]]]
- val seqPojoTypeInfo = implicitly[TypeInformation[Seq[Pojo]]]
- }
-
- import Types._
-
- @SuppressWarnings(Array("unchecked"))
- @Parameterized.Parameters(name = "Test Specification = {0}")
- def testSpecifications: util.Collection[
- TypeSerializerSnapshotMigrationTestBase.TestSpecification[_]] = {
-
- val testSpecifications: TypeSerializerSnapshotMigrationTestBase.TestSpecifications =
- new TypeSerializerSnapshotMigrationTestBase.TestSpecifications(
- MigrationVersion.v1_6,
- MigrationVersion.v1_7)
-
- val serializerSpecs = Seq(
- ("bitset", bitsetTypeInfo),
- ("indexedseq", indexedSeqTypeInfo),
- ("linearseq", linearSeqTypeInfo),
- ("map", mapTypeInfo),
- ("mutable-list", mutableListTypeInfo),
- ("seq", seqTypeInfo),
- ("set", setTypeInfo),
- ("with-case-class", seqTupleTypeInfo),
- ("with-pojo", seqPojoTypeInfo)
- )
-
- serializerSpecs foreach {
- case (data, typeInfo) =>
- testSpecifications.add(
- s"traversable-serializer-$data",
- classOf[TraversableSerializer[_, _]],
- classOf[TraversableSerializerSnapshot[_, _]],
- new TypeSerializerSupplier(typeInfo)
- )
- }
-
- testSpecifications.get
- }
-
- private class TypeSerializerSupplier[T](typeInfo: TypeInformation[T])
- extends Supplier[TypeSerializer[T]] {
- override def get(): TypeSerializer[T] = {
- typeInfo
- .createSerializer(new ExecutionConfig)
- .asInstanceOf[TypeSerializer[T]]
- }
- }
-}
diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TraversableSerializerUpgradeTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TraversableSerializerUpgradeTest.scala
new file mode 100644
index 0000000..0d503a9
--- /dev/null
+++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/typeutils/TraversableSerializerUpgradeTest.scala
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils
+
+import java.util
+import java.util.function.Supplier
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializerUpgradeTestBase.TestSpecification
+import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerMatchers, TypeSerializerSchemaCompatibility, TypeSerializerUpgradeTestBase}
+import org.apache.flink.api.scala.createTypeInformation
+import org.apache.flink.testutils.migration.MigrationVersion
+import org.hamcrest.Matcher
+import org.hamcrest.Matchers.is
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.{BitSet, LinearSeq, mutable}
+
+/**
+ * A [[TypeSerializerUpgradeTestBase]] for [[TraversableSerializer]].
+ */
+@RunWith(classOf[Parameterized])
+class TraversableSerializerUpgradeTest(
+ testSpecification: TypeSerializerUpgradeTestBase.TestSpecification[
+TraversableOnce[_], TraversableOnce[_]])
+ extends TypeSerializerUpgradeTestBase[TraversableOnce[_], TraversableOnce[_]](testSpecification)
+
+object TraversableSerializerUpgradeTest {
+
+ object Types {
+
+ class Pojo(var name: String, var count: Int) {
+ def this() = this("", -1)
+
+ override def equals(other: Any): Boolean = {
+ other match {
+ case oP: Pojo => name == oP.name && count == oP.count
+ case _ => false
+ }
+ }
+ }
+
+ val seqTypeInfo = implicitly[TypeInformation[Seq[Int]]]
+ val indexedSeqTypeInfo =
+ implicitly[TypeInformation[IndexedSeq[Int]]]
+ val linearSeqTypeInfo = implicitly[TypeInformation[LinearSeq[Int]]]
+ val mapTypeInfo = implicitly[TypeInformation[Map[String, Int]]]
+ val setTypeInfo = implicitly[TypeInformation[Set[Int]]]
+ val bitsetTypeInfo = implicitly[TypeInformation[BitSet]]
+ val mutableListTypeInfo =
+ implicitly[TypeInformation[mutable.MutableList[Int]]]
+ val seqTupleTypeInfo = implicitly[TypeInformation[Seq[(Int, String)]]]
+ val seqPojoTypeInfo = implicitly[TypeInformation[Seq[Pojo]]]
+ }
+
+ import Types._
+
+ @Parameterized.Parameters(name = "Test Specification = {0}")
+ def testSpecifications: util.Collection[TestSpecification[_, _]] = {
+
+ val testSpecifications =
+ new util.ArrayList[TypeSerializerUpgradeTestBase.TestSpecification[_, _]]
+ for (migrationVersion <- TypeSerializerUpgradeTestBase.MIGRATION_VERSIONS) {
+ testSpecifications.add(
+ new TestSpecification[BitSet, BitSet](
+ "traversable-serializer-bitset",
+ migrationVersion,
+ classOf[BitsetSerializerSetup],
+ classOf[BitsetSerializerVerifier]))
+ testSpecifications.add(
+ new TestSpecification[IndexedSeq[Int], IndexedSeq[Int]](
+ "traversable-serializer-indexedseq",
+ migrationVersion,
+ classOf[IndexedSeqSerializerSetup],
+ classOf[IndexedSeqSerializerVerifier]))
+ testSpecifications.add(
+ new TestSpecification[LinearSeq[Int], LinearSeq[Int]](
+ "traversable-serializer-linearseq",
+ migrationVersion,
+ classOf[LinearSeqSerializerSetup],
+ classOf[LinearSeqSerializerVerifier]))
+ testSpecifications.add(
+ new TestSpecification[Map[String, Int], Map[String, Int]](
+ "traversable-serializer-map",
+ migrationVersion,
+ classOf[MapSerializerSetup],
+ classOf[MapSerializerVerifier]))
+ testSpecifications.add(
+ new TestSpecification[mutable.MutableList[Int], mutable.MutableList[Int]](
+ "traversable-serializer-mutable-list",
+ migrationVersion,
+ classOf[MutableListSerializerSetup],
+ classOf[MutableListSerializerVerifier]))
+ testSpecifications.add(
+ new TestSpecification[Seq[Int], Seq[Int]](
+ "traversable-serializer-seq",
+ migrationVersion,
+ classOf[SeqSerializerSetup],
+ classOf[SeqSerializerVerifier]))
+ testSpecifications.add(
+ new TestSpecification[Set[Int], Set[Int]](
+ "traversable-serializer-set",
+ migrationVersion,
+ classOf[SetSerializerSetup],
+ classOf[SetSerializerVerifier]))
+ testSpecifications.add(
+ new TestSpecification[Seq[(Int, String)], Seq[(Int, String)]](
+ "traversable-serializer-with-case-class",
+ migrationVersion,
+ classOf[SeqWithCaseClassSetup],
+ classOf[SeqWithCaseClassVerifier]))
+ testSpecifications.add(
+ new TestSpecification[Seq[Pojo], Seq[Pojo]](
+ "traversable-serializer-with-pojo",
+ migrationVersion,
+ classOf[SeqWithPojoSetup],
+ classOf[SeqWithPojoVerifier]))
+ }
+ testSpecifications
+ }
+
+ final class BitsetSerializerSetup extends TypeSerializerUpgradeTestBase.PreUpgradeSetup[BitSet] {
+ override def createPriorSerializer: TypeSerializer[BitSet] =
+ new TypeSerializerSupplier(bitsetTypeInfo).get()
+
+ override def createTestData: BitSet = BitSet(3, 2, 0)
+ }
+
+ final class BitsetSerializerVerifier extends
+ TypeSerializerUpgradeTestBase.UpgradeVerifier[BitSet] {
+ override def createUpgradedSerializer: TypeSerializer[BitSet] =
+ new TypeSerializerSupplier(bitsetTypeInfo).get()
+
+ override def testDataMatcher: Matcher[BitSet] = is(BitSet(3, 2, 0))
+
+ override def schemaCompatibilityMatcher(version: MigrationVersion):
+ Matcher[TypeSerializerSchemaCompatibility[BitSet]] =
+ TypeSerializerMatchers.isCompatibleAsIs[BitSet]()
+ }
+
+ final class IndexedSeqSerializerSetup extends
+ TypeSerializerUpgradeTestBase.PreUpgradeSetup[IndexedSeq[Int]] {
+ override def createPriorSerializer: TypeSerializer[IndexedSeq[Int]] =
+ new TypeSerializerSupplier(indexedSeqTypeInfo).get()
+
+ override def createTestData: IndexedSeq[Int] = IndexedSeq(1, 2, 3)
+ }
+
+ final class IndexedSeqSerializerVerifier extends
+ TypeSerializerUpgradeTestBase.UpgradeVerifier[IndexedSeq[Int]] {
+ override def createUpgradedSerializer: TypeSerializer[IndexedSeq[Int]] =
+ new TypeSerializerSupplier(indexedSeqTypeInfo).get()
+
+ override def testDataMatcher: Matcher[IndexedSeq[Int]] = is(IndexedSeq(1, 2, 3))
+
+ override def schemaCompatibilityMatcher(version: MigrationVersion):
+ Matcher[TypeSerializerSchemaCompatibility[IndexedSeq[Int]]] =
+ TypeSerializerMatchers.isCompatibleAsIs[IndexedSeq[Int]]()
+ }
+
+ final class LinearSeqSerializerSetup extends
+ TypeSerializerUpgradeTestBase.PreUpgradeSetup[LinearSeq[Int]] {
+ override def createPriorSerializer: TypeSerializer[LinearSeq[Int]] =
+ new TypeSerializerSupplier(linearSeqTypeInfo).get()
+
+ override def createTestData: LinearSeq[Int] = LinearSeq(2, 3, 4)
+ }
+
+ final class LinearSeqSerializerVerifier extends
+ TypeSerializerUpgradeTestBase.UpgradeVerifier[LinearSeq[Int]] {
+ override def createUpgradedSerializer: TypeSerializer[LinearSeq[Int]] =
+ new TypeSerializerSupplier(linearSeqTypeInfo).get()
+
+ override def testDataMatcher: Matcher[LinearSeq[Int]] = is(LinearSeq(2, 3, 4))
+
+ override def schemaCompatibilityMatcher(version: MigrationVersion):
+ Matcher[TypeSerializerSchemaCompatibility[LinearSeq[Int]]] =
+ TypeSerializerMatchers.isCompatibleAsIs[LinearSeq[Int]]()
+ }
+
+ final class MapSerializerSetup extends
+ TypeSerializerUpgradeTestBase.PreUpgradeSetup[Map[String, Int]] {
+ override def createPriorSerializer: TypeSerializer[Map[String, Int]] =
+ new TypeSerializerSupplier(mapTypeInfo).get()
+
+ override def createTestData: Map[String, Int] = Map("Apache" -> 0, "Flink" -> 1)
+ }
+
+ final class MapSerializerVerifier extends
+ TypeSerializerUpgradeTestBase.UpgradeVerifier[Map[String, Int]] {
+ override def createUpgradedSerializer: TypeSerializer[Map[String, Int]] =
+ new TypeSerializerSupplier(mapTypeInfo).get()
+
+ override def testDataMatcher: Matcher[Map[String, Int]] = is(Map("Apache" -> 0, "Flink" -> 1))
+
+ override def schemaCompatibilityMatcher(version: MigrationVersion):
+ Matcher[TypeSerializerSchemaCompatibility[Map[String, Int]]] =
+ TypeSerializerMatchers.isCompatibleAsIs[Map[String, Int]]()
+ }
+
+ final class MutableListSerializerSetup extends
+ TypeSerializerUpgradeTestBase.PreUpgradeSetup[mutable.MutableList[Int]] {
+ override def createPriorSerializer: TypeSerializer[mutable.MutableList[Int]] =
+ new TypeSerializerSupplier(mutableListTypeInfo).get()
+
+ override def createTestData: mutable.MutableList[Int] = mutable.MutableList(1, 2, 3)
+ }
+
+ final class MutableListSerializerVerifier extends
+ TypeSerializerUpgradeTestBase.UpgradeVerifier[mutable.MutableList[Int]] {
+ override def createUpgradedSerializer: TypeSerializer[mutable.MutableList[Int]] =
+ new TypeSerializerSupplier(mutableListTypeInfo).get()
+
+ override def testDataMatcher: Matcher[mutable.MutableList[Int]] =
+ is(mutable.MutableList(1, 2, 3))
+
+ override def schemaCompatibilityMatcher(version: MigrationVersion):
+ Matcher[TypeSerializerSchemaCompatibility[mutable.MutableList[Int]]] =
+ TypeSerializerMatchers.isCompatibleAsIs[mutable.MutableList[Int]]()
+ }
+
+ final class SeqSerializerSetup extends TypeSerializerUpgradeTestBase.PreUpgradeSetup[Seq[Int]] {
+ override def createPriorSerializer: TypeSerializer[Seq[Int]] =
+ new TypeSerializerSupplier(seqTypeInfo).get()
+
+ override def createTestData: Seq[Int] = Seq(1, 2, 3)
+ }
+
+ final class SeqSerializerVerifier extends
+ TypeSerializerUpgradeTestBase.UpgradeVerifier[Seq[Int]] {
+ override def createUpgradedSerializer: TypeSerializer[Seq[Int]] =
+ new TypeSerializerSupplier(seqTypeInfo).get()
+
+ override def testDataMatcher: Matcher[Seq[Int]] = is(Seq(1, 2, 3))
+
+ override def schemaCompatibilityMatcher(version: MigrationVersion):
+ Matcher[TypeSerializerSchemaCompatibility[Seq[Int]]] =
+ TypeSerializerMatchers.isCompatibleAsIs[Seq[Int]]()
+ }
+
+ final class SetSerializerSetup extends TypeSerializerUpgradeTestBase.PreUpgradeSetup[Set[Int]] {
+ override def createPriorSerializer: TypeSerializer[Set[Int]] =
+ new TypeSerializerSupplier(setTypeInfo).get()
+
+ override def createTestData: Set[Int] = Set(2, 3, 4)
+ }
+
+ final class SetSerializerVerifier extends
+ TypeSerializerUpgradeTestBase.UpgradeVerifier[Set[Int]] {
+ override def createUpgradedSerializer: TypeSerializer[Set[Int]] =
+ new TypeSerializerSupplier(setTypeInfo).get()
+
+ override def testDataMatcher: Matcher[Set[Int]] = is(Set(2, 3, 4))
+
+ override def schemaCompatibilityMatcher(version: MigrationVersion):
+ Matcher[TypeSerializerSchemaCompatibility[Set[Int]]] =
+ TypeSerializerMatchers.isCompatibleAsIs[Set[Int]]()
+ }
+
+ final class SeqWithCaseClassSetup extends
+ TypeSerializerUpgradeTestBase.PreUpgradeSetup[Seq[(Int, String)]] {
+ override def createPriorSerializer: TypeSerializer[Seq[(Int, String)]] =
+ new TypeSerializerSupplier(seqTupleTypeInfo).get()
+
+ override def createTestData: Seq[(Int, String)] = Seq((0, "Apache"), (1, "Flink"))
+ }
+
+ final class SeqWithCaseClassVerifier extends
+ TypeSerializerUpgradeTestBase.UpgradeVerifier[Seq[(Int, String)]] {
+ override def createUpgradedSerializer: TypeSerializer[Seq[(Int, String)]] =
+ new TypeSerializerSupplier(seqTupleTypeInfo).get()
+
+ override def testDataMatcher: Matcher[Seq[(Int, String)]] = is(Seq((0, "Apache"), (1, "Flink")))
+
+ override def schemaCompatibilityMatcher(version: MigrationVersion):
+ Matcher[TypeSerializerSchemaCompatibility[Seq[(Int, String)]]] =
+ TypeSerializerMatchers.isCompatibleAsIs[Seq[(Int, String)]]()
+ }
+
+ final class SeqWithPojoSetup extends TypeSerializerUpgradeTestBase.PreUpgradeSetup[Seq[Pojo]] {
+ override def createPriorSerializer: TypeSerializer[Seq[Pojo]] =
+ new TypeSerializerSupplier(seqPojoTypeInfo).get()
+
+ override def createTestData: Seq[Pojo] = Seq(new Pojo("Apache", 0), new Pojo("Flink", 1))
+ }
+
+ final class SeqWithPojoVerifier extends TypeSerializerUpgradeTestBase.UpgradeVerifier[Seq[Pojo]] {
+ override def createUpgradedSerializer: TypeSerializer[Seq[Pojo]] =
+ new TypeSerializerSupplier(seqPojoTypeInfo).get()
+
+ override def testDataMatcher: Matcher[Seq[Pojo]] =
+ is(Seq(new Pojo("Apache", 0), new Pojo("Flink", 1)))
+
+ override def schemaCompatibilityMatcher(version: MigrationVersion):
+ Matcher[TypeSerializerSchemaCompatibility[Seq[Pojo]]] =
+ TypeSerializerMatchers.isCompatibleAsIs[Seq[Pojo]]()
+ }
+
+ private class TypeSerializerSupplier[T](typeInfo: TypeInformation[T])
+ extends Supplier[TypeSerializer[T]] {
+ override def get(): TypeSerializer[T] = {
+ typeInfo
+ .createSerializer(new ExecutionConfig)
+ .asInstanceOf[TypeSerializer[T]]
+ }
+ }
+
+}