You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Chengzhi Zhao <w....@gmail.com> on 2018/02/06 18:15:38 UTC

Fwd: Question about flink checkpoint

Hey, I am new to flink and I have a question and want to see if anyone can
help here.

So we have a s3 path that flink is monitoring that path to see new files
available.

val avroInputStream_activity = env.readFile(format, path,
FileProcessingMode.PROCESS_CONTINUOUSLY, 10000)

I am doing both internal and external check pointing and let's say there is
a bad file came to the path and flink will do several retries. I want to
take those bad files and let the process continue. However, since the file
path persist in the checkpoint, when I try to resume from external
checkpoint, it threw the following error on no file been found.

java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: No
such file or directory: s3a://myfile

Is there a way to skip this bad file and move on?
Thanks in advance.

Best,
Chengzhi Zhao

Re: Question about flink checkpoint

Posted by Fabian Hueske <fh...@gmail.com>.
Great, thank you!

Best, Fabian

2018-02-07 23:52 GMT+01:00 Chengzhi Zhao <w....@gmail.com>:

> Thanks, Fabian,
>
> I opened an JIRA ticket and I'd like to work on it if people think this
> would be a improvement:
> https://issues.apache.org/jira/browse/FLINK-8599
>
> Best,
> Chengzhi
>
> On Wed, Feb 7, 2018 at 4:17 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Chengzhi Zhao,
>>
>> I think this is rather an issue with the ContinuousFileReaderOperator
>> than with the checkpointing algorithm in general.
>> A source can decide which information to store as state and also how to
>> handle failures such as file paths that have been put into state but have
>> been removed from the file system.
>>
>> It would be great if you could open a JIRA issue with a feature request
>> to improve the failure behavior of the ContinuousFileReaderOperator.
>> It could for example check if a path exists and before trying to read a
>> file and ignore the input split instead of throwing an exception and
>> causing a failure.
>> If you want to, you can also work on a fix and contribute it back.
>>
>> Best, Fabian
>>
>> 2018-02-06 19:15 GMT+01:00 Chengzhi Zhao <w....@gmail.com>:
>>
>>> Hey, I am new to flink and I have a question and want to see if anyone
>>> can help here.
>>>
>>> So we have a s3 path that flink is monitoring that path to see new files
>>> available.
>>>
>>> val avroInputStream_activity = env.readFile(format, path,
>>> FileProcessingMode.PROCESS_CONTINUOUSLY, 10000)
>>>
>>> I am doing both internal and external check pointing and let's say there
>>> is a bad file came to the path and flink will do several retries. I want to
>>> take those bad files and let the process continue. However, since the file
>>> path persist in the checkpoint, when I try to resume from external
>>> checkpoint, it threw the following error on no file been found.
>>>
>>> java.io.IOException: Error opening the Input Split s3a://myfile [0,904]:
>>> No such file or directory: s3a://myfile
>>>
>>> Is there a way to skip this bad file and move on?
>>> Thanks in advance.
>>>
>>> Best,
>>> Chengzhi Zhao
>>>
>>>
>>
>

Re: Question about flink checkpoint

Posted by Chengzhi Zhao <w....@gmail.com>.
Thanks, Fabian,

I opened an JIRA ticket and I'd like to work on it if people think this
would be a improvement:
https://issues.apache.org/jira/browse/FLINK-8599

Best,
Chengzhi

On Wed, Feb 7, 2018 at 4:17 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Chengzhi Zhao,
>
> I think this is rather an issue with the ContinuousFileReaderOperator than
> with the checkpointing algorithm in general.
> A source can decide which information to store as state and also how to
> handle failures such as file paths that have been put into state but have
> been removed from the file system.
>
> It would be great if you could open a JIRA issue with a feature request to
> improve the failure behavior of the ContinuousFileReaderOperator.
> It could for example check if a path exists and before trying to read a
> file and ignore the input split instead of throwing an exception and
> causing a failure.
> If you want to, you can also work on a fix and contribute it back.
>
> Best, Fabian
>
> 2018-02-06 19:15 GMT+01:00 Chengzhi Zhao <w....@gmail.com>:
>
>> Hey, I am new to flink and I have a question and want to see if anyone
>> can help here.
>>
>> So we have a s3 path that flink is monitoring that path to see new files
>> available.
>>
>> val avroInputStream_activity = env.readFile(format, path,
>> FileProcessingMode.PROCESS_CONTINUOUSLY, 10000)
>>
>> I am doing both internal and external check pointing and let's say there
>> is a bad file came to the path and flink will do several retries. I want to
>> take those bad files and let the process continue. However, since the file
>> path persist in the checkpoint, when I try to resume from external
>> checkpoint, it threw the following error on no file been found.
>>
>> java.io.IOException: Error opening the Input Split s3a://myfile [0,904]:
>> No such file or directory: s3a://myfile
>>
>> Is there a way to skip this bad file and move on?
>> Thanks in advance.
>>
>> Best,
>> Chengzhi Zhao
>>
>>
>

Re: Question about flink checkpoint

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Chengzhi Zhao,

I think this is rather an issue with the ContinuousFileReaderOperator than
with the checkpointing algorithm in general.
A source can decide which information to store as state and also how to
handle failures such as file paths that have been put into state but have
been removed from the file system.

It would be great if you could open a JIRA issue with a feature request to
improve the failure behavior of the ContinuousFileReaderOperator.
It could for example check if a path exists and before trying to read a
file and ignore the input split instead of throwing an exception and
causing a failure.
If you want to, you can also work on a fix and contribute it back.

Best, Fabian

2018-02-06 19:15 GMT+01:00 Chengzhi Zhao <w....@gmail.com>:

> Hey, I am new to flink and I have a question and want to see if anyone can
> help here.
>
> So we have a s3 path that flink is monitoring that path to see new files
> available.
>
> val avroInputStream_activity = env.readFile(format, path,
> FileProcessingMode.PROCESS_CONTINUOUSLY, 10000)
>
> I am doing both internal and external check pointing and let's say there
> is a bad file came to the path and flink will do several retries. I want to
> take those bad files and let the process continue. However, since the file
> path persist in the checkpoint, when I try to resume from external
> checkpoint, it threw the following error on no file been found.
>
> java.io.IOException: Error opening the Input Split s3a://myfile [0,904]:
> No such file or directory: s3a://myfile
>
> Is there a way to skip this bad file and move on?
> Thanks in advance.
>
> Best,
> Chengzhi Zhao
>
>