You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Kien Truong <du...@gmail.com> on 2018/02/01 12:06:11 UTC

Re: How does BucketingSink generate a SUCCESS file when a directory is finished

Hi,

I did not actually test this, but I think with Flink 1.4 you can extend 
BucketingSink and overwrite the invoke method to access the watermark

Pseudo code:

invoke(IN value, SinkFunction.Context context) {

    long currentWatermark = context.watermark()

    long taskIndex = getRuntimeContext().getIndexOfThisSubtask()

     if (taskIndex == 0 && currentWatermark - lastSuccessWatermark > 1 hour) {

        Write _SUCCESS

        lastSuccessWatermark = currentWatermark round down to 1 hour

     }

     invoke(value)

}

|Regards, Kien |

On 1/31/2018 5:54 PM, xiaobin yan wrote:
> Hi:
>
> I think so too! But I have a question that when should I add this logic in BucketingSink! And who does this logic, and ensures that the logic is executed only once, not every parallel instance of the sink that executes this logic!
>
> Best,
> Ben
>
>> On 31 Jan 2018, at 5:58 PM, Hung <un...@gmail.com> wrote:
>>
>> it depends on how you partition your file. in my case I write file per hour,
>> so I'm sure that file is ready after that hour period, in processing time.
>> Here, read to be ready means this file contains all the data in that hour
>> period.
>>
>> If the downstream runs in a batch way, you may want to ensure the file is
>> ready.
>> In this case, ready to read can mean all the data before watermark as
>> arrived.
>> You could take the BucketingSink and implement this logic there, maybe wait
>> until watermark
>> reaches
>>
>> Best,
>>
>> Sendoh
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How does BucketingSink generate a SUCCESS file when a directory is finished

Posted by Fabian Hueske <fh...@gmail.com>.
According to the JavaDocs of BucketingSink, in-progress files are still
being written to.
I don't know what would cause a file to remain in that state.

Another thing to mention, you might want to ensure that only the last task
that renames the file generates the _SUCCESS file.
Otherwise, the data might be consumed too early. You'd have to figure out
how to do that in a race-condition free way.

Best, Fabian

2018-02-06 11:03 GMT+01:00 xiaobin yan <ya...@gmail.com>:

