You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/06/02 08:30:12 UTC

[pulsar] branch master updated: [improve][connector] Sink support custom acknowledge type (#15491)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a75dfdf2178 [improve][connector] Sink support custom acknowledge type (#15491)
a75dfdf2178 is described below

commit a75dfdf21787fde586e119cb0412bb7fd9a53ee3
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Thu Jun 2 16:29:56 2022 +0800

    [improve][connector] Sink support custom acknowledge type (#15491)
---
 .../pulsar/functions/instance/SinkRecord.java      | 25 ++++++++++++
 .../pulsar/functions/source/PulsarRecord.java      | 17 ++++++++
 .../pulsar/functions/source/PulsarSource.java      | 25 +++++++-----
 .../pulsar/functions/instance/SinkRecordTest.java  | 47 ++++++++++++++++++++++
 .../pulsar/functions/source/PulsarSourceTest.java  | 21 ++++++++++
 5 files changed, 124 insertions(+), 11 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
index 61e9d5378ae..b922b988581 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.functions.api.KVRecord;
 import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.source.PulsarRecord;
 
 @Slf4j
 @Data
@@ -82,6 +83,30 @@ public class SinkRecord<T> implements Record<T> {
         sourceRecord.ack();
     }
 
+    /**
+     * Some sink sometimes wants to control the ack type.
+     */
+    public void cumulativeAck() {
+        if (sourceRecord instanceof PulsarRecord) {
+            PulsarRecord pulsarRecord = (PulsarRecord) sourceRecord;
+            pulsarRecord.cumulativeAck();
+        } else {
+            throw new RuntimeException("SourceRecord class type must be PulsarRecord");
+        }
+    }
+
+    /**
+     * Some sink sometimes wants to control the ack type.
+     */
+    public void individualAck() {
+        if (sourceRecord instanceof PulsarRecord) {
+            PulsarRecord pulsarRecord = (PulsarRecord) sourceRecord;
+            pulsarRecord.individualAck();
+        } else {
+            throw new RuntimeException("SourceRecord class type must be PulsarRecord");
+        }
+    }
+
     @Override
     public void fail() {
         sourceRecord.fail();
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index e53e7fe9ace..cd8405e407d 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.source;
 
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Consumer;
 import lombok.Builder;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
@@ -44,6 +45,7 @@ public class PulsarRecord<T> implements RecordWithEncryptionContext<T> {
 
     private final Runnable failFunction;
     private final Runnable ackFunction;
+    private final Consumer<Boolean> customAckFunction;
 
     @Override
     public Optional<String> getKey() {
@@ -93,6 +95,20 @@ public class PulsarRecord<T> implements RecordWithEncryptionContext<T> {
         }
     }
 
+    /**
+     * Some sink sometimes wants to control the ack type.
+     */
+    public void cumulativeAck() {
+        this.customAckFunction.accept(true);
+    }
+
+    /**
+     * Some sink sometimes wants to control the ack type.
+     */
+    public void individualAck() {
+        this.customAckFunction.accept(false);
+    }
+
     @Override
     public Optional<EncryptionContext> getEncryptionCtx() {
         return message.getEncryptionCtx();
@@ -121,4 +137,5 @@ public class PulsarRecord<T> implements RecordWithEncryptionContext<T> {
     public Optional<Message<T>> getMessage() {
         return Optional.of(message);
     }
+
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 652c682bbed..1fb76459e60 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -132,18 +132,21 @@ public abstract class PulsarSource<T> implements Source<T> {
                 .message(message)
                 .schema(schema)
                 .topicName(message.getTopicName())
+                .customAckFunction(cumulative -> {
+                    if (cumulative) {
+                        consumer.acknowledgeCumulativeAsync(message)
+                                .whenComplete((unused, throwable) -> message.release());
+                    } else {
+                        consumer.acknowledgeAsync(message).whenComplete((unused, throwable) -> message.release());
+                    }
+                })
                 .ackFunction(() -> {
-                    try {
-                        if (pulsarSourceConfig
-                                .getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
-                            consumer.acknowledgeCumulativeAsync(message);
-                        } else {
-                            consumer.acknowledgeAsync(message);
-                        }
-                    } finally {
-                        // don't need to check if message pooling is set
-                        // client will automatically check
-                        message.release();
+                    if (pulsarSourceConfig
+                            .getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+                        consumer.acknowledgeCumulativeAsync(message)
+                                .whenComplete((unused, throwable) -> message.release());
+                    } else {
+                        consumer.acknowledgeAsync(message).whenComplete((unused, throwable) -> message.release());
                     }
                 }).failFunction(() -> {
                     try {
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/SinkRecordTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/SinkRecordTest.java
new file mode 100644
index 00000000000..56624581824
--- /dev/null
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/SinkRecordTest.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.source.PulsarRecord;
+import org.junit.Assert;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+public class SinkRecordTest {
+
+    @Test
+    public void testCustomAck() {
+
+        PulsarRecord pulsarRecord = Mockito.mock(PulsarRecord.class);
+        SinkRecord sinkRecord = new SinkRecord<>(pulsarRecord, new Object());
+
+        sinkRecord.cumulativeAck();
+        Mockito.verify(pulsarRecord, Mockito.times(1)).cumulativeAck();
+
+        sinkRecord = new SinkRecord(Mockito.mock(Record.class), new Object());
+        try {
+            sinkRecord.individualAck();
+            Assert.fail("Should throw runtime exception");
+        } catch (Exception e) {
+            Assert.assertTrue(e instanceof RuntimeException);
+            Assert.assertEquals(e.getMessage(), "SourceRecord class type must be PulsarRecord");
+        }
+    }
+}
\ No newline at end of file
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index 80c4001d36f..d7c0a8d818a 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -38,6 +38,7 @@ import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -360,4 +361,24 @@ public class PulsarSourceTest {
 
         fail("Unknown config type");
     }
+
+
+    @Test(dataProvider = "sourceImpls")
+    public void testPulsarRecordCustomAck(PulsarSourceConfig pulsarSourceConfig) throws Exception {
+
+        PulsarSource pulsarSource = getPulsarSource(pulsarSourceConfig);
+        Message message = Mockito.mock(Message.class);
+        Consumer consumer = Mockito.mock(Consumer.class);
+        Mockito.when(consumer.acknowledgeAsync(message)).thenReturn(CompletableFuture.completedFuture(null));
+        Mockito.when(consumer.acknowledgeCumulativeAsync(message)).thenReturn(CompletableFuture.completedFuture(null));
+
+        PulsarRecord record = (PulsarRecord) pulsarSource.buildRecord(consumer, message);
+
+        record.cumulativeAck();
+        Mockito.verify(consumer, Mockito.times(1)).acknowledgeCumulativeAsync(message);
+
+        record.individualAck();
+        Mockito.verify(consumer, Mockito.times(1)).acknowledgeAsync(message);
+    }
+
 }