You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vishal Santoshi <vi...@gmail.com> on 2019/02/10 15:12:52 UTC

fllink 1.7.1 and RollingFileSink

Can StreamingFileSink be used instead of
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html,
even though it looks it could.


This code for example


        StreamingFileSink
                .forRowFormat(new Path(PATH),
                        new SimpleStringEncoder<KafkaRecord>())
                .withBucketAssigner(new
KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
                .withRollingPolicy(new RollingPolicy<KafkaRecord, String>() {
                                       @Override
                                       public boolean
shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws
IOException {
                                           return false;
                                       }

                                       @Override
                                       public boolean
shouldRollOnEvent(PartFileInfo<String> partFileState,

 KafkaRecord element) throws IOException {
                                           return
partFileState.getSize() > 1024 * 1024 * 1024l;
                                       }

                                       @Override
                                       public boolean
shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long
currentTime) throws IOException {
                                           return currentTime -
partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
                                                   currentTime -
partFileState.getCreationTime() > 120 * 60 * 1000l;
                                       }
                                   }
                )
                .build();


few things I see and am not sure I follow about the new
RollingFileSink  vis a vis BucketingSink


1. I do not ever see the inprogress file go to the pending state, as
in renamed as pending, as was the case in Bucketing Sink.  I would
assume that it would be pending and then

   finalized on checkpoint for exactly once semantics ?


2. I see dangling inprogress files at the end of the day. I would
assume that the withBucketCheckInterval set to 1 minute by default,
the shouldRollOnProcessingTime should kick in ?

 3. The inprogress files are  like
.part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is
that additional suffix ?




I have the following set up on the env

env.enableCheckpointing(10 * 60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setRestartStrategy(fixedDelayRestart(4,
org.apache.flink.api.common.time.Time.minutes(1)));
StateBackend stateBackEnd = new MemoryStateBackend();
env.setStateBackend(stateBackEnd);


Regards.

Re: fllink 1.7.1 and RollingFileSink

Posted by Vishal Santoshi <vi...@gmail.com>.
Awesome, thanks!  Will open a new thread. But yes the inprogress file was
helpful.

On Thu, Feb 14, 2019, 7:50 AM Kostas Kloudas <k.kloudas@ververica.com wrote:

> Hi Vishal,
>
> For the StreamingFileSink vs Rolling/BucketingSink:
>  - you can use the StreamingFileSink instead of the Rolling/BucketingSink.
> You can see the StreamingFileSink as an evolution of the previous two.
>
> In the StreamingFileSink the files in Pending state are not renamed, but
> they keep their "*in-progress*" name. This is the reason why you do not see
> .pending files anymore.
>
> What Timothy said for bulk formats is correct. They only support
> "onCheckpoint" rolling policy.
>
> Now for the second issue about deployment, I would recommend to open a new
> thread so that people can see from the title if they can help or not.
> In addition, it is good to have the title indicating the content of the
> topic for the community. The mailing list is searchable by search engines,
> so if someone
> has a similar question, the title will help to retrieve the relevant
> thread.
>
> Cheers,
> Kostas
>
>
> On Thu, Feb 14, 2019 at 12:09 PM Vishal Santoshi <
> vishal.santoshi@gmail.com> wrote:
>
>> Thanks Fabian,
>>
>>  more questions
>>
>> 1. I had on k8s standlone job
>> env.getCheckpointConfig().setFailOnCheckpointingErrors(true)// the
>> default. The job failed on chkpoint and I would have imagined that under HA
>> the job would restore from the last checkpoint but it did not ( The UI
>> showed the job had restarted without a restore . The state was wiped out
>> and the job was relaunched but with no state.
>>
>> 2. I had the inprogress files from that failed instance and that is
>> consistent with no restored state.
>>
>> Thus there are few  questions
>>
>> 1. In k8s and with stand alone job cluster, have we tested the scenerio
>> of the* container failing* ( the pod remained in tact ) and restore ?
>> In this case the pod remained up and running but it was definitely a clean
>> relaunch of the container the pod was executing.
>>
>>
>> 2. Did I have any configuration missing . given the below  ?
>>
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>> env.enableCheckpointing(30 * 60000);
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
>> env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
>> StateBackend stateBackEnd = new FsStateBackend(
>>         new org.apache.flink.core.fs.Path(
>>                 "........"));
>> env.setStateBackend(stateBackEnd);
>>
>>
>> 3. What is the nature of RollingFileSink ?  How does it enable exactly
>> once semantics ( or does it not . ) ?
>>
>> Any help will be appreciated.
>>
>> Regards.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Mon, Feb 11, 2019 at 5:00 AM Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Hi Vishal,
>>>
>>> Kostas (in CC) should be able to help here.
>>>
>>> Best, Fabian
>>>
>>> Am Mo., 11. Feb. 2019 um 00:05 Uhr schrieb Vishal Santoshi <
>>> vishal.santoshi@gmail.com>:
>>>
>>>> Any one ?
>>>>
>>>> On Sun, Feb 10, 2019 at 2:07 PM Vishal Santoshi <
>>>> vishal.santoshi@gmail.com> wrote:
>>>>
>>>>> You don't have to. Thank you for the input.
>>>>>
>>>>> On Sun, Feb 10, 2019 at 1:56 PM Timothy Victor <vi...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> My apologies for not seeing your use case properly.   The constraint
>>>>>> on rolling policy is only applicable for bulk formats such as Parquet as
>>>>>> highlighted in the docs.
>>>>>>
>>>>>> As for your questions, I'll have to defer to others more familiar
>>>>>> with it.   I mostly just use bulk formats such as avro and parquet.
>>>>>>
>>>>>> Tim
>>>>>>
>>>>>>
>>>>>> On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi <
>>>>>> vishal.santoshi@gmail.com wrote:
>>>>>>
>>>>>>> That said the in the DefaultRollingPolicy it seems the check is on
>>>>>>> the file size ( mimics the check shouldRollOnEVent()).
>>>>>>>
>>>>>>> I guess the question is
>>>>>>>
>>>>>>> Is  the call to shouldRollOnCheckPoint.  done by the checkpointing
>>>>>>> thread ?
>>>>>>>
>>>>>>> Are the calls to the other 2 methods shouldRollOnEVent and
>>>>>>> shouldRollOnProcessingTIme done on the execution thread  as in inlined ?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <
>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thanks for the quick reply.
>>>>>>>>
>>>>>>>> I am confused. If this was a more full featured BucketingSink ,I
>>>>>>>> would imagine that  based on shouldRollOnEvent and shouldRollOnEvent,
>>>>>>>> an in progress file could go into pending phase and on checkpoint
>>>>>>>> the pending part file would be  finalized. For exactly once any files ( in
>>>>>>>> progress file ) will have a length of the file  snapshotted to the
>>>>>>>> checkpoint  and used to truncate the file ( if supported ) or dropped as a
>>>>>>>> part-length file ( if truncate not supported )  if a resume from a
>>>>>>>> checkpoint was to happen, to indicate what part of the the finalized file (
>>>>>>>> finalized when resumed ) was valid . and  I had always assumed ( and there
>>>>>>>> is no doc otherwise ) that shouldRollOnCheckpoint would be similar
>>>>>>>> to the other 2 apart from the fact it does the roll and finalize step in a
>>>>>>>> single step on a checkpoint.
>>>>>>>>
>>>>>>>>
>>>>>>>> Am I better off using BucketingSink ?  When to use BucketingSink
>>>>>>>> and when to use RollingSink is not clear at all, even though at the surface
>>>>>>>> it sure looks RollingSink is a better version of .BucketingSink ( or not )
>>>>>>>>
>>>>>>>> Regards.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <vi...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I think the only rolling policy that can be used is
>>>>>>>>> CheckpointRollingPolicy to ensure exactly once.
>>>>>>>>>
>>>>>>>>> Tim
>>>>>>>>>
>>>>>>>>> On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <
>>>>>>>>> vishal.santoshi@gmail.com wrote:
>>>>>>>>>
>>>>>>>>>> Can StreamingFileSink be used instead of https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, even though it looks it could.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> This code for example
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>         StreamingFileSink
>>>>>>>>>>                 .forRowFormat(new Path(PATH),
>>>>>>>>>>                         new SimpleStringEncoder<KafkaRecord>())
>>>>>>>>>>                 .withBucketAssigner(new KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
>>>>>>>>>>                 .withRollingPolicy(new RollingPolicy<KafkaRecord, String>() {
>>>>>>>>>>                                        @Override
>>>>>>>>>>                                        public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException {
>>>>>>>>>>                                            return false;
>>>>>>>>>>                                        }
>>>>>>>>>>
>>>>>>>>>>                                        @Override
>>>>>>>>>>                                        public boolean shouldRollOnEvent(PartFileInfo<String> partFileState,
>>>>>>>>>>                                                                         KafkaRecord element) throws IOException {
>>>>>>>>>>                                            return partFileState.getSize() > 1024 * 1024 * 1024l;
>>>>>>>>>>                                        }
>>>>>>>>>>
>>>>>>>>>>                                        @Override
>>>>>>>>>>                                        public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) throws IOException {
>>>>>>>>>>                                            return currentTime - partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
>>>>>>>>>>                                                    currentTime - partFileState.getCreationTime() > 120 * 60 * 1000l;
>>>>>>>>>>                                        }
>>>>>>>>>>                                    }
>>>>>>>>>>                 )
>>>>>>>>>>                 .build();
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> few things I see and am not sure I follow about the new RollingFileSink  vis a vis BucketingSink
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 1. I do not ever see the inprogress file go to the pending state, as in renamed as pending, as was the case in Bucketing Sink.  I would assume that it would be pending and then
>>>>>>>>>>
>>>>>>>>>>    finalized on checkpoint for exactly once semantics ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 2. I see dangling inprogress files at the end of the day. I would assume that the withBucketCheckInterval set to 1 minute by default, the shouldRollOnProcessingTime should kick in ?
>>>>>>>>>>
>>>>>>>>>>  3. The inprogress files are  like .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that additional suffix ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I have the following set up on the env
>>>>>>>>>>
>>>>>>>>>> env.enableCheckpointing(10 * 60000);
>>>>>>>>>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>>>>>>>>> env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
>>>>>>>>>> StateBackend stateBackEnd = new MemoryStateBackend();
>>>>>>>>>> env.setStateBackend(stateBackEnd);
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Regards.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>
> --
>
> Kostas Kloudas | Software Engineer
>
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>

Re: fllink 1.7.1 and RollingFileSink

Posted by Kostas Kloudas <k....@ververica.com>.
Hi Vishal,

For the StreamingFileSink vs Rolling/BucketingSink:
 - you can use the StreamingFileSink instead of the Rolling/BucketingSink.
You can see the StreamingFileSink as an evolution of the previous two.

In the StreamingFileSink the files in Pending state are not renamed, but
they keep their "*in-progress*" name. This is the reason why you do not see
.pending files anymore.

What Timothy said for bulk formats is correct. They only support
"onCheckpoint" rolling policy.

Now for the second issue about deployment, I would recommend to open a new
thread so that people can see from the title if they can help or not.
In addition, it is good to have the title indicating the content of the
topic for the community. The mailing list is searchable by search engines,
so if someone
has a similar question, the title will help to retrieve the relevant thread.

Cheers,
Kostas


On Thu, Feb 14, 2019 at 12:09 PM Vishal Santoshi <vi...@gmail.com>
wrote:

> Thanks Fabian,
>
>  more questions
>
> 1. I had on k8s standlone job
> env.getCheckpointConfig().setFailOnCheckpointingErrors(true)// the
> default. The job failed on chkpoint and I would have imagined that under HA
> the job would restore from the last checkpoint but it did not ( The UI
> showed the job had restarted without a restore . The state was wiped out
> and the job was relaunched but with no state.
>
> 2. I had the inprogress files from that failed instance and that is
> consistent with no restored state.
>
> Thus there are few  questions
>
> 1. In k8s and with stand alone job cluster, have we tested the scenerio of
> the* container failing* ( the pod remained in tact ) and restore ?  In
> this case the pod remained up and running but it was definitely a clean
> relaunch of the container the pod was executing.
>
>
> 2. Did I have any configuration missing . given the below  ?
>
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(30 * 60000);
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
> env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
> StateBackend stateBackEnd = new FsStateBackend(
>         new org.apache.flink.core.fs.Path(
>                 "........"));
> env.setStateBackend(stateBackEnd);
>
>
> 3. What is the nature of RollingFileSink ?  How does it enable exactly
> once semantics ( or does it not . ) ?
>
> Any help will be appreciated.
>
> Regards.
>
>
>
>
>
>
>
>
>
> On Mon, Feb 11, 2019 at 5:00 AM Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Vishal,
>>
>> Kostas (in CC) should be able to help here.
>>
>> Best, Fabian
>>
>> Am Mo., 11. Feb. 2019 um 00:05 Uhr schrieb Vishal Santoshi <
>> vishal.santoshi@gmail.com>:
>>
>>> Any one ?
>>>
>>> On Sun, Feb 10, 2019 at 2:07 PM Vishal Santoshi <
>>> vishal.santoshi@gmail.com> wrote:
>>>
>>>> You don't have to. Thank you for the input.
>>>>
>>>> On Sun, Feb 10, 2019 at 1:56 PM Timothy Victor <vi...@gmail.com>
>>>> wrote:
>>>>
>>>>> My apologies for not seeing your use case properly.   The constraint
>>>>> on rolling policy is only applicable for bulk formats such as Parquet as
>>>>> highlighted in the docs.
>>>>>
>>>>> As for your questions, I'll have to defer to others more familiar with
>>>>> it.   I mostly just use bulk formats such as avro and parquet.
>>>>>
>>>>> Tim
>>>>>
>>>>>
>>>>> On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi <
>>>>> vishal.santoshi@gmail.com wrote:
>>>>>
>>>>>> That said the in the DefaultRollingPolicy it seems the check is on
>>>>>> the file size ( mimics the check shouldRollOnEVent()).
>>>>>>
>>>>>> I guess the question is
>>>>>>
>>>>>> Is  the call to shouldRollOnCheckPoint.  done by the checkpointing
>>>>>> thread ?
>>>>>>
>>>>>> Are the calls to the other 2 methods shouldRollOnEVent and
>>>>>> shouldRollOnProcessingTIme done on the execution thread  as in inlined ?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <
>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks for the quick reply.
>>>>>>>
>>>>>>> I am confused. If this was a more full featured BucketingSink ,I
>>>>>>> would imagine that  based on shouldRollOnEvent and shouldRollOnEvent,
>>>>>>> an in progress file could go into pending phase and on checkpoint
>>>>>>> the pending part file would be  finalized. For exactly once any files ( in
>>>>>>> progress file ) will have a length of the file  snapshotted to the
>>>>>>> checkpoint  and used to truncate the file ( if supported ) or dropped as a
>>>>>>> part-length file ( if truncate not supported )  if a resume from a
>>>>>>> checkpoint was to happen, to indicate what part of the the finalized file (
>>>>>>> finalized when resumed ) was valid . and  I had always assumed ( and there
>>>>>>> is no doc otherwise ) that shouldRollOnCheckpoint would be similar
>>>>>>> to the other 2 apart from the fact it does the roll and finalize step in a
>>>>>>> single step on a checkpoint.
>>>>>>>
>>>>>>>
>>>>>>> Am I better off using BucketingSink ?  When to use BucketingSink and
>>>>>>> when to use RollingSink is not clear at all, even though at the surface it
>>>>>>> sure looks RollingSink is a better version of .BucketingSink ( or not )
>>>>>>>
>>>>>>> Regards.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <vi...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I think the only rolling policy that can be used is
>>>>>>>> CheckpointRollingPolicy to ensure exactly once.
>>>>>>>>
>>>>>>>> Tim
>>>>>>>>
>>>>>>>> On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <
>>>>>>>> vishal.santoshi@gmail.com wrote:
>>>>>>>>
>>>>>>>>> Can StreamingFileSink be used instead of https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, even though it looks it could.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This code for example
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>         StreamingFileSink
>>>>>>>>>                 .forRowFormat(new Path(PATH),
>>>>>>>>>                         new SimpleStringEncoder<KafkaRecord>())
>>>>>>>>>                 .withBucketAssigner(new KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
>>>>>>>>>                 .withRollingPolicy(new RollingPolicy<KafkaRecord, String>() {
>>>>>>>>>                                        @Override
>>>>>>>>>                                        public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException {
>>>>>>>>>                                            return false;
>>>>>>>>>                                        }
>>>>>>>>>
>>>>>>>>>                                        @Override
>>>>>>>>>                                        public boolean shouldRollOnEvent(PartFileInfo<String> partFileState,
>>>>>>>>>                                                                         KafkaRecord element) throws IOException {
>>>>>>>>>                                            return partFileState.getSize() > 1024 * 1024 * 1024l;
>>>>>>>>>                                        }
>>>>>>>>>
>>>>>>>>>                                        @Override
>>>>>>>>>                                        public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) throws IOException {
>>>>>>>>>                                            return currentTime - partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
>>>>>>>>>                                                    currentTime - partFileState.getCreationTime() > 120 * 60 * 1000l;
>>>>>>>>>                                        }
>>>>>>>>>                                    }
>>>>>>>>>                 )
>>>>>>>>>                 .build();
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> few things I see and am not sure I follow about the new RollingFileSink  vis a vis BucketingSink
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 1. I do not ever see the inprogress file go to the pending state, as in renamed as pending, as was the case in Bucketing Sink.  I would assume that it would be pending and then
>>>>>>>>>
>>>>>>>>>    finalized on checkpoint for exactly once semantics ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2. I see dangling inprogress files at the end of the day. I would assume that the withBucketCheckInterval set to 1 minute by default, the shouldRollOnProcessingTime should kick in ?
>>>>>>>>>
>>>>>>>>>  3. The inprogress files are  like .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that additional suffix ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I have the following set up on the env
>>>>>>>>>
>>>>>>>>> env.enableCheckpointing(10 * 60000);
>>>>>>>>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>>>>>>>> env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
>>>>>>>>> StateBackend stateBackEnd = new MemoryStateBackend();
>>>>>>>>> env.setStateBackend(stateBackEnd);
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Regards.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>

-- 

Kostas Kloudas | Software Engineer


<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Re: fllink 1.7.1 and RollingFileSink

Posted by Vishal Santoshi <vi...@gmail.com>.
Thanks Fabian,

 more questions

1. I had on k8s standlone job
env.getCheckpointConfig().setFailOnCheckpointingErrors(true)// the default.
The job failed on chkpoint and I would have imagined that under HA the job
would restore from the last checkpoint but it did not ( The UI showed the
job had restarted without a restore . The state was wiped out and the job
was relaunched but with no state.

2. I had the inprogress files from that failed instance and that is
consistent with no restored state.

Thus there are few  questions

1. In k8s and with stand alone job cluster, have we tested the scenerio of
the* container failing* ( the pod remained in tact ) and restore ?  In this
case the pod remained up and running but it was definitely a clean relaunch
of the container the pod was executing.


2. Did I have any configuration missing . given the below  ?

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30 * 60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
env.setRestartStrategy(fixedDelayRestart(4,
org.apache.flink.api.common.time.Time.minutes(1)));
StateBackend stateBackEnd = new FsStateBackend(
        new org.apache.flink.core.fs.Path(
                "........"));
env.setStateBackend(stateBackEnd);


3. What is the nature of RollingFileSink ?  How does it enable exactly once
semantics ( or does it not . ) ?

Any help will be appreciated.

Regards.









On Mon, Feb 11, 2019 at 5:00 AM Fabian Hueske <fh...@gmail.com> wrote:

> Hi Vishal,
>
> Kostas (in CC) should be able to help here.
>
> Best, Fabian
>
> Am Mo., 11. Feb. 2019 um 00:05 Uhr schrieb Vishal Santoshi <
> vishal.santoshi@gmail.com>:
>
>> Any one ?
>>
>> On Sun, Feb 10, 2019 at 2:07 PM Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> You don't have to. Thank you for the input.
>>>
>>> On Sun, Feb 10, 2019 at 1:56 PM Timothy Victor <vi...@gmail.com>
>>> wrote:
>>>
>>>> My apologies for not seeing your use case properly.   The constraint on
>>>> rolling policy is only applicable for bulk formats such as Parquet as
>>>> highlighted in the docs.
>>>>
>>>> As for your questions, I'll have to defer to others more familiar with
>>>> it.   I mostly just use bulk formats such as avro and parquet.
>>>>
>>>> Tim
>>>>
>>>>
>>>> On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi <
>>>> vishal.santoshi@gmail.com wrote:
>>>>
>>>>> That said the in the DefaultRollingPolicy it seems the check is on the
>>>>> file size ( mimics the check shouldRollOnEVent()).
>>>>>
>>>>> I guess the question is
>>>>>
>>>>> Is  the call to shouldRollOnCheckPoint.  done by the checkpointing
>>>>> thread ?
>>>>>
>>>>> Are the calls to the other 2 methods shouldRollOnEVent and
>>>>> shouldRollOnProcessingTIme done on the execution thread  as in inlined ?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <
>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>
>>>>>> Thanks for the quick reply.
>>>>>>
>>>>>> I am confused. If this was a more full featured BucketingSink ,I
>>>>>> would imagine that  based on shouldRollOnEvent and shouldRollOnEvent,
>>>>>> an in progress file could go into pending phase and on checkpoint the
>>>>>> pending part file would be  finalized. For exactly once any files ( in
>>>>>> progress file ) will have a length of the file  snapshotted to the
>>>>>> checkpoint  and used to truncate the file ( if supported ) or dropped as a
>>>>>> part-length file ( if truncate not supported )  if a resume from a
>>>>>> checkpoint was to happen, to indicate what part of the the finalized file (
>>>>>> finalized when resumed ) was valid . and  I had always assumed ( and there
>>>>>> is no doc otherwise ) that shouldRollOnCheckpoint would be similar
>>>>>> to the other 2 apart from the fact it does the roll and finalize step in a
>>>>>> single step on a checkpoint.
>>>>>>
>>>>>>
>>>>>> Am I better off using BucketingSink ?  When to use BucketingSink and
>>>>>> when to use RollingSink is not clear at all, even though at the surface it
>>>>>> sure looks RollingSink is a better version of .BucketingSink ( or not )
>>>>>>
>>>>>> Regards.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <vi...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I think the only rolling policy that can be used is
>>>>>>> CheckpointRollingPolicy to ensure exactly once.
>>>>>>>
>>>>>>> Tim
>>>>>>>
>>>>>>> On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <
>>>>>>> vishal.santoshi@gmail.com wrote:
>>>>>>>
>>>>>>>> Can StreamingFileSink be used instead of https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, even though it looks it could.
>>>>>>>>
>>>>>>>>
>>>>>>>> This code for example
>>>>>>>>
>>>>>>>>
>>>>>>>>         StreamingFileSink
>>>>>>>>                 .forRowFormat(new Path(PATH),
>>>>>>>>                         new SimpleStringEncoder<KafkaRecord>())
>>>>>>>>                 .withBucketAssigner(new KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
>>>>>>>>                 .withRollingPolicy(new RollingPolicy<KafkaRecord, String>() {
>>>>>>>>                                        @Override
>>>>>>>>                                        public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException {
>>>>>>>>                                            return false;
>>>>>>>>                                        }
>>>>>>>>
>>>>>>>>                                        @Override
>>>>>>>>                                        public boolean shouldRollOnEvent(PartFileInfo<String> partFileState,
>>>>>>>>                                                                         KafkaRecord element) throws IOException {
>>>>>>>>                                            return partFileState.getSize() > 1024 * 1024 * 1024l;
>>>>>>>>                                        }
>>>>>>>>
>>>>>>>>                                        @Override
>>>>>>>>                                        public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) throws IOException {
>>>>>>>>                                            return currentTime - partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
>>>>>>>>                                                    currentTime - partFileState.getCreationTime() > 120 * 60 * 1000l;
>>>>>>>>                                        }
>>>>>>>>                                    }
>>>>>>>>                 )
>>>>>>>>                 .build();
>>>>>>>>
>>>>>>>>
>>>>>>>> few things I see and am not sure I follow about the new RollingFileSink  vis a vis BucketingSink
>>>>>>>>
>>>>>>>>
>>>>>>>> 1. I do not ever see the inprogress file go to the pending state, as in renamed as pending, as was the case in Bucketing Sink.  I would assume that it would be pending and then
>>>>>>>>
>>>>>>>>    finalized on checkpoint for exactly once semantics ?
>>>>>>>>
>>>>>>>>
>>>>>>>> 2. I see dangling inprogress files at the end of the day. I would assume that the withBucketCheckInterval set to 1 minute by default, the shouldRollOnProcessingTime should kick in ?
>>>>>>>>
>>>>>>>>  3. The inprogress files are  like .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that additional suffix ?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I have the following set up on the env
>>>>>>>>
>>>>>>>> env.enableCheckpointing(10 * 60000);
>>>>>>>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>>>>>>> env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
>>>>>>>> StateBackend stateBackEnd = new MemoryStateBackend();
>>>>>>>> env.setStateBackend(stateBackEnd);
>>>>>>>>
>>>>>>>>
>>>>>>>> Regards.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>

Re: fllink 1.7.1 and RollingFileSink

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Vishal,

Kostas (in CC) should be able to help here.

Best, Fabian

Am Mo., 11. Feb. 2019 um 00:05 Uhr schrieb Vishal Santoshi <
vishal.santoshi@gmail.com>:

> Any one ?
>
> On Sun, Feb 10, 2019 at 2:07 PM Vishal Santoshi <vi...@gmail.com>
> wrote:
>
>> You don't have to. Thank you for the input.
>>
>> On Sun, Feb 10, 2019 at 1:56 PM Timothy Victor <vi...@gmail.com> wrote:
>>
>>> My apologies for not seeing your use case properly.   The constraint on
>>> rolling policy is only applicable for bulk formats such as Parquet as
>>> highlighted in the docs.
>>>
>>> As for your questions, I'll have to defer to others more familiar with
>>> it.   I mostly just use bulk formats such as avro and parquet.
>>>
>>> Tim
>>>
>>>
>>> On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi <
>>> vishal.santoshi@gmail.com wrote:
>>>
>>>> That said the in the DefaultRollingPolicy it seems the check is on the
>>>> file size ( mimics the check shouldRollOnEVent()).
>>>>
>>>> I guess the question is
>>>>
>>>> Is  the call to shouldRollOnCheckPoint.  done by the checkpointing
>>>> thread ?
>>>>
>>>> Are the calls to the other 2 methods shouldRollOnEVent and
>>>> shouldRollOnProcessingTIme done on the execution thread  as in inlined ?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <
>>>> vishal.santoshi@gmail.com> wrote:
>>>>
>>>>> Thanks for the quick reply.
>>>>>
>>>>> I am confused. If this was a more full featured BucketingSink ,I would
>>>>> imagine that  based on shouldRollOnEvent and shouldRollOnEvent, an in
>>>>> progress file could go into pending phase and on checkpoint the pending
>>>>> part file would be  finalized. For exactly once any files ( in progress
>>>>> file ) will have a length of the file  snapshotted to the checkpoint  and
>>>>> used to truncate the file ( if supported ) or dropped as a part-length file
>>>>> ( if truncate not supported )  if a resume from a checkpoint was to happen,
>>>>> to indicate what part of the the finalized file ( finalized when resumed )
>>>>> was valid . and  I had always assumed ( and there is no doc otherwise )
>>>>> that shouldRollOnCheckpoint would be similar to the other 2 apart
>>>>> from the fact it does the roll and finalize step in a single step on a
>>>>> checkpoint.
>>>>>
>>>>>
>>>>> Am I better off using BucketingSink ?  When to use BucketingSink and
>>>>> when to use RollingSink is not clear at all, even though at the surface it
>>>>> sure looks RollingSink is a better version of .BucketingSink ( or not )
>>>>>
>>>>> Regards.
>>>>>
>>>>>
>>>>>
>>>>> On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <vi...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I think the only rolling policy that can be used is
>>>>>> CheckpointRollingPolicy to ensure exactly once.
>>>>>>
>>>>>> Tim
>>>>>>
>>>>>> On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <
>>>>>> vishal.santoshi@gmail.com wrote:
>>>>>>
>>>>>>> Can StreamingFileSink be used instead of https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, even though it looks it could.
>>>>>>>
>>>>>>>
>>>>>>> This code for example
>>>>>>>
>>>>>>>
>>>>>>>         StreamingFileSink
>>>>>>>                 .forRowFormat(new Path(PATH),
>>>>>>>                         new SimpleStringEncoder<KafkaRecord>())
>>>>>>>                 .withBucketAssigner(new KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
>>>>>>>                 .withRollingPolicy(new RollingPolicy<KafkaRecord, String>() {
>>>>>>>                                        @Override
>>>>>>>                                        public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException {
>>>>>>>                                            return false;
>>>>>>>                                        }
>>>>>>>
>>>>>>>                                        @Override
>>>>>>>                                        public boolean shouldRollOnEvent(PartFileInfo<String> partFileState,
>>>>>>>                                                                         KafkaRecord element) throws IOException {
>>>>>>>                                            return partFileState.getSize() > 1024 * 1024 * 1024l;
>>>>>>>                                        }
>>>>>>>
>>>>>>>                                        @Override
>>>>>>>                                        public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) throws IOException {
>>>>>>>                                            return currentTime - partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
>>>>>>>                                                    currentTime - partFileState.getCreationTime() > 120 * 60 * 1000l;
>>>>>>>                                        }
>>>>>>>                                    }
>>>>>>>                 )
>>>>>>>                 .build();
>>>>>>>
>>>>>>>
>>>>>>> few things I see and am not sure I follow about the new RollingFileSink  vis a vis BucketingSink
>>>>>>>
>>>>>>>
>>>>>>> 1. I do not ever see the inprogress file go to the pending state, as in renamed as pending, as was the case in Bucketing Sink.  I would assume that it would be pending and then
>>>>>>>
>>>>>>>    finalized on checkpoint for exactly once semantics ?
>>>>>>>
>>>>>>>
>>>>>>> 2. I see dangling inprogress files at the end of the day. I would assume that the withBucketCheckInterval set to 1 minute by default, the shouldRollOnProcessingTime should kick in ?
>>>>>>>
>>>>>>>  3. The inprogress files are  like .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that additional suffix ?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I have the following set up on the env
>>>>>>>
>>>>>>> env.enableCheckpointing(10 * 60000);
>>>>>>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>>>>>> env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
>>>>>>> StateBackend stateBackEnd = new MemoryStateBackend();
>>>>>>> env.setStateBackend(stateBackEnd);
>>>>>>>
>>>>>>>
>>>>>>> Regards.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>

Re: fllink 1.7.1 and RollingFileSink

Posted by Vishal Santoshi <vi...@gmail.com>.
Any one ?

On Sun, Feb 10, 2019 at 2:07 PM Vishal Santoshi <vi...@gmail.com>
wrote:

> You don't have to. Thank you for the input.
>
> On Sun, Feb 10, 2019 at 1:56 PM Timothy Victor <vi...@gmail.com> wrote:
>
>> My apologies for not seeing your use case properly.   The constraint on
>> rolling policy is only applicable for bulk formats such as Parquet as
>> highlighted in the docs.
>>
>> As for your questions, I'll have to defer to others more familiar with
>> it.   I mostly just use bulk formats such as avro and parquet.
>>
>> Tim
>>
>>
>> On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi <vishal.santoshi@gmail.com
>> wrote:
>>
>>> That said the in the DefaultRollingPolicy it seems the check is on the
>>> file size ( mimics the check shouldRollOnEVent()).
>>>
>>> I guess the question is
>>>
>>> Is  the call to shouldRollOnCheckPoint.  done by the checkpointing
>>> thread ?
>>>
>>> Are the calls to the other 2 methods shouldRollOnEVent and
>>> shouldRollOnProcessingTIme done on the execution thread  as in inlined ?
>>>
>>>
>>>
>>>
>>>
>>> On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <
>>> vishal.santoshi@gmail.com> wrote:
>>>
>>>> Thanks for the quick reply.
>>>>
>>>> I am confused. If this was a more full featured BucketingSink ,I would
>>>> imagine that  based on shouldRollOnEvent and shouldRollOnEvent, an in
>>>> progress file could go into pending phase and on checkpoint the pending
>>>> part file would be  finalized. For exactly once any files ( in progress
>>>> file ) will have a length of the file  snapshotted to the checkpoint  and
>>>> used to truncate the file ( if supported ) or dropped as a part-length file
>>>> ( if truncate not supported )  if a resume from a checkpoint was to happen,
>>>> to indicate what part of the the finalized file ( finalized when resumed )
>>>> was valid . and  I had always assumed ( and there is no doc otherwise )
>>>> that shouldRollOnCheckpoint would be similar to the other 2 apart from
>>>> the fact it does the roll and finalize step in a single step on a
>>>> checkpoint.
>>>>
>>>>
>>>> Am I better off using BucketingSink ?  When to use BucketingSink and
>>>> when to use RollingSink is not clear at all, even though at the surface it
>>>> sure looks RollingSink is a better version of .BucketingSink ( or not )
>>>>
>>>> Regards.
>>>>
>>>>
>>>>
>>>> On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <vi...@gmail.com>
>>>> wrote:
>>>>
>>>>> I think the only rolling policy that can be used is
>>>>> CheckpointRollingPolicy to ensure exactly once.
>>>>>
>>>>> Tim
>>>>>
>>>>> On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <
>>>>> vishal.santoshi@gmail.com wrote:
>>>>>
>>>>>> Can StreamingFileSink be used instead of https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, even though it looks it could.
>>>>>>
>>>>>>
>>>>>> This code for example
>>>>>>
>>>>>>
>>>>>>         StreamingFileSink
>>>>>>                 .forRowFormat(new Path(PATH),
>>>>>>                         new SimpleStringEncoder<KafkaRecord>())
>>>>>>                 .withBucketAssigner(new KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
>>>>>>                 .withRollingPolicy(new RollingPolicy<KafkaRecord, String>() {
>>>>>>                                        @Override
>>>>>>                                        public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException {
>>>>>>                                            return false;
>>>>>>                                        }
>>>>>>
>>>>>>                                        @Override
>>>>>>                                        public boolean shouldRollOnEvent(PartFileInfo<String> partFileState,
>>>>>>                                                                         KafkaRecord element) throws IOException {
>>>>>>                                            return partFileState.getSize() > 1024 * 1024 * 1024l;
>>>>>>                                        }
>>>>>>
>>>>>>                                        @Override
>>>>>>                                        public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) throws IOException {
>>>>>>                                            return currentTime - partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
>>>>>>                                                    currentTime - partFileState.getCreationTime() > 120 * 60 * 1000l;
>>>>>>                                        }
>>>>>>                                    }
>>>>>>                 )
>>>>>>                 .build();
>>>>>>
>>>>>>
>>>>>> few things I see and am not sure I follow about the new RollingFileSink  vis a vis BucketingSink
>>>>>>
>>>>>>
>>>>>> 1. I do not ever see the inprogress file go to the pending state, as in renamed as pending, as was the case in Bucketing Sink.  I would assume that it would be pending and then
>>>>>>
>>>>>>    finalized on checkpoint for exactly once semantics ?
>>>>>>
>>>>>>
>>>>>> 2. I see dangling inprogress files at the end of the day. I would assume that the withBucketCheckInterval set to 1 minute by default, the shouldRollOnProcessingTime should kick in ?
>>>>>>
>>>>>>  3. The inprogress files are  like .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that additional suffix ?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> I have the following set up on the env
>>>>>>
>>>>>> env.enableCheckpointing(10 * 60000);
>>>>>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>>>>> env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
>>>>>> StateBackend stateBackEnd = new MemoryStateBackend();
>>>>>> env.setStateBackend(stateBackEnd);
>>>>>>
>>>>>>
>>>>>> Regards.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>

Re: fllink 1.7.1 and RollingFileSink

Posted by Vishal Santoshi <vi...@gmail.com>.
You don't have to. Thank you for the input.

On Sun, Feb 10, 2019 at 1:56 PM Timothy Victor <vi...@gmail.com> wrote:

> My apologies for not seeing your use case properly.   The constraint on
> rolling policy is only applicable for bulk formats such as Parquet as
> highlighted in the docs.
>
> As for your questions, I'll have to defer to others more familiar with
> it.   I mostly just use bulk formats such as avro and parquet.
>
> Tim
>
>
> On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi <vishal.santoshi@gmail.com
> wrote:
>
>> That said the in the DefaultRollingPolicy it seems the check is on the
>> file size ( mimics the check shouldRollOnEVent()).
>>
>> I guess the question is
>>
>> Is  the call to shouldRollOnCheckPoint.  done by the checkpointing thread
>> ?
>>
>> Are the calls to the other 2 methods shouldRollOnEVent and
>> shouldRollOnProcessingTIme done on the execution thread  as in inlined ?
>>
>>
>>
>>
>>
>> On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> Thanks for the quick reply.
>>>
>>> I am confused. If this was a more full featured BucketingSink ,I would
>>> imagine that  based on shouldRollOnEvent and shouldRollOnEvent, an in
>>> progress file could go into pending phase and on checkpoint the pending
>>> part file would be  finalized. For exactly once any files ( in progress
>>> file ) will have a length of the file  snapshotted to the checkpoint  and
>>> used to truncate the file ( if supported ) or dropped as a part-length file
>>> ( if truncate not supported )  if a resume from a checkpoint was to happen,
>>> to indicate what part of the the finalized file ( finalized when resumed )
>>> was valid . and  I had always assumed ( and there is no doc otherwise )
>>> that shouldRollOnCheckpoint would be similar to the other 2 apart from
>>> the fact it does the roll and finalize step in a single step on a
>>> checkpoint.
>>>
>>>
>>> Am I better off using BucketingSink ?  When to use BucketingSink and
>>> when to use RollingSink is not clear at all, even though at the surface it
>>> sure looks RollingSink is a better version of .BucketingSink ( or not )
>>>
>>> Regards.
>>>
>>>
>>>
>>> On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <vi...@gmail.com>
>>> wrote:
>>>
>>>> I think the only rolling policy that can be used is
>>>> CheckpointRollingPolicy to ensure exactly once.
>>>>
>>>> Tim
>>>>
>>>> On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <
>>>> vishal.santoshi@gmail.com wrote:
>>>>
>>>>> Can StreamingFileSink be used instead of https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, even though it looks it could.
>>>>>
>>>>>
>>>>> This code for example
>>>>>
>>>>>
>>>>>         StreamingFileSink
>>>>>                 .forRowFormat(new Path(PATH),
>>>>>                         new SimpleStringEncoder<KafkaRecord>())
>>>>>                 .withBucketAssigner(new KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
>>>>>                 .withRollingPolicy(new RollingPolicy<KafkaRecord, String>() {
>>>>>                                        @Override
>>>>>                                        public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException {
>>>>>                                            return false;
>>>>>                                        }
>>>>>
>>>>>                                        @Override
>>>>>                                        public boolean shouldRollOnEvent(PartFileInfo<String> partFileState,
>>>>>                                                                         KafkaRecord element) throws IOException {
>>>>>                                            return partFileState.getSize() > 1024 * 1024 * 1024l;
>>>>>                                        }
>>>>>
>>>>>                                        @Override
>>>>>                                        public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) throws IOException {
>>>>>                                            return currentTime - partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
>>>>>                                                    currentTime - partFileState.getCreationTime() > 120 * 60 * 1000l;
>>>>>                                        }
>>>>>                                    }
>>>>>                 )
>>>>>                 .build();
>>>>>
>>>>>
>>>>> few things I see and am not sure I follow about the new RollingFileSink  vis a vis BucketingSink
>>>>>
>>>>>
>>>>> 1. I do not ever see the inprogress file go to the pending state, as in renamed as pending, as was the case in Bucketing Sink.  I would assume that it would be pending and then
>>>>>
>>>>>    finalized on checkpoint for exactly once semantics ?
>>>>>
>>>>>
>>>>> 2. I see dangling inprogress files at the end of the day. I would assume that the withBucketCheckInterval set to 1 minute by default, the shouldRollOnProcessingTime should kick in ?
>>>>>
>>>>>  3. The inprogress files are  like .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that additional suffix ?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> I have the following set up on the env
>>>>>
>>>>> env.enableCheckpointing(10 * 60000);
>>>>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>>>> env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
>>>>> StateBackend stateBackEnd = new MemoryStateBackend();
>>>>> env.setStateBackend(stateBackEnd);
>>>>>
>>>>>
>>>>> Regards.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>

Re: fllink 1.7.1 and RollingFileSink

Posted by Timothy Victor <vi...@gmail.com>.
My apologies for not seeing your use case properly.   The constraint on
rolling policy is only applicable for bulk formats such as Parquet as
highlighted in the docs.

As for your questions, I'll have to defer to others more familiar with it.
 I mostly just use bulk formats such as avro and parquet.

Tim


On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi <vishal.santoshi@gmail.com
wrote:

> That said the in the DefaultRollingPolicy it seems the check is on the
> file size ( mimics the check shouldRollOnEVent()).
>
> I guess the question is
>
> Is  the call to shouldRollOnCheckPoint.  done by the checkpointing thread
> ?
>
> Are the calls to the other 2 methods shouldRollOnEVent and
> shouldRollOnProcessingTIme done on the execution thread  as in inlined ?
>
>
>
>
>
> On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <vi...@gmail.com>
> wrote:
>
>> Thanks for the quick reply.
>>
>> I am confused. If this was a more full featured BucketingSink ,I would
>> imagine that  based on shouldRollOnEvent and shouldRollOnEvent, an in
>> progress file could go into pending phase and on checkpoint the pending
>> part file would be  finalized. For exactly once any files ( in progress
>> file ) will have a length of the file  snapshotted to the checkpoint  and
>> used to truncate the file ( if supported ) or dropped as a part-length file
>> ( if truncate not supported )  if a resume from a checkpoint was to happen,
>> to indicate what part of the the finalized file ( finalized when resumed )
>> was valid . and  I had always assumed ( and there is no doc otherwise )
>> that shouldRollOnCheckpoint would be similar to the other 2 apart from
>> the fact it does the roll and finalize step in a single step on a
>> checkpoint.
>>
>>
>> Am I better off using BucketingSink ?  When to use BucketingSink and when
>> to use RollingSink is not clear at all, even though at the surface it sure
>> looks RollingSink is a better version of .BucketingSink ( or not )
>>
>> Regards.
>>
>>
>>
>> On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <vi...@gmail.com>
>> wrote:
>>
>>> I think the only rolling policy that can be used is
>>> CheckpointRollingPolicy to ensure exactly once.
>>>
>>> Tim
>>>
>>> On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <vishal.santoshi@gmail.com
>>> wrote:
>>>
>>>> Can StreamingFileSink be used instead of https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, even though it looks it could.
>>>>
>>>>
>>>> This code for example
>>>>
>>>>
>>>>         StreamingFileSink
>>>>                 .forRowFormat(new Path(PATH),
>>>>                         new SimpleStringEncoder<KafkaRecord>())
>>>>                 .withBucketAssigner(new KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
>>>>                 .withRollingPolicy(new RollingPolicy<KafkaRecord, String>() {
>>>>                                        @Override
>>>>                                        public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException {
>>>>                                            return false;
>>>>                                        }
>>>>
>>>>                                        @Override
>>>>                                        public boolean shouldRollOnEvent(PartFileInfo<String> partFileState,
>>>>                                                                         KafkaRecord element) throws IOException {
>>>>                                            return partFileState.getSize() > 1024 * 1024 * 1024l;
>>>>                                        }
>>>>
>>>>                                        @Override
>>>>                                        public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) throws IOException {
>>>>                                            return currentTime - partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
>>>>                                                    currentTime - partFileState.getCreationTime() > 120 * 60 * 1000l;
>>>>                                        }
>>>>                                    }
>>>>                 )
>>>>                 .build();
>>>>
>>>>
>>>> few things I see and am not sure I follow about the new RollingFileSink  vis a vis BucketingSink
>>>>
>>>>
>>>> 1. I do not ever see the inprogress file go to the pending state, as in renamed as pending, as was the case in Bucketing Sink.  I would assume that it would be pending and then
>>>>
>>>>    finalized on checkpoint for exactly once semantics ?
>>>>
>>>>
>>>> 2. I see dangling inprogress files at the end of the day. I would assume that the withBucketCheckInterval set to 1 minute by default, the shouldRollOnProcessingTime should kick in ?
>>>>
>>>>  3. The inprogress files are  like .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that additional suffix ?
>>>>
>>>>
>>>>
>>>>
>>>> I have the following set up on the env
>>>>
>>>> env.enableCheckpointing(10 * 60000);
>>>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>>> env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
>>>> StateBackend stateBackEnd = new MemoryStateBackend();
>>>> env.setStateBackend(stateBackEnd);
>>>>
>>>>
>>>> Regards.
>>>>
>>>>
>>>>
>>>>
>>>>

Re: fllink 1.7.1 and RollingFileSink

Posted by Vishal Santoshi <vi...@gmail.com>.
That said the in the DefaultRollingPolicy it seems the check is on the file
size ( mimics the check shouldRollOnEVent()).

I guess the question is

Is  the call to shouldRollOnCheckPoint.  done by the checkpointing thread ?

Are the calls to the other 2 methods shouldRollOnEVent and
shouldRollOnProcessingTIme done on the execution thread  as in inlined ?





On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <vi...@gmail.com>
wrote:

> Thanks for the quick reply.
>
> I am confused. If this was a more full featured BucketingSink ,I would
> imagine that  based on shouldRollOnEvent and shouldRollOnEvent, an in
> progress file could go into pending phase and on checkpoint the pending
> part file would be  finalized. For exactly once any files ( in progress
> file ) will have a length of the file  snapshotted to the checkpoint  and
> used to truncate the file ( if supported ) or dropped as a part-length file
> ( if truncate not supported )  if a resume from a checkpoint was to happen,
> to indicate what part of the the finalized file ( finalized when resumed )
> was valid . and  I had always assumed ( and there is no doc otherwise )
> that shouldRollOnCheckpoint would be similar to the other 2 apart from
> the fact it does the roll and finalize step in a single step on a
> checkpoint.
>
>
> Am I better off using BucketingSink ?  When to use BucketingSink and when
> to use RollingSink is not clear at all, even though at the surface it sure
> looks RollingSink is a better version of .BucketingSink ( or not )
>
> Regards.
>
>
>
> On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <vi...@gmail.com> wrote:
>
>> I think the only rolling policy that can be used is
>> CheckpointRollingPolicy to ensure exactly once.
>>
>> Tim
>>
>> On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <vishal.santoshi@gmail.com
>> wrote:
>>
>>> Can StreamingFileSink be used instead of https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, even though it looks it could.
>>>
>>>
>>> This code for example
>>>
>>>
>>>         StreamingFileSink
>>>                 .forRowFormat(new Path(PATH),
>>>                         new SimpleStringEncoder<KafkaRecord>())
>>>                 .withBucketAssigner(new KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
>>>                 .withRollingPolicy(new RollingPolicy<KafkaRecord, String>() {
>>>                                        @Override
>>>                                        public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException {
>>>                                            return false;
>>>                                        }
>>>
>>>                                        @Override
>>>                                        public boolean shouldRollOnEvent(PartFileInfo<String> partFileState,
>>>                                                                         KafkaRecord element) throws IOException {
>>>                                            return partFileState.getSize() > 1024 * 1024 * 1024l;
>>>                                        }
>>>
>>>                                        @Override
>>>                                        public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) throws IOException {
>>>                                            return currentTime - partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
>>>                                                    currentTime - partFileState.getCreationTime() > 120 * 60 * 1000l;
>>>                                        }
>>>                                    }
>>>                 )
>>>                 .build();
>>>
>>>
>>> few things I see and am not sure I follow about the new RollingFileSink  vis a vis BucketingSink
>>>
>>>
>>> 1. I do not ever see the inprogress file go to the pending state, as in renamed as pending, as was the case in Bucketing Sink.  I would assume that it would be pending and then
>>>
>>>    finalized on checkpoint for exactly once semantics ?
>>>
>>>
>>> 2. I see dangling inprogress files at the end of the day. I would assume that the withBucketCheckInterval set to 1 minute by default, the shouldRollOnProcessingTime should kick in ?
>>>
>>>  3. The inprogress files are  like .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that additional suffix ?
>>>
>>>
>>>
>>>
>>> I have the following set up on the env
>>>
>>> env.enableCheckpointing(10 * 60000);
>>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>> env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
>>> StateBackend stateBackEnd = new MemoryStateBackend();
>>> env.setStateBackend(stateBackEnd);
>>>
>>>
>>> Regards.
>>>
>>>
>>>
>>>
>>>

Re: fllink 1.7.1 and RollingFileSink

Posted by Vishal Santoshi <vi...@gmail.com>.
Thanks for the quick reply.

I am confused. If this was a more full featured BucketingSink ,I would
imagine that  based on shouldRollOnEvent and shouldRollOnEvent, an in
progress file could go into pending phase and on checkpoint the pending
part file would be  finalized. For exactly once any files ( in progress
file ) will have a length of the file  snapshotted to the checkpoint  and
used to truncate the file ( if supported ) or dropped as a part-length file
( if truncate not supported )  if a resume from a checkpoint was to happen,
to indicate what part of the the finalized file ( finalized when resumed )
was valid . and  I had always assumed ( and there is no doc otherwise )
that shouldRollOnCheckpoint would be similar to the other 2 apart from the
fact it does the roll and finalize step in a single step on a checkpoint.


Am I better off using BucketingSink ?  When to use BucketingSink and when
to use RollingSink is not clear at all, even though at the surface it sure
looks RollingSink is a better version of .BucketingSink ( or not )

Regards.



On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <vi...@gmail.com> wrote:

> I think the only rolling policy that can be used is
> CheckpointRollingPolicy to ensure exactly once.
>
> Tim
>
> On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <vishal.santoshi@gmail.com
> wrote:
>
>> Can StreamingFileSink be used instead of https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, even though it looks it could.
>>
>>
>> This code for example
>>
>>
>>         StreamingFileSink
>>                 .forRowFormat(new Path(PATH),
>>                         new SimpleStringEncoder<KafkaRecord>())
>>                 .withBucketAssigner(new KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
>>                 .withRollingPolicy(new RollingPolicy<KafkaRecord, String>() {
>>                                        @Override
>>                                        public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException {
>>                                            return false;
>>                                        }
>>
>>                                        @Override
>>                                        public boolean shouldRollOnEvent(PartFileInfo<String> partFileState,
>>                                                                         KafkaRecord element) throws IOException {
>>                                            return partFileState.getSize() > 1024 * 1024 * 1024l;
>>                                        }
>>
>>                                        @Override
>>                                        public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) throws IOException {
>>                                            return currentTime - partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
>>                                                    currentTime - partFileState.getCreationTime() > 120 * 60 * 1000l;
>>                                        }
>>                                    }
>>                 )
>>                 .build();
>>
>>
>> few things I see and am not sure I follow about the new RollingFileSink  vis a vis BucketingSink
>>
>>
>> 1. I do not ever see the inprogress file go to the pending state, as in renamed as pending, as was the case in Bucketing Sink.  I would assume that it would be pending and then
>>
>>    finalized on checkpoint for exactly once semantics ?
>>
>>
>> 2. I see dangling inprogress files at the end of the day. I would assume that the withBucketCheckInterval set to 1 minute by default, the shouldRollOnProcessingTime should kick in ?
>>
>>  3. The inprogress files are  like .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that additional suffix ?
>>
>>
>>
>>
>> I have the following set up on the env
>>
>> env.enableCheckpointing(10 * 60000);
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
>> StateBackend stateBackEnd = new MemoryStateBackend();
>> env.setStateBackend(stateBackEnd);
>>
>>
>> Regards.
>>
>>
>>
>>
>>

Re: fllink 1.7.1 and RollingFileSink

Posted by Timothy Victor <vi...@gmail.com>.
I think the only rolling policy that can be used is CheckpointRollingPolicy
to ensure exactly once.

Tim

On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <vishal.santoshi@gmail.com
wrote:

> Can StreamingFileSink be used instead of https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html, even though it looks it could.
>
>
> This code for example
>
>
>         StreamingFileSink
>                 .forRowFormat(new Path(PATH),
>                         new SimpleStringEncoder<KafkaRecord>())
>                 .withBucketAssigner(new KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING, ZONE_ID))
>                 .withRollingPolicy(new RollingPolicy<KafkaRecord, String>() {
>                                        @Override
>                                        public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException {
>                                            return false;
>                                        }
>
>                                        @Override
>                                        public boolean shouldRollOnEvent(PartFileInfo<String> partFileState,
>                                                                         KafkaRecord element) throws IOException {
>                                            return partFileState.getSize() > 1024 * 1024 * 1024l;
>                                        }
>
>                                        @Override
>                                        public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) throws IOException {
>                                            return currentTime - partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
>                                                    currentTime - partFileState.getCreationTime() > 120 * 60 * 1000l;
>                                        }
>                                    }
>                 )
>                 .build();
>
>
> few things I see and am not sure I follow about the new RollingFileSink  vis a vis BucketingSink
>
>
> 1. I do not ever see the inprogress file go to the pending state, as in renamed as pending, as was the case in Bucketing Sink.  I would assume that it would be pending and then
>
>    finalized on checkpoint for exactly once semantics ?
>
>
> 2. I see dangling inprogress files at the end of the day. I would assume that the withBucketCheckInterval set to 1 minute by default, the shouldRollOnProcessingTime should kick in ?
>
>  3. The inprogress files are  like .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14 . What is that additional suffix ?
>
>
>
>
> I have the following set up on the env
>
> env.enableCheckpointing(10 * 60000);
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
> StateBackend stateBackEnd = new MemoryStateBackend();
> env.setStateBackend(stateBackEnd);
>
>
> Regards.
>
>
>
>
>