You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by kl0u <gi...@git.apache.org> on 2016/06/28 11:37:55 UTC

[GitHub] flink pull request #2174: [FLINK-4075] ContinuousFileProcessingCheckpointITC...

GitHub user kl0u opened a pull request:

    https://github.com/apache/flink/pull/2174

    [FLINK-4075] ContinuousFileProcessingCheckpointITCase failed on Travis

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kl0u/flink hotfix_file_src

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2174.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2174
    
----
commit 3dbc2dc1d63514d1b28d92b9477f8278327f8c0f
Author: kl0u <kk...@gmail.com>
Date:   2016-06-24T13:01:44Z

    Fixes unstable ContinuousFileProcessingCheckpointITCase test.

commit d4f99ee240bac3917454ee76950f3dd27c91e804
Author: kl0u <kk...@gmail.com>
Date:   2016-06-27T14:47:45Z

    Fixes unstable JMXReportedTest.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2174: [FLINK-4075] ContinuousFileProcessingCheckpointITCase fai...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2174
  
    I think having `splitsToFwdOrderedAscByModTime` and `currentSplitsToFwd` as fields that are checkpointed is no longer necessary since `monitorDirAndForwardSplits()` is called in lock scope and those two fields are always set to `null` at the end of the method.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2174: [FLINK-4075] ContinuousFileProcessingCheckpointITC...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2174


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2174: [FLINK-4075] ContinuousFileProcessingCheckpointITC...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2174#discussion_r68744213
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---
    @@ -405,6 +414,9 @@ public void restoreState(StreamTaskState state, long recoveryTimestamp) throws E
     		S formatState = (S) ois.readObject();
     
     		// set the whole reader state for the open() to find.
    +		Preconditions.checkArgument(this.readerState == null,
    --- End diff --
    
    This should also be `Preconditions.checkState`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2174: [FLINK-4075] ContinuousFileProcessingCheckpointITC...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2174#discussion_r69100533
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---
    @@ -334,9 +342,11 @@ public void run() {
     				this.pendingSplits.remove();
     			}
     
    -			if (this.format instanceof CheckpointableInputFormat && this.isSplitOpen) {
    -				S formatState = (S) ((CheckpointableInputFormat) format).getCurrentState();
    -				return new Tuple3<>(snapshot, currentSplit, currentSplit == null ? null : formatState);
    +			if (this.format instanceof CheckpointableInputFormat && this.currentSplit != null) {
    +				S formatState = this.isSplitOpen ?
    +					(S) ((CheckpointableInputFormat) format).getCurrentState() :
    +					restoredFormatState;
    +				return new Tuple3<>(snapshot, currentSplit, formatState);
     			} else {
     				LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery.");
    --- End diff --
    
    I think this log message is wrong for `this.currentSplit != null`. It leads to pollution of log files.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2174: [FLINK-4075] ContinuousFileProcessingCheckpointITCase fai...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/2174
  
    Thanks for fixing this! I had some comments but they should be easy to address.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2174: [FLINK-4075] ContinuousFileProcessingCheckpointITC...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2174#discussion_r69102339
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---
    @@ -334,9 +342,11 @@ public void run() {
     				this.pendingSplits.remove();
     			}
     
    -			if (this.format instanceof CheckpointableInputFormat && this.isSplitOpen) {
    -				S formatState = (S) ((CheckpointableInputFormat) format).getCurrentState();
    -				return new Tuple3<>(snapshot, currentSplit, currentSplit == null ? null : formatState);
    +			if (this.format instanceof CheckpointableInputFormat && this.currentSplit != null) {
    +				S formatState = this.isSplitOpen ?
    +					(S) ((CheckpointableInputFormat) format).getCurrentState() :
    +					restoredFormatState;
    +				return new Tuple3<>(snapshot, currentSplit, formatState);
     			} else {
     				LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery.");
    --- End diff --
    
    Monitoring an empty directory looks like this:
    
    ```
    11:36:42,704 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 1 @ 1467279402703
    11:36:42,709 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:42,709 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:42,709 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:42,709 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:42,709 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:42,709 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:42,709 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:42,710 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:42,823 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 1 (in 120 ms)
    11:36:44,703 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 @ 1467279404703
    11:36:44,705 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:44,706 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:44,706 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:44,706 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:44,706 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:44,706 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:44,707 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:44,710 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:44,745 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 2 (in 18 ms)
    11:36:46,703 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 3 @ 1467279406703
    11:36:46,704 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:46,704 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:46,704 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:46,704 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:46,704 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:46,704 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:46,704 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:46,705 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:46,723 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 3 (in 8 ms)
    11:36:48,703 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 4 @ 1467279408703
    11:36:48,704 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:48,704 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:48,704 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:48,704 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:48,705 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:48,705 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:48,705 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:48,705 INFO  org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator  - The format used is not checkpointable. The current input split will be restarted upon recovery.
    11:36:48,729 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 4 (in 10 ms)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2174: [FLINK-4075] ContinuousFileProcessingCheckpointITC...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2174#discussion_r68743982
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---
    @@ -104,7 +104,13 @@ public void open() throws Exception {
     		this.collector = new TimestampedCollector<>(output);
     		this.checkpointLock = getContainingTask().getCheckpointLock();
     
    +		Preconditions.checkArgument(reader == null, "The reader is already initialized.");
    --- End diff --
    
    I think this should be `Preconditions.checkState()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2174: [FLINK-4075] ContinuousFileProcessingCheckpointITC...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2174#discussion_r68744278
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---
    @@ -191,11 +197,11 @@ public void close() throws Exception {
     
     		private volatile boolean isSplitOpen = false;
     
    -		SplitReader(FileInputFormat<OT> format,
    +		private SplitReader(FileInputFormat<OT> format,
     					TypeSerializer<OT> serializer,
     					TimestampedCollector<OT> collector,
     					Object checkpointLock,
    -					Tuple3<List<FileInputSplit>, FileInputSplit, S> restoredState) {
    +					Tuple3<List<FileInputSplit>, FileInputSplit, S> recoveredState) {
    --- End diff --
    
    I think the old name was fine. In other parts of Flink it's also called restored state.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2174: [FLINK-4075] ContinuousFileProcessingCheckpointITC...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2174#discussion_r68744158
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---
    @@ -104,7 +104,13 @@ public void open() throws Exception {
     		this.collector = new TimestampedCollector<>(output);
     		this.checkpointLock = getContainingTask().getCheckpointLock();
     
    +		Preconditions.checkArgument(reader == null, "The reader is already initialized.");
    +
     		this.reader = new SplitReader<>(format, serializer, collector, checkpointLock, readerState);
    +
    +		// after initializing the reader, set the state to recovered state
    --- End diff --
    
    Could you please clarify this sentence. I think something might be missing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---