You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/06/13 05:17:21 UTC

[02/15] flink git commit: [hotfix] [scala] Fix instantiation of Scala serializers' config snapshot classes

[hotfix] [scala] Fix instantiation of Scala serializers' config snapshot classes

Prior to this commit, the configuration snapshot classes of Scala
serializers did not have the proper default empty constructor that is
used for deserializing the configuration snapshot.

Since some Scala serializers' config snapshots extend the Java
CompositeTypeSerializerConfigSnapshot, their config snapshot classes are
also changed to be implemented in Java since in Scala we can only call a
single base class constructor from subclasses.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6edb72d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6edb72d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6edb72d4

Branch: refs/heads/master
Commit: 6edb72d42f95978cdb426b0ca1d8d4c50a3c700a
Parents: 6abbba2
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Thu Jun 8 08:52:04 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jun 13 06:26:48 2017 +0200

----------------------------------------------------------------------
 .../ScalaOptionSerializerConfigSnapshot.java    | 47 +++++++++++++
 .../ScalaTrySerializerConfigSnapshot.java       | 50 ++++++++++++++
 .../TraversableSerializerConfigSnapshot.java    | 47 +++++++++++++
 .../scala/typeutils/EnumValueSerializer.scala   | 14 ++--
 .../api/scala/typeutils/OptionSerializer.scala  | 62 ++++++++++-------
 .../scala/typeutils/TraversableSerializer.scala | 25 ++-----
 .../api/scala/typeutils/TrySerializer.scala     | 72 +++++++++++---------
 7 files changed, 233 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6edb72d4/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