> Hi,
>
> I think it can be judged at the BucketingSink.notifyCheckpointComplete()
> method  in this way, as shown below:
>
> if (!bucketState.isWriterOpen &&
>         bucketState.pendingFiles.isEmpty() &&
>         bucketState.pendingFilesPerCheckpoint.isEmpty()) {
>     boolean flag = true;
>     RemoteIterator<LocatedFileStatus> files = fs.listFiles(new Path(directory), false);
>     while (files.hasNext()) {
>         LocatedFileStatus file = files.next();
>         String fileName = file.getPath().getName();
>         if (fileName.lastIndexOf(".") != -1) {
>             flag = false;
>             break;
>         }
>     }
>     Path path = new Path(directory + "/_SUCCESS");
>     if (flag && !fs.exists(path)){
>         FSDataOutputStream outputStream = fs.create(path);
>         outputStream.flush();
>         outputStream.close();
>     }
>     // We've dealt with all the pending files and the writer for this bucket is not currently open.
>     // Therefore this bucket is currently inactive and we can remove it from our state.
>     bucketStatesIt.remove();
> }
>
>
> But we find this problem: occasionally, a file is always in the
> in-progress state, and the amount of data processed by each sub task is
> almost the same, and there is no data skew. There are no exceptions in the
> program.
>
> Best,
> Ben
>
>
>
> On 6 Feb 2018, at 5:50 PM, xiaobin yan <ya...@gmail.com>
> wrote:
>
> Hi,
>
> Thanks for your reply! See here :https://github.com/apache/
> flink/blob/master/flink-connectors/flink-connector-
> filesystem/src/main/java/org/apache/flink/streaming/
> connectors/fs/bucketing/BucketingSink.java#L652  After calling this
> method, the file is renamed,and we can't determine which subtask is
> finished at last.
>
> Best,
> Ben
>
> On 6 Feb 2018, at 5:35 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
> The notifyCheckpointComplete() method will be called when all subtasks of
> a job completed their checkpoints.
> Check the JavaDocs of the CheckpointListener class [1].
>
> Please note that you need to handle the case where multiple tasks try to
> create the _SUCCESS file concurrently.
> So, there is a good chance of race conditions between file checks and
> modifications.
>
> Best, Fabian
>
> [1] https://github.com/apache/flink/blob/master/flink-
> runtime/src/main/java/org/apache/flink/runtime/state/
> CheckpointListener.java
>
> 2018-02-06 4:14 GMT+01:00 xiaobin yan <ya...@gmail.com>:
>
>> Hi ,
>>
>> You've got a point. I saw that method, but how can I make sure that all
>> the subtasks checkpoint are finished, because I can only write _SUCCESS
>> file at that time.
>>
>> Best,
>> Ben
>>
>>
>> On 5 Feb 2018, at 6:34 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>> In case of a failure, Flink rolls back the job to the last checkpoint and
>> reprocesses all data since that checkpoint.
>> Also the BucketingSink will truncate a file to the position of the last
>> checkpoint if the file system supports truncate. If not, it writes a file
>> with the valid length and starts a new file.
>>
>> Therefore, all files that the BucketingSink finishes must be treated as
>> volatile until the next checkpoint is completed.
>> Only when a checkpoint is completed a finalized file may be read. The
>> files are renamed on checkpoint to signal that they are final and can be
>> read. This would also be the right time to generate a _SUCCESS file.
>> Have a look at the BucketingSink.notifyCheckpointComplete() method.
>>
>> Best, Fabian
>>
>>
>>
>>
>> 2018-02-05 6:43 GMT+01:00 xiaobin yan <ya...@gmail.com>:
>>
>>> Hi ,
>>>
>>> I have tested it. There are some small problems. When checkpoint is
>>> finished, the name of the file will change, and the success file will be
>>> written before checkpoint.
>>>
>>> Best,
>>> Ben
>>>
>>>
>>> On 1 Feb 2018, at 8:06 PM, Kien Truong <du...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> I did not actually test this, but I think with Flink 1.4 you can extend
>>> BucketingSink and overwrite the invoke method to access the watermark
>>>
>>> Pseudo code:
>>>
>>> invoke(IN value, SinkFunction.Context context) {
>>>
>>>    long currentWatermark = context.watermark()
>>>
>>>    long taskIndex = getRuntimeContext().getIndexOfThisSubtask()
>>>
>>>     if (taskIndex == 0 && currentWatermark - lastSuccessWatermark > 1 hour) {
>>>
>>>        Write _SUCCESS
>>>
>>>        lastSuccessWatermark = currentWatermark round down to 1 hour
>>>
>>>     }
>>>
>>>     invoke(value)
>>>
>>> }
>>>
>>>
>>> Regards,
>>> Kien
>>>
>>> On 1/31/2018 5:54 PM, xiaobin yan wrote:
>>>
>>> Hi:
>>>
>>> I think so too! But I have a question that when should I add this logic in BucketingSink! And who does this logic, and ensures that the logic is executed only once, not every parallel instance of the sink that executes this logic!
>>>
>>> Best,
>>> Ben
>>>
>>>
>>> On 31 Jan 2018, at 5:58 PM, Hung <un...@gmail.com> <un...@gmail.com> wrote:
>>>
>>> it depends on how you partition your file. in my case I write file per hour,
>>> so I'm sure that file is ready after that hour period, in processing time.
>>> Here, read to be ready means this file contains all the data in that hour
>>> period.
>>>
>>> If the downstream runs in a batch way, you may want to ensure the file is
>>> ready.
>>> In this case, ready to read can mean all the data before watermark as
>>> arrived.
>>> You could take the BucketingSink and implement this logic there, maybe wait
>>> until watermark
>>> reaches
>>>
>>> Best,
>>>
>>> Sendoh
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>>
>>>
>>
>>
>
>
>

Re: How does BucketingSink generate a SUCCESS file when a directory is finished

Posted by xiaobin yan <ya...@gmail.com>.
Hi, 

