You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2017/05/08 10:05:35 UTC

[GitHub] flink pull request #3844: [FLINK-5892] Enable 1.2 keyed state test

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/3844

    [FLINK-5892] Enable 1.2 keyed state test

    Backport of #3842 for the 1.3 branch.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink 13_5892_enable_test

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3844.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3844
    
----
commit 7a0fff31d483633f88026650956b97cc451f19df
Author: zentol <ch...@apache.org>
Date:   2017-05-08T09:56:57Z

    [FLINK-5892] Enable 1.2 keyed state test

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3844: [FLINK-5892] Enable 1.2 keyed state test

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3844#discussion_r122718852
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java ---
    @@ -98,10 +98,9 @@ public static void main(String[] args) throws Exception {
     			.map(new StatefulStringStoringMap(mode, "first"))
     			.setParallelism(4);
     
    -		// TODO: re-enable this when generating the actual 1.2 savepoint
    -		//if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) {
    -		map.uid("first");
    -		//}
    +		if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) {
    --- End diff --
    
    I somehow don't like it that is not explained in the commit message what has actually changed/why was this change required at all. Especially since you have not changed anything else in the code, it is difficult to understand that.  If nothing else has changed, why do we need this `if (...)`? If something has changed, shouldn't it be covered by some test?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3844: [FLINK-5892] Enable 1.2 keyed state test

Posted by zentol <gi...@git.apache.org>.
Github user zentol closed the pull request at:

    https://github.com/apache/flink/pull/3844


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3844: [FLINK-5892] Enable 1.2 keyed state test

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3844#discussion_r122897990
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java ---
    @@ -98,10 +98,9 @@ public static void main(String[] args) throws Exception {
     			.map(new StatefulStringStoringMap(mode, "first"))
     			.setParallelism(4);
     
    -		// TODO: re-enable this when generating the actual 1.2 savepoint
    -		//if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) {
    -		map.uid("first");
    -		//}
    +		if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) {
    --- End diff --
    
    Yes, thank you very much, now I get it :)
    
    I think since this code is already on the master it's a bit too late change a commit message there :(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3844: [FLINK-5892] Enable 1.2 keyed state test

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/3844
  
    That's unrelated. The changes made here can only affect the `KeyedComplexChainTest`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3844: [FLINK-5892] Enable 1.2 keyed state test

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/3844
  
    yes, this is present in master, but we should have it in the 1.3 branch as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3844: [FLINK-5892] Enable 1.2 keyed state test

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/3844
  
    Are you sure that errors in travis are intermittent or unrelated to your change? One is already reported here: https://issues.apache.org/jira/browse/FLINK-6843 but second one I'm not sure:
    
    ```
    Tests run: 10, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 6.933 sec <<< FAILURE! - in org.apache.flink.runtime.state.OperatorStateBackendTest
    testSnapshotAsyncCancel(org.apache.flink.runtime.state.OperatorStateBackendTest)  Time elapsed: 0.061 sec  <<< ERROR!
    java.util.concurrent.ExecutionException: java.io.IOException: Stream closed.
    	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    	at java.util.concurrent.FutureTask.get(FutureTask.java:206)
    	at org.apache.flink.runtime.state.OperatorStateBackendTest.testSnapshotAsyncCancel(OperatorStateBackendTest.java:636)
    Caused by: java.io.IOException: Stream closed.
    	at org.apache.flink.runtime.util.BlockerCheckpointStreamFactory$1.write(BlockerCheckpointStreamFactory.java:95)
    	at java.io.DataOutputStream.writeInt(DataOutputStream.java:197)
    	at org.apache.flink.core.io.VersionedIOReadableWritable.write(VersionedIOReadableWritable.java:40)
    	at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.write(OperatorBackendSerializationProxy.java:65)
    	at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:255)
    	at org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:233)
    	at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    	at java.lang.Thread.run(Thread.java:745)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3844: [FLINK-5892] Enable 1.2 keyed state test

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3844#discussion_r122894136
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java ---
    @@ -98,10 +98,9 @@ public static void main(String[] args) throws Exception {
     			.map(new StatefulStringStoringMap(mode, "first"))
     			.setParallelism(4);
     
    -		// TODO: re-enable this when generating the actual 1.2 savepoint
    -		//if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) {
    -		map.uid("first");
    -		//}
    +		if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) {
    --- End diff --
    
    You are completely right, the commit message/PR description isn't sufficient to explain what this PR changes. In fact, it took me a bit to remember that as well. I'll adjust the commit message later on.
    
    So this PR is pretty subtle, since the changes to the code aren't the interesting part, but the change to the `complexKeyed-flink1.2/_metadata` file is. This file is supposed to be a 1.2 savepoint to verify the restore behavior from them in 1.3. But this file is not a 1.2 savepoint, because at the time of merging the restoration of keyed 1.2 state was broken, In the meantime we used a 1.3 savepoint instead.
    
    The main thing this PR does is replace this 1.3 savepoint with an actual 1.2 savepoint.
    
    The second change is related to the uid's. In 1.2, it is not possible to assign UIDs to chained operators. As "first" and "second" are both chained to the window function we are not allowed to call `map.uid("...")` when generating the 1.2 savepoint (! (MIGRATE || RESTORE)). However, in 1.3 it is possible and in fact mandatory to assign UIDs.
    
    Does that clear things up?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---