You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/01/27 17:09:49 UTC
[flink] branch master updated: [FLINK-11329][core] Migrate
CRowSerializerConfigSnapshot to new TypeSerializerSnapshot interface
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d7ef63d [FLINK-11329][core] Migrate CRowSerializerConfigSnapshot to new TypeSerializerSnapshot interface
d7ef63d is described below
commit d7ef63df15aa78cf84a004da62fb2ba1716688ba
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Jan 22 18:14:16 2019 +0100
[FLINK-11329][core] Migrate CRowSerializerConfigSnapshot to new TypeSerializerSnapshot interface
---
.../runtime/types/CRowSerializerSnapshot.java | 64 ++++++++++++++++++++++
.../flink/table/runtime/types/CRowSerializer.scala | 52 ++++++++----------
2 files changed, 86 insertions(+), 30 deletions(-)
diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/types/CRowSerializerSnapshot.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/types/CRowSerializerSnapshot.java
new file mode 100644
index 0000000..4deea9b
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/types/CRowSerializerSnapshot.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.types;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.types.Row;
+
+/**
+ * Snapshot class for {@link CRowSerializer}.
+ */
+public class CRowSerializerSnapshot extends CompositeTypeSerializerSnapshot<CRow, CRowSerializer> {
+
+ private static final int CURRENT_VERSION = 1;
+
+ /**
+ * Constructor for read instantiation.
+ */
+ public CRowSerializerSnapshot() {
+ super(CRowSerializer.class);
+ }
+
+ /**
+ * Constructor to create the snapshot for writing.
+ */
+ public CRowSerializerSnapshot(CRowSerializer serializerInstance) {
+ super(serializerInstance);
+ }
+
+ @Override
+ protected int getCurrentOuterSnapshotVersion() {
+ return CURRENT_VERSION;
+ }
+
+ @Override
+ protected TypeSerializer<?>[] getNestedSerializers(CRowSerializer outerSerializer) {
+ return new TypeSerializer[]{outerSerializer.rowSerializer()};
+ }
+
+ @Override
+ protected CRowSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
+
+ @SuppressWarnings("unchecked")
+ TypeSerializer<Row> rowSerializer = (TypeSerializer<Row>) nestedSerializers[0];
+
+ return new CRowSerializer(rowSerializer);
+ }
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
index b3fe508..02e9160 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils._
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flink.types.Row
+@SerialVersionUID(2L)
class CRowSerializer(val rowSerializer: TypeSerializer[Row]) extends TypeSerializer[CRow] {
override def isImmutableType: Boolean = false
@@ -80,41 +81,22 @@ class CRowSerializer(val rowSerializer: TypeSerializer[Row]) extends TypeSeriali
// Serializer configuration snapshotting & compatibility
// --------------------------------------------------------------------------------------------
- override def snapshotConfiguration(): TypeSerializerConfigSnapshot[CRow] = {
- new CRowSerializer.CRowSerializerConfigSnapshot(Array(rowSerializer))
- }
-
- override def ensureCompatibility(
- configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[CRow] = {
-
- configSnapshot match {
- case crowSerializerConfigSnapshot: CRowSerializer.CRowSerializerConfigSnapshot =>
- val compatResult = CompatibilityUtil.resolveCompatibilityResult(
- crowSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f0,
- classOf[UnloadableDummyTypeSerializer[_]],
- crowSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f1,
- rowSerializer)
-
- if (compatResult.isRequiresMigration) {
- if (compatResult.getConvertDeserializer != null) {
- CompatibilityResult.requiresMigration(
- new CRowSerializer(
- new TypeDeserializerAdapter(compatResult.getConvertDeserializer))
- )
- } else {
- CompatibilityResult.requiresMigration()
- }
- } else {
- CompatibilityResult.compatible()
- }
-
- case _ => CompatibilityResult.requiresMigration()
- }
+ override def snapshotConfiguration(): TypeSerializerSnapshot[CRow] = {
+ new CRowSerializerSnapshot(this)
}
}
object CRowSerializer {
+ /**
+ * [[CRowSerializer]] is not meant to be used for persisting state. In versions 1.6+ there
+ * were changes introduced that resulted in incompatibility in java serialization. Thus one
+ * cannot read state in 1.8+ from snapshot written with previous versions of Flink.
+ *
+ * Moreover this serializer is meant to be dropped once we migrate to the new planner
+ * implementation.
+ */
+ @deprecated
class CRowSerializerConfigSnapshot(rowSerializers: Array[TypeSerializer[Row]])
extends CompositeTypeSerializerConfigSnapshot[CRow](rowSerializers: _*) {
@@ -123,6 +105,16 @@ object CRowSerializer {
}
override def getVersion: Int = CRowSerializerConfigSnapshot.VERSION
+
+ override def resolveSchemaCompatibility(newSerializer: TypeSerializer[CRow])
+ : TypeSerializerSchemaCompatibility[CRow] = {
+
+ CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
+ newSerializer,
+ new CRowSerializerSnapshot(),
+ getSingleNestedSerializerAndConfig.f1
+ )
+ }
}
object CRowSerializerConfigSnapshot {