I think it can be judged at the BucketingSink.notifyCheckpointComplete() method  in this way, as shown below:
if (!bucketState.isWriterOpen &&
        bucketState.pendingFiles.isEmpty() &&
        bucketState.pendingFilesPerCheckpoint.isEmpty()) {
    boolean flag = true;
    RemoteIterator<LocatedFileStatus> files = fs.listFiles(new Path(directory), false);
    while (files.hasNext()) {
        LocatedFileStatus file = files.next();
        String fileName = file.getPath().getName();
        if (fileName.lastIndexOf(".") != -1) {
            flag = false;
            break;
        }
    }
    Path path = new Path(directory + "/_SUCCESS");
    if (flag && !fs.exists(path)){
        FSDataOutputStream outputStream = fs.create(path);
        outputStream.flush();
        outputStream.close();
    }
    // We've dealt with all the pending files and the writer for this bucket is not currently open.
    // Therefore this bucket is currently inactive and we can remove it from our state.
    bucketStatesIt.remove();
}

But we find this problem: occasionally, a file is always in the in-progress state, and the amount of data processed by each sub task is almost the same, and there is no data skew. There are no exceptions in the program.

Best,
Ben



> On 6 Feb 2018, at 5:50 PM, xiaobin yan <ya...@gmail.com> wrote:
> 
> Hi, 
> 
> Thanks for your reply! See here :https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L652 <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L652>  After calling this method, the file is renamed,and we can't determine which subtask is finished at last.
> 
> Best,
> Ben
> 
>> On 6 Feb 2018, at 5:35 PM, Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> wrote:
>> 
>> The notifyCheckpointComplete() method will be called when all subtasks of a job completed their checkpoints.
>> Check the JavaDocs of the CheckpointListener class [1].
>> 
>> Please note that you need to handle the case where multiple tasks try to create the _SUCCESS file concurrently.
>> So, there is a good chance of race conditions between file checks and modifications.
>> 
>> Best, Fabian
>> 
>> [1] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java <https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java>
>> 
>> 2018-02-06 4:14 GMT+01:00 xiaobin yan <yan.xiao.bin.mail@gmail.com <ma...@gmail.com>>:
>> Hi ,
>> 
>> 	You've got a point. I saw that method, but how can I make sure that all the subtasks checkpoint are finished, because I can only write _SUCCESS file at that time.
>> 
>> Best,
>> Ben
>> 
>> 
>>> On 5 Feb 2018, at 6:34 PM, Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> In case of a failure, Flink rolls back the job to the last checkpoint and reprocesses all data since that checkpoint. 
>>> Also the BucketingSink will truncate a file to the position of the last checkpoint if the file system supports truncate. If not, it writes a file with the valid length and starts a new file.
>>> 
>>> Therefore, all files that the BucketingSink finishes must be treated as volatile until the next checkpoint is completed. 
>>> Only when a checkpoint is completed a finalized file may be read. The files are renamed on checkpoint to signal that they are final and can be read. This would also be the right time to generate a _SUCCESS file.
>>> Have a look at the BucketingSink.notifyCheckpointComplete() method.
>>> 
>>> Best, Fabian
>>> 
>>> 
>>> 
>>> 
>>> 2018-02-05 6:43 GMT+01:00 xiaobin yan <yan.xiao.bin.mail@gmail.com <ma...@gmail.com>>:
>>> Hi ,
>>> 
>>> 	I have tested it. There are some small problems. When checkpoint is finished, the name of the file will change, and the success file will be written before checkpoint.
>>> 
>>> Best,
>>> Ben
>>> 
>>> 
>>>> On 1 Feb 2018, at 8:06 PM, Kien Truong <duckientruong@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> I did not actually test this, but I think with Flink 1.4 you can extend BucketingSink and overwrite the invoke method to access the watermark
>>>> Pseudo code:
>>>> invoke(IN value, SinkFunction.Context context) {
>>>>    long currentWatermark = context.watermark()
>>>>    long taskIndex = getRuntimeContext().getIndexOfThisSubtask()
>>>>     if (taskIndex == 0 && currentWatermark - lastSuccessWatermark > 1 hour) {
>>>>        Write _SUCCESS
>>>>        lastSuccessWatermark = currentWatermark round down to 1 hour
>>>>     }
>>>>     invoke(value)
>>>> }
>>>> 
>>>> Regards,
>>>> Kien
>>>> On 1/31/2018 5:54 PM, xiaobin yan wrote:
>>>>> Hi:
>>>>> 
>>>>> I think so too! But I have a question that when should I add this logic in BucketingSink! And who does this logic, and ensures that the logic is executed only once, not every parallel instance of the sink that executes this logic!
>>>>> 
>>>>> Best,
>>>>> Ben
>>>>> 
>>>>>> On 31 Jan 2018, at 5:58 PM, Hung <un...@gmail.com> <ma...@gmail.com> wrote:
>>>>>> 
>>>>>> it depends on how you partition your file. in my case I write file per hour,
>>>>>> so I'm sure that file is ready after that hour period, in processing time.
>>>>>> Here, read to be ready means this file contains all the data in that hour
>>>>>> period.
>>>>>> 
>>>>>> If the downstream runs in a batch way, you may want to ensure the file is
>>>>>> ready.
>>>>>> In this case, ready to read can mean all the data before watermark as
>>>>>> arrived.
>>>>>> You could take the BucketingSink and implement this logic there, maybe wait
>>>>>> until watermark
>>>>>> reaches
>>>>>> 
>>>>>> Best,
>>>>>> 
>>>>>> Sendoh
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>> 
>>> 
>> 
>> 
> 


Re: How does BucketingSink generate a SUCCESS file when a directory is finished

Posted by xiaobin yan <ya...@gmail.com>.
Hi, 

Thanks for your reply! See here :https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L652 <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L652>  After calling this method, the file is renamed,and we can't determine which subtask is finished at last.

Best,
Ben

> On 6 Feb 2018, at 5:35 PM, Fabian Hueske <fh...@gmail.com> wrote:
> 
> The notifyCheckpointComplete() method will be called when all subtasks of a job completed their checkpoints.
> Check the JavaDocs of the CheckpointListener class [1].
> 
> Please note that you need to handle the case where multiple tasks try to create the _SUCCESS file concurrently.
> So, there is a good chance of race conditions between file checks and modifications.
> 
> Best, Fabian
> 
> [1] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java <https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java>
> 
> 2018-02-06 4:14 GMT+01:00 xiaobin yan <yan.xiao.bin.mail@gmail.com <ma...@gmail.com>>:
> Hi ,
> 
> 	You've got a point. I saw that method, but how can I make sure that all the subtasks checkpoint are finished, because I can only write _SUCCESS file at that time.
> 
> Best,
> Ben
> 
> 
>> On 5 Feb 2018, at 6:34 PM, Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> wrote:
>> 
>> In case of a failure, Flink rolls back the job to the last checkpoint and reprocesses all data since that checkpoint. 
>> Also the BucketingSink will truncate a file to the position of the last checkpoint if the file system supports truncate. If not, it writes a file with the valid length and starts a new file.
>> 
>> Therefore, all files that the BucketingSink finishes must be treated as volatile until the next checkpoint is completed. 
>> Only when a checkpoint is completed a finalized file may be read. The files are renamed on checkpoint to signal that they are final and can be read. This would also be the right time to generate a _SUCCESS file.
>> Have a look at the BucketingSink.notifyCheckpointComplete() method.
>> 
>> Best, Fabian
>> 
>> 
>> 
>> 
>> 2018-02-05 6:43 GMT+01:00 xiaobin yan <yan.xiao.bin.mail@gmail.com <ma...@gmail.com>>:
>> Hi ,
>> 
>> 	I have tested it. There are some small problems. When checkpoint is finished, the name of the file will change, and the success file will be written before checkpoint.
>> 
>> Best,
>> Ben
>> 
>> 
>>> On 1 Feb 2018, at 8:06 PM, Kien Truong <duckientruong@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> I did not actually test this, but I think with Flink 1.4 you can extend BucketingSink and overwrite the invoke method to access the watermark
>>> Pseudo code:
>>> invoke(IN value, SinkFunction.Context context) {
>>>    long currentWatermark = context.watermark()
>>>    long taskIndex = getRuntimeContext().getIndexOfThisSubtask()
>>>     if (taskIndex == 0 && currentWatermark - lastSuccessWatermark > 1 hour) {
>>>        Write _SUCCESS
>>>        lastSuccessWatermark = currentWatermark round down to 1 hour
>>>     }
>>>     invoke(value)
>>> }
>>> 
>>> Regards,
>>> Kien
>>> On 1/31/2018 5:54 PM, xiaobin yan wrote:
>>>> Hi:
>>>> 
>>>> I think so too! But I have a question that when should I add this logic in BucketingSink! And who does this logic, and ensures that the logic is executed only once, not every parallel instance of the sink that executes this logic!
>>>> 
>>>> Best,
>>>> Ben
>>>> 
>>>>> On 31 Jan 2018, at 5:58 PM, Hung <un...@gmail.com> <ma...@gmail.com> wrote:
>>>>> 
>>>>> it depends on how you partition your file. in my case I write file per hour,
>>>>> so I'm sure that file is ready after that hour period, in processing time.
>>>>> Here, read to be ready means this file contains all the data in that hour
>>>>> period.
>>>>> 
>>>>> If the downstream runs in a batch way, you may want to ensure the file is
>>>>> ready.
>>>>> In this case, ready to read can mean all the data before watermark as
>>>>> arrived.
>>>>> You could take the BucketingSink and implement this logic there, maybe wait
>>>>> until watermark
>>>>> reaches
>>>>> 
>>>>> Best,
>>>>> 
>>>>> Sendoh
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>> 
>> 
> 
> 


Re: How does BucketingSink generate a SUCCESS file when a directory is finished

Posted by Fabian Hueske <fh...@gmail.com>.
The notifyCheckpointComplete() method will be called when all subtasks of a
job completed their checkpoints.
Check the JavaDocs of the CheckpointListener class [1].

Please note that you need to handle the case where multiple tasks try to
create the _SUCCESS file concurrently.
So, there is a good chance of race conditions between file checks and
modifications.

Best, Fabian

[1]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointListener.java

2018-02-06 4:14 GMT+01:00 xiaobin yan <ya...@gmail.com>:

> Hi ,
>
> You've got a point. I saw that method, but how can I make sure that all
> the subtasks checkpoint are finished, because I can only write _SUCCESS
> file at that time.
>
> Best,
> Ben
>
>
> On 5 Feb 2018, at 6:34 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
> In case of a failure, Flink rolls back the job to the last checkpoint and
> reprocesses all data since that checkpoint.
> Also the BucketingSink will truncate a file to the position of the last
> checkpoint if the file system supports truncate. If not, it writes a file
> with the valid length and starts a new file.
>
> Therefore, all files that the BucketingSink finishes must be treated as
> volatile until the next checkpoint is completed.
> Only when a checkpoint is completed a finalized file may be read. The
> files are renamed on checkpoint to signal that they are final and can be
> read. This would also be the right time to generate a _SUCCESS file.
> Have a look at the BucketingSink.notifyCheckpointComplete() method.
>
> Best, Fabian
>
>
>
>
> 2018-02-05 6:43 GMT+01:00 xiaobin yan <ya...@gmail.com>:
>
>> Hi ,
>>
>> I have tested it. There are some small problems. When checkpoint is
>> finished, the name of the file will change, and the success file will be
>> written before checkpoint.
>>
>> Best,
>> Ben
>>
>>
>> On 1 Feb 2018, at 8:06 PM, Kien Truong <du...@gmail.com> wrote:
>>
>> Hi,
>>
>> I did not actually test this, but I think with Flink 1.4 you can extend
>> BucketingSink and overwrite the invoke method to access the watermark
>>
>> Pseudo code:
>>
>> invoke(IN value, SinkFunction.Context context) {
>>
>>    long currentWatermark = context.watermark()
>>
>>    long taskIndex = getRuntimeContext().getIndexOfThisSubtask()
>>
>>     if (taskIndex == 0 && currentWatermark - lastSuccessWatermark > 1 hour) {
>>
>>        Write _SUCCESS
>>
>>        lastSuccessWatermark = currentWatermark round down to 1 hour
>>
>>     }
>>
>>     invoke(value)
>>
>> }
>>
>>
>> Regards,
>> Kien
>>
>> On 1/31/2018 5:54 PM, xiaobin yan wrote:
>>
>> Hi:
>>
>> I think so too! But I have a question that when should I add this logic in BucketingSink! And who does this logic, and ensures that the logic is executed only once, not every parallel instance of the sink that executes this logic!
>>
>> Best,
>> Ben
>>
>>
>> On 31 Jan 2018, at 5:58 PM, Hung <un...@gmail.com> <un...@gmail.com> wrote:
>>
>> it depends on how you partition your file. in my case I write file per hour,
>> so I'm sure that file is ready after that hour period, in processing time.
>> Here, read to be ready means this file contains all the data in that hour
>> period.
>>
>> If the downstream runs in a batch way, you may want to ensure the file is
>> ready.
>> In this case, ready to read can mean all the data before watermark as
>> arrived.
>> You could take the BucketingSink and implement this logic there, maybe wait
>> until watermark
>> reaches
>>
>> Best,
>>
>> Sendoh
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>
>>
>
>

Re: How does BucketingSink generate a SUCCESS file when a directory is finished

Posted by xiaobin yan <ya...@gmail.com>.
Hi ,

	You've got a point. I saw that method, but how can I make sure that all the subtasks checkpoint are finished, because I can only write _SUCCESS file at that time.

Best,
Ben

> On 5 Feb 2018, at 6:34 PM, Fabian Hueske <fh...@gmail.com> wrote:
> 
> In case of a failure, Flink rolls back the job to the last checkpoint and reprocesses all data since that checkpoint. 
> Also the BucketingSink will truncate a file to the position of the last checkpoint if the file system supports truncate. If not, it writes a file with the valid length and starts a new file.
> 
> Therefore, all files that the BucketingSink finishes must be treated as volatile until the next checkpoint is completed. 
> Only when a checkpoint is completed a finalized file may be read. The files are renamed on checkpoint to signal that they are final and can be read. This would also be the right time to generate a _SUCCESS file.
> Have a look at the BucketingSink.notifyCheckpointComplete() method.
> 
> Best, Fabian
> 
> 
> 
> 
> 2018-02-05 6:43 GMT+01:00 xiaobin yan <yan.xiao.bin.mail@gmail.com <ma...@gmail.com>>:
> Hi ,
> 
> 	I have tested it. There are some small problems. When checkpoint is finished, the name of the file will change, and the success file will be written before checkpoint.
> 
> Best,
> Ben
> 
> 
>> On 1 Feb 2018, at 8:06 PM, Kien Truong <duckientruong@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi,
>> 
>> I did not actually test this, but I think with Flink 1.4 you can extend BucketingSink and overwrite the invoke method to access the watermark
>> Pseudo code:
>> invoke(IN value, SinkFunction.Context context) {
>>    long currentWatermark = context.watermark()
>>    long taskIndex = getRuntimeContext().getIndexOfThisSubtask()
>>     if (taskIndex == 0 && currentWatermark - lastSuccessWatermark > 1 hour) {
>>        Write _SUCCESS
>>        lastSuccessWatermark = currentWatermark round down to 1 hour
>>     }
>>     invoke(value)
>> }
>> 
>> Regards,
>> Kien
>> On 1/31/2018 5:54 PM, xiaobin yan wrote:
>>> Hi:
>>> 
>>> I think so too! But I have a question that when should I add this logic in BucketingSink! And who does this logic, and ensures that the logic is executed only once, not every parallel instance of the sink that executes this logic!
>>> 
>>> Best,
>>> Ben
>>> 
>>>> On 31 Jan 2018, at 5:58 PM, Hung <un...@gmail.com> <ma...@gmail.com> wrote:
>>>> 
>>>> it depends on how you partition your file. in my case I write file per hour,
>>>> so I'm sure that file is ready after that hour period, in processing time.
>>>> Here, read to be ready means this file contains all the data in that hour
>>>> period.
>>>> 
>>>> If the downstream runs in a batch way, you may want to ensure the file is
>>>> ready.
>>>> In this case, ready to read can mean all the data before watermark as
>>>> arrived.
>>>> You could take the BucketingSink and implement this logic there, maybe wait
>>>> until watermark
>>>> reaches
>>>> 
>>>> Best,
>>>> 
>>>> Sendoh
>>>> 
>>>> 
>>>> 
>>>> --
>>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
> 
> 


Re: How does BucketingSink generate a SUCCESS file when a directory is finished

Posted by Fabian Hueske <fh...@gmail.com>.
In case of a failure, Flink rolls back the job to the last checkpoint and
reprocesses all data since that checkpoint.
Also the BucketingSink will truncate a file to the position of the last
checkpoint if the file system supports truncate. If not, it writes a file
with the valid length and starts a new file.

Therefore, all files that the BucketingSink finishes must be treated as
volatile until the next checkpoint is completed.
Only when a checkpoint is completed a finalized file may be read. The files
are renamed on checkpoint to signal that they are final and can be read.
This would also be the right time to generate a _SUCCESS file.
Have a look at the BucketingSink.notifyCheckpointComplete() method.

Best, Fabian




2018-02-05 6:43 GMT+01:00 xiaobin yan <ya...@gmail.com>:

> Hi ,
>
> I have tested it. There are some small problems. When checkpoint is
> finished, the name of the file will change, and the success file will be
> written before checkpoint.
>
> Best,
> Ben
>
>
> On 1 Feb 2018, at 8:06 PM, Kien Truong <du...@gmail.com> wrote:
>
> Hi,
>
> I did not actually test this, but I think with Flink 1.4 you can extend
> BucketingSink and overwrite the invoke method to access the watermark
>
> Pseudo code:
>
> invoke(IN value, SinkFunction.Context context) {
>
>    long currentWatermark = context.watermark()
>
>    long taskIndex = getRuntimeContext().getIndexOfThisSubtask()
>
>     if (taskIndex == 0 && currentWatermark - lastSuccessWatermark > 1 hour) {
>
>        Write _SUCCESS
>
>        lastSuccessWatermark = currentWatermark round down to 1 hour
>
>     }
>
>     invoke(value)
>
> }
>
>
> Regards,
> Kien
>
> On 1/31/2018 5:54 PM, xiaobin yan wrote:
>
> Hi:
>
> I think so too! But I have a question that when should I add this logic in BucketingSink! And who does this logic, and ensures that the logic is executed only once, not every parallel instance of the sink that executes this logic!
>
> Best,
> Ben
>
>
> On 31 Jan 2018, at 5:58 PM, Hung <un...@gmail.com> <un...@gmail.com> wrote:
>
> it depends on how you partition your file. in my case I write file per hour,
> so I'm sure that file is ready after that hour period, in processing time.
> Here, read to be ready means this file contains all the data in that hour
> period.
>
> If the downstream runs in a batch way, you may want to ensure the file is
> ready.
> In this case, ready to read can mean all the data before watermark as
> arrived.
> You could take the BucketingSink and implement this logic there, maybe wait
> until watermark
> reaches
>
> Best,
>
> Sendoh
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>

Re: How does BucketingSink generate a SUCCESS file when a directory is finished

Posted by xiaobin yan <ya...@gmail.com>.
Hi ,

	I have tested it. There are some small problems. When checkpoint is finished, the name of the file will change, and the success file will be written before checkpoint.

Best,
Ben

> On 1 Feb 2018, at 8:06 PM, Kien Truong <du...@gmail.com> wrote:
> 
> Hi,
> 
> I did not actually test this, but I think with Flink 1.4 you can extend BucketingSink and overwrite the invoke method to access the watermark
> Pseudo code:
> invoke(IN value, SinkFunction.Context context) {
>    long currentWatermark = context.watermark()
>    long taskIndex = getRuntimeContext().getIndexOfThisSubtask()
>     if (taskIndex == 0 && currentWatermark - lastSuccessWatermark > 1 hour) {
>        Write _SUCCESS
>        lastSuccessWatermark = currentWatermark round down to 1 hour
>     }
>     invoke(value)
> }
> 
> Regards,
> Kien
> On 1/31/2018 5:54 PM, xiaobin yan wrote:
>> Hi:
>> 
>> I think so too! But I have a question that when should I add this logic in BucketingSink! And who does this logic, and ensures that the logic is executed only once, not every parallel instance of the sink that executes this logic!
>> 
>> Best,
>> Ben
>> 
>>> On 31 Jan 2018, at 5:58 PM, Hung <un...@gmail.com> <ma...@gmail.com> wrote:
>>> 
>>> it depends on how you partition your file. in my case I write file per hour,
>>> so I'm sure that file is ready after that hour period, in processing time.
>>> Here, read to be ready means this file contains all the data in that hour
>>> period.
>>> 
>>> If the downstream runs in a batch way, you may want to ensure the file is
>>> ready.
>>> In this case, ready to read can mean all the data before watermark as
>>> arrived.
>>> You could take the BucketingSink and implement this logic there, maybe wait
>>> until watermark
>>> reaches
>>> 
>>> Best,
>>> 
>>> Sendoh
>>> 
>>> 
>>> 
>>> --
>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>