You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tzulitai <gi...@git.apache.org> on 2017/05/18 07:10:22 UTC

[GitHub] flink pull request #3937: [FLINK-6482] [core] Add nested serializers to conf...

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/3937

    [FLINK-6482] [core] Add nested serializers to config snapshots of composite serializers

    This commit adds also the nested serializers themselves to the configuration snapshots of composite serializers. This opens up the opportunity to use the previous nested serializer as the convert deserializer in the case that a nested serializer in the new serializer determines that state migration is required.
    
    Take for example a Tuple serializer. If any one of the nested field serializers is not compatible, then the whole Tuple serializer will need to be migrated. Prior to this PR, this means that all serializers would need to be able to be either compatible or provide a convert deserializer. With this change, this relaxes this by opening up the opportunity to just use the previous serializer of fields as the convert deserializer.
    
    Only the composite serializers are affected by this change.
    
    This commit also consolidates all TypeSerializer-related serialization
    proxies into a single utility class. A few renamings are included, namely:
    1. `ClassNotFoundDummyTypeSerializer` --> `UnloadableDummyTypeSerializer`
    2. `TypeSerializerUtil` --> `TypeSerializerSerializationUtil` (because that's all its used for)
    3. `TypeSerializerSerializationProxy` --> functionality moved to a static method of `TypeSerializerSerializationUtil`
    4. `StateMigrationUtil` --> moved to flink-core and renamed to `CompatibilityUtil`.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-6482

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3937.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3937
    
----
commit a189999f799fe943f2297293a2f74707b9c4188d
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Date:   2017-05-18T06:51:33Z

    [FLINK-6482] [core] Add nested serializers to config snapshots of composite serializers
    
    This commit adds also the nested serializers themselves to the
    configuration snapshots of composite serializers. This opens up the
    oppurtunity to use the previous nested serializer as the convert
    deserializer in the case that a nested serializer in the new serializer
    determines that state migration is required.
    
    This commit also consolidate all TypeSerializer-related serialization
    proxies into a single utility class.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3937: [FLINK-6482] [core] Add nested serializers to conf...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3937#discussion_r117696993
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java ---
    @@ -73,13 +91,11 @@ public boolean equals(Object obj) {
     		}
     
     		return (obj.getClass().equals(getClass()))
    -				&& Arrays.equals(
    -					nestedSerializerConfigSnapshots,
    -					((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializerConfigSnapshots());
    +				&& nestedSerializersWithConfigs.equals(((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializersAndConfigs());
    --- End diff --
    
    This is not doing a deep equals on the array.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3937: [FLINK-6482] [core] Add nested serializers to conf...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3937#discussion_r117699760
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java ---
    @@ -73,13 +91,11 @@ public boolean equals(Object obj) {
     		}
     
     		return (obj.getClass().equals(getClass()))
    -				&& Arrays.equals(
    -					nestedSerializerConfigSnapshots,
    -					((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializerConfigSnapshots());
    +				&& nestedSerializersWithConfigs.equals(((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializersAndConfigs());
    --- End diff --
    
    I've changed `nestedSerializersWithConfigs` to a list, it's no longer an array. The `equals` implementation should suffice?  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3937: [FLINK-6482] [core] Add nested serializers to config snap...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/3937
  
    Overall, LGTM. +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3937: [FLINK-6482] [core] Add nested serializers to config snap...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/3937
  
    Hey,
    what's the status of this PR?
    I see the JIRA is marked as a blocker. When do you think is this PR ready to be merged?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3937: [FLINK-6482] [core] Add nested serializers to config snap...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/3937
  
    Thanks a lot for the quick review and merge!!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3937: [FLINK-6482] [core] Add nested serializers to config snap...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3937
  
    Thanks for the review Stefan!
    @rmetzger currently waiting for a Travis run before merging this: https://travis-ci.org/tzulitai/flink/builds/234805397


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3937: [FLINK-6482] [core] Add nested serializers to conf...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3937#discussion_r117710279
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java ---
    @@ -19,47 +19,65 @@
     package org.apache.flink.api.common.typeutils;
     
     import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.java.tuple.Tuple2;
     import org.apache.flink.core.memory.DataInputView;
     import org.apache.flink.core.memory.DataOutputView;
     import org.apache.flink.util.Preconditions;
     
     import java.io.IOException;
    -import java.util.Arrays;
    +import java.util.ArrayList;
    +import java.util.List;
     
     /**
      * A {@link TypeSerializerConfigSnapshot} for serializers that has multiple nested serializers.
    - * The configuration snapshot consists of the configuration snapshots of all nested serializers.
    + * The configuration snapshot consists of the configuration snapshots of all nested serializers, and
    + * also the nested serializers themselves.
    + *
    + * <p>Both the nested serializers and the configuration snapshots are written as configuration of
    + * composite serializers, so that on restore, the previous serializer may be used in case migration
    + * is required.
      */
     @Internal
     public abstract class CompositeTypeSerializerConfigSnapshot extends TypeSerializerConfigSnapshot {
     
    -	private TypeSerializerConfigSnapshot[] nestedSerializerConfigSnapshots;
    +	private List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nestedSerializersWithConfigs;
     
     	/** This empty nullary constructor is required for deserializing the configuration. */
     	public CompositeTypeSerializerConfigSnapshot() {}
     
    -	public CompositeTypeSerializerConfigSnapshot(TypeSerializerConfigSnapshot... nestedSerializerConfigSnapshots) {
    -		this.nestedSerializerConfigSnapshots = Preconditions.checkNotNull(nestedSerializerConfigSnapshots);
    +	public CompositeTypeSerializerConfigSnapshot(TypeSerializer<?>... nestedSerializers) {
    +		Preconditions.checkNotNull(nestedSerializers);
    +
    +		this.nestedSerializersWithConfigs = new ArrayList<>(nestedSerializers.length);
    +		TypeSerializerConfigSnapshot configSnapshot;
    --- End diff --
    
    I think this could go inside the loop.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3937: [FLINK-6482] [core] Add nested serializers to conf...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3937


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3937: [FLINK-6482] [core] Add nested serializers to config snap...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/3937
  
    Thank you for your response.
    This PR is the last item for the next release candidate. Would be good to get this in in the next few hours so that I can finally start the vote for the 1.3 release.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3937: [FLINK-6482] [core] Add nested serializers to conf...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3937#discussion_r117697041
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java ---
    @@ -73,13 +91,11 @@ public boolean equals(Object obj) {
     		}
     
     		return (obj.getClass().equals(getClass()))
    -				&& Arrays.equals(
    -					nestedSerializerConfigSnapshots,
    -					((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializerConfigSnapshots());
    +				&& nestedSerializersWithConfigs.equals(((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializersAndConfigs());
     	}
     
     	@Override
     	public int hashCode() {
    -		return Arrays.hashCode(nestedSerializerConfigSnapshots);
    +		return nestedSerializersWithConfigs.hashCode();
    --- End diff --
    
    This is also not giving a deep hashcode of the array.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3937: [FLINK-6482] [core] Add nested serializers to conf...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3937#discussion_r117700406
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java ---
    @@ -73,13 +91,11 @@ public boolean equals(Object obj) {
     		}
     
     		return (obj.getClass().equals(getClass()))
    -				&& Arrays.equals(
    -					nestedSerializerConfigSnapshots,
    -					((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializerConfigSnapshots());
    +				&& nestedSerializersWithConfigs.equals(((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializersAndConfigs());
     	}
     
     	@Override
     	public int hashCode() {
    -		return Arrays.hashCode(nestedSerializerConfigSnapshots);
    +		return nestedSerializersWithConfigs.hashCode();
    --- End diff --
    
    Same here; `nestedSerializersWithConfigs` is a `List` not an array, the `hashCode` implementation should suffice?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3937: [FLINK-6482] [core] Add nested serializers to config snap...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3937
  
    @rmetzger it is currently under review by @StefanRRichter. The conflicts are pretty trivial and can be resolved very quickly; I'm just leaving it as is because the review started already last week.
    I'm hoping to merge it by the end of Monday (today).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3937: [FLINK-6482] [core] Add nested serializers to conf...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3937#discussion_r117710523
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeTypeSerializerConfigSnapshot.java ---
    @@ -19,47 +19,65 @@
     package org.apache.flink.api.common.typeutils;
     
     import org.apache.flink.annotation.Internal;
    +import org.apache.flink.api.java.tuple.Tuple2;
     import org.apache.flink.core.memory.DataInputView;
     import org.apache.flink.core.memory.DataOutputView;
     import org.apache.flink.util.Preconditions;
     
     import java.io.IOException;
    -import java.util.Arrays;
    +import java.util.ArrayList;
    +import java.util.List;
     
     /**
      * A {@link TypeSerializerConfigSnapshot} for serializers that has multiple nested serializers.
    - * The configuration snapshot consists of the configuration snapshots of all nested serializers.
    + * The configuration snapshot consists of the configuration snapshots of all nested serializers, and
    + * also the nested serializers themselves.
    + *
    + * <p>Both the nested serializers and the configuration snapshots are written as configuration of
    + * composite serializers, so that on restore, the previous serializer may be used in case migration
    + * is required.
      */
     @Internal
     public abstract class CompositeTypeSerializerConfigSnapshot extends TypeSerializerConfigSnapshot {
     
    -	private TypeSerializerConfigSnapshot[] nestedSerializerConfigSnapshots;
    +	private List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nestedSerializersWithConfigs;
     
     	/** This empty nullary constructor is required for deserializing the configuration. */
     	public CompositeTypeSerializerConfigSnapshot() {}
     
    -	public CompositeTypeSerializerConfigSnapshot(TypeSerializerConfigSnapshot... nestedSerializerConfigSnapshots) {
    -		this.nestedSerializerConfigSnapshots = Preconditions.checkNotNull(nestedSerializerConfigSnapshots);
    +	public CompositeTypeSerializerConfigSnapshot(TypeSerializer<?>... nestedSerializers) {
    +		Preconditions.checkNotNull(nestedSerializers);
    +
    +		this.nestedSerializersWithConfigs = new ArrayList<>(nestedSerializers.length);
    +		TypeSerializerConfigSnapshot configSnapshot;
    +		for (TypeSerializer<?> nestedSerializer : nestedSerializers) {
    +			configSnapshot = nestedSerializer.snapshotConfiguration();
    +			this.nestedSerializersWithConfigs.add(
    +				new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
    +					nestedSerializer.duplicate(),
    +					Preconditions.checkNotNull(configSnapshot)));
    +		}
     	}
     
     	@Override
     	public void write(DataOutputView out) throws IOException {
     		super.write(out);
    -		TypeSerializerUtil.writeSerializerConfigSnapshots(out, nestedSerializerConfigSnapshots);
    +		TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(out, nestedSerializersWithConfigs);
     	}
     
     	@Override
     	public void read(DataInputView in) throws IOException {
     		super.read(in);
    -		nestedSerializerConfigSnapshots = TypeSerializerUtil.readSerializerConfigSnapshots(in, getUserCodeClassLoader());
    +		this.nestedSerializersWithConfigs =
    +			TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, getUserCodeClassLoader());
     	}
     
    -	public TypeSerializerConfigSnapshot[] getNestedSerializerConfigSnapshots() {
    -		return nestedSerializerConfigSnapshots;
    +	public List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> getNestedSerializersAndConfigs() {
    +		return nestedSerializersWithConfigs;
     	}
     
    -	public TypeSerializerConfigSnapshot getSingleNestedSerializerConfigSnapshot() {
    -		return nestedSerializerConfigSnapshots[0];
    +	public Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> getSingleNestedSerializerAndConfig() {
    +		return nestedSerializersWithConfigs.get(0);
    --- End diff --
    
    Method name and variable name are not aligned.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---