You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by Baozi <wu...@icloud.com.INVALID> on 2022/05/09 09:02:12 UTC

[DISCUSS] Cancel the configuration of autoAck in Function framework

Hi, guys:

I found out that autoAck configuration in function framework now affects Delivery semantics, and make it difficult for users to understand. Refer to the following two scenarios.

1. If the user understands that the semantics of Guarantees shall prevail

If the user set Guarantees == ATMOST_ONCE and autoAck == false. Then the processing semantics of the actual Function will become ATLEAST_ONCE. Refer to the following code, this scenario will not immediately ack.

JavaInstanceRunnable#run():Line273  <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L271-L276>
if (instanceConfig.getFunctionDetails().getProcessingGuarantees() == org.apache.pulsar.functions
        .proto.Function.ProcessingGuarantees.ATMOST_ONCE) {
    if (instanceConfig.getFunctionDetails().getAutoAck()) { // just when autoAck == true to auto ack
        currentRecord.ack();
    }
}

2. If the user thinks that the framework doesn’t auto ack when autoAck == false

According to the following code, the framework is still automatically acked.

PulsarSinkAtLeastOnceProcessor#sendOutputMessage():Line275 <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L274-L276>
PulsarSinkEffectivelyOnceProcessor#sendOutputMessage():Line325 <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L325>

public void sendOutputMessage(TypedMessageBuilder<T> msg, SinkRecord<T> record) {
    msg.sendAsync()
            .thenAccept(messageId -> record.ack()) 
            .exceptionally(getPublishErrorHandler(record, true));
}

To sum up, users may be confused when configuring Guarantees and autoAck, and cannot judge their correct expected behavior.

I would like to discuss whether it is possible to cancel the autoAck configuration and add a CUSTOM type for Guarantees.

switch (processingGuarantees) {
	case Guarantees.ATMOST_ONCE: After the framework consumes the message, it immediately acks
	case Guarantees.ATLEAST_ONCE: After processing on the source side, perform ack again
	case Guarantees.EFFECTIVELY_ONCE: After processing on the source side, perform ack again
	case  Guarantees.CUSTOM: The function framework does not help users with any ack operations and semantic guarantees
}

If you have any ideas, welcome to discuss. If everyone agrees with this idea, I will mention a PIP to promote implementation.

Thanks,
Baodi Shi


Re: [DISCUSS] Cancel the configuration of autoAck in Function framework

Posted by Neng Lu <nl...@apache.org>.
Regarding your question "why AUTO_ACK is designed this way"

I think at the time when it's firstly implemented, the AUTO_ACK is just a convenient way to help user ack the message.

We can discuss the gap between expected behavior and actual behavior and try to resolve or simplify it.

