You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/01/27 17:09:49 UTC

[flink] branch master updated: [FLINK-11329][core] Migrate CRowSerializerConfigSnapshot to new TypeSerializerSnapshot interface

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d7ef63d  [FLINK-11329][core] Migrate CRowSerializerConfigSnapshot to new TypeSerializerSnapshot interface
d7ef63d is described below

commit d7ef63df15aa78cf84a004da62fb2ba1716688ba
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Jan 22 18:14:16 2019 +0100

    [FLINK-11329][core] Migrate CRowSerializerConfigSnapshot to new TypeSerializerSnapshot interface
---
 .../runtime/types/CRowSerializerSnapshot.java      | 64 ++++++++++++++++++++++
 .../flink/table/runtime/types/CRowSerializer.scala | 52 ++++++++----------
 2 files changed, 86 insertions(+), 30 deletions(-)

diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/types/CRowSerializerSnapshot.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/types/CRowSerializerSnapshot.java
new file mode 100644
index 0000000..4deea9b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/types/CRowSerializerSnapshot.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.types;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.Row;
+
+/**
+ * Snapshot class for {@link CRowSerializer}.
+ */
+public class CRowSerializerSnapshot extends CompositeTypeSerializerSnapshot<CRow, CRowSerializer> {
+
+	private static final int CURRENT_VERSION = 1;
+
+	/**
+	 * Constructor for read instantiation.
+	 */
+	public CRowSerializerSnapshot() {
+		super(CRowSerializer.class);
+	}
+
+	/**
+	 * Constructor to create the snapshot for writing.
+	 */
+	public CRowSerializerSnapshot(CRowSerializer serializerInstance) {
+		super(serializerInstance);
+	}
+
+	@Override
+	protected int getCurrentOuterSnapshotVersion() {
+		return CURRENT_VERSION;
+	}
+
+	@Override
+	protected TypeSerializer<?>[] getNestedSerializers(CRowSerializer outerSerializer) {
+		return new TypeSerializer[]{outerSerializer.rowSerializer()};
+	}
+
+	@Override
+	protected CRowSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+
+		@SuppressWarnings("unchecked")
+		TypeSerializer<Row> rowSerializer = (TypeSerializer<Row>) nestedSerializers[0];
+
+		return new CRowSerializer(rowSerializer);
+	}
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
index b3fe508..02e9160 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils._
 import org.apache.flink.core.memory.{DataInputView, DataOutputView}
 import org.apache.flink.types.Row
 
+@SerialVersionUID(2L)
 class CRowSerializer(val rowSerializer: TypeSerializer[Row]) extends TypeSerializer[CRow] {
 
   override def isImmutableType: Boolean = false
@@ -80,41 +81,22 @@ class CRowSerializer(val rowSerializer: TypeSerializer[Row]) extends TypeSeriali
   // Serializer configuration snapshotting & compatibility
   // --------------------------------------------------------------------------------------------
 
-  override def snapshotConfiguration(): TypeSerializerConfigSnapshot[CRow] = {
-    new CRowSerializer.CRowSerializerConfigSnapshot(Array(rowSerializer))
-  }
-
-  override def ensureCompatibility(
-      configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[CRow] = {
-
-    configSnapshot match {
-      case crowSerializerConfigSnapshot: CRowSerializer.CRowSerializerConfigSnapshot =>
-        val compatResult = CompatibilityUtil.resolveCompatibilityResult(
-          crowSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f0,
-          classOf[UnloadableDummyTypeSerializer[_]],
-          crowSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f1,
-          rowSerializer)
-
-        if (compatResult.isRequiresMigration) {
-          if (compatResult.getConvertDeserializer != null) {
-            CompatibilityResult.requiresMigration(
-              new CRowSerializer(
-                new TypeDeserializerAdapter(compatResult.getConvertDeserializer))
-            )
-          } else {
-            CompatibilityResult.requiresMigration()
-          }
-        } else {
-          CompatibilityResult.compatible()
-        }
-
-      case _ => CompatibilityResult.requiresMigration()
-    }
+  override def snapshotConfiguration(): TypeSerializerSnapshot[CRow] = {
+    new CRowSerializerSnapshot(this)
   }
 }
 
 object CRowSerializer {
 
+  /**
+    * [[CRowSerializer]] is not meant to be used for persisting state. In versions 1.6+ there
+    * were changes introduced that resulted in incompatibility in java serialization. Thus one
+    * cannot read state in 1.8+ from snapshot written with previous versions of Flink.
+    *
+    * Moreover this serializer is meant to be dropped once we migrate to the new planner
+    * implementation.
+    */
+  @deprecated
   class CRowSerializerConfigSnapshot(rowSerializers: Array[TypeSerializer[Row]])
     extends CompositeTypeSerializerConfigSnapshot[CRow](rowSerializers: _*) {
 
@@ -123,6 +105,16 @@ object CRowSerializer {
     }
 
     override def getVersion: Int = CRowSerializerConfigSnapshot.VERSION
+
+    override def resolveSchemaCompatibility(newSerializer: TypeSerializer[CRow])
+      : TypeSerializerSchemaCompatibility[CRow] = {
+
+      CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+        newSerializer,
+        new CRowSerializerSnapshot(),
+        getSingleNestedSerializerAndConfig.f1
+      )
+    }
   }
 
   object CRowSerializerConfigSnapshot {