You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2020/05/28 11:52:08 UTC
[flink] 42/42: [FLINK-13632] Remove
TypeSerializerSnapshotMigrationTestBase
This is an automated email from the ASF dual-hosted git repository.
aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e80bb0bc96095305d04c7177c5aeb243bd892bba
Author: Aljoscha Krettek <al...@apache.org>
AuthorDate: Wed May 27 12:02:11 2020 +0200
[FLINK-13632] Remove TypeSerializerSnapshotMigrationTestBase
All tests have been ported to the new test base.
---
.../TypeSerializerSnapshotMigrationTestBase.java | 529 ---------------------
1 file changed, 529 deletions(-)
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
deleted file mode 100644
index 6aa3481..0000000
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshotMigrationTestBase.java
+++ /dev/null
@@ -1,529 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.typeutils;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.DataInputDeserializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputSerializer;
-import org.apache.flink.testutils.migration.MigrationVersion;
-import org.apache.flink.util.TestLogger;
-
-import org.hamcrest.Matcher;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.function.Supplier;
-
-import static org.apache.flink.api.common.typeutils.TypeSerializerMatchers.hasSameCompatibilityAs;
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.hamcrest.CoreMatchers.allOf;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.not;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-/**
- * A test base for verifying {@link TypeSerializerSnapshot} migration.
- *
- * @param <ElementT> the element being serialized.
- *
- * @deprecated please use the newer {@link TypeSerializerUpgradeTestBase}
- */
-@Deprecated
-public abstract class TypeSerializerSnapshotMigrationTestBase<ElementT> extends TestLogger {
-
- private final TestSpecification<ElementT> testSpecification;
-
- protected TypeSerializerSnapshotMigrationTestBase(TestSpecification<ElementT> testSpecification) {
- this.testSpecification = checkNotNull(testSpecification);
- }
-
- @Test
- public void serializerSnapshotIsSuccessfullyRead() {
- TypeSerializerSnapshot<ElementT> snapshot = snapshotUnderTest();
-
- assertThat(snapshot, allOf(
- notNullValue(),
- instanceOf(TypeSerializerSnapshot.class)
- ));
- }
-
- @Test
- public void specifiedNewSerializerHasExpectedCompatibilityResultsWithSnapshot() {
- TypeSerializerSnapshot<ElementT> snapshot = snapshotUnderTest();
-
- TypeSerializerSchemaCompatibility<ElementT> result = snapshot.resolveSchemaCompatibility(testSpecification.createSerializer());
-
- assertThat(result, testSpecification.schemaCompatibilityMatcher);
- }
-
- @Test
- public void restoredSerializerIsAbleToDeserializePreviousData() throws IOException {
- TypeSerializerSnapshot<ElementT> snapshot = snapshotUnderTest();
- TypeSerializer<ElementT> serializer = snapshot.restoreSerializer();
-
- assertSerializerIsAbleToReadOldData(serializer);
- }
-
- @Test
- public void reconfiguredSerializerIsAbleToDeserializePreviousData() throws IOException {
- TypeSerializerSnapshot<ElementT> snapshot = snapshotUnderTest();
- TypeSerializerSchemaCompatibility<ElementT> compatibility = snapshot.resolveSchemaCompatibility(testSpecification.createSerializer());
-
- if (!compatibility.isCompatibleWithReconfiguredSerializer()) {
- // this test only applies for reconfigured serializers.
- return;
- }
-
- TypeSerializer<ElementT> serializer = compatibility.getReconfiguredSerializer();
- assertSerializerIsAbleToReadOldData(serializer);
- }
-
- @SuppressWarnings("deprecation")
- @Test
- public void movingForward() throws IOException {
- TypeSerializerSnapshot<ElementT> previousSnapshot = snapshotUnderTest();
- TypeSerializer<ElementT> restoredSerializer = previousSnapshot.restoreSerializer();
-
- TypeSerializerSnapshot<ElementT> nextSnapshot = restoredSerializer.snapshotConfiguration();
- assertThat(nextSnapshot, instanceOf(testSpecification.snapshotClass));
-
- TypeSerializerSnapshot<ElementT> nextSnapshotDeserialized = writeAndThenReadTheSnapshot(restoredSerializer, nextSnapshot);
-
- assertThat(nextSnapshotDeserialized, allOf(
- notNullValue(),
- not(instanceOf(TypeSerializerConfigSnapshot.class))
- ));
- }
-
- @Test
- public void restoreSerializerFromNewSerializerSnapshotIsAbleToDeserializePreviousData() throws IOException {
- TypeSerializer<ElementT> newSerializer = testSpecification.createSerializer();
-
- TypeSerializerSchemaCompatibility<ElementT> compatibility =
- snapshotUnderTest().resolveSchemaCompatibility(newSerializer);
-
- final TypeSerializer<ElementT> nextSerializer;
- if (compatibility.isCompatibleWithReconfiguredSerializer()) {
- nextSerializer = compatibility.getReconfiguredSerializer();
- } else if (compatibility.isCompatibleAsIs()) {
- nextSerializer = newSerializer;
- } else {
- // this test does not apply.
- return;
- }
-
- TypeSerializerSnapshot<ElementT> nextSnapshot = nextSerializer.snapshotConfiguration();
-
- assertSerializerIsAbleToReadOldData(nextSnapshot.restoreSerializer());
- }
-
- // --------------------------------------------------------------------------------------------------------------
- // Test Helpers
- // --------------------------------------------------------------------------------------------------------------
-
- private TypeSerializerSnapshot<ElementT> writeAndThenReadTheSnapshot(
- TypeSerializer<ElementT> serializer,
- TypeSerializerSnapshot<ElementT> newSnapshot) throws IOException {
-
- DataOutputSerializer out = new DataOutputSerializer(128);
- TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(out, newSnapshot, serializer);
-
- DataInputView in = new DataInputDeserializer(out.wrapAsByteBuffer());
- return readSnapshot(in);
- }
-
- private TypeSerializerSnapshot<ElementT> snapshotUnderTest() {
- DataInputView input = contentsOf(testSpecification.getSnapshotDataLocation());
- try {
- if (!testSpecification.getTestMigrationVersion().isNewerVersionThan(MigrationVersion.v1_6)) {
- return readPre17SnapshotFormat(input);
- } else {
- return readSnapshot(input);
- }
- }
- catch (IOException e) {
- throw new RuntimeException("Unable to read " + testSpecification.getSnapshotDataLocation(), e);
- }
- }
-
- @SuppressWarnings({"unchecked", "deprecation"})
- private TypeSerializerSnapshot<ElementT> readPre17SnapshotFormat(DataInputView input) throws IOException {
- final ClassLoader cl = Thread.currentThread().getContextClassLoader();
-
- List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> serializers =
- TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(input, cl);
-
- return (TypeSerializerSnapshot<ElementT>) serializers.get(0).f1;
- }
-
- private TypeSerializerSnapshot<ElementT> readSnapshot(DataInputView in) throws IOException {
- return TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
- in, Thread.currentThread().getContextClassLoader(), null);
- }
-
- private DataInputView dataUnderTest() {
- return contentsOf(testSpecification.getTestDataLocation());
- }
-
- private void assertSerializerIsAbleToReadOldData(TypeSerializer<ElementT> serializer) throws IOException {
- DataInputView input = dataUnderTest();
-
- final Matcher<ElementT> matcher = testSpecification.testDataElementMatcher;
- for (int i = 0; i < testSpecification.testDataCount; i++) {
- final ElementT result = serializer.deserialize(input);
- assertThat(result, matcher);
- }
- }
-
- // --------------------------------------------------------------------------------------------------------------
- // Static Helpers
- // --------------------------------------------------------------------------------------------------------------
-
- private static DataInputView contentsOf(Path path) {
- try {
- byte[] bytes = Files.readAllBytes(path);
- return new DataInputDeserializer(bytes);
- }
- catch (IOException e) {
- throw new RuntimeException("Failed to read " + path, e);
- }
- }
-
- private static Path resourcePath(String resourceName) {
- checkNotNull(resourceName, "resource name can not be NULL");
- try {
- ClassLoader cl = Thread.currentThread().getContextClassLoader();
- URL resource = cl.getResource(resourceName);
- if (resource == null) {
- throw new IllegalArgumentException("unable locate test data " + resourceName);
- }
- return Paths.get(resource.toURI());
- }
- catch (URISyntaxException e) {
- throw new RuntimeException("unable", e);
- }
- }
-
- // --------------------------------------------------------------------------------------------------------------
- // Test Specification
- // --------------------------------------------------------------------------------------------------------------
-
- /**
- * Test Specification.
- */
- @SuppressWarnings("WeakerAccess")
- public static final class TestSpecification<T> {
- private final Class<? extends TypeSerializer<T>> serializerType;
- private final Class<? extends TypeSerializerSnapshot<T>> snapshotClass;
- private final String name;
- private final MigrationVersion testMigrationVersion;
- private Supplier<? extends TypeSerializer<T>> serializerProvider;
- private Matcher<TypeSerializerSchemaCompatibility<T>> schemaCompatibilityMatcher;
- private String snapshotDataLocation;
- private String testDataLocation;
- private int testDataCount;
-
- @SuppressWarnings("unchecked")
- private Matcher<T> testDataElementMatcher = (Matcher<T>) notNullValue();
-
- @SuppressWarnings("unchecked")
- public static <T> TestSpecification<T> builder(
- String name,
- Class<? extends TypeSerializer> serializerClass,
- Class<? extends TypeSerializerSnapshot> snapshotClass,
- MigrationVersion testMigrationVersion) {
-
- return new TestSpecification<>(
- name,
- (Class<? extends TypeSerializer<T>>) serializerClass,
- (Class<? extends TypeSerializerSnapshot<T>>) snapshotClass,
- testMigrationVersion);
- }
-
- private TestSpecification(
- String name,
- Class<? extends TypeSerializer<T>> serializerType,
- Class<? extends TypeSerializerSnapshot<T>> snapshotClass,
- MigrationVersion testMigrationVersion) {
-
- this.name = name;
- this.serializerType = serializerType;
- this.snapshotClass = snapshotClass;
- this.testMigrationVersion = testMigrationVersion;
- }
-
- public TestSpecification<T> withNewSerializerProvider(Supplier<? extends TypeSerializer<T>> serializerProvider) {
- return withNewSerializerProvider(serializerProvider, TypeSerializerSchemaCompatibility.compatibleAsIs());
- }
-
- public TestSpecification<T> withNewSerializerProvider(
- Supplier<? extends TypeSerializer<T>> serializerProvider,
- TypeSerializerSchemaCompatibility<T> expectedCompatibilityResult) {
- this.serializerProvider = serializerProvider;
- this.schemaCompatibilityMatcher = hasSameCompatibilityAs(expectedCompatibilityResult);
- return this;
- }
-
- public TestSpecification<T> withSchemaCompatibilityMatcher(
- Matcher<TypeSerializerSchemaCompatibility<T>> schemaCompatibilityMatcher) {
- this.schemaCompatibilityMatcher = schemaCompatibilityMatcher;
- return this;
- }
-
- public TestSpecification<T> withSnapshotDataLocation(String snapshotDataLocation) {
- this.snapshotDataLocation = snapshotDataLocation;
- return this;
- }
-
- public TestSpecification<T> withTestData(String testDataLocation, int testDataCount) {
- this.testDataLocation = testDataLocation;
- this.testDataCount = testDataCount;
- return this;
- }
-
- public TestSpecification<T> withTestDataMatcher(Matcher<T> matcher) {
- testDataElementMatcher = matcher;
- return this;
- }
-
- public TestSpecification<T> withTestDataCount(int expectedDataItmes) {
- this.testDataCount = expectedDataItmes;
- return this;
- }
-
- private TypeSerializer<T> createSerializer() {
- try {
- return (serializerProvider == null) ? serializerType.newInstance() : serializerProvider.get();
- }
- catch (InstantiationException | IllegalAccessException e) {
- throw new RuntimeException("serializer provider was not set, and creating the serializer reflectively failed.", e);
- }
- }
-
- private Path getTestDataLocation() {
- return resourcePath(this.testDataLocation);
- }
-
- private Path getSnapshotDataLocation() {
- return resourcePath(this.snapshotDataLocation);
- }
-
- private MigrationVersion getTestMigrationVersion() {
- return testMigrationVersion;
- }
-
- @Override
- public String toString() {
- return String.format("%s , %s, %s", name, serializerType.getSimpleName(), snapshotClass.getSimpleName());
- }
-
- }
-
- /**
- * Utility class to help build a collection of {@link TestSpecification} for
- * multiple test migration versions. For each test specification added,
- * an entry will be added for each specified migration version.
- */
- public static final class TestSpecifications {
-
- private static final int DEFAULT_TEST_DATA_COUNT = 10;
- private static final String DEFAULT_SNAPSHOT_FILENAME_FORMAT = "flink-%s-%s-snapshot";
- private static final String DEFAULT_TEST_DATA_FILENAME_FORMAT = "flink-%s-%s-data";
-
- private final Collection<TestSpecification<?>> testSpecifications = new LinkedList<>();
- private final MigrationVersion[] testVersions;
-
- public TestSpecifications(MigrationVersion... testVersions) {
- checkArgument(
- testVersions.length > 0,
- "At least one test migration version should be specified.");
- this.testVersions = testVersions;
- }
-
- /**
- * Adds a test specification to be tested for all specified test versions.
- *
- * <p>This method adds the specification with pre-defined snapshot and data filenames,
- * with the format "flink-<testVersion>-<specName>-<data/snapshot>",
- * and each specification's test data count is assumed to always be 10.
- *
- * @param name test specification name.
- * @param serializerClass class of the current serializer.
- * @param snapshotClass class of the current serializer snapshot class.
- * @param serializerProvider provider for an instance of the current serializer.
- *
- * @param <T> type of the test data.
- */
- public <T> void add(
- String name,
- Class<? extends TypeSerializer> serializerClass,
- Class<? extends TypeSerializerSnapshot> snapshotClass,
- Supplier<? extends TypeSerializer<T>> serializerProvider) {
- for (MigrationVersion testVersion : testVersions) {
- testSpecifications.add(
- TestSpecification.<T>builder(
- getSpecNameForVersion(name, testVersion),
- serializerClass,
- snapshotClass,
- testVersion)
- .withNewSerializerProvider(serializerProvider)
- .withSnapshotDataLocation(
- String.format(DEFAULT_SNAPSHOT_FILENAME_FORMAT, testVersion, name))
- .withTestData(
- String.format(DEFAULT_TEST_DATA_FILENAME_FORMAT, testVersion, name),
- DEFAULT_TEST_DATA_COUNT)
- );
- }
- }
-
- /**
- * Adds a test specification to be tested for all specified test versions.
- *
- * <p>This method adds the specification with pre-defined snapshot and data filenames,
- * with the format "flink-<testVersion>-<specName>-<data/snapshot>",
- * and each specification's test data count is assumed to always be 10.
- *
- * @param <T> type of the test data.
- */
- public <T> void addWithCompatibilityMatcher(
- String name,
- Class<? extends TypeSerializer> serializerClass,
- Class<? extends TypeSerializerSnapshot> snapshotClass,
- Supplier<? extends TypeSerializer<T>> serializerProvider,
- Matcher<TypeSerializerSchemaCompatibility<T>> schemaCompatibilityMatcher) {
- for (MigrationVersion testVersion : testVersions) {
- testSpecifications.add(
- TestSpecification.<T>builder(
- getSpecNameForVersion(name, testVersion),
- serializerClass,
- snapshotClass,
- testVersion)
- .withNewSerializerProvider(serializerProvider)
- .withSchemaCompatibilityMatcher(schemaCompatibilityMatcher)
- .withSnapshotDataLocation(
- String.format(DEFAULT_SNAPSHOT_FILENAME_FORMAT, testVersion, name))
- .withTestData(
- String.format(DEFAULT_TEST_DATA_FILENAME_FORMAT, testVersion, name),
- DEFAULT_TEST_DATA_COUNT)
- );
- }
- }
-
- /**
- * Adds a test specification to be tested for all specified test versions.
- *
- * <p>This method adds the specification with pre-defined snapshot and data filenames,
- * with the format "flink-<testVersion>-<specName>-<data/snapshot>",
- * and each specification's test data count is assumed to always be 10.
- *
- * @param name test specification name.
- * @param serializerClass class of the current serializer.
- * @param snapshotClass class of the current serializer snapshot class.
- * @param serializerProvider provider for an instance of the current serializer.
- * @param elementMatcher an {@code hamcrest} matcher to match test data.
- *
- * @param <T> type of the test data.
- */
- public <T> void add(
- String name,
- Class<? extends TypeSerializer> serializerClass,
- Class<? extends TypeSerializerSnapshot> snapshotClass,
- Supplier<? extends TypeSerializer<T>> serializerProvider,
- Matcher<T> elementMatcher) {
- for (MigrationVersion testVersion : testVersions) {
- testSpecifications.add(
- TestSpecification.<T>builder(
- getSpecNameForVersion(name, testVersion),
- serializerClass,
- snapshotClass,
- testVersion)
- .withNewSerializerProvider(serializerProvider)
- .withSnapshotDataLocation(
- String.format(DEFAULT_SNAPSHOT_FILENAME_FORMAT, testVersion, name))
- .withTestData(
- String.format(DEFAULT_TEST_DATA_FILENAME_FORMAT, testVersion, name),
- DEFAULT_TEST_DATA_COUNT)
- .withTestDataMatcher(elementMatcher)
- );
- }
- }
-
- /**
- * Adds a test specification to be tested for all specified test versions.
- *
- * @param name test specification name.
- * @param serializerClass class of the current serializer.
- * @param snapshotClass class of the current serializer snapshot class.
- * @param serializerProvider provider for an instance of the current serializer.
- * @param testSnapshotFilenameProvider provider for the filename of the test snapshot.
- * @param testDataFilenameProvider provider for the filename of the test data.
- * @param testDataCount expected number of records to be read in the test data files.
- *
- * @param <T> type of the test data.
- */
- public <T> void add(
- String name,
- Class<? extends TypeSerializer> serializerClass,
- Class<? extends TypeSerializerSnapshot> snapshotClass,
- Supplier<? extends TypeSerializer<T>> serializerProvider,
- TestResourceFilenameSupplier testSnapshotFilenameProvider,
- TestResourceFilenameSupplier testDataFilenameProvider,
- int testDataCount) {
- for (MigrationVersion testVersion : testVersions) {
- testSpecifications.add(
- TestSpecification.<T>builder(
- getSpecNameForVersion(name, testVersion),
- serializerClass,
- snapshotClass,
- testVersion)
- .withNewSerializerProvider(serializerProvider)
- .withSnapshotDataLocation(testSnapshotFilenameProvider.get(testVersion))
- .withTestData(testDataFilenameProvider.get(testVersion), testDataCount)
- );
- }
- }
-
- public Collection<TestSpecification<?>> get() {
- return Collections.unmodifiableCollection(testSpecifications);
- }
-
- private static String getSpecNameForVersion(String baseName, MigrationVersion testVersion) {
- return testVersion + "-" + baseName;
- }
- }
-
- /**
- * Supplier of paths based on {@link MigrationVersion}.
- */
- protected interface TestResourceFilenameSupplier {
- String get(MigrationVersion testVersion);
- }
-}