You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/01/08 16:49:22 UTC

[flink] 09/12: [FLINK-11073] [core] Replace GenericArraySerializerConfigSnapshot with new GenericArraySerializerSnapshot

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e791b1a29b675af4f290be9b68ae7f03b2de43c5
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu Dec 6 19:07:16 2018 +0800

    [FLINK-11073] [core] Replace GenericArraySerializerConfigSnapshot with new GenericArraySerializerSnapshot
---
 .../typeutils/base/GenericArraySerializer.java     |  4 +-
 .../base/GenericArraySerializerConfigSnapshot.java | 20 +++---
 .../base/GenericArraySerializerSnapshot.java       | 81 ++++++++++++++++++++++
 ...mpositeTypeSerializerSnapshotMigrationTest.java |  4 +-
 4 files changed, 94 insertions(+), 15 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
index 55ba8ab..a4949fb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java
@@ -206,7 +206,7 @@ public final class GenericArraySerializer<C> extends TypeSerializer<C[]> {
 	// --------------------------------------------------------------------------------------------
 
 	@Override
-	public GenericArraySerializerConfigSnapshot<C> snapshotConfiguration() {
-		return new GenericArraySerializerConfigSnapshot<>(this);
+	public GenericArraySerializerSnapshot<C> snapshotConfiguration() {
+		return new GenericArraySerializerSnapshot<>(this);
 	}
 }
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
index b0aa241..8cbe76c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerConfigSnapshot.java
@@ -38,8 +38,12 @@ import static org.apache.flink.util.Preconditions.checkState;
  * Point-in-time configuration of a {@link GenericArraySerializer}.
  *
  * @param <C> The component type.
+ *
+ * @deprecated this is deprecated and no longer used by the {@link GenericArraySerializer}.
+ *             It has been replaced by {@link GenericArraySerializerSnapshot}.
  */
 @Internal
+@Deprecated
 public final class GenericArraySerializerConfigSnapshot<C> implements TypeSerializerSnapshot<C[]> {
 
 	private static final int CURRENT_VERSION = 2;
@@ -118,18 +122,12 @@ public final class GenericArraySerializerConfigSnapshot<C> implements TypeSerial
 
 	@Override
 	public TypeSerializerSchemaCompatibility<C[]> resolveSchemaCompatibility(TypeSerializer<C[]> newSerializer) {
-		checkState(componentClass != null && nestedSnapshot != null);
-
 		if (newSerializer instanceof GenericArraySerializer) {
-			GenericArraySerializer<C> serializer = (GenericArraySerializer<C>) newSerializer;
-			TypeSerializerSchemaCompatibility<C> compat = serializer.getComponentClass() == componentClass ?
-					TypeSerializerSchemaCompatibility.compatibleAsIs() :
-					TypeSerializerSchemaCompatibility.incompatible();
-
-			return nestedSnapshot.resolveCompatibilityWithNested(
-					compat, serializer.getComponentSerializer());
-		}
-		else {
+			// delegate to the new snapshot class
+			GenericArraySerializer<C> castedNewSerializer = (GenericArraySerializer<C>) newSerializer;
+			GenericArraySerializerSnapshot<C> newSnapshot = new GenericArraySerializerSnapshot<>(castedNewSerializer);
+			return newSnapshot.resolveSchemaCompatibility(castedNewSerializer);
+		} else {
 			return TypeSerializerSchemaCompatibility.incompatible();
 		}
 	}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
new file mode 100644
index 0000000..3f54dee
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializerSnapshot.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+
+/**
+ * Point-in-time configuration of a {@link GenericArraySerializer}.
+ *
+ * @param <C> The component type.
+ */
+public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerializerSnapshot<C[], GenericArraySerializer> {
+
+	private static final int CURRENT_VERSION = 1;
+
+	private Class<C> componentClass;
+
+	/**
+	 * Constructor to be used for read instantiation.
+	 */
+	public GenericArraySerializerSnapshot() {
+		super(GenericArraySerializer.class);
+	}
+
+	/**
+	 * Constructor to be used for writing the snapshot.
+	 */
+	public GenericArraySerializerSnapshot(GenericArraySerializer<C> genericArraySerializer) {
+		super(genericArraySerializer);
+		this.componentClass = genericArraySerializer.getComponentClass();
+	}
+
+	@Override
+	protected int getCurrentOuterSnapshotVersion() {
+		return CURRENT_VERSION;
+	}
+
+	@Override
+	protected void writeOuterSnapshot(DataOutputView out) throws IOException {
+		out.writeUTF(componentClass.getName());
+	}
+
+	@Override
+	protected void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
+		this.componentClass = InstantiationUtil.resolveClassByName(in, userCodeClassLoader);
+	}
+
+	@Override
+	protected GenericArraySerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+		@SuppressWarnings("unchecked")
+		TypeSerializer<C> componentSerializer = (TypeSerializer<C>) nestedSerializers[0];
+		return new GenericArraySerializer<>(componentClass, componentSerializer);
+	}
+
+	@Override
+	protected TypeSerializer<?>[] getNestedSerializers(GenericArraySerializer outerSerializer) {
+		return new TypeSerializer<?>[] { outerSerializer.getComponentSerializer() };
+	}
+}
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
index c7b002a..c6b49a4 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerSnapshotMigrationTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.api.common.typeutils;
 
 import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
-import org.apache.flink.api.common.typeutils.base.GenericArraySerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.GenericArraySerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
@@ -55,7 +55,7 @@ public class CompositeTypeSerializerSnapshotMigrationTest extends TypeSerializer
 
 		// GenericArray<String>
 
-		final TestSpecification<String[]> array = TestSpecification.<String[]>builder("1.6-generic-array", GenericArraySerializer.class, GenericArraySerializerConfigSnapshot.class)
+		final TestSpecification<String[]> array = TestSpecification.<String[]>builder("1.6-generic-array", GenericArraySerializer.class, GenericArraySerializerSnapshot.class)
 			.withSerializerProvider(() -> new GenericArraySerializer<>(String.class, StringSerializer.INSTANCE))
 			.withSnapshotDataLocation("flink-1.6-array-type-serializer-snapshot")
 			.withTestData("flink-1.6-array-type-serializer-data", 10);