On 2022/05/10 01:14:07 Baozi wrote:
> Thanks reply,
> 
> > If AUTO_ACK is TRUE, then the JavaInstanceRunnable will be acking messages.
> > If AUTO_ACK is  FALSE, then the acking will be done by Sink implementation.
> 
> A little confused, I want to know why AUTO_ACK is designed this way.
> 
> I'll give another example:
> 
> > If AUTO_ACK is TRUE, then the JavaInstanceRunnable will be acking messages.
> 
> 
> And if Guarantees != ATMOST_ONCE,then the JavaInstanceRunnable not will ack message.
> 
> > JavaInstanceRunnable#run():Line273
> 
> 
> Thanks,
> Baodi Shi
> 
> > 2022年5月10日 01:0009,Neng Lu <nl...@apache.org> 写道:
> > 
> > Thanks for this detailed discussion about processing guarantee and ack.
> > These two settings are together affecting the behavior of a running function.
> > 
> > One thing I want to clarify is: 
> > AUTO_ACK setting means if the function runtime will ack messages or not. ("function runtime" here specifically refers to the JavaInstanceRunnable. If the ack happens inside a sink's implemented write method, it's not auto-ack). 
> > 
> > If AUTO_ACK is TRUE, then the JavaInstanceRunnable will be acking messages.
> > If AUTO_ACK is  FALSE, then the acking will be done by Sink implementation.
> > 
> > Now with this context, let's review your two scenarios:
> > 
> >> 1.If the user set Guarantees == ATMOST_ONCE and autoAck == false.
> > To be precise, the processing semantics is not ATLEAST_ONCE. It's actually left to the implemented Sink to decide which semantics it is. It can be ATMOST_ONCE, ATLEAST_ONCE and probably EFFECTIVELLY_ONCE.
> > 
> >> 2. If the user thinks that the framework doesn’t auto ack when autoAck == false
> > This behavior is actually correct based on our previous context.
> > 
> > A real problematic scenario here is when USER sets ATLEAST_ONCE/EFFECTIVELY_ONCE and AUTO_ACK=true. I don't think the JavaInstanceRunnable can ack for use under these cases. So there should be some check to ban user submit function with such configs.
> > 
> > 
> > 
> > On 2022/05/09 09:02:12 Baozi wrote:
> >> Hi, guys:
> >> 
> >> I found out that autoAck configuration in function framework now affects Delivery semantics, and make it difficult for users to understand. Refer to the following two scenarios.
> >> 
> >> 1. If the user understands that the semantics of Guarantees shall prevail
> >> 
> >> If the user set Guarantees == ATMOST_ONCE and autoAck == false. Then the processing semantics of the actual Function will become ATLEAST_ONCE. Refer to the following code, this scenario will not immediately ack.
> >> 
> >> JavaInstanceRunnable#run():Line273  <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L271-L276>
> >> if (instanceConfig.getFunctionDetails().getProcessingGuarantees() == org.apache.pulsar.functions
> >>        .proto.Function.ProcessingGuarantees.ATMOST_ONCE) {
> >>    if (instanceConfig.getFunctionDetails().getAutoAck()) { // just when autoAck == true to auto ack
> >>        currentRecord.ack();
> >>    }
> >> }
> >> 
> >> 2. If the user thinks that the framework doesn’t auto ack when autoAck == false
> >> 
> >> According to the following code, the framework is still automatically acked.
> >> 
> >> PulsarSinkAtLeastOnceProcessor#sendOutputMessage():Line275 <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L274-L276>
> >> PulsarSinkEffectivelyOnceProcessor#sendOutputMessage():Line325 <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L325>
> >> 
> >> public void sendOutputMessage(TypedMessageBuilder<T> msg, SinkRecord<T> record) {
> >>    msg.sendAsync()
> >>            .thenAccept(messageId -> record.ack()) 
> >>            .exceptionally(getPublishErrorHandler(record, true));
> >> }
> >> 
> >> To sum up, users may be confused when configuring Guarantees and autoAck, and cannot judge their correct expected behavior.
> >> 
> >> I would like to discuss whether it is possible to cancel the autoAck configuration and add a CUSTOM type for Guarantees.
> >> 
> >> switch (processingGuarantees) {
> >> 	case Guarantees.ATMOST_ONCE: After the framework consumes the message, it immediately acks
> >> 	case Guarantees.ATLEAST_ONCE: After processing on the source side, perform ack again
> >> 	case Guarantees.EFFECTIVELY_ONCE: After processing on the source side, perform ack again
> >> 	case  Guarantees.CUSTOM: The function framework does not help users with any ack operations and semantic guarantees
> >> }
> >> 
> >> If you have any ideas, welcome to discuss. If everyone agrees with this idea, I will mention a PIP to promote implementation.
> >> 
> >> Thanks,
> >> Baodi Shi
> >> 
> >> 
> 
> 

Re: [DISCUSS] Cancel the configuration of autoAck in Function framework

Posted by Neng Lu <nl...@apache.org>.
> For users, sink is also part of the function framework.

^^ Is this written inside any Pulsar documentation? If you look the code closely, the source and sink are actually configurable in Java runtime. User can actually provide their own source/sink implementation.

