You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by twalthr <gi...@git.apache.org> on 2018/02/23 09:17:51 UTC

[GitHub] flink pull request #5567: [FLINK-8451] [serializers] Make Scala tuple serial...

GitHub user twalthr opened a pull request:

    https://github.com/apache/flink/pull/5567

    [FLINK-8451] [serializers] Make Scala tuple serializer deserialization more failure tolerant

    ## What is the purpose of the change
    
    Scala tuples (and thus also case classes) classes are serialized together with their serializers. The `serialVersionUID` changed from Scala 2.10 to 2.11. We need to ignore those changes when reading a serializer config snapshot.
    
    
    ## Brief change log
    
    - Make deserialization more failure tolerant
    
    ## Verifying this change
    
    - See `org.apache.flink.api.scala.runtime.TupleSerializerCompatibilityTest`
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): no
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
      - The serializers: yes
      - The runtime per-record code paths (performance sensitive): no
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
      - The S3 file system connector: no
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / no)
      - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/twalthr/flink FLINK-8451

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5567.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5567
    
----
commit 3855837fe3d79c473c59ad892daabbb45f711c47
Author: Timo Walther <tw...@...>
Date:   2018-02-22T16:22:54Z

    [FLINK-8451] [serializers] Make Scala tuple serializer deserialization more failure tolerant

----


---

[GitHub] flink pull request #5567: [FLINK-8451] [serializers] Make Scala tuple serial...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5567#discussion_r171212166
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java ---
    @@ -113,18 +113,61 @@ public ClassLoaderObjectInputStream(InputStream in, ClassLoader classLoader) thr
     	 *
     	 * <p>This can be removed once 1.2 is no longer supported.
     	 */
    -	private static Set<String> scalaSerializerClassnames = new HashSet<>();
    +	private final static Set<String> scalaSerializerClassnames = new HashSet<>();
     	static {
     		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TraversableSerializer");
     		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.CaseClassSerializer");
     		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer");
     		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer");
     		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer");
     		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer");
    -		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer");
    --- End diff --
    
    Why is this removed?


---

[GitHub] flink issue #5567: [FLINK-8451] [serializers] Make Scala tuple serializer de...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5567
  
    A few side notes:
    1) Maybe we should have written just the arity of tuples in the config snapshots. That would have avoided this issue.
    2) Is there any potential cases like this one, for other Scala types beside Tuples?


---