new file mode 100644
index 0000000..03eef12
--- /dev/null
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaOptionSerializerConfigSnapshot.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+
+/**
+ * A {@link TypeSerializerConfigSnapshot} for the Scala {@link OptionSerializer}.
+ *
+ * <p>This configuration snapshot class is implemented in Java because Scala does not
+ * allow calling different base class constructors from subclasses, while we need that
+ * for the default empty constructor.
+ */
+public final class ScalaOptionSerializerConfigSnapshot<E> extends CompositeTypeSerializerConfigSnapshot {
+
+	private static final int VERSION = 1;
+
+	/** This empty nullary constructor is required for deserializing the configuration. */
+	public ScalaOptionSerializerConfigSnapshot() {}
+
+	public ScalaOptionSerializerConfigSnapshot(TypeSerializer<E> elementSerializer) {
+		super(elementSerializer);
+	}
+
+	@Override
+	public int getVersion() {
+		return VERSION;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6edb72d4/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerConfigSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerConfigSnapshot.java
new file mode 100644
index 0000000..6abb3ea
--- /dev/null
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/ScalaTrySerializerConfigSnapshot.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+
+/**
+ * A {@link TypeSerializerConfigSnapshot} for the Scala {@link TrySerializer}.
+ *
+ * <p>This configuration snapshot class is implemented in Java because Scala does not
+ * allow calling different base class constructors from subclasses, while we need that
+ * for the default empty constructor.
+ */
+public class ScalaTrySerializerConfigSnapshot<E> extends CompositeTypeSerializerConfigSnapshot {
+
+	private static final int VERSION = 1;
+
+	/** This empty nullary constructor is required for deserializing the configuration. */
+	public ScalaTrySerializerConfigSnapshot() {}
+
+	public ScalaTrySerializerConfigSnapshot(
+			TypeSerializer<E> elementSerializer,
+			TypeSerializer<Throwable> throwableSerializer) {
+
+		super(elementSerializer, throwableSerializer);
+	}
+
+	@Override
+	public int getVersion() {
+		return VERSION;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6edb72d4/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerConfigSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerConfigSnapshot.java b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerConfigSnapshot.java
new file mode 100644
index 0000000..9a39421
--- /dev/null
+++ b/flink-scala/src/main/java/org/apache/flink/api/scala/typeutils/TraversableSerializerConfigSnapshot.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.typeutils;
+
+import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+
+/**
+ * A {@link TypeSerializerConfigSnapshot} for the Scala {@link TraversableSerializer}.
+ *
+ * <p>This configuration snapshot class is implemented in Java because Scala does not
+ * allow calling different base class constructors from subclasses, while we need that
+ * for the default empty constructor.
+ */
+public class TraversableSerializerConfigSnapshot<E> extends CompositeTypeSerializerConfigSnapshot {
+
+	private static final int VERSION = 1;
+
+	/** This empty nullary constructor is required for deserializing the configuration. */
+	public TraversableSerializerConfigSnapshot() {}
+
+	public TraversableSerializerConfigSnapshot(TypeSerializer<E> elementSerializer) {
+		super(elementSerializer);
+	}
+
+	@Override
+	public int getVersion() {
+		return VERSION;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6edb72d4/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
index 843079a..d549623 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.{CompatibilityResult, TypeSerialize
 import org.apache.flink.api.common.typeutils.base.IntSerializer
 import org.apache.flink.api.java.typeutils.runtime.{DataInputViewStream, DataOutputViewStream}
 import org.apache.flink.core.memory.{DataInputView, DataOutputView}
-import org.apache.flink.util.InstantiationUtil
+import org.apache.flink.util.{InstantiationUtil, Preconditions}
 
 /**
  * Serializer for [[Enumeration]] values.
@@ -111,13 +111,17 @@ class EnumValueSerializer[E <: Enumeration](val enum: E) extends TypeSerializer[
 
 object EnumValueSerializer {
 
-  class ScalaEnumSerializerConfigSnapshot[E <: Enumeration](private var enumClass: Class[E])
+  class ScalaEnumSerializerConfigSnapshot[E <: Enumeration]
       extends TypeSerializerConfigSnapshot {
 
-    var enumConstants: Array[E] = enumClass.getEnumConstants
+    var enumClass: Class[E] = _
+    var enumConstants: Array[E] = _
 
-    /** This empty nullary constructor is required for deserializing the configuration. */
-    def this() = this(null)
+    def this(enumClass: Class[E]) = {
+      this()
+      this.enumClass = Preconditions.checkNotNull(enumClass)
+      this.enumConstants = enumClass.getEnumConstants
+    }
 
     override def write(out: DataOutputView): Unit = {
       super.write(out)

http://git-wip-us.apache.org/repos/asf/flink/blob/6edb72d4/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
index 8adfb5c..810c91c 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala
@@ -100,46 +100,56 @@ class OptionSerializer[A](val elemSerializer: TypeSerializer[A])
   // Serializer configuration snapshotting & compatibility
   // --------------------------------------------------------------------------------------------
 
-  override def snapshotConfiguration(): OptionSerializer.OptionSerializerConfigSnapshot[A] = {
-    new OptionSerializer.OptionSerializerConfigSnapshot(elemSerializer)
+  override def snapshotConfiguration(): ScalaOptionSerializerConfigSnapshot[A] = {
+    new ScalaOptionSerializerConfigSnapshot[A](elemSerializer)
   }
 
   override def ensureCompatibility(
       configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[Option[A]] = {
+
     configSnapshot match {
       case optionSerializerConfigSnapshot
+          : ScalaOptionSerializerConfigSnapshot[A] =>
+        ensureCompatibility(optionSerializerConfigSnapshot)
+      case legacyOptionSerializerConfigSnapshot
           : OptionSerializer.OptionSerializerConfigSnapshot[A] =>
-        val compatResult = CompatibilityUtil.resolveCompatibilityResult(
-          optionSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f0,
-          classOf[UnloadableDummyTypeSerializer[_]],
-          optionSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f1,
-          elemSerializer)
-
-        if (compatResult.isRequiresMigration) {
-          if (compatResult.getConvertDeserializer != null) {
-            CompatibilityResult.requiresMigration(
-              new OptionSerializer[A](
-                new TypeDeserializerAdapter(compatResult.getConvertDeserializer)))
-          } else {
-            CompatibilityResult.requiresMigration()
-          }
-        } else {
-          CompatibilityResult.compatible()
-        }
-
+        ensureCompatibility(legacyOptionSerializerConfigSnapshot)
       case _ => CompatibilityResult.requiresMigration()
     }
   }
+
+  private def ensureCompatibility(
+      compositeConfigSnapshot: CompositeTypeSerializerConfigSnapshot)
+      : CompatibilityResult[Option[A]] = {
+
+    val compatResult = CompatibilityUtil.resolveCompatibilityResult(
+      compositeConfigSnapshot.getSingleNestedSerializerAndConfig.f0,
+      classOf[UnloadableDummyTypeSerializer[_]],
+      compositeConfigSnapshot.getSingleNestedSerializerAndConfig.f1,
+      elemSerializer)
+
+    if (compatResult.isRequiresMigration) {
+      if (compatResult.getConvertDeserializer != null) {
+        CompatibilityResult.requiresMigration(
+          new OptionSerializer[A](
+            new TypeDeserializerAdapter(compatResult.getConvertDeserializer)))
+      } else {
+        CompatibilityResult.requiresMigration()
+      }
+    } else {
+      CompatibilityResult.compatible()
+    }
+  }
 }
 
 object OptionSerializer {
 
-  class OptionSerializerConfigSnapshot[A](
-      private val elemSerializer: TypeSerializer[A])
-    extends CompositeTypeSerializerConfigSnapshot(elemSerializer) {
-
-    /** This empty nullary constructor is required for deserializing the configuration. */
-    def this() = this(null)
+  /**
+    * We need to keep this to be compatible with snapshots taken in Flink 1.3.0.
+    * Once Flink 1.3.x is no longer supported, this can be removed.
+    */
+  class OptionSerializerConfigSnapshot[A]()
+      extends CompositeTypeSerializerConfigSnapshot {
 
     override def getVersion: Int = OptionSerializerConfigSnapshot.VERSION
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/6edb72d4/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
index 6299a24..5963987 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala
@@ -151,16 +151,16 @@ abstract class TraversableSerializer[T <: TraversableOnce[E], E](
     obj.isInstanceOf[TraversableSerializer[_, _]]
   }
 
-  override def snapshotConfiguration(): TypeSerializerConfigSnapshot = {
-    new TraversableSerializer.TraversableSerializerConfigSnapshot[E](elementSerializer)
+  override def snapshotConfiguration(): TraversableSerializerConfigSnapshot[E] = {
+    new TraversableSerializerConfigSnapshot[E](elementSerializer)
   }
 
   override def ensureCompatibility(
       configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[T] = {
 
     configSnapshot match {
-      case traversableSerializerConfigSnapshot:
-          TraversableSerializer.TraversableSerializerConfigSnapshot[E] =>
+      case traversableSerializerConfigSnapshot
+          : TraversableSerializerConfigSnapshot[E] =>
 
         val elemCompatRes = CompatibilityUtil.resolveCompatibilityResult(
           traversableSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f0,
@@ -178,20 +178,3 @@ abstract class TraversableSerializer[T <: TraversableOnce[E], E](
     }
   }
 }
-
-object TraversableSerializer {
-
-  class TraversableSerializerConfigSnapshot[E](
-      private var elementSerializer: TypeSerializer[E])
-    extends CompositeTypeSerializerConfigSnapshot(elementSerializer) {
-
-    /** This empty nullary constructor is required for deserializing the configuration. */
-    def this() = this(null)
-
-    override def getVersion = TraversableSerializerConfigSnapshot.VERSION
-  }
-
-  object TraversableSerializerConfigSnapshot {
-    val VERSION = 1
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6edb72d4/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
index 641caa1..a88cce7 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala
@@ -103,51 +103,59 @@ class TrySerializer[A](
   // Serializer configuration snapshotting & compatibility
   // --------------------------------------------------------------------------------------------
 
-  override def snapshotConfiguration(): TypeSerializerConfigSnapshot = {
-    new TrySerializer.TrySerializerConfigSnapshot[A](elemSerializer, throwableSerializer)
+  override def snapshotConfiguration(): ScalaTrySerializerConfigSnapshot[A] = {
+    new ScalaTrySerializerConfigSnapshot[A](elemSerializer, throwableSerializer)
   }
 
   override def ensureCompatibility(
       configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[Try[A]] = {
 
     configSnapshot match {
-      case trySerializerConfigSnapshot: TrySerializer.TrySerializerConfigSnapshot[A] =>
-        val previousSerializersAndConfigs =
-          trySerializerConfigSnapshot.getNestedSerializersAndConfigs
-
-        val elemCompatRes = CompatibilityUtil.resolveCompatibilityResult(
-          previousSerializersAndConfigs.get(0).f0,
-          classOf[UnloadableDummyTypeSerializer[_]],
-          previousSerializersAndConfigs.get(0).f1,
-          elemSerializer)
-
-        val throwableCompatRes = CompatibilityUtil.resolveCompatibilityResult(
-          previousSerializersAndConfigs.get(1).f0,
-          classOf[UnloadableDummyTypeSerializer[_]],
-          previousSerializersAndConfigs.get(1).f1,
-          throwableSerializer)
-
-        if (elemCompatRes.isRequiresMigration || throwableCompatRes.isRequiresMigration) {
-          CompatibilityResult.requiresMigration()
-        } else {
-          CompatibilityResult.compatible()
-        }
-
+      case trySerializerConfigSnapshot
+          : ScalaTrySerializerConfigSnapshot[A] =>
+        ensureCompatibility(trySerializerConfigSnapshot)
+      case legacyTrySerializerConfigSnapshot
+          : TrySerializer.TrySerializerConfigSnapshot[A] =>
+        ensureCompatibility(legacyTrySerializerConfigSnapshot)
       case _ => CompatibilityResult.requiresMigration()
     }
   }
+
+  private def ensureCompatibility(
+      compositeConfigSnapshot: CompositeTypeSerializerConfigSnapshot)
+      : CompatibilityResult[Option[A]] = {
+
+    val previousSerializersAndConfigs =
+      compositeConfigSnapshot.getNestedSerializersAndConfigs
+
+    val elemCompatRes = CompatibilityUtil.resolveCompatibilityResult(
+      previousSerializersAndConfigs.get(0).f0,
+      classOf[UnloadableDummyTypeSerializer[_]],
+      previousSerializersAndConfigs.get(0).f1,
+      elemSerializer)
+
+    val throwableCompatRes = CompatibilityUtil.resolveCompatibilityResult(
+      previousSerializersAndConfigs.get(1).f0,
+      classOf[UnloadableDummyTypeSerializer[_]],
+      previousSerializersAndConfigs.get(1).f1,
+      throwableSerializer)
+
+    if (elemCompatRes.isRequiresMigration || throwableCompatRes.isRequiresMigration) {
+      CompatibilityResult.requiresMigration()
+    } else {
+      CompatibilityResult.compatible()
+    }
+  }
 }
 
 object TrySerializer {
 
-  class TrySerializerConfigSnapshot[A](
-      private var elemSerializer: TypeSerializer[A],
-      private var throwableSerializer: TypeSerializer[Throwable])
-    extends CompositeTypeSerializerConfigSnapshot(
-      elemSerializer, throwableSerializer) {
-
-    /** This empty nullary constructor is required for deserializing the configuration. */
-    def this() = this(null, null)
+  /**
+    * We need to keep this to be compatible with snapshots taken in Flink 1.3.0.
+    * Once Flink 1.3.x is no longer supported, this can be removed.
+    */
+  class TrySerializerConfigSnapshot[A]()
+      extends CompositeTypeSerializerConfigSnapshot() {
 
     override def getVersion: Int = TrySerializerConfigSnapshot.VERSION
   }