You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tillrohrmann <gi...@git.apache.org> on 2017/01/20 13:07:10 UTC

[GitHub] flink pull request #3178: [FLINK-5214] Clean up checkpoint data in case of a...

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/3178

    [FLINK-5214] Clean up checkpoint data in case of a failing checkpoint operation

    Adds exception handling to the stream operators for the snapshotState method. A failing
    snapshot operation will trigger the clean up of all so far generated state resources.
    This will avoid that in case of a failing snapshot operation resources (e.g. files) are
    left behind.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink taskOperatorStateCleanup

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3178.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3178
    
----
commit c1a597320dbe3c7f4514297d5ad7f2b8f416e287
Author: Till Rohrmann <tr...@apache.org>
Date:   2016-12-01T12:25:05Z

    [FLINK-5214] Clean up checkpoint data in case of a failing checkpoint operation
    
    Adds exception handling to the stream operators for the snapshotState method. A failing
    snapshot operation will trigger the clean up of all so far generated state resources.
    This will avoid that in case of a failing snapshot operation resources (e.g. files) are
    left behind.
    
    Add test case for OperatorSnapshotResult
    
    Add StateSnapshotContextSynchronousImplTest
    
    Add AbstractStreamOperator failing snapshot tests

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3178: [FLINK-5214] Clean up checkpoint data in case of a...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3178#discussion_r97282223
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---
    @@ -342,21 +343,50 @@ public final OperatorSnapshotResult snapshotState(
     		StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
     				checkpointId, timestamp, streamFactory, keyGroupRange, getContainingTask().getCancelables());
     
    -		snapshotState(snapshotContext);
    +		try {
    --- End diff --
    
    Here we could use the try-with-resource if `SnapshotContext` implements `AutoCloseable`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3178: [FLINK-5214] Clean up checkpoint data in case of a...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3178#discussion_r97279062
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContext.java ---
    @@ -21,8 +21,9 @@
     import org.apache.flink.annotation.PublicEvolving;
     
     /**
    - * This interface provides a context in which operators that use managed (i.e. state that is managed by state
    - * backends) or raw (i.e. the operator can write it's state streams) state can perform a snapshot.
    + * This interface provides a context in which operators that use managed (i.e. state that is managed
    --- End diff --
    
    All changes in this file are unrelated/reformatting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3178: [FLINK-5214] Clean up checkpoint data in case of a...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3178#discussion_r97279484
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java ---
    @@ -127,4 +128,38 @@ public OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Ex
     		return new DoneFuture<>(stream.closeAndGetHandle());
     	}
     
    -}
    \ No newline at end of file
    +	private <T extends StreamStateHandle> void closeAndUnregisterStream(NonClosingCheckpointOutputStream<T> stream) throws IOException {
    +		Preconditions.checkNotNull(stream);
    +
    +		closableRegistry.unregisterClosable(stream.getDelegate());
    +		stream.getDelegate().close();
    +	}
    +
    +	public void close() throws IOException {
    --- End diff --
    
    If we already introduce a close method, we could also make this implement `AutoCloseable` and use it in try-with-resource?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3178: [FLINK-5214] Clean up checkpoint data in case of a...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3178#discussion_r97313842
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---
    @@ -342,21 +343,50 @@ public final OperatorSnapshotResult snapshotState(
     		StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
     				checkpointId, timestamp, streamFactory, keyGroupRange, getContainingTask().getCancelables());
     
    -		snapshotState(snapshotContext);
    +		try {
    --- End diff --
    
    Yes, you're right. Will refactor the code that way :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3178: [FLINK-5214] Clean up checkpoint data in case of a...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3178#discussion_r97323947
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java ---
    @@ -127,4 +128,38 @@ public OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Ex
     		return new DoneFuture<>(stream.closeAndGetHandle());
     	}
     
    -}
    \ No newline at end of file
    +	private <T extends StreamStateHandle> void closeAndUnregisterStream(NonClosingCheckpointOutputStream<T> stream) throws IOException {
    +		Preconditions.checkNotNull(stream);
    +
    +		closableRegistry.unregisterClosable(stream.getDelegate());
    +		stream.getDelegate().close();
    +	}
    +
    +	public void close() throws IOException {
    --- End diff --
    
    But isn't it better to be as specific as possible? In this case we wouldn't unnecessarily widen the thrown exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3178: [FLINK-5214] Clean up checkpoint data in case of a...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3178#discussion_r97324700
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java ---
    @@ -127,4 +128,38 @@ public OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Ex
     		return new DoneFuture<>(stream.closeAndGetHandle());
     	}
     
    -}
    \ No newline at end of file
    +	private <T extends StreamStateHandle> void closeAndUnregisterStream(NonClosingCheckpointOutputStream<T> stream) throws IOException {
    +		Preconditions.checkNotNull(stream);
    +
    +		closableRegistry.unregisterClosable(stream.getDelegate());
    +		stream.getDelegate().close();
    +	}
    +
    +	public void close() throws IOException {
    --- End diff --
    
    Of course, but you can and should always pick a more specific exception on the signature of your implementation. If `IOException` or a subclass is appropriate here, then you could also go with `Closeable` of course.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3178: [FLINK-5214] Clean up checkpoint data in case of a failin...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/3178
  
    Thanks for the review @StefanRRichter. I'll address your comments and then merge this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3178: [FLINK-5214] Clean up checkpoint data in case of a...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3178#discussion_r97313258
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java ---
    @@ -127,4 +128,38 @@ public OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Ex
     		return new DoneFuture<>(stream.closeAndGetHandle());
     	}
     
    -}
    \ No newline at end of file
    +	private <T extends StreamStateHandle> void closeAndUnregisterStream(NonClosingCheckpointOutputStream<T> stream) throws IOException {
    +		Preconditions.checkNotNull(stream);
    +
    +		closableRegistry.unregisterClosable(stream.getDelegate());
    +		stream.getDelegate().close();
    +	}
    +
    +	public void close() throws IOException {
    --- End diff --
    
    Good idea. I will let it implement the `Closeable` interface


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3178: [FLINK-5214] Clean up checkpoint data in case of a...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3178


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3178: [FLINK-5214] Clean up checkpoint data in case of a...

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3178#discussion_r97314933
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java ---
    @@ -127,4 +128,38 @@ public OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Ex
     		return new DoneFuture<>(stream.closeAndGetHandle());
     	}
     
    -}
    \ No newline at end of file
    +	private <T extends StreamStateHandle> void closeAndUnregisterStream(NonClosingCheckpointOutputStream<T> stream) throws IOException {
    +		Preconditions.checkNotNull(stream);
    +
    +		closableRegistry.unregisterClosable(stream.getDelegate());
    +		stream.getDelegate().close();
    +	}
    +
    +	public void close() throws IOException {
    --- End diff --
    
    I would prefer `AutoCloseable` over `Closeable`, because the former is general while the latter is more related to IO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3178: [FLINK-5214] Clean up checkpoint data in case of a...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3178#discussion_r97312291
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContext.java ---
    @@ -21,8 +21,9 @@
     import org.apache.flink.annotation.PublicEvolving;
     
     /**
    - * This interface provides a context in which operators that use managed (i.e. state that is managed by state
    - * backends) or raw (i.e. the operator can write it's state streams) state can perform a snapshot.
    + * This interface provides a context in which operators that use managed (i.e. state that is managed
    --- End diff --
    
    True. Will revert them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---