[GitHub] flink pull request #5567: [FLINK-8451] [serializers] Make Scala tuple serial...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5567#discussion_r171214451
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java ---
    @@ -113,18 +113,61 @@ public ClassLoaderObjectInputStream(InputStream in, ClassLoader classLoader) thr
     	 *
     	 * <p>This can be removed once 1.2 is no longer supported.
     	 */
    -	private static Set<String> scalaSerializerClassnames = new HashSet<>();
    +	private final static Set<String> scalaSerializerClassnames = new HashSet<>();
     	static {
     		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TraversableSerializer");
     		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.CaseClassSerializer");
     		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer");
     		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer");
     		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer");
     		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer");
    -		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer");
    --- End diff --
    
    nvm, @twalthr pointed out that it was in there twice


---

[GitHub] flink issue #5567: [FLINK-8451] [serializers] Make Scala tuple serializer de...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/5567
  
    @tzulitai exactly, I think tuples/case classes are just a special case. @aljoscha are we going to merge this now? (given the checkstyle error is fixed)


---

[GitHub] flink pull request #5567: [FLINK-8451] [serializers] Make Scala tuple serial...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5567


---

[GitHub] flink issue #5567: [FLINK-8451] [serializers] Make Scala tuple serializer de...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/5567
  
    @tzulitai I think just writing out the arity is not enough. It really depends on the type of tuple. Maybe it would have been better to write out only the name of the class. But since we support tuple POJOs this might be a little bit risky. The tuple serializer serializes general subclasses for both Java and Scala and case classes.
    
    I tested case classes, tuples, `Either` and `Unit`. What do you think we should test as well? The other serializers do not include classes that need to be snapshotted.


---

[GitHub] flink issue #5567: [FLINK-8451] [serializers] Make Scala tuple serializer de...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5567
  
    @twalthr I see. Then for other Scala types whose serializer config snapshot does not require writing classes, we probably don't need to include them in the `FailureTolerantObjectInputStream` then.


---

[GitHub] flink pull request #5567: [FLINK-8451] [serializers] Make Scala tuple serial...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5567#discussion_r170231102
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java ---
    @@ -121,10 +121,53 @@ public ClassLoaderObjectInputStream(InputStream in, ClassLoader classLoader) thr
     		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer");
     		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer");
     		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer");
    -		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer");
     		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.UnitSerializer");
     	}
     
    +	/**
    +	 * The serialVersionUID might change between Scala versions and since those classes are
    +	 * part of the tuple serializer config snapshots we need to ignore them.
    +	 *
    +	 * See also FLINK-8451.
    +	 */
    +	private static Set<String> scalaTypes = new HashSet<>();
    +	static {
    +		scalaTypes.add("scala.Tuple1");
    +		scalaTypes.add("scala.Tuple2");
    +		scalaTypes.add("scala.Tuple3");
    +		scalaTypes.add("scala.Tuple4");
    +		scalaTypes.add("scala.Tuple5");
    +		scalaTypes.add("scala.Tuple6");
    +		scalaTypes.add("scala.Tuple7");
    +		scalaTypes.add("scala.Tuple8");
    +		scalaTypes.add("scala.Tuple9");
    +		scalaTypes.add("scala.Tuple10");
    +		scalaTypes.add("scala.Tuple11");
    +		scalaTypes.add("scala.Tuple12");
    +		scalaTypes.add("scala.Tuple13");
    +		scalaTypes.add("scala.Tuple14");
    +		scalaTypes.add("scala.Tuple15");
    +		scalaTypes.add("scala.Tuple16");
    +		scalaTypes.add("scala.Tuple17");
    +		scalaTypes.add("scala.Tuple18");
    +		scalaTypes.add("scala.Tuple19");
    +		scalaTypes.add("scala.Tuple20");
    +		scalaTypes.add("scala.Tuple21");
    +		scalaTypes.add("scala.Tuple22");
    +		scalaTypes.add("scala.Tuple1$mcJ$sp");
    +		scalaTypes.add("scala.Tuple1$mcI$sp");
    --- End diff --
    
    Could you briefly explain what these are?


---

[GitHub] flink issue #5567: [FLINK-8451] [serializers] Make Scala tuple serializer de...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/5567
  
    CC @tzulitai @aljoscha 


---

[GitHub] flink issue #5567: [FLINK-8451] [serializers] Make Scala tuple serializer de...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5567
  
    +1 to what Gordon suggested.
    
    Let's move towards avoiding writing classes alltogether in any config snapshot. Tuple arity is the right thing for tuples, plus ideally a format version number.


---

[GitHub] flink pull request #5567: [FLINK-8451] [serializers] Make Scala tuple serial...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5567#discussion_r170247807
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java ---
    @@ -121,10 +121,53 @@ public ClassLoaderObjectInputStream(InputStream in, ClassLoader classLoader) thr
     		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer");
     		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer");
     		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer");
    -		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer");
     		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.UnitSerializer");
     	}
     
    +	/**
    +	 * The serialVersionUID might change between Scala versions and since those classes are
    +	 * part of the tuple serializer config snapshots we need to ignore them.
    +	 *
    +	 * See also FLINK-8451.
    +	 */
    +	private static Set<String> scalaTypes = new HashSet<>();
    +	static {
    +		scalaTypes.add("scala.Tuple1");
    +		scalaTypes.add("scala.Tuple2");
    +		scalaTypes.add("scala.Tuple3");
    +		scalaTypes.add("scala.Tuple4");
    +		scalaTypes.add("scala.Tuple5");
    +		scalaTypes.add("scala.Tuple6");
    +		scalaTypes.add("scala.Tuple7");
    +		scalaTypes.add("scala.Tuple8");
    +		scalaTypes.add("scala.Tuple9");
    +		scalaTypes.add("scala.Tuple10");
    +		scalaTypes.add("scala.Tuple11");
    +		scalaTypes.add("scala.Tuple12");
    +		scalaTypes.add("scala.Tuple13");
    +		scalaTypes.add("scala.Tuple14");
    +		scalaTypes.add("scala.Tuple15");
    +		scalaTypes.add("scala.Tuple16");
    +		scalaTypes.add("scala.Tuple17");
    +		scalaTypes.add("scala.Tuple18");
    +		scalaTypes.add("scala.Tuple19");
    +		scalaTypes.add("scala.Tuple20");
    +		scalaTypes.add("scala.Tuple21");
    +		scalaTypes.add("scala.Tuple22");
    +		scalaTypes.add("scala.Tuple1$mcJ$sp");
    +		scalaTypes.add("scala.Tuple1$mcI$sp");
    --- End diff --
    
    These anonymous classes are used for type specific tuples. Like `II` for a `(Int, Int)`. I don't know exactly when they are inserted by the compiler but I added them just to be sure.


---

[GitHub] flink pull request #5567: [FLINK-8451] [serializers] Make Scala tuple serial...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5567#discussion_r170215340
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java ---
    @@ -121,10 +121,53 @@ public ClassLoaderObjectInputStream(InputStream in, ClassLoader classLoader) thr
     		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer");
     		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer");
     		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer");
    -		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer");
     		scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.UnitSerializer");
     	}
     
    +	/**
    +	 * The serialVersionUID might change between Scala versions and since those classes are
    +	 * part of the tuple serializer config snapshots we need to ignore them.
    +	 *
    +	 * See also FLINK-8451.
    +	 */
    +	private static Set<String> scalaTypes = new HashSet<>();
    --- End diff --
    
    make final?


---

[GitHub] flink issue #5567: [FLINK-8451] [serializers] Make Scala tuple serializer de...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/5567
  
    @StephanEwen a tuple arity would be enough if the tuple serializer would only serialize tuples but it also serializes case classes and POJOs. Wouldn't be the fully qualified name better? But yes writing classes should be avoided.


---