On 2022/05/10 01:38:48 Baozi wrote:
> > AUTO_ACK setting means if the function runtime will ack messages or not. ("function runtime" here specifically refers to the JavaInstanceRunnable. If the ack happens inside a sink's implemented write method, it's not auto-ack). 
> The description of the official website document is:Whether or not the framework acknowledges messages automatically.
> For users, sink is also part of the function framework.
> 
> 
> Thanks,
> Baodi Shi
> 
> > 2022年5月10日 09:1407,Baozi <wu...@icloud.com.INVALID> 写道:
> > 
> > Thanks for this detailed discussion about processing guarantee and ack.
> > These two settings are together affecting the behavior of a running function.
> > 
> > One thing I want to clarify is: 
> > AUTO_ACK setting means if the function runtime will ack messages or not. ("function runtime" here specifically refers to the JavaInstanceRunnable. If the ack happens inside a sink's implemented write method, it's not auto-ack). 
> > 
> > If AUTO_ACK is TRUE, then the JavaInstanceRunnable will be acking messages.
> > If AUTO_ACK is FALSE, then the acking will be done by Sink implementation.
> > 
> > Now with this context, let's review your two scenarios:
> > 
> >> 1.If the user set Guarantees == ATMOST_ONCE and autoAck == false.
> > To be precise, the processing semantics is not ATLEAST_ONCE. It's actually left to the implemented Sink to decide which semantics it is. It can be ATMOST_ONCE, ATLEAST_ONCE and probably EFFECTIVELLY_ONCE.
> > 
> >> 2. If the user thinks that the framework doesn’t auto ack when autoAck == false
> > This behavior is actually correct based on our previous context.
> > 
> > A real problematic scenario here is when USER sets ATLEAST_ONCE/EFFECTIVELY_ONCE and AUTO_ACK=true. I don't think the JavaInstanceRunnable can ack for use under these cases. So there should be some check to ban user submit function with such configs.
> > 
> > 
> > 
> > On 2022/05/09 09:02:12 Baozi wrote:
> >> Hi, guys:
> >> 
> >> I found out that autoAck configuration in function framework now affects Delivery semantics, and make it difficult for users to understand. Refer to the following two scenarios.
> >> 
> >> 1. If the user understands that the semantics of Guarantees shall prevail
> >> 
> >> If the user set Guarantees == ATMOST_ONCE and autoAck == false. Then the processing semantics of the actual Function will become ATLEAST_ONCE. Refer to the following code, this scenario will not immediately ack.
> >> 
> >> JavaInstanceRunnable#run():Line273 <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L271-L276 <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L271-L276>>
> >> if (instanceConfig.getFunctionDetails().getProcessingGuarantees() == org.apache.pulsar.functions
> >> .proto.Function.ProcessingGuarantees.ATMOST_ONCE) {
> >> if (instanceConfig.getFunctionDetails().getAutoAck()) { // just when autoAck == true to auto ack
> >> currentRecord.ack();
> >> }
> >> }
> >> 
> >> 2. If the user thinks that the framework doesn’t auto ack when autoAck == false
> >> 
> >> According to the following code, the framework is still automatically acked.
> >> 
> >> PulsarSinkAtLeastOnceProcessor#sendOutputMessage():Line275 <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L274-L276 <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L274-L276>>
> >> PulsarSinkEffectivelyOnceProcessor#sendOutputMessage():Line325 <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L325 <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L325>>
> >> 
> >> public void sendOutputMessage(TypedMessageBuilder<T> msg, SinkRecord<T> record) {
> >> msg.sendAsync()
> >> .thenAccept(messageId -> record.ack()) 
> >> .exceptionally(getPublishErrorHandler(record, true));
> >> }
> >> 
> >> To sum up, users may be confused when configuring Guarantees and autoAck, and cannot judge their correct expected behavior.
> >> 
> >> I would like to discuss whether it is possible to cancel the autoAck configuration and add a CUSTOM type for Guarantees.
> >> 
> >> switch (processingGuarantees) {
> >> 	case Guarantees.ATMOST_ONCE: After the framework consumes the message, it immediately acks
> >> 	case Guarantees.ATLEAST_ONCE: After processing on the source side, perform ack again
> >> 	case Guarantees.EFFECTIVELY_ONCE: After processing on the source side, perform ack again
> >> 	case Guarantees.CUSTOM: The function framework does not help users with any ack operations and semantic guarantees
> >> }
> >> 
> >> If you have any ideas, welcome to discuss. If everyone agrees with this idea, I will mention a PIP to promote implementation.
> >> 
> >> Thanks,
> >> Baodi Shi
> 
> 

Re: [DISCUSS] Cancel the configuration of autoAck in Function framework

