You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/08 16:49:22 UTC
[flink] 09/12: [FLINK-11073] [core] Replace
GenericArraySerializerConfigSnapshot with new
GenericArraySerializerSnapshot
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e791b1a29b675af4f290be9b68ae7f03b2de43c5
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu Dec 6 19:07:16 2018 +0800
[FLINK-11073] [core] Replace GenericArraySerializerConfigSnapshot with new GenericArraySerializerSnapshot
---
.../typeutils/base/GenericArraySerializer.java | 4 +-
.../base/GenericArraySerializerConfigSnapshot.java | 20 +++---
.../base/GenericArraySerializerSnapshot.java | 81 ++++++++++++++++++++++
...mpositeTypeSerializerSnapshotMigrationTest.java | 4 +-
4 files changed, 94 insertions(+), 15 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index 55ba8ab..a4949fb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -206,7 +206,7 @@ public final class GenericArraySerializer<C> extends TypeSerializer<C[]> {
// --------------------------------------------------------------------------------------------
@Override
- public GenericArraySerializerConfigSnapshot<C> snapshotConfiguration() {
- return new GenericArraySerializerConfigSnapshot<>(this);
+ public GenericArraySerializerSnapshot<C> snapshotConfiguration() {
+ return new GenericArraySerializerSnapshot<>(this);
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
index b0aa241..8cbe76c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
@@ -38,8 +38,12 @@ import static org.apache.flink.util.Preconditions.checkState;
* Point-in-time configuration of a {@link GenericArraySerializer}.
*
* @param <C> The component type.
+ *
+ * @deprecated this is deprecated and no longer used by the {@link GenericArraySerializer}.
+ * It has been replaced by {@link GenericArraySerializerSnapshot}.
*/
@Internal
+@Deprecated
public final class GenericArraySerializerConfigSnapshot<C> implements TypeSerializerSnapshot<C[]> {
private static final int CURRENT_VERSION = 2;
@@ -118,18 +122,12 @@ public final class GenericArraySerializerConfigSnapshot<C> implements TypeSerial
@Override
public TypeSerializerSchemaCompatibility<C[]> resolveSchemaCompatibility(TypeSerializer<C[]> newSerializer) {
- checkState(componentClass != null && nestedSnapshot != null);
-
if (newSerializer instanceof GenericArraySerializer) {
- GenericArraySerializer<C> serializer = (GenericArraySerializer<C>) newSerializer;
- TypeSerializerSchemaCompatibility<C> compat = serializer.getComponentClass() == componentClass ?
- TypeSerializerSchemaCompatibility.compatibleAsIs() :
- TypeSerializerSchemaCompatibility.incompatible();
-
- return nestedSnapshot.resolveCompatibilityWithNested(
- compat, serializer.getComponentSerializer());
- }
- else {
+ // delegate to the new snapshot class
+ GenericArraySerializer<C> castedNewSerializer = (GenericArraySerializer<C>) newSerializer;
+ GenericArraySerializerSnapshot<C> newSnapshot = new GenericArraySerializerSnapshot<>(castedNewSerializer);
+ return newSnapshot.resolveSchemaCompatibility(castedNewSerializer);
+ } else {
return TypeSerializerSchemaCompatibility.incompatible();
}
}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
new file mode 100644
index 0000000..3f54dee
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+
+/**
+ * Point-in-time configuration of a {@link GenericArraySerializer}.
+ *
+ * @param <C> The component type.
+ */
+public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerializerSnapshot<C[], GenericArraySerializer> {
+
+ private static final int CURRENT_VERSION = 1;
+
+ private Class<C> componentClass;
+
+ /**
+ * Constructor to be used for read instantiation.
+ */
+ public GenericArraySerializerSnapshot() {
+ super(GenericArraySerializer.class);
+ }
+
+ /**
+ * Constructor to be used for writing the snapshot.
+ */
+ public GenericArraySerializerSnapshot(GenericArraySerializer<C> genericArraySerializer) {
+ super(genericArraySerializer);
+ this.componentClass = genericArraySerializer.getComponentClass();
+ }
+
+ @Override
+ protected int getCurrentOuterSnapshotVersion() {
+ return CURRENT_VERSION;
+ }
+
+ @Override
+ protected void writeOuterSnapshot(DataOutputView out) throws IOException {
+ out.writeUTF(componentClass.getName());
+ }
+
+ @Override
+ protected void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+ this.componentClass = InstantiationUtil.resolveClassByName(in, userCodeClassLoader);
+ }
+
+ @Override
+ protected GenericArraySerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+ @SuppressWarnings("unchecked")
+ TypeSerializer<C> componentSerializer = (TypeSerializer<C>) nestedSerializers[0];
+ return new GenericArraySerializer<>(componentClass, componentSerializer);
+ }
+
+ @Override
+ protected TypeSerializer<?>[] getNestedSerializers(GenericArraySerializer outerSerializer) {
+ return new TypeSerializer<?>[] { outerSerializer.getComponentSerializer() };
+ }
+}
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
index c7b002a..c6b49a4 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
@@ -19,7 +19,7 @@
package org.apache.flink.api.common.typeutils;
import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
-import org.apache.flink.api.common.typeutils.base.GenericArraySerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.GenericArraySerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
@@ -55,7 +55,7 @@ public class CompositeTypeSerializerSnapshotMigrationTest extends TypeSerializer
// GenericArray<String>
- final TestSpecification<String[]> array = TestSpecification.<String[]>builder("1.6-generic-array", GenericArraySerializer.class, GenericArraySerializerConfigSnapshot.class)
+ final TestSpecification<String[]> array = TestSpecification.<String[]>builder("1.6-generic-array", GenericArraySerializer.class, GenericArraySerializerSnapshot.class)
.withSerializerProvider(() -> new GenericArraySerializer<>(String.class, StringSerializer.INSTANCE))
.withSnapshotDataLocation("flink-1.6-array-type-serializer-snapshot")
.withTestData("flink-1.6-array-type-serializer-data", 10);