You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tzulitai <gi...@git.apache.org> on 2018/01/25 13:53:15 UTC

[GitHub] flink pull request #5362: [FLINK-8421] [DataStream] Make timer serializers r...

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/5362

    [FLINK-8421] [DataStream] Make timer serializers reconfigurable on restore

    ## What is the purpose of the change
    
    Previously, key and namespace serializers of the `HeapInternalTimerService` were not reconfigured on restore.
    
    In Flink 1.4.0, we removed Avro dependency, and on restore if the Avro dependency is not present, a `DummyAvroKryoSerializerClass` was registered to Kryo as a placeholder, which altered the base Kryo registrations in the `KryoSerializer`. This change required a serializer reconfiguration in order for restores to work. Effectively, this allowed the issue in the `HeapInternalTimerService` to surface.
    
    This PR fixes this by writing also the `TypeSerializerConfigSnapshot`s of the key and namespace serializer of the `HeapInternalTimerService` into savepoints, and use them to reconfigure new serializers on restore.
    Since this would change the binary format of the written timer services, this PR also uses this opportunity to properly make the format versioned.
    
    More details of the change is explained below.
    
    ## Brief change log
    
    - 1bc3cd0: A preliminary migration test that took a savepoint of a `WindowOperator` with keys that required serialization using the `KryoSerializer`. Savepoint were taken for Flink versions 1.2 and 1.3. Restoring from this savepoint in Flink 1.4 fails, and requires the following commits to pass.
    
    - b9a1695: Always use the `FailureTolerantObjectInputStream` to read objects in the `InstantiationUtil.deserializeObject(...)` methods. That special stream avoids restore failures with `ClassNotFoundException` if Avro is not present, but there were leaks where during the restore process, that special input stream was not used.
    
    - ff2e6b7 and 8bd955d: Introduced `ByteArrayPrependedInputStream` and `PostVersionedIOReadableWritable`. These are utility classes that were required to migrate the serialization format of the timer services from non-versioned to versioned.
    
    - bcdc1f1: The main change, which adds key / namespace serializer config snapshots and use them for serializer reconfiguration on restore. This commit also makes the format versioned.
    
    ## Verifying this change
    
    - The migration test added in 1bc3cd0 will not pass without all fixes.
    - Unit tests are added for the new `ByteArrayPrependedInputStream` and `PostVersionedIOReadableWritable` classes.
    - The `testSnapshotAndRestore` and `testSnapshotAndRebalancedRestore` tests in `HeapInternalTimerServiceTest` are adapted to test both versioned and previous non-versioned formats.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**YES** / no / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / **no**)
      - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-8421

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5362.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5362
    
----
commit 1bc3cd0214d2d17f19d76a9aa094429730b5ba13
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2018-01-24T16:08:13Z

    [FLINK-8421] [DataStream, tests] Add WindowOperator migration test for Kryo-serialized window keys

commit b9a169535a91d5678ae916d8e54b7e60724a7486
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2018-01-24T16:15:08Z

    [FLINK-8421] [core] Let InstantiationUtil.deserializeObject() always use FailureTolerantObjectInputStream

commit ff2e6b75f39f0d474ecca451ac1a47c0183e9a6f
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2018-01-24T17:07:14Z

    [FLINK-8421] [core] Introduce ByteArrayPrependedInputStream

commit 8bd955d701f9f9278a5e52befea4308f42a60b45
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2018-01-24T17:08:52Z

    [FLINK-8421] [core] Introduce PostVersionedIOReadableWritable

commit bcdc1f14d29ef272d07c8e52c46a355ac565d853
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2018-01-24T17:09:44Z

    [FLINK-8421] [DataStream] Make timer serializers reconfigurable on restore
    
    Previously, the key and namespace serializers for the
    HeapInternalTimerService were not reconfigured on restore to be compatible
    with previously written serializers.
    
    This caused an immediate error to restore savepoints in Flink 1.4.0,
    since in Flink 1.4.0 we changed the base registrations in the Kryo
    serializer. That change requires serializer reconfiguration.
    
    This commit fixes this by writing also the serializer configuration
    snapshots of the key and namespace serializer into savepoints, and use
    them to reconfigure the new serializers on rrestore. This improvement also
    comes along with making the written data for timer service snapshots
    versioned. Backwards compatibility with previous non-versioned formats
    is not broken.

----


---

[GitHub] flink pull request #5362: [FLINK-8421] [DataStream] Make timer serializers r...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5362


---

[GitHub] flink issue #5362: [FLINK-8421] [DataStream] Make timer serializers reconfig...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5362
  
    Thanks for the reviews!
    Will merge after Travis gives green.


