You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by di wu <67...@qq.com.INVALID> on 2023/02/15 14:42:22 UTC

Disable the chain of the Sink operator

Hello


The current Sink operator will be split into two operations, Writer and Commiter. By default, they will be chained together and executed on the same thread.
So sometimes when the commiter is very slow, it will block the data writer, causing back pressure.


At present, FlinkSQL can be solved by disabling the chain globally, and DataStream can partially disable the chain through the disableChaining method, but both of them need to be set by the user.


Can the strategy of the Chain be changed in the Custom Sink Connector to separate Writer and Commiter?



Thanks &amp;&amp; Regards,


di.wu

Re: Disable the chain of the Sink operator

Posted by wudi <67...@qq.com.INVALID>.
Thank you for your reply
But in my local test environment (flink1.15 and flink1.16), when the chain of writer and commiter is disabled, the back pressure can be reduced.

The specific phenomenon is as follows:
1. After ck-4 is completed, the commit execution is very slow
2. At this time, the [Sink: Writer (1/1)#0] thread will continue to call the SinkWriter.write() method to receive upstream data.
3. After triggering ck-5, the prepareCommit and snapshotState methods will be executed
4. Because the last commit has not been completed, the [Sink: Committer (1/1)#0] thread will wait to call the commit method.
5. SinkWriter.write() can still continue to receive upstream data
6. ck-6 will not be triggered, and ck-6 will be triggered after the first commit is completed
The whole process will not block the method of SinkWriter.write().

However, if the chain is not disabled, receiving upstream data will be blocked in the second step.


Thanks && Regards,
di.wu

> 2023年2月17日 上午9:40,Shammon FY <zj...@gmail.com> 写道:
> 
> Hi wudi
> 
> I'm afraid it cannot reduce back pressure. If Writer and Commiter are not
> chained together, and the Commiter task runs slowly, it can block the
> upstream Writer tasks by back pressure too.
> 
> On the other hand, you can try to increase the parallelism of sink node to
> speedup the Commiter operation
> 
> Best,
> Shammon
> 
> On Thu, Feb 16, 2023 at 11:38 PM wudi <67...@qq.com.invalid> wrote:
> 
>> thanks for your replies.
>> I think that if Writer and Commiter are not chained together, data
>> consistency can be guaranteed, right?
>> Because when the Commiter does not block the Writer, at the next
>> Checkpoint, if the Commit is not completed, the SinkWriter.precommit will
>> not be triggered
>> 
>> In addition, in this scenario (writer blocking caused by slow commit), may
>> the performance of disabling Sink's chain be better? Because it can reduce
>> a lot of back pressure.
>> 
>> Thanks && Regards,
>> di.wu
>> 
>> 
>>> 2023年2月16日 下午10:05,Chesnay Schepler <ch...@apache.org> 写道:
>>> 
>>> As far as I know that chain between committer and writer is also
>> required for correctness.
>>> 
>>> On 16/02/2023 14:53, weijie guo wrote:
>>>> Hi wu,
>>>> 
>>>> I don't think it is a good choice to directly change the strategy of
>> chain. Operator chain usually has better performance and resource
>> utilization. If we directly change the chain policy between them, users can
>> no longer chain them together, which is not a good starting point.
>>>> 
>>>> Best regards,
>>>> 
>>>> Weijie
>>>> 
>>>> 
>>>> 
>>>> wudi <67...@qq.com> 于2023年2月16日周四 19:29写道:
>>>> 
>>>>   Thank you for your reply.
>>>> 
>>>>   Currently in the custom Sink Connector, the Flink task will
>>>>   combine Writer and Committer into one thread, and the thread name
>>>>   is similar: *[Sink: Writer -> Sink: Committer (1/1)#0]*.
>>>>   In this way, when the *Committer.commit()* method is very slow, it
>>>>   will block the*SinkWriter.write()* method to receive upstream data.
>>>> 
>>>>   The client can use the *env.disableOperatorChaining() *method to
>>>>   split the thread into two threads:*[Sink: Writer (1/1)#0] *and
>>>>   *[Sink: Committer (1/1)#0]*. This Committer. The commit method
>>>>   will not block the SinkWriter.write method.
>>>> 
>>>>   If the chain policy can be disabled in the custom Sink Connector,
>>>>   the client can be prevented from setting and disabling the chain.
>>>>   Or is there a better way to make*Committer.commit()* not block
>>>>   *SinkWriter.write()*?
>>>> 
>>>>   Looking forward for your reply.
>>>>   Thanks && Regards,
>>>>   di.wu
>>>> 
>>>>>   2023年2月16日 下午6:54,Shammon FY <zj...@gmail.com> 写道:
>>>>> 
>>>>>   Hi
>>>>> 
>>>>>   Do you mean how to disable `chain` in your custom sink
>>>>>   connector?  Can you
>>>>>   give an example of what you want?
>>>>> 
>>>>>   On Wed, Feb 15, 2023 at 10:42 PM di wu <67...@qq.com> wrote:
>>>>> 
>>>>>>   Hello
>>>>>> 
>>>>>>   The current Sink operator will be split into two operations,
>>>>>>   Writer and
>>>>>>   Commiter. By default, they will be chained together and executed
>>>>>>   on the
>>>>>>   same thread.
>>>>>>   So sometimes when the commiter is very slow, it will block the data
>>>>>>   writer, causing back pressure.
>>>>>> 
>>>>>>   At present, FlinkSQL can be solved by disabling the chain
>>>>>>   globally, and
>>>>>>   DataStream can partially disable the chain through the
>>>>>>   disableChaining
>>>>>>   method, but both of them need to be set by the user.
>>>>>> 
>>>>>>   Can the strategy of the Chain be changed in the Custom Sink
>>>>>>   Connector to
>>>>>>   separate Writer and Commiter?
>>>>>> 
>>>>>>   Thanks && Regards,
>>>>>>   di.wu
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
>> 
> 


Re: Disable the chain of the Sink operator

Posted by wudi <67...@qq.com>.
Thank you for your reply
But in my local test environment (flink1.15 and flink1.16), when the chain of writer and commiter is disabled, the back pressure can be reduced.

The specific phenomenon is as follows:
1. After ck-4 is completed, the commit execution is very slow
2. At this time, the [Sink: Writer (1/1)#0] thread will continue to call the SinkWriter.write() method to receive upstream data.
3. After triggering ck-5, the prepareCommit and snapshotState methods will be executed
4. Because the last commit has not been completed, the [Sink: Committer (1/1)#0] thread will wait to call the commit method.
5. SinkWriter.write() can still continue to receive upstream data
6. ck-6 will not be triggered, and ck-6 will be triggered after the first commit is completed
The whole process will not block the method of SinkWriter.write().

However, if the chain is not disabled, receiving upstream data will be blocked in the second step.


Thanks && Regards,
di.wu

> 2023年2月17日 上午9:40,Shammon FY <zj...@gmail.com> 写道:
> 
> Hi wudi
> 
> I'm afraid it cannot reduce back pressure. If Writer and Commiter are not
> chained together, and the Commiter task runs slowly, it can block the
> upstream Writer tasks by back pressure too.
> 
> On the other hand, you can try to increase the parallelism of sink node to
> speedup the Commiter operation
> 
> Best,
> Shammon
> 
> On Thu, Feb 16, 2023 at 11:38 PM wudi <67...@qq.com.invalid> wrote:
> 
>> thanks for your replies.
>> I think that if Writer and Commiter are not chained together, data
>> consistency can be guaranteed, right?
>> Because when the Commiter does not block the Writer, at the next
>> Checkpoint, if the Commit is not completed, the SinkWriter.precommit will
>> not be triggered
>> 
>> In addition, in this scenario (writer blocking caused by slow commit), may
>> the performance of disabling Sink's chain be better? Because it can reduce
>> a lot of back pressure.
>> 
>> Thanks && Regards,
>> di.wu
>> 
>> 
>>> 2023年2月16日 下午10:05,Chesnay Schepler <ch...@apache.org> 写道:
>>> 
>>> As far as I know that chain between committer and writer is also
>> required for correctness.
>>> 
>>> On 16/02/2023 14:53, weijie guo wrote:
>>>> Hi wu,
>>>> 
>>>> I don't think it is a good choice to directly change the strategy of
>> chain. Operator chain usually has better performance and resource
>> utilization. If we directly change the chain policy between them, users can
>> no longer chain them together, which is not a good starting point.
>>>> 
>>>> Best regards,
>>>> 
>>>> Weijie
>>>> 
>>>> 
>>>> 
>>>> wudi <67...@qq.com> 于2023年2月16日周四 19:29写道:
>>>> 
>>>>   Thank you for your reply.
>>>> 
>>>>   Currently in the custom Sink Connector, the Flink task will
>>>>   combine Writer and Committer into one thread, and the thread name
>>>>   is similar: *[Sink: Writer -> Sink: Committer (1/1)#0]*.
>>>>   In this way, when the *Committer.commit()* method is very slow, it
>>>>   will block the*SinkWriter.write()* method to receive upstream data.
>>>> 
>>>>   The client can use the *env.disableOperatorChaining() *method to
>>>>   split the thread into two threads:*[Sink: Writer (1/1)#0] *and
>>>>   *[Sink: Committer (1/1)#0]*. This Committer. The commit method
>>>>   will not block the SinkWriter.write method.
>>>> 
>>>>   If the chain policy can be disabled in the custom Sink Connector,
>>>>   the client can be prevented from setting and disabling the chain.
>>>>   Or is there a better way to make*Committer.commit()* not block
>>>>   *SinkWriter.write()*?
>>>> 
>>>>   Looking forward for your reply.
>>>>   Thanks && Regards,
>>>>   di.wu
>>>> 
>>>>>   2023年2月16日 下午6:54,Shammon FY <zj...@gmail.com> 写道:
>>>>> 
>>>>>   Hi
>>>>> 
>>>>>   Do you mean how to disable `chain` in your custom sink
>>>>>   connector?  Can you
>>>>>   give an example of what you want?
>>>>> 
>>>>>   On Wed, Feb 15, 2023 at 10:42 PM di wu <67...@qq.com> wrote:
>>>>> 
>>>>>>   Hello
>>>>>> 
>>>>>>   The current Sink operator will be split into two operations,
>>>>>>   Writer and
>>>>>>   Commiter. By default, they will be chained together and executed
>>>>>>   on the
>>>>>>   same thread.
>>>>>>   So sometimes when the commiter is very slow, it will block the data
>>>>>>   writer, causing back pressure.
>>>>>> 
>>>>>>   At present, FlinkSQL can be solved by disabling the chain
>>>>>>   globally, and
>>>>>>   DataStream can partially disable the chain through the
>>>>>>   disableChaining
>>>>>>   method, but both of them need to be set by the user.
>>>>>> 
>>>>>>   Can the strategy of the Chain be changed in the Custom Sink
>>>>>>   Connector to
>>>>>>   separate Writer and Commiter?
>>>>>> 
>>>>>>   Thanks && Regards,
>>>>>>   di.wu
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
>> 
> 


Re: Disable the chain of the Sink operator

Posted by Shammon FY <zj...@gmail.com>.
Hi wudi

I'm afraid it cannot reduce back pressure. If Writer and Commiter are not
chained together, and the Commiter task runs slowly, it can block the
upstream Writer tasks by back pressure too.

On the other hand, you can try to increase the parallelism of sink node to
speedup the Commiter operation

Best,
Shammon

On Thu, Feb 16, 2023 at 11:38 PM wudi <67...@qq.com.invalid> wrote:

> thanks for your replies.
> I think that if Writer and Commiter are not chained together, data
> consistency can be guaranteed, right?
> Because when the Commiter does not block the Writer, at the next
> Checkpoint, if the Commit is not completed, the SinkWriter.precommit will
> not be triggered
>
> In addition, in this scenario (writer blocking caused by slow commit), may
> the performance of disabling Sink's chain be better? Because it can reduce
> a lot of back pressure.
>
> Thanks && Regards,
> di.wu
>
>
> > 2023年2月16日 下午10:05,Chesnay Schepler <ch...@apache.org> 写道:
> >
> > As far as I know that chain between committer and writer is also
> required for correctness.
> >
> > On 16/02/2023 14:53, weijie guo wrote:
> >> Hi wu,
> >>
> >> I don't think it is a good choice to directly change the strategy of
> chain. Operator chain usually has better performance and resource
> utilization. If we directly change the chain policy between them, users can
> no longer chain them together, which is not a good starting point.
> >>
> >> Best regards,
> >>
> >> Weijie
> >>
> >>
> >>
> >> wudi <67...@qq.com> 于2023年2月16日周四 19:29写道:
> >>
> >>    Thank you for your reply.
> >>
> >>    Currently in the custom Sink Connector, the Flink task will
> >>    combine Writer and Committer into one thread, and the thread name
> >>    is similar: *[Sink: Writer -> Sink: Committer (1/1)#0]*.
> >>    In this way, when the *Committer.commit()* method is very slow, it
> >>    will block the*SinkWriter.write()* method to receive upstream data.
> >>
> >>    The client can use the *env.disableOperatorChaining() *method to
> >>    split the thread into two threads:*[Sink: Writer (1/1)#0] *and
> >>    *[Sink: Committer (1/1)#0]*. This Committer. The commit method
> >>    will not block the SinkWriter.write method.
> >>
> >>    If the chain policy can be disabled in the custom Sink Connector,
> >>    the client can be prevented from setting and disabling the chain.
> >>    Or is there a better way to make*Committer.commit()* not block
> >>    *SinkWriter.write()*?
> >>
> >>    Looking forward for your reply.
> >>    Thanks && Regards,
> >>    di.wu
> >>
> >>>    2023年2月16日 下午6:54,Shammon FY <zj...@gmail.com> 写道:
> >>>
> >>>    Hi
> >>>
> >>>    Do you mean how to disable `chain` in your custom sink
> >>>    connector?  Can you
> >>>    give an example of what you want?
> >>>
> >>>    On Wed, Feb 15, 2023 at 10:42 PM di wu <67...@qq.com> wrote:
> >>>
> >>>>    Hello
> >>>>
> >>>>    The current Sink operator will be split into two operations,
> >>>>    Writer and
> >>>>    Commiter. By default, they will be chained together and executed
> >>>>    on the
> >>>>    same thread.
> >>>>    So sometimes when the commiter is very slow, it will block the data
> >>>>    writer, causing back pressure.
> >>>>
> >>>>    At present, FlinkSQL can be solved by disabling the chain
> >>>>    globally, and
> >>>>    DataStream can partially disable the chain through the
> >>>>    disableChaining
> >>>>    method, but both of them need to be set by the user.
> >>>>
> >>>>    Can the strategy of the Chain be changed in the Custom Sink
> >>>>    Connector to
> >>>>    separate Writer and Commiter?
> >>>>
> >>>>    Thanks && Regards,
> >>>>    di.wu
> >>>>
> >>>
> >>
> >
>
>

Re: Disable the chain of the Sink operator

Posted by Shammon FY <zj...@gmail.com>.
Hi wudi

I'm afraid it cannot reduce back pressure. If Writer and Commiter are not
chained together, and the Commiter task runs slowly, it can block the
upstream Writer tasks by back pressure too.

On the other hand, you can try to increase the parallelism of sink node to
speedup the Commiter operation

Best,
Shammon

On Thu, Feb 16, 2023 at 11:38 PM wudi <67...@qq.com.invalid> wrote:

> thanks for your replies.
> I think that if Writer and Commiter are not chained together, data
> consistency can be guaranteed, right?
> Because when the Commiter does not block the Writer, at the next
> Checkpoint, if the Commit is not completed, the SinkWriter.precommit will
> not be triggered
>
> In addition, in this scenario (writer blocking caused by slow commit), may
> the performance of disabling Sink's chain be better? Because it can reduce
> a lot of back pressure.
>
> Thanks && Regards,
> di.wu
>
>
> > 2023年2月16日 下午10:05,Chesnay Schepler <ch...@apache.org> 写道:
> >
> > As far as I know that chain between committer and writer is also
> required for correctness.
> >
> > On 16/02/2023 14:53, weijie guo wrote:
> >> Hi wu,
> >>
> >> I don't think it is a good choice to directly change the strategy of
> chain. Operator chain usually has better performance and resource
> utilization. If we directly change the chain policy between them, users can
> no longer chain them together, which is not a good starting point.
> >>
> >> Best regards,
> >>
> >> Weijie
> >>
> >>
> >>
> >> wudi <67...@qq.com> 于2023年2月16日周四 19:29写道:
> >>
> >>    Thank you for your reply.
> >>
> >>    Currently in the custom Sink Connector, the Flink task will
> >>    combine Writer and Committer into one thread, and the thread name
> >>    is similar: *[Sink: Writer -> Sink: Committer (1/1)#0]*.
> >>    In this way, when the *Committer.commit()* method is very slow, it
> >>    will block the*SinkWriter.write()* method to receive upstream data.
> >>
> >>    The client can use the *env.disableOperatorChaining() *method to
> >>    split the thread into two threads:*[Sink: Writer (1/1)#0] *and
> >>    *[Sink: Committer (1/1)#0]*. This Committer. The commit method
> >>    will not block the SinkWriter.write method.
> >>
> >>    If the chain policy can be disabled in the custom Sink Connector,
> >>    the client can be prevented from setting and disabling the chain.
> >>    Or is there a better way to make*Committer.commit()* not block
> >>    *SinkWriter.write()*?
> >>
> >>    Looking forward for your reply.
> >>    Thanks && Regards,
> >>    di.wu
> >>
> >>>    2023年2月16日 下午6:54,Shammon FY <zj...@gmail.com> 写道:
> >>>
> >>>    Hi
> >>>
> >>>    Do you mean how to disable `chain` in your custom sink
> >>>    connector?  Can you
> >>>    give an example of what you want?
> >>>
> >>>    On Wed, Feb 15, 2023 at 10:42 PM di wu <67...@qq.com> wrote:
> >>>
> >>>>    Hello
> >>>>
> >>>>    The current Sink operator will be split into two operations,
> >>>>    Writer and
> >>>>    Commiter. By default, they will be chained together and executed
> >>>>    on the
> >>>>    same thread.
> >>>>    So sometimes when the commiter is very slow, it will block the data
> >>>>    writer, causing back pressure.
> >>>>
> >>>>    At present, FlinkSQL can be solved by disabling the chain
> >>>>    globally, and
> >>>>    DataStream can partially disable the chain through the
> >>>>    disableChaining
> >>>>    method, but both of them need to be set by the user.
> >>>>
> >>>>    Can the strategy of the Chain be changed in the Custom Sink
> >>>>    Connector to
> >>>>    separate Writer and Commiter?
> >>>>
> >>>>    Thanks && Regards,
> >>>>    di.wu
> >>>>
> >>>
> >>
> >
>
>

Re: Disable the chain of the Sink operator

Posted by wudi <67...@qq.com>.
thanks for your replies.
I think that if Writer and Commiter are not chained together, data consistency can be guaranteed, right?
Because when the Commiter does not block the Writer, at the next Checkpoint, if the Commit is not completed, the SinkWriter.precommit will not be triggered

In addition, in this scenario (writer blocking caused by slow commit), may the performance of disabling Sink's chain be better? Because it can reduce a lot of back pressure.

Thanks && Regards,
di.wu


> 2023年2月16日 下午10:05,Chesnay Schepler <ch...@apache.org> 写道:
> 
> As far as I know that chain between committer and writer is also required for correctness.
> 
> On 16/02/2023 14:53, weijie guo wrote:
>> Hi wu,
>> 
>> I don't think it is a good choice to directly change the strategy of chain. Operator chain usually has better performance and resource utilization. If we directly change the chain policy between them, users can no longer chain them together, which is not a good starting point.
>> 
>> Best regards,
>> 
>> Weijie
>> 
>> 
>> 
>> wudi <67...@qq.com> 于2023年2月16日周四 19:29写道:
>> 
>>    Thank you for your reply.
>> 
>>    Currently in the custom Sink Connector, the Flink task will
>>    combine Writer and Committer into one thread, and the thread name
>>    is similar: *[Sink: Writer -> Sink: Committer (1/1)#0]*.
>>    In this way, when the *Committer.commit()* method is very slow, it
>>    will block the*SinkWriter.write()* method to receive upstream data.
>> 
>>    The client can use the *env.disableOperatorChaining() *method to
>>    split the thread into two threads:*[Sink: Writer (1/1)#0] *and
>>    *[Sink: Committer (1/1)#0]*. This Committer. The commit method
>>    will not block the SinkWriter.write method.
>> 
>>    If the chain policy can be disabled in the custom Sink Connector,
>>    the client can be prevented from setting and disabling the chain.
>>    Or is there a better way to make*Committer.commit()* not block
>>    *SinkWriter.write()*?
>> 
>>    Looking forward for your reply.
>>    Thanks && Regards,
>>    di.wu
>> 
>>>    2023年2月16日 下午6:54,Shammon FY <zj...@gmail.com> 写道:
>>> 
>>>    Hi
>>> 
>>>    Do you mean how to disable `chain` in your custom sink
>>>    connector?  Can you
>>>    give an example of what you want?
>>> 
>>>    On Wed, Feb 15, 2023 at 10:42 PM di wu <67...@qq.com> wrote:
>>> 
>>>>    Hello
>>>> 
>>>>    The current Sink operator will be split into two operations,
>>>>    Writer and
>>>>    Commiter. By default, they will be chained together and executed
>>>>    on the
>>>>    same thread.
>>>>    So sometimes when the commiter is very slow, it will block the data
>>>>    writer, causing back pressure.
>>>> 
>>>>    At present, FlinkSQL can be solved by disabling the chain
>>>>    globally, and
>>>>    DataStream can partially disable the chain through the
>>>>    disableChaining
>>>>    method, but both of them need to be set by the user.
>>>> 
>>>>    Can the strategy of the Chain be changed in the Custom Sink
>>>>    Connector to
>>>>    separate Writer and Commiter?
>>>> 
>>>>    Thanks && Regards,
>>>>    di.wu
>>>> 
>>> 
>> 
> 


Re: Disable the chain of the Sink operator

Posted by wudi <67...@qq.com.INVALID>.
thanks for your replies.
I think that if Writer and Commiter are not chained together, data consistency can be guaranteed, right?
Because when the Commiter does not block the Writer, at the next Checkpoint, if the Commit is not completed, the SinkWriter.precommit will not be triggered

In addition, in this scenario (writer blocking caused by slow commit), may the performance of disabling Sink's chain be better? Because it can reduce a lot of back pressure.

Thanks && Regards,
di.wu


> 2023年2月16日 下午10:05,Chesnay Schepler <ch...@apache.org> 写道:
> 
> As far as I know that chain between committer and writer is also required for correctness.
> 
> On 16/02/2023 14:53, weijie guo wrote:
>> Hi wu,
>> 
>> I don't think it is a good choice to directly change the strategy of chain. Operator chain usually has better performance and resource utilization. If we directly change the chain policy between them, users can no longer chain them together, which is not a good starting point.
>> 
>> Best regards,
>> 
>> Weijie
>> 
>> 
>> 
>> wudi <67...@qq.com> 于2023年2月16日周四 19:29写道:
>> 
>>    Thank you for your reply.
>> 
>>    Currently in the custom Sink Connector, the Flink task will
>>    combine Writer and Committer into one thread, and the thread name
>>    is similar: *[Sink: Writer -> Sink: Committer (1/1)#0]*.
>>    In this way, when the *Committer.commit()* method is very slow, it
>>    will block the*SinkWriter.write()* method to receive upstream data.
>> 
>>    The client can use the *env.disableOperatorChaining() *method to
>>    split the thread into two threads:*[Sink: Writer (1/1)#0] *and
>>    *[Sink: Committer (1/1)#0]*. This Committer. The commit method
>>    will not block the SinkWriter.write method.
>> 
>>    If the chain policy can be disabled in the custom Sink Connector,
>>    the client can be prevented from setting and disabling the chain.
>>    Or is there a better way to make*Committer.commit()* not block
>>    *SinkWriter.write()*?
>> 
>>    Looking forward for your reply.
>>    Thanks && Regards,
>>    di.wu
>> 
>>>    2023年2月16日 下午6:54,Shammon FY <zj...@gmail.com> 写道:
>>> 
>>>    Hi
>>> 
>>>    Do you mean how to disable `chain` in your custom sink
>>>    connector?  Can you
>>>    give an example of what you want?
>>> 
>>>    On Wed, Feb 15, 2023 at 10:42 PM di wu <67...@qq.com> wrote:
>>> 
>>>>    Hello
>>>> 
>>>>    The current Sink operator will be split into two operations,
>>>>    Writer and
>>>>    Commiter. By default, they will be chained together and executed
>>>>    on the
>>>>    same thread.
>>>>    So sometimes when the commiter is very slow, it will block the data
>>>>    writer, causing back pressure.
>>>> 
>>>>    At present, FlinkSQL can be solved by disabling the chain
>>>>    globally, and
>>>>    DataStream can partially disable the chain through the
>>>>    disableChaining
>>>>    method, but both of them need to be set by the user.
>>>> 
>>>>    Can the strategy of the Chain be changed in the Custom Sink
>>>>    Connector to
>>>>    separate Writer and Commiter?
>>>> 
>>>>    Thanks && Regards,
>>>>    di.wu
>>>> 
>>> 
>> 
> 


Re: Disable the chain of the Sink operator

Posted by Chesnay Schepler <ch...@apache.org>.
As far as I know that chain between committer and writer is also 
required for correctness.

On 16/02/2023 14:53, weijie guo wrote:
> Hi wu,
>
> I don't think it is a good choice to directly change the strategy of 
> chain. Operator chain usually has better performance and resource 
> utilization. If we directly change the chain policy between them, 
> users can no longer chain them together, which is not a good starting 
> point.
>
> Best regards,
>
> Weijie
>
>
>
> wudi <67...@qq.com> 于2023年2月16日周四 19:29写道:
>
>     Thank you for your reply.
>
>     Currently in the custom Sink Connector, the Flink task will
>     combine Writer and Committer into one thread, and the thread name
>     is similar: *[Sink: Writer -> Sink: Committer (1/1)#0]*.
>     In this way, when the *Committer.commit()* method is very slow, it
>     will block the*SinkWriter.write()* method to receive upstream data.
>
>     The client can use the *env.disableOperatorChaining() *method to
>     split the thread into two threads:*[Sink: Writer (1/1)#0] *and
>     *[Sink: Committer (1/1)#0]*. This Committer. The commit method
>     will not block the SinkWriter.write method.
>
>     If the chain policy can be disabled in the custom Sink Connector,
>     the client can be prevented from setting and disabling the chain.
>     Or is there a better way to make*Committer.commit()* not block
>     *SinkWriter.write()*?
>
>     Looking forward for your reply.
>     Thanks && Regards,
>     di.wu
>
>>     2023年2月16日 下午6:54,Shammon FY <zj...@gmail.com> 写道:
>>
>>     Hi
>>
>>     Do you mean how to disable `chain` in your custom sink
>>     connector?  Can you
>>     give an example of what you want?
>>
>>     On Wed, Feb 15, 2023 at 10:42 PM di wu <67...@qq.com> wrote:
>>
>>>     Hello
>>>
>>>     The current Sink operator will be split into two operations,
>>>     Writer and
>>>     Commiter. By default, they will be chained together and executed
>>>     on the
>>>     same thread.
>>>     So sometimes when the commiter is very slow, it will block the data
>>>     writer, causing back pressure.
>>>
>>>     At present, FlinkSQL can be solved by disabling the chain
>>>     globally, and
>>>     DataStream can partially disable the chain through the
>>>     disableChaining
>>>     method, but both of them need to be set by the user.
>>>
>>>     Can the strategy of the Chain be changed in the Custom Sink
>>>     Connector to
>>>     separate Writer and Commiter?
>>>
>>>     Thanks && Regards,
>>>     di.wu
>>>
>>
>

Re: Disable the chain of the Sink operator

Posted by Chesnay Schepler <ch...@apache.org>.
As far as I know that chain between committer and writer is also 
required for correctness.

On 16/02/2023 14:53, weijie guo wrote:
> Hi wu,
>
> I don't think it is a good choice to directly change the strategy of 
> chain. Operator chain usually has better performance and resource 
> utilization. If we directly change the chain policy between them, 
> users can no longer chain them together, which is not a good starting 
> point.
>
> Best regards,
>
> Weijie
>
>
>
> wudi <67...@qq.com> 于2023年2月16日周四 19:29写道:
>
>     Thank you for your reply.
>
>     Currently in the custom Sink Connector, the Flink task will
>     combine Writer and Committer into one thread, and the thread name
>     is similar: *[Sink: Writer -> Sink: Committer (1/1)#0]*.
>     In this way, when the *Committer.commit()* method is very slow, it
>     will block the*SinkWriter.write()* method to receive upstream data.
>
>     The client can use the *env.disableOperatorChaining() *method to
>     split the thread into two threads:*[Sink: Writer (1/1)#0] *and
>     *[Sink: Committer (1/1)#0]*. This Committer. The commit method
>     will not block the SinkWriter.write method.
>
>     If the chain policy can be disabled in the custom Sink Connector,
>     the client can be prevented from setting and disabling the chain.
>     Or is there a better way to make*Committer.commit()* not block
>     *SinkWriter.write()*?
>
>     Looking forward for your reply.
>     Thanks && Regards,
>     di.wu
>
>>     2023年2月16日 下午6:54,Shammon FY <zj...@gmail.com> 写道:
>>
>>     Hi
>>
>>     Do you mean how to disable `chain` in your custom sink
>>     connector?  Can you
>>     give an example of what you want?
>>
>>     On Wed, Feb 15, 2023 at 10:42 PM di wu <67...@qq.com> wrote:
>>
>>>     Hello
>>>
>>>     The current Sink operator will be split into two operations,
>>>     Writer and
>>>     Commiter. By default, they will be chained together and executed
>>>     on the
>>>     same thread.
>>>     So sometimes when the commiter is very slow, it will block the data
>>>     writer, causing back pressure.
>>>
>>>     At present, FlinkSQL can be solved by disabling the chain
>>>     globally, and
>>>     DataStream can partially disable the chain through the
>>>     disableChaining
>>>     method, but both of them need to be set by the user.
>>>
>>>     Can the strategy of the Chain be changed in the Custom Sink
>>>     Connector to
>>>     separate Writer and Commiter?
>>>
>>>     Thanks && Regards,
>>>     di.wu
>>>
>>
>

Re: Disable the chain of the Sink operator

Posted by weijie guo <gu...@gmail.com>.
Hi wu,

I don't think it is a good choice to directly change the strategy of chain.
Operator chain usually has better performance and resource utilization. If
we directly change the chain policy between them, users can no longer chain
them together, which is not a good starting point.

Best regards,

Weijie


wudi <67...@qq.com> 于2023年2月16日周四 19:29写道:

> Thank you for your reply.
>
> Currently in the custom Sink Connector, the Flink task will combine Writer
> and Committer into one thread, and the thread name is similar: *[Sink:
> Writer -> Sink: Committer (1/1)#0]*.
> In this way, when the *Committer.commit()* method is very slow, it will
> block the* SinkWriter.write()* method to receive upstream data.
>
> The client can use the *env.disableOperatorChaining() *method to split
> the thread into two threads:* [Sink: Writer (1/1)#0] *and *[Sink:
> Committer (1/1)#0]*. This Committer. The commit method will not block the
> SinkWriter.write method.
>
> If the chain policy can be disabled in the custom Sink Connector, the
> client can be prevented from setting and disabling the chain. Or is there a
> better way to make* Committer.commit()* not block *SinkWriter.write()*?
>
> Looking forward for your reply.
> Thanks && Regards,
> di.wu
>
> 2023年2月16日 下午6:54,Shammon FY <zj...@gmail.com> 写道:
>
> Hi
>
> Do you mean how to disable `chain` in your custom sink connector?  Can you
> give an example of what you want?
>
> On Wed, Feb 15, 2023 at 10:42 PM di wu <67...@qq.com> wrote:
>
> Hello
>
> The current Sink operator will be split into two operations, Writer and
> Commiter. By default, they will be chained together and executed on the
> same thread.
> So sometimes when the commiter is very slow, it will block the data
> writer, causing back pressure.
>
> At present, FlinkSQL can be solved by disabling the chain globally, and
> DataStream can partially disable the chain through the disableChaining
> method, but both of them need to be set by the user.
>
> Can the strategy of the Chain be changed in the Custom Sink Connector to
> separate Writer and Commiter?
>
> Thanks && Regards,
> di.wu
>
>
>
>

Re: Disable the chain of the Sink operator

Posted by weijie guo <gu...@gmail.com>.
Hi wu,

I don't think it is a good choice to directly change the strategy of chain.
Operator chain usually has better performance and resource utilization. If
we directly change the chain policy between them, users can no longer chain
them together, which is not a good starting point.

Best regards,

Weijie


wudi <67...@qq.com> 于2023年2月16日周四 19:29写道:

> Thank you for your reply.
>
> Currently in the custom Sink Connector, the Flink task will combine Writer
> and Committer into one thread, and the thread name is similar: *[Sink:
> Writer -> Sink: Committer (1/1)#0]*.
> In this way, when the *Committer.commit()* method is very slow, it will
> block the* SinkWriter.write()* method to receive upstream data.
>
> The client can use the *env.disableOperatorChaining() *method to split
> the thread into two threads:* [Sink: Writer (1/1)#0] *and *[Sink:
> Committer (1/1)#0]*. This Committer. The commit method will not block the
> SinkWriter.write method.
>
> If the chain policy can be disabled in the custom Sink Connector, the
> client can be prevented from setting and disabling the chain. Or is there a
> better way to make* Committer.commit()* not block *SinkWriter.write()*?
>
> Looking forward for your reply.
> Thanks && Regards,
> di.wu
>
> 2023年2月16日 下午6:54,Shammon FY <zj...@gmail.com> 写道:
>
> Hi
>
> Do you mean how to disable `chain` in your custom sink connector?  Can you
> give an example of what you want?
>
> On Wed, Feb 15, 2023 at 10:42 PM di wu <67...@qq.com> wrote:
>
> Hello
>
> The current Sink operator will be split into two operations, Writer and
> Commiter. By default, they will be chained together and executed on the
> same thread.
> So sometimes when the commiter is very slow, it will block the data
> writer, causing back pressure.
>
> At present, FlinkSQL can be solved by disabling the chain globally, and
> DataStream can partially disable the chain through the disableChaining
> method, but both of them need to be set by the user.
>
> Can the strategy of the Chain be changed in the Custom Sink Connector to
> separate Writer and Commiter?
>
> Thanks && Regards,
> di.wu
>
>
>
>

Re: Disable the chain of the Sink operator

Posted by wudi <67...@qq.com>.
Thank you for your reply.

Currently in the custom Sink Connector, the Flink task will combine Writer and Committer into one thread, and the thread name is similar: [Sink: Writer -> Sink: Committer (1/1)#0].
In this way, when the Committer.commit() method is very slow, it will block the SinkWriter.write() method to receive upstream data.

The client can use the env.disableOperatorChaining() method to split the thread into two threads: [Sink: Writer (1/1)#0] and [Sink: Committer (1/1)#0]. This Committer. The commit method will not block the SinkWriter.write method.

If the chain policy can be disabled in the custom Sink Connector, the client can be prevented from setting and disabling the chain. Or is there a better way to make Committer.commit() not block SinkWriter.write()?

Looking forward for your reply.
Thanks && Regards,
di.wu

> 2023年2月16日 下午6:54,Shammon FY <zj...@gmail.com> 写道:
> 
> Hi
> 
> Do you mean how to disable `chain` in your custom sink connector?  Can you
> give an example of what you want?
> 
> On Wed, Feb 15, 2023 at 10:42 PM di wu <67...@qq.com> wrote:
> 
>> Hello
>> 
>> The current Sink operator will be split into two operations, Writer and
>> Commiter. By default, they will be chained together and executed on the
>> same thread.
>> So sometimes when the commiter is very slow, it will block the data
>> writer, causing back pressure.
>> 
>> At present, FlinkSQL can be solved by disabling the chain globally, and
>> DataStream can partially disable the chain through the disableChaining
>> method, but both of them need to be set by the user.
>> 
>> Can the strategy of the Chain be changed in the Custom Sink Connector to
>> separate Writer and Commiter?
>> 
>> Thanks && Regards,
>> di.wu
>> 
> 


Re: Disable the chain of the Sink operator

Posted by wudi <67...@qq.com.INVALID>.
Thank you for your reply.

Currently in the custom Sink Connector, the Flink task will combine Writer and Committer into one thread, and the thread name is similar: [Sink: Writer -> Sink: Committer (1/1)#0].
In this way, when the Committer.commit() method is very slow, it will block the SinkWriter.write() method to receive upstream data.

The client can use the env.disableOperatorChaining() method to split the thread into two threads: [Sink: Writer (1/1)#0] and [Sink: Committer (1/1)#0]. This Committer. The commit method will not block the SinkWriter.write method.

If the chain policy can be disabled in the custom Sink Connector, the client can be prevented from setting and disabling the chain. Or is there a better way to make Committer.commit() not block SinkWriter.write()?

Looking forward for your reply.
Thanks && Regards,
di.wu

> 2023年2月16日 下午6:54,Shammon FY <zj...@gmail.com> 写道:
> 
> Hi
> 
> Do you mean how to disable `chain` in your custom sink connector?  Can you
> give an example of what you want?
> 
> On Wed, Feb 15, 2023 at 10:42 PM di wu <67...@qq.com> wrote:
> 
>> Hello
>> 
>> The current Sink operator will be split into two operations, Writer and
>> Commiter. By default, they will be chained together and executed on the
>> same thread.
>> So sometimes when the commiter is very slow, it will block the data
>> writer, causing back pressure.
>> 
>> At present, FlinkSQL can be solved by disabling the chain globally, and
>> DataStream can partially disable the chain through the disableChaining
>> method, but both of them need to be set by the user.
>> 
>> Can the strategy of the Chain be changed in the Custom Sink Connector to
>> separate Writer and Commiter?
>> 
>> Thanks && Regards,
>> di.wu
>> 
> 


Re: Disable the chain of the Sink operator

Posted by Shammon FY <zj...@gmail.com>.
Hi

Do you mean how to disable `chain` in your custom sink connector?  Can you
give an example of what you want?

On Wed, Feb 15, 2023 at 10:42 PM di wu <67...@qq.com> wrote:

> Hello
>
> The current Sink operator will be split into two operations, Writer and
> Commiter. By default, they will be chained together and executed on the
> same thread.
> So sometimes when the commiter is very slow, it will block the data
> writer, causing back pressure.
>
> At present, FlinkSQL can be solved by disabling the chain globally, and
> DataStream can partially disable the chain through the disableChaining
> method, but both of them need to be set by the user.
>
> Can the strategy of the Chain be changed in the Custom Sink Connector to
> separate Writer and Commiter?
>
> Thanks && Regards,
> di.wu
>

Re: Disable the chain of the Sink operator

Posted by Shammon FY <zj...@gmail.com>.
Hi

Do you mean how to disable `chain` in your custom sink connector?  Can you
give an example of what you want?

On Wed, Feb 15, 2023 at 10:42 PM di wu <67...@qq.com> wrote:

> Hello
>
> The current Sink operator will be split into two operations, Writer and
> Commiter. By default, they will be chained together and executed on the
> same thread.
> So sometimes when the commiter is very slow, it will block the data
> writer, causing back pressure.
>
> At present, FlinkSQL can be solved by disabling the chain globally, and
> DataStream can partially disable the chain through the disableChaining
> method, but both of them need to be set by the user.
>
> Can the strategy of the Chain be changed in the Custom Sink Connector to
> separate Writer and Commiter?
>
> Thanks && Regards,
> di.wu
>