Posted by Baozi <wu...@icloud.com.INVALID>.
> AUTO_ACK setting means if the function runtime will ack messages or not. ("function runtime" here specifically refers to the JavaInstanceRunnable. If the ack happens inside a sink's implemented write method, it's not auto-ack). 
The description of the official website document is:Whether or not the framework acknowledges messages automatically.
For users, sink is also part of the function framework.


Thanks,
Baodi Shi

> 2022年5月10日 09:1407,Baozi <wu...@icloud.com.INVALID> 写道:
> 
> Thanks for this detailed discussion about processing guarantee and ack.
> These two settings are together affecting the behavior of a running function.
> 
> One thing I want to clarify is: 
> AUTO_ACK setting means if the function runtime will ack messages or not. ("function runtime" here specifically refers to the JavaInstanceRunnable. If the ack happens inside a sink's implemented write method, it's not auto-ack). 
> 
> If AUTO_ACK is TRUE, then the JavaInstanceRunnable will be acking messages.
> If AUTO_ACK is FALSE, then the acking will be done by Sink implementation.
> 
> Now with this context, let's review your two scenarios:
> 
>> 1.If the user set Guarantees == ATMOST_ONCE and autoAck == false.
> To be precise, the processing semantics is not ATLEAST_ONCE. It's actually left to the implemented Sink to decide which semantics it is. It can be ATMOST_ONCE, ATLEAST_ONCE and probably EFFECTIVELLY_ONCE.
> 
>> 2. If the user thinks that the framework doesn’t auto ack when autoAck == false
> This behavior is actually correct based on our previous context.
> 
> A real problematic scenario here is when USER sets ATLEAST_ONCE/EFFECTIVELY_ONCE and AUTO_ACK=true. I don't think the JavaInstanceRunnable can ack for use under these cases. So there should be some check to ban user submit function with such configs.
> 
> 
> 
> On 2022/05/09 09:02:12 Baozi wrote:
>> Hi, guys:
>> 
>> I found out that autoAck configuration in function framework now affects Delivery semantics, and make it difficult for users to understand. Refer to the following two scenarios.
>> 
>> 1. If the user understands that the semantics of Guarantees shall prevail
>> 
>> If the user set Guarantees == ATMOST_ONCE and autoAck == false. Then the processing semantics of the actual Function will become ATLEAST_ONCE. Refer to the following code, this scenario will not immediately ack.
>> 
>> JavaInstanceRunnable#run():Line273 <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L271-L276 <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L271-L276>>
>> if (instanceConfig.getFunctionDetails().getProcessingGuarantees() == org.apache.pulsar.functions
>> .proto.Function.ProcessingGuarantees.ATMOST_ONCE) {
>> if (instanceConfig.getFunctionDetails().getAutoAck()) { // just when autoAck == true to auto ack
>> currentRecord.ack();
>> }
>> }
>> 
>> 2. If the user thinks that the framework doesn’t auto ack when autoAck == false
>> 
>> According to the following code, the framework is still automatically acked.
>> 
>> PulsarSinkAtLeastOnceProcessor#sendOutputMessage():Line275 <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L274-L276 <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L274-L276>>
>> PulsarSinkEffectivelyOnceProcessor#sendOutputMessage():Line325 <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L325 <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L325>>
>> 
>> public void sendOutputMessage(TypedMessageBuilder<T> msg, SinkRecord<T> record) {
>> msg.sendAsync()
>> .thenAccept(messageId -> record.ack()) 
>> .exceptionally(getPublishErrorHandler(record, true));
>> }
>> 
>> To sum up, users may be confused when configuring Guarantees and autoAck, and cannot judge their correct expected behavior.
>> 
>> I would like to discuss whether it is possible to cancel the autoAck configuration and add a CUSTOM type for Guarantees.
>> 
>> switch (processingGuarantees) {
>> 	case Guarantees.ATMOST_ONCE: After the framework consumes the message, it immediately acks
>> 	case Guarantees.ATLEAST_ONCE: After processing on the source side, perform ack again
>> 	case Guarantees.EFFECTIVELY_ONCE: After processing on the source side, perform ack again
>> 	case Guarantees.CUSTOM: The function framework does not help users with any ack operations and semantic guarantees
>> }
>> 
>> If you have any ideas, welcome to discuss. If everyone agrees with this idea, I will mention a PIP to promote implementation.
>> 
>> Thanks,
>> Baodi Shi


Re: [DISCUSS] Cancel the configuration of autoAck in Function framework

Posted by Baozi <wu...@icloud.com.INVALID>.
Thanks reply,

> If AUTO_ACK is TRUE, then the JavaInstanceRunnable will be acking messages.
> If AUTO_ACK is  FALSE, then the acking will be done by Sink implementation.

A little confused, I want to know why AUTO_ACK is designed this way.

I'll give another example:

> If AUTO_ACK is TRUE, then the JavaInstanceRunnable will be acking messages.


And if Guarantees != ATMOST_ONCE,then the JavaInstanceRunnable not will ack message.

> JavaInstanceRunnable#run():Line273


Thanks,
Baodi Shi

> 2022年5月10日 01:0009,Neng Lu <nl...@apache.org> 写道:
> 
> Thanks for this detailed discussion about processing guarantee and ack.
> These two settings are together affecting the behavior of a running function.
> 
> One thing I want to clarify is: 
> AUTO_ACK setting means if the function runtime will ack messages or not. ("function runtime" here specifically refers to the JavaInstanceRunnable. If the ack happens inside a sink's implemented write method, it's not auto-ack). 
> 
> If AUTO_ACK is TRUE, then the JavaInstanceRunnable will be acking messages.
> If AUTO_ACK is  FALSE, then the acking will be done by Sink implementation.
> 
> Now with this context, let's review your two scenarios:
> 
>> 1.If the user set Guarantees == ATMOST_ONCE and autoAck == false.
> To be precise, the processing semantics is not ATLEAST_ONCE. It's actually left to the implemented Sink to decide which semantics it is. It can be ATMOST_ONCE, ATLEAST_ONCE and probably EFFECTIVELLY_ONCE.
> 
>> 2. If the user thinks that the framework doesn’t auto ack when autoAck == false
> This behavior is actually correct based on our previous context.
> 
> A real problematic scenario here is when USER sets ATLEAST_ONCE/EFFECTIVELY_ONCE and AUTO_ACK=true. I don't think the JavaInstanceRunnable can ack for use under these cases. So there should be some check to ban user submit function with such configs.
> 
> 
> 
> On 2022/05/09 09:02:12 Baozi wrote:
>> Hi, guys:
>> 
>> I found out that autoAck configuration in function framework now affects Delivery semantics, and make it difficult for users to understand. Refer to the following two scenarios.
>> 
>> 1. If the user understands that the semantics of Guarantees shall prevail
>> 
>> If the user set Guarantees == ATMOST_ONCE and autoAck == false. Then the processing semantics of the actual Function will become ATLEAST_ONCE. Refer to the following code, this scenario will not immediately ack.
>> 
>> JavaInstanceRunnable#run():Line273  <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L271-L276>
>> if (instanceConfig.getFunctionDetails().getProcessingGuarantees() == org.apache.pulsar.functions
>>        .proto.Function.ProcessingGuarantees.ATMOST_ONCE) {
>>    if (instanceConfig.getFunctionDetails().getAutoAck()) { // just when autoAck == true to auto ack
>>        currentRecord.ack();
>>    }
>> }
>> 
>> 2. If the user thinks that the framework doesn’t auto ack when autoAck == false
>> 
>> According to the following code, the framework is still automatically acked.
>> 
>> PulsarSinkAtLeastOnceProcessor#sendOutputMessage():Line275 <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L274-L276>
>> PulsarSinkEffectivelyOnceProcessor#sendOutputMessage():Line325 <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L325>
>> 
>> public void sendOutputMessage(TypedMessageBuilder<T> msg, SinkRecord<T> record) {
>>    msg.sendAsync()
>>            .thenAccept(messageId -> record.ack()) 
>>            .exceptionally(getPublishErrorHandler(record, true));
>> }
>> 
>> To sum up, users may be confused when configuring Guarantees and autoAck, and cannot judge their correct expected behavior.
>> 
>> I would like to discuss whether it is possible to cancel the autoAck configuration and add a CUSTOM type for Guarantees.
>> 
>> switch (processingGuarantees) {
>> 	case Guarantees.ATMOST_ONCE: After the framework consumes the message, it immediately acks
>> 	case Guarantees.ATLEAST_ONCE: After processing on the source side, perform ack again
>> 	case Guarantees.EFFECTIVELY_ONCE: After processing on the source side, perform ack again
>> 	case  Guarantees.CUSTOM: The function framework does not help users with any ack operations and semantic guarantees
>> }
>> 
>> If you have any ideas, welcome to discuss. If everyone agrees with this idea, I will mention a PIP to promote implementation.
>> 
>> Thanks,
>> Baodi Shi
>> 
>> 


Re: [DISCUSS] Cancel the configuration of autoAck in Function framework

Posted by Neng Lu <nl...@apache.org>.
Thanks for this detailed discussion about processing guarantee and ack.
These two settings are together affecting the behavior of a running function.

One thing I want to clarify is: 
AUTO_ACK setting means if the function runtime will ack messages or not. ("function runtime" here specifically refers to the JavaInstanceRunnable. If the ack happens inside a sink's implemented write method, it's not auto-ack). 

If AUTO_ACK is TRUE, then the JavaInstanceRunnable will be acking messages.
If AUTO_ACK is  FALSE, then the acking will be done by Sink implementation.

Now with this context, let's review your two scenarios:

> 1.If the user set Guarantees == ATMOST_ONCE and autoAck == false.
To be precise, the processing semantics is not ATLEAST_ONCE. It's actually left to the implemented Sink to decide which semantics it is. It can be ATMOST_ONCE, ATLEAST_ONCE and probably EFFECTIVELLY_ONCE.

> 2. If the user thinks that the framework doesn’t auto ack when autoAck == false
This behavior is actually correct based on our previous context.

A real problematic scenario here is when USER sets ATLEAST_ONCE/EFFECTIVELY_ONCE and AUTO_ACK=true. I don't think the JavaInstanceRunnable can ack for use under these cases. So there should be some check to ban user submit function with such configs.



On 2022/05/09 09:02:12 Baozi wrote:
> Hi, guys:
> 
> I found out that autoAck configuration in function framework now affects Delivery semantics, and make it difficult for users to understand. Refer to the following two scenarios.
> 
> 1. If the user understands that the semantics of Guarantees shall prevail
> 
> If the user set Guarantees == ATMOST_ONCE and autoAck == false. Then the processing semantics of the actual Function will become ATLEAST_ONCE. Refer to the following code, this scenario will not immediately ack.
> 
> JavaInstanceRunnable#run():Line273  <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L271-L276>
> if (instanceConfig.getFunctionDetails().getProcessingGuarantees() == org.apache.pulsar.functions
>         .proto.Function.ProcessingGuarantees.ATMOST_ONCE) {
>     if (instanceConfig.getFunctionDetails().getAutoAck()) { // just when autoAck == true to auto ack
>         currentRecord.ack();
>     }
> }
> 
> 2. If the user thinks that the framework doesn’t auto ack when autoAck == false
> 
> According to the following code, the framework is still automatically acked.
> 
> PulsarSinkAtLeastOnceProcessor#sendOutputMessage():Line275 <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L274-L276>
> PulsarSinkEffectivelyOnceProcessor#sendOutputMessage():Line325 <https://github.com/apache/pulsar/blob/c49a977de4b0b525ec80e5070bc90eddcc7cddad/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java#L325>
> 
> public void sendOutputMessage(TypedMessageBuilder<T> msg, SinkRecord<T> record) {
>     msg.sendAsync()
>             .thenAccept(messageId -> record.ack()) 
>             .exceptionally(getPublishErrorHandler(record, true));
> }
> 
> To sum up, users may be confused when configuring Guarantees and autoAck, and cannot judge their correct expected behavior.
> 
> I would like to discuss whether it is possible to cancel the autoAck configuration and add a CUSTOM type for Guarantees.
> 
> switch (processingGuarantees) {
> 	case Guarantees.ATMOST_ONCE: After the framework consumes the message, it immediately acks
> 	case Guarantees.ATLEAST_ONCE: After processing on the source side, perform ack again
> 	case Guarantees.EFFECTIVELY_ONCE: After processing on the source side, perform ack again
> 	case  Guarantees.CUSTOM: The function framework does not help users with any ack operations and semantic guarantees
> }
> 
> If you have any ideas, welcome to discuss. If everyone agrees with this idea, I will mention a PIP to promote implementation.
> 
> Thanks,
> Baodi Shi
> 
>