You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tzulitai <gi...@git.apache.org> on 2018/05/03 07:42:20 UTC

[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/5950

    [FLINK-9169] [state-backend] Allow absence of old serializers when restoring RocksDBStateBackend

    ## What is the purpose of the change
    
    This PR contains 2 commits that overall allows absence of old state serializers when restoring the RocksDB state backend. It also eliminates the possibility of confusing NPEs which comes with the fact that previously, restored serializers may be `null`.
    
    Allowing old state serializers to be absent for the RocksDB state backend will allow for users to perform state evolutions that they couldn't before.
    
    ## Brief change log
    
    - 2f9f0d9 Always use dummy serializer instead of null when old state serializer cannot be read
    Previously, the behaviour of `TypeSerializerSerializationUtil` read methods in the case when serializers cannot be read, is quite mixed up. For some exceptions (e.g. `ClassNotFoundException,
    InvalidClassException`), a dummy serializer will be used as a replacement. In other cases, `null` is used.
    This commit fixes this by always using dummy serializers if a `useDummyPlaceholder` flag is set to true. Otherwise, an `IOException` is thrown. This makes it clear that users should use dummy serializers if they want the deserialization to be tolerant to failures.
    
    Another benefit of this is that there will no longer be `null` serializers after restore; they will either be an actual serializer, or a dummy if the old serializer cannot be restored.
    
    - 95223fc Adds a `isSerializerPresenceRequired` flag to the `KeyedBackendSerializationProxy`.
    If true, restored serializers cannot be the dummy serializer, otherwise an `IOException` will be thrown to fail the restore. Heap backends set this to true, while RocksDB sets this to false.
    
    ## Verifying this change
    
    There are two main test classes that already have coverage of this issue:
    - `SerializationProxiesTest`
    - `StateBackendTestBase`
    
    A new test, `StateBackendTestBase#testSerializerPresenceOnRestore`, additionally verifies the restore behaviour of heap / rocksdb state backends when serializers are not present.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / **no**)
      - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-9169-approach2

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5950.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5950
    
----
commit 2f9f0d9ab02c0c207e4ac887e958f8de9c057310
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2018-05-02T05:37:22Z

    [FLINK-9169] [runtime] Always use dummy serializer instead of null when old state serializer cannot be read
    
    Prreviously, the behaviour of TypeSerializerSerializationUtil read
    methods in the case when serializers cannot be read, is quite mixed up.
    For some exceptions (e.g. ClassNotFoundException,
    InvalidClassException), a dummy serializer will be used as a
    replacement. In other cases, null is used.
    
    This commit fixes this by always using dummy serializers if a
    'useDummyPlaceholder' flag is set to true. Otherwise, an IOException is
    thrown. This makes it clear that users should use dummy serializers if
    they want the deserialization to be tolerant to failures.
    
    Another benefit of this is that there will no longer be 'null'
    serializers after restore; they will either be an actual serializer, or
    a dummy if the old serializer cannot be restored.

commit 95223fc129ed0439b3f14636721cb72bc7560876
Author: Tzu-Li (Gordon) Tai <tz...@...>
Date:   2018-05-03T07:30:55Z

    [FLINK-9169] [runtime] Allow specifiying serializer presence requirement in KeyedBackendSerializationProxy
    
    This commit consolidates logic of whether old serializers are required
    to be present at restore time in the KeyedBackendSerializationProxy, via
    a isSerializerPresenceRequired flag.
    
    Heap-based backends typically set this to true, while RocksDB state
    backend will set this to false. If set to true, restored serializers
    cannot be the UnloadableDummyTypeSerializer, otherwise an IOException
    will be thrown to fail the restore.

----


---

[GitHub] flink issue #5950: [FLINK-9169] [state-backend] Allow absence of old seriali...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5950
  
    Thanks for the review Stefan! Will merge this now ..


---

[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5950#discussion_r185825767
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java ---
    @@ -200,7 +197,7 @@ public static void writeSerializersAndConfigsWithResilience(
     			for (int i = 0; i < numSerializersAndConfigSnapshots; i++) {
     
     				bufferWithPos.setPosition(offsets[i * 2]);
    -				serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader);
    +				serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader, true);
    --- End diff --
    
    This could be the place where we catch a `UnloadableSerializerException`, but if we let the caller do the iteration from 0 to `numSerializersAndConfigSnapshots`, we can push it out even more. Why is it helpful to create a list here? Otherwise we can do the exception handling in the caller and more fine grained.


---

[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5950#discussion_r186082804
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java ---
    @@ -102,17 +99,24 @@
     	 *
     	 * @return the deserialized serializer.
     	 */
    -	public static <T> TypeSerializer<T> tryReadSerializer(DataInputView in, ClassLoader userCodeClassLoader, boolean useDummyPlaceholder) {
    +	public static <T> TypeSerializer<T> tryReadSerializer(
    +			DataInputView in,
    +			ClassLoader userCodeClassLoader,
    +			boolean useDummyPlaceholder) throws IOException {
    +
     		final TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<T> proxy =
    -			new TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<>(userCodeClassLoader, useDummyPlaceholder);
    +			new TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<>(userCodeClassLoader);
     
     		try {
     			proxy.read(in);
     			return proxy.getTypeSerializer();
    -		} catch (IOException e) {
    -			LOG.warn("Deserialization of serializer errored; replacing with null.", e);
    -
    -			return null;
    +		} catch (UnloadableTypeSerializerException e) {
    --- End diff --
    
    I would let this bubble up one more level, remove the flag here and only catch `UnloadableTypeSerializerException ` in the case where this method is called with `true`.


---

[GitHub] flink issue #5950: [FLINK-9169] [state-backend] Allow absence of old seriali...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5950
  
    @StefanRRichter I have updated the PR. Also had to do a rebase due to conflicts.
    
    Regarding the thoughts you brought up:
    - Bubble up `UnloadableTypeSerializerException` approach:
    I introduced the exception, but only use it minimally. I think overall it is definitely an improvement, since we don't have to carry the dummy flag all the way down to the low-level serializer serialization proxy. However, I don't think there really is a need to handle it any level higher than the `TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience` method, since the original intent of that method was to always be fault tolerant when reading a bunch of serializers alongside other things. More details on that in my above comments.
    
    - Whether or not we really need to hand down the flag to `KeyedBackendSerializationProxy`:
    My gut feeling is that, even if in the future we bubble up the `UnloadableTypeSerializerException` to higher level components, the serialization proxy is still where we need to decide whether or not we handle it with a dummy serializer. The reason is that the serialization proxies handles deserialization of _all_ keyed state meta infos (and therefore their serializers); simply bubbling the exception further up without checking does not make sense.


---

[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5950#discussion_r185994454
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java ---
    @@ -200,7 +197,7 @@ public static void writeSerializersAndConfigsWithResilience(
     			for (int i = 0; i < numSerializersAndConfigSnapshots; i++) {
     
     				bufferWithPos.setPosition(offsets[i * 2]);
    -				serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader);
    +				serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader, true);
    --- End diff --
    
    Since we are already thinking about not writing serializers anymore in savepoints for 1.6, I'm leaning towards not touching this method now.


---

[GitHub] flink issue #5950: [FLINK-9169] [state-backend] Allow absence of old seriali...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/5950
  
    Overall, I think this looks good for me now 👍 


---

[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5950#discussion_r185995278
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java ---
    @@ -200,7 +197,7 @@ public static void writeSerializersAndConfigsWithResilience(
     			for (int i = 0; i < numSerializersAndConfigSnapshots; i++) {
     
     				bufferWithPos.setPosition(offsets[i * 2]);
    -				serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader);
    +				serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader, true);
    --- End diff --
    
    One other thing to keep in mind:
    The original intent of having the `readSerializersAndConfigSnapshotsWithResilience` method and why we added the complex indexing of offsets, is that so we can _always_ be fault tolerant when trying to read a bunch of serializers. So, essentially, there is no need to push out the exception further - the result of `readSerializersAndConfigSnapshotsWithResilience` should always be that there is some serializer, even if it is a dummy (hence the naming).


---

[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5950


---

[GitHub] flink issue #5950: [FLINK-9169] [state-backend] Allow absence of old seriali...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/5950
  
    Please also check the travis build, some related tests seem to fail:
    Tests in error: 
      TypeSerializerSerializationUtilTest.testSerializerAndConfigPairsSerializationWithSerializerDeserializationFailures:236 » IO
      TypeSerializerSerializationUtilTest.testSerializerSerializationWithClassNotFound:109 » IO
      TypeSerializerSerializationUtilTest.testSerializerSerializationWithInvalidClass:149 » IO
      PojoSerializerTest.testSerializerSerializationFailureResilience:570 » IO


---

[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5950#discussion_r185775633
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---
    @@ -589,7 +589,7 @@ private void restoreKeyGroupsInStateHandle()
     		private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException {
     
     			KeyedBackendSerializationProxy<K> serializationProxy =
    -				new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
    +				new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader, false);
    --- End diff --
    
    Maybe a small comment on this line why we can tolerate the absence of the serializer is helpful for future maintenance. And a matching comment for the other option on the corresponding line in the heap backend.


---

[GitHub] flink issue #5950: [FLINK-9169] [state-backend] Allow absence of old seriali...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5950
  
    @StefanRRichter yes, now that you mentioned it, the `isSerializerPresenceRequiredFlag` does seem a bit awkward to be in the serialization proxy. Essentially, what it is only doing is serving as a switch to decide whether or not to fail - something that could be done by the caller.
    
    I'll quickly try your suggested approach and see how that turns out.


---

[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5950#discussion_r185771830
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java ---
    @@ -69,7 +69,7 @@
     	 * written using {@link #writeSerializer(DataOutputView, TypeSerializer)}.
     	 *
     	 * <p>If deserialization fails for any reason (corrupted serializer bytes, serializer class
    -	 * no longer in classpath, serializer class no longer valid, etc.), {@code null} will
    +	 * no longer in classpath, serializer class no longer valid, etc.), an {@link IOException} is thrown.
    --- End diff --
    
    Comment in the next line should be deleted, looks like leftover from the copy-paste.


---

[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5950#discussion_r185828137
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java ---
    @@ -200,7 +197,7 @@ public static void writeSerializersAndConfigsWithResilience(
     			for (int i = 0; i < numSerializersAndConfigSnapshots; i++) {
     
     				bufferWithPos.setPosition(offsets[i * 2]);
    -				serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader);
    +				serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader, true);
    --- End diff --
    
    The problem is that this method might mix too many things together, that is also again visible in the complex return type and e.g. many call sites are only interested in the first element of the list. Wonder if we should break this up in dedicated steps (serializer, config) and let the callers invoke them one by one, so that we can handle exceptions on a higher level and make decisions about if we need to have a serializer there.


---

[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5950#discussion_r185994375
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java ---
    @@ -200,7 +197,7 @@ public static void writeSerializersAndConfigsWithResilience(
     			for (int i = 0; i < numSerializersAndConfigSnapshots; i++) {
     
     				bufferWithPos.setPosition(offsets[i * 2]);
    -				serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader);
    +				serializer = tryReadSerializer(bufferWrapper, userCodeClassLoader, true);
    --- End diff --
    
    The reason why this method is so complex, is because it handles indexing of the serializers' and serializer config snapshots' offsets within the byte stream. It does so to be able to read all serializers and their serializer config snapshots fault tolerantly, and to not leave the stream corrupt when some exception occurs.
    
    I'm not sure we can break this method up - doing so would just be moving a lot of duplicate code to the callers (due to the fact that we previously have the offset index reading / writing, if we remove that we still need to maintain backwards compatibility).


---

[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5950#discussion_r185772501
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/ArtificialCNFErrorThrowingClassLoader.java ---
    @@ -0,0 +1,42 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.state.testutils;
    +
    +import java.util.Set;
    +
    +/**
    + * Utility classloader used in tests that allows simulating {@link ClassNotFoundException}s for specific classes.
    + */
    +public class ArtificialCNFErrorThrowingClassLoader extends ClassLoader {
    --- End diff --
    
    `ArtificialCNFExceptionThrowingClassLoader` might be a better fit


---

[GitHub] flink issue #5950: [FLINK-9169] [state-backend] Allow absence of old seriali...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/5950
  
    Yes, I think we can only remove the flag further when splitting up `readSerializersAndConfigsWithResilience`, but I guess it is ok to leave it if we change this soon anyways.


---

[GitHub] flink issue #5950: [FLINK-9169] [state-backend] Allow absence of old seriali...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5950
  
    I think even with the `UnloadableTypeSerializerException` exception bubbling approach, we actually still need a flag in the serialization proxy to decide how to handle the exception.
    
    The serialization proxy handles deserialization of all meta data of all registered key states, so that would be the highest level where we need to decide whether or not to use the dummy serializer.
    
    If we want to hand out this control to an even higher level (i.e. the backend), we would then need to break up the deserialization logic from the serialization proxy, which IMO isn't appropriate.


---

[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5950#discussion_r185821296
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java ---
    @@ -373,15 +370,14 @@ public void read(DataInputView in) throws IOException {
     
     				Thread.currentThread().setContextClassLoader(userClassLoader);
     				typeSerializer = (TypeSerializer<T>) ois.readObject();
    -			} catch (ClassNotFoundException | InvalidClassException e) {
    +			} catch (Exception e) {
     				if (useDummyPlaceholder) {
     					// we create a dummy so that all the information is not lost when we get a new checkpoint before receiving
     					// a proper typeserializer from the user
    -					typeSerializer =
    -						new UnloadableDummyTypeSerializer<>(buffer);
    -					LOG.warn("Could not find requested TypeSerializer class in classpath. Created dummy.", e);
    +					typeSerializer = new UnloadableDummyTypeSerializer<>(buffer);
    --- End diff --
    
    Some food for thought, even if it is not introduced by this PR: why can we not introduce a special `UnloadableSerializerException extends IOException` that holds a field  with the byte array in `buffer` and let it bubble up to a higher level component. If that component wants to introduce dummies, it can do some from the bytes in the caught exception, if not forward the exception. Then we would not have to hand down this flag but let the higher level component decide. What do you think?


---