You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by pnowojski <gi...@git.apache.org> on 2017/08/18 08:38:32 UTC

[GitHub] flink pull request #4561: [FLINK-7476][streaming] Continue using previous tr...

GitHub user pnowojski opened a pull request:

    https://github.com/apache/flink/pull/4561

     [FLINK-7476][streaming] Continue using previous transaction on failures

    First commit comes from #4557.
    
    Previously when using TwoPhaseCommitSinkFunction, if there was some intermittent failure in "beginTransaction", not only the snapshot that triggered this call failed, but also any subsequent write requests would fail also. This caused such sink unusable until application restart.
    
    This PR changes order of execution of the methods from `PublicEvolving` class that has not been yet released.
    
    PR is covered by existing tests in `TwoPhaseCommitSinkFunctionTest` as well as two additional test cases (`testContinueWorkOnBeginTransactionFailure` would be failing before this PR).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/pnowojski/flink 2phase-recover

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4561.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4561
    
----
commit 813c8121b406b60ea478f4765b33b7d75c221d1e
Author: Piotr Nowojski <pi...@gmail.com>
Date:   2017-08-14T14:40:45Z

    [hotifx][streaming] Simplify state of TwoPhaseCommitSinkFunction

commit 249b6419af3d15414dc411838aba624d0ee2f3a1
Author: Piotr Nowojski <pi...@gmail.com>
Date:   2017-08-14T13:09:39Z

    [hotfix][tests] Implement AutoCloseable in TestHarness

commit a0ae6324dcaded581a3352c8ff4bae6e86e01fde
Author: Piotr Nowojski <pi...@gmail.com>
Date:   2017-08-17T13:46:47Z

    [hotfix][streaming] Refactor TwoPhaseCommitSinkFunctionTest

commit e3c7dc83ccbce2505be5769fa0827b09dfa54875
Author: Piotr Nowojski <pi...@gmail.com>
Date:   2017-08-17T10:29:16Z

    [FLINK-7476][streaming] Continue using previous transaction on failures
    
    Previuosly when using TwoPhaseCommitSinkFunction, if there was some intermittent failure
    in "beginTransaction", not only the snapshot that triggered this call failed, but also
    any subsequent write requests would fail also. This caused such sink unusable until
    application restart.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4561: [FLINK-7476][streaming] Continue using previous tr...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4561#discussion_r134771937
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java ---
    @@ -185,11 +253,44 @@ protected void recoverAndAbort(FileTransaction transaction) {
     
     	private static class FileTransaction {
     		private final File tmpFile;
    -		private final transient Writer writer;
    +		private final transient BufferedWriter writer;
     
     		public FileTransaction(File tmpFile) throws IOException {
     			this.tmpFile = tmpFile;
     			this.writer = new BufferedWriter(new FileWriter(tmpFile));
     		}
    +
    +		@Override
    +		public String toString() {
    +			return String.format("FileTransaction[%s]", tmpFile.getName());
    +		}
    +	}
    +
    +	private static class TestContext implements AutoCloseable {
    +		public final File tmpDirectory = Files.createTempDirectory(TwoPhaseCommitSinkFunctionTest.class.getSimpleName() + "_tmp").toFile();
    +		public final File targetDirectory = Files.createTempDirectory(TwoPhaseCommitSinkFunctionTest.class.getSimpleName() + "_target").toFile();
    +
    +		public FileBasedSinkFunction sinkFunction;
    +		public OneInputStreamOperatorTestHarness<String, Object> harness;
    +
    +		private TestContext() throws Exception {
    +			tmpDirectory.deleteOnExit();
    --- End diff --
    
    Yes, you couldn't use it with that (without handing in the rule when initialising the context). I'm just pointing it out but we'll leave it as is. 😉 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4561: [FLINK-7476][streaming] Continue using previous transacti...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/4561
  
    I have abandoned last commit. This PR is now pure refactor/hotfix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4561: [FLINK-7476][streaming] Continue using previous transacti...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/4561
  
    Thanks :) closed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4561: [FLINK-7476][streaming] Continue using previous tr...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4561#discussion_r134737971
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java ---
    @@ -185,11 +253,44 @@ protected void recoverAndAbort(FileTransaction transaction) {
     
     	private static class FileTransaction {
     		private final File tmpFile;
    -		private final transient Writer writer;
    +		private final transient BufferedWriter writer;
     
     		public FileTransaction(File tmpFile) throws IOException {
     			this.tmpFile = tmpFile;
     			this.writer = new BufferedWriter(new FileWriter(tmpFile));
     		}
    +
    +		@Override
    +		public String toString() {
    +			return String.format("FileTransaction[%s]", tmpFile.getName());
    +		}
    +	}
    +
    +	private static class TestContext implements AutoCloseable {
    +		public final File tmpDirectory = Files.createTempDirectory(TwoPhaseCommitSinkFunctionTest.class.getSimpleName() + "_tmp").toFile();
    +		public final File targetDirectory = Files.createTempDirectory(TwoPhaseCommitSinkFunctionTest.class.getSimpleName() + "_target").toFile();
    +
    +		public FileBasedSinkFunction sinkFunction;
    +		public OneInputStreamOperatorTestHarness<String, Object> harness;
    +
    +		private TestContext() throws Exception {
    +			tmpDirectory.deleteOnExit();
    --- End diff --
    
    I think you could also use the `TemporaryFolder` rule for this:
    ```
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    ```
    
    but this also seems fine.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4561: [FLINK-7476][streaming] Continue using previous tr...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4561#discussion_r134759790
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java ---
    @@ -185,11 +253,44 @@ protected void recoverAndAbort(FileTransaction transaction) {
     
     	private static class FileTransaction {
     		private final File tmpFile;
    -		private final transient Writer writer;
    +		private final transient BufferedWriter writer;
     
     		public FileTransaction(File tmpFile) throws IOException {
     			this.tmpFile = tmpFile;
     			this.writer = new BufferedWriter(new FileWriter(tmpFile));
     		}
    +
    +		@Override
    +		public String toString() {
    +			return String.format("FileTransaction[%s]", tmpFile.getName());
    +		}
    +	}
    +
    +	private static class TestContext implements AutoCloseable {
    +		public final File tmpDirectory = Files.createTempDirectory(TwoPhaseCommitSinkFunctionTest.class.getSimpleName() + "_tmp").toFile();
    +		public final File targetDirectory = Files.createTempDirectory(TwoPhaseCommitSinkFunctionTest.class.getSimpleName() + "_target").toFile();
    +
    +		public FileBasedSinkFunction sinkFunction;
    +		public OneInputStreamOperatorTestHarness<String, Object> harness;
    +
    +		private TestContext() throws Exception {
    +			tmpDirectory.deleteOnExit();
    --- End diff --
    
    I think I wouldn't be able to use the `@Rule` here for those directories. I would have to write my own rule, that would wrap whole `TestContext`, right? If so, can we leave as it is for know, because currently I'm getting stack  overflow exceptions with number of opened issues that I'm currently working on ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4561: [FLINK-7476][streaming] Continue using previous tr...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski closed the pull request at:

    https://github.com/apache/flink/pull/4561


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4561: [FLINK-7476][streaming] Continue using previous transacti...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/4561
  
    I merged. 👌 Could you please close this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---