---

[GitHub] flink issue #5362: [FLINK-8421] [DataStream] Make timer serializers reconfig...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/5362
  
    I think this is good to go now. 👍 😃 


---

[GitHub] flink pull request #5362: [FLINK-8421] [DataStream] Make timer serializers r...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5362#discussion_r165646464
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java ---
    @@ -52,7 +52,7 @@
     
     	private final ProcessingTimeService processingTimeService;
     
    -	private final Map<String, HeapInternalTimerService<K, N>> timerServices;
    +	public final Map<String, HeapInternalTimerService<K, N>> timerServices;
    --- End diff --
    
    no, this is a mistake that should be reverted.


---

[GitHub] flink pull request #5362: [FLINK-8421] [DataStream] Make timer serializers r...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5362#discussion_r165636384
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java ---
    @@ -52,7 +52,7 @@
     
     	private final ProcessingTimeService processingTimeService;
     
    -	private final Map<String, HeapInternalTimerService<K, N>> timerServices;
    +	public final Map<String, HeapInternalTimerService<K, N>> timerServices;
    --- End diff --
    
    Is this only public for testing?


---

[GitHub] flink issue #5362: [FLINK-8421] [DataStream] Make timer serializers reconfig...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5362
  
    @aljoscha As discussed offline, I've:
    - replaced `ByteArrayPrependedInputStream` with Java's `PushbackInputStream`
    - use negative values in `VERSIONED_IDENTIFIER` to be extra safe


---

[GitHub] flink pull request #5362: [FLINK-8421] [DataStream] Make timer serializers r...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5362#discussion_r165573730
  
    --- Diff: flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java ---
    @@ -0,0 +1,78 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.core.io;
    +
    +import org.apache.flink.annotation.Internal;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.InputStreamViewWrapper;
    +
    +import java.io.IOException;
    +import java.io.PushbackInputStream;
    +import java.util.Arrays;
    +
    +/**
    + * A {@link VersionedIOReadableWritable} which allows to differentiate whether the previous
    + * data was versioned with a {@link VersionedIOReadableWritable}. This can be used if previously
    + * written data was not versioned, and is to be migrated to a versioned format.
    + */
    +@Internal
    +public abstract class PostVersionedIOReadableWritable extends VersionedIOReadableWritable {
    +
    +	/** NOTE: CANNOT CHANGE! */
    +	private static final byte[] VERSIONED_IDENTIFIER = new byte[] {-15, -51, -123, -97};
    +
    +	/**
    +	 * Read from the provided {@link DataInputView in}. A flag {@code wasVersioned} can be
    +	 * used to determine whether or not the data to read was previously written
    +	 * by a {@link VersionedIOReadableWritable}.
    +	 */
    +	protected abstract void read(DataInputView in, boolean wasVersioned) throws IOException;
    +
    +	@Override
    +	public void write(DataOutputView out) throws IOException {
    +		out.write(VERSIONED_IDENTIFIER);
    +		super.write(out);
    +	}
    +
    +	/**
    +	 * This read attempts to first identify if the input view contains the special
    +	 * {@link #VERSIONED_IDENTIFIER} by reading and buffering the first few bytes.
    +	 * If identified to be versioned, the usual version resolution read path
    +	 * in {@link VersionedIOReadableWritable#read(DataInputView)} is invoked.
    +	 * Otherwise, we "reset" the input view by pushing back the read buffered bytes
    +	 * into the stream.
    +	 */
    +	@Override
    +	public final void read(DataInputView in) throws IOException {
    +		PushbackInputStream stream = new PushbackInputStream(new InputStreamViewWrapper(in), VERSIONED_IDENTIFIER.length);
    +
    +		byte[] tmp = new byte[VERSIONED_IDENTIFIER.length];
    +		stream.read(tmp);
    +
    +		if (Arrays.equals(tmp, VERSIONED_IDENTIFIER)) {
    +			super.read(in);
    +			read(in, true);
    +		} else {
    +			stream.unread(tmp);
    +			read(new DataInputViewStreamWrapper(stream), false);
    +		}
    --- End diff --
    
    Not-so-nice things about this current implementation is:
    1) it requires several layers of transforming back and forth between a `DataInputView` and `InputStream`, and 
    2) it uses a separate `read(DataInputView, boolean)` method in order to wrap a "reset" `DataInputView` for the remaining reads.
    
    I think the implementation would have been much more elegant if `DataInputView` has an `unread(byte[])` method, though I'm not sure how non-trivial it is to support this across all subclasses.
    Maybe a food for thought for the future ..


---