You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/05/08 14:25:54 UTC

[GitHub] [pulsar] shibd opened a new pull request, #15491: [Connector] Sink support custom acknowledge type

shibd opened a new pull request, #15491:
URL: https://github.com/apache/pulsar/pull/15491

   ### Motivation
   
   The current source  ack type and `ProcessingGuarantees` are forcibly bound together, This is not flexible for implementing a new sink. If sink is Guarantees equals `ATMOST_ONCE`,  it can't be use `acknowledgeCumulative`
   
   https://github.com/apache/pulsar/blob/a0dcab679b1842a2efee4df2f31bf41ef5ab1dd0/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java#L136-L142
   
   
   ### Modifications
   - Provide an `ack(boolean cumulative)` method to support the sink to select the ack type
   
   ### Documentation
   - [x] `no-need-doc` 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #15491: [improve][connector] Sink support custom acknowledge type

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #15491:
URL: https://github.com/apache/pulsar/pull/15491#discussion_r883665150


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java:
##########
@@ -93,6 +95,15 @@ public Optional<Long> getEventTime() {
         }
     }
 
+    /**
+     * Some sink sometimes wants to control the ack type.
+     *
+     * @param cumulative
+     */
+    public void ack(boolean cumulative) {

Review Comment:
   Nice! I fixed. PTAL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #15491: [improve][connector] Sink support custom acknowledge type

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #15491:
URL: https://github.com/apache/pulsar/pull/15491#discussion_r871009890


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java:
##########
@@ -132,6 +132,17 @@ protected Record<T> buildRecord(Consumer<T> consumer, Message<T> message) {
                 .message(message)
                 .schema(schema)
                 .topicName(message.getTopicName())
+                .customAckFunction(cumulative -> {
+                    try {
+                        if (cumulative) {
+                            consumer.acknowledgeCumulativeAsync(message);
+                        } else {
+                            consumer.acknowledgeAsync(message);
+                        }
+                    } finally {
+                        message.release();

Review Comment:
   Thanks reply, i fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15491: [improve][connector] Sink support custom acknowledge type

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15491:
URL: https://github.com/apache/pulsar/pull/15491#discussion_r870971289


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java:
##########
@@ -82,6 +83,20 @@ public void ack() {
         sourceRecord.ack();
     }
 
+    /**
+     * Some sink sometimes wants to control the ack type.
+     *
+     * @param cumulative
+     */
+    public void ack(boolean cumulative) {
+        if (sourceRecord instanceof PulsarRecord) {
+            PulsarRecord pulsarRecord = (PulsarRecord) sourceRecord;
+            pulsarRecord.ack(cumulative);
+        } else {
+            throw new RuntimeException("SourceRecord class type must equals PulsarRecord");

Review Comment:
   `must equals` -> `must be` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on pull request #15491: [improve][connector] Sink support custom acknowledge type

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #15491:
URL: https://github.com/apache/pulsar/pull/15491#issuecomment-1125616646

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on a diff in pull request #15491: [improve][connector] Sink support custom acknowledge type

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on code in PR #15491:
URL: https://github.com/apache/pulsar/pull/15491#discussion_r879563093


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java:
##########
@@ -93,6 +95,15 @@ public Optional<Long> getEventTime() {
         }
     }
 
+    /**
+     * Some sink sometimes wants to control the ack type.
+     *
+     * @param cumulative
+     */
+    public void ack(boolean cumulative) {

Review Comment:
   what about adding `ackCumulative` or `cumulativeAck` instead of a blindly method with a boolean param? 
   
   it is also more similar to Consumer API 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on pull request #15491: [improve][connector] Sink support custom acknowledge type

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #15491:
URL: https://github.com/apache/pulsar/pull/15491#issuecomment-1134712872

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #15491: [improve][connector] Sink support custom acknowledge type

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #15491:
URL: https://github.com/apache/pulsar/pull/15491#discussion_r882218972


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java:
##########
@@ -93,6 +95,15 @@ public Optional<Long> getEventTime() {
         }
     }
 
+    /**
+     * Some sink sometimes wants to control the ack type.
+     *
+     * @param cumulative
+     */
+    public void ack(boolean cumulative) {

Review Comment:
   @nicoloboschi  Can you take a look at it again, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on pull request #15491: [improve][connector] Sink support custom acknowledge type

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #15491:
URL: https://github.com/apache/pulsar/pull/15491#issuecomment-1125707816

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on pull request #15491: [Connector] Sink support custom acknowledge type

Posted by GitBox <gi...@apache.org>.
shibd commented on PR #15491:
URL: https://github.com/apache/pulsar/pull/15491#issuecomment-1120452890

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15491: [improve][connector] Sink support custom acknowledge type

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15491:
URL: https://github.com/apache/pulsar/pull/15491#discussion_r870971855


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java:
##########
@@ -132,6 +132,17 @@ protected Record<T> buildRecord(Consumer<T> consumer, Message<T> message) {
                 .message(message)
                 .schema(schema)
                 .topicName(message.getTopicName())
+                .customAckFunction(cumulative -> {
+                    try {
+                        if (cumulative) {
+                            consumer.acknowledgeCumulativeAsync(message);
+                        } else {
+                            consumer.acknowledgeAsync(message);
+                        }
+                    } finally {
+                        message.release();

Review Comment:
   The ack is async, is it right to release here ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- merged pull request #15491: [improve][connector] Sink support custom acknowledge type

Posted by GitBox <gi...@apache.org>.
Technoboy- merged PR #15491:
URL: https://github.com/apache/pulsar/pull/15491


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #15491: [improve][connector] Sink support custom acknowledge type

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #15491:
URL: https://github.com/apache/pulsar/pull/15491#discussion_r880250358


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java:
##########
@@ -93,6 +95,15 @@ public Optional<Long> getEventTime() {
         }
     }
 
+    /**
+     * Some sink sometimes wants to control the ack type.
+     *
+     * @param cumulative
+     */
+    public void ack(boolean cumulative) {

Review Comment:
   I think this is similar to the API now provided, if provide `ackCumulative` or  `cumulativeAck` instead it, will easily make the original `ack` method understood as `Individual ack`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on a diff in pull request #15491: [improve][connector] Sink support custom acknowledge type

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on code in PR #15491:
URL: https://github.com/apache/pulsar/pull/15491#discussion_r883399746


##########
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java:
##########
@@ -93,6 +95,15 @@ public Optional<Long> getEventTime() {
         }
     }
 
+    /**
+     * Some sink sometimes wants to control the ack type.
+     *
+     * @param cumulative
+     */
+    public void ack(boolean cumulative) {

Review Comment:
   I read again the pull. I believe that we should have
   -  `cumulativeAck()` -> call cumulative acknowledgment
   -  `individualAck()` -> call acknowledgment
   - `ack()` -> based on the processing guaranteee
   
   we need to explicity document that the first two will force the ack type, and the latter will use the "better" depending on the configuration
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org