You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/30 09:36:39 UTC

[pulsar] branch master updated: fix Pulsar supporting DLQ for sources/sinks #7032 (#7094)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 36df3bb  fix Pulsar supporting DLQ for sources/sinks #7032 (#7094)
36df3bb is described below

commit 36df3bb3e28e4bc7f98e5788ea1680ddcce45997
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Sat May 30 17:36:27 2020 +0800

    fix Pulsar supporting DLQ for sources/sinks #7032 (#7094)
    
    Fixes #7032
    
    ### Motivation
    Source data flow:
    Custom Source-> Function-> Pulsar producer-> Broker
    
    Sink data flow:
    Broker-> Pulsar consumer-> Function-> Custom Sink
    
    Pulsar consumer is only used in sink mode, so it is necessary to add DLQ parameters for sink mode
---
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    | 69 ++++++++++++++++++++++
 .../java/org/apache/pulsar/io/SinkForTest.java     | 46 +++++++++++++++
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 16 ++++-
 .../org/apache/pulsar/common/io/SinkConfig.java    |  5 ++
 .../functions/instance/JavaInstanceRunnable.java   |  3 +
 .../pulsar/functions/source/PulsarSource.java      |  7 ++-
 .../functions/source/PulsarSourceConfig.java       |  1 +
 .../org/apache/pulsar/functions/LocalRunner.java   |  8 ++-
 .../proto/src/main/proto/Function.proto            |  1 +
 .../pulsar/functions/utils/SinkConfigUtils.java    | 21 +++++++
 10 files changed, 170 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index f28c9eb..b3be45d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -90,6 +90,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.LocalRunner;
 import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.utils.FunctionCommon;
@@ -500,6 +501,74 @@ public class PulsarFunctionE2ETest {
         testE2EPulsarFunction(jarFilePathUrl);
     }
 
+    @Test(timeOut = 30000)
+    private void testPulsarSinkDLQ() throws Exception {
+        final String namespacePortion = "io";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sourceTopic = "persistent://" + replNamespace + "/input";
+        final String dlqTopic = sourceTopic+"-DLQ";
+        final String sinkName = "PulsarSink-test";
+        final String propertyKey = "key";
+        final String propertyValue = "value";
+        final String subscriptionName = "test-sub";
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
+        // 1 create producer、DLQ consumer
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(dlqTopic).subscriptionName(subscriptionName).subscribe();
+
+        // 2 setup sink
+        SinkConfig sinkConfig = createSinkConfig(tenant, namespacePortion, sinkName, sourceTopic, subscriptionName);
+        sinkConfig.setNegativeAckRedeliveryDelayMs(1001L);
+        sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        sinkConfig.setMaxMessageRetries(2);
+        sinkConfig.setDeadLetterTopic(dlqTopic);
+        sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().receiverQueueSize(1000).build()));
+        sinkConfig.setClassName(SinkForTest.class.getName());
+        LocalRunner localRunner = LocalRunner.builder()
+                .sinkConfig(sinkConfig)
+                .clientAuthPlugin(AuthenticationTls.class.getName())
+                .clientAuthParams(String.format("tlsCertFile:%s,tlsKeyFile:%s", TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH))
+                .useTls(true)
+                .tlsTrustCertFilePath(TLS_TRUST_CERT_FILE_PATH)
+                .tlsAllowInsecureConnection(true)
+                .tlsHostNameVerificationEnabled(false)
+                .brokerServiceUrl(pulsar.getBrokerServiceUrlTls()).build();
+
+        localRunner.start(false);
+
+        retryStrategically((test) -> {
+            try {
+                TopicStats topicStats = admin.topics().getStats(sourceTopic);
+
+                return topicStats.subscriptions.containsKey(subscriptionName)
+                        && topicStats.subscriptions.get(subscriptionName).consumers.size() == 1
+                        && topicStats.subscriptions.get(subscriptionName).consumers.get(0).availablePermits == 1000;
+
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 50, 150);
+
+        // 3 send message
+        int totalMsgs = 10;
+        for (int i = 0; i < totalMsgs; i++) {
+            producer.newMessage().property(propertyKey, propertyValue).value("fail" + i).sendAsync();
+        }
+
+        //4 All messages should enter DLQ
+        for (int i = 0; i < totalMsgs; i++) {
+            Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
+            assertNotNull(message);
+            assertEquals(message.getValue(), "fail" + i);
+        }
+
+        //clean up
+        producer.close();
+        consumer.close();
+    }
+
     private void testPulsarSinkStats(String jarFilePathUrl) throws Exception {
         final String namespacePortion = "io";
         final String replNamespace = tenant + "/" + namespacePortion;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/SinkForTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/SinkForTest.java
new file mode 100644
index 0000000..853019c
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/SinkForTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io;
+
+import java.util.Map;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+public class SinkForTest<T> implements Sink<String> {
+    @Override
+    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+
+    }
+
+    @Override
+    public void write(Record<String> record) throws Exception {
+        if (record.getValue().contains("fail")) {
+            record.fail();
+        } else {
+            record.ack();
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 5e23635..14fe9be 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -276,6 +276,10 @@ public class CmdSinks extends CmdBase {
         @Parameter(names = "--custom-schema-inputs", description = "The map of input topics to Schema types or class names (as a JSON string)")
         protected String customSchemaInputString;
 
+        @Parameter(names = "--max-redeliver-count", description = "Maximum number of times that a message will be redelivered before being sent to the dead letter queue")
+        protected Integer maxMessageRetries;
+        @Parameter(names = "--dead-letter-topic", description = "Name of the dead topic where the failing messages will be sent.")
+        protected String deadLetterTopic;
 
         @Parameter(names = "--processingGuarantees", description = "The processing guarantees (aka delivery semantics) applied to the sink", hidden = true)
         protected FunctionConfig.ProcessingGuarantees DEPRECATED_processingGuarantees;
@@ -314,6 +318,8 @@ public class CmdSinks extends CmdBase {
         protected Boolean autoAck;
         @Parameter(names = "--timeout-ms", description = "The message timeout in milliseconds")
         protected Long timeoutMs;
+        @Parameter(names = "--negative-ack-redelivery-delay-ms", description = "The negative ack message redelivery delay in milliseconds")
+        protected Long negativeAckRedeliveryDelayMs;
         @Parameter(names = "--custom-runtime-options", description = "A string that encodes options to customize the runtime, see docs for configured runtime for details")
         protected String customRuntimeOptions;
 
@@ -380,6 +386,11 @@ public class CmdSinks extends CmdBase {
                 sinkConfig.setTopicToSchemaType(customSchemaInputMap);
             }
 
+            sinkConfig.setMaxMessageRetries(maxMessageRetries);
+            if (null != deadLetterTopic) {
+                sinkConfig.setDeadLetterTopic(deadLetterTopic);
+            }
+
             if (isNotBlank(subsName)) {
                 sinkConfig.setSourceSubscriptionName(subsName);
             }
@@ -443,9 +454,8 @@ public class CmdSinks extends CmdBase {
             if (timeoutMs != null) {
                 sinkConfig.setTimeoutMs(timeoutMs);
             }
-            
-            if (null != sinkConfigString) {
-                sinkConfig.setConfigs(parseConfigs(sinkConfigString));
+            if (negativeAckRedeliveryDelayMs != null && negativeAckRedeliveryDelayMs > 0) {
+                sinkConfig.setNegativeAckRedeliveryDelayMs(negativeAckRedeliveryDelayMs);
             }
 
             if (customRuntimeOptions != null) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
index a9c904f..a82d7ac 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
@@ -56,6 +56,10 @@ public class SinkConfig {
 
     private Map<String, ConsumerConfig> inputSpecs;
 
+    private Integer maxMessageRetries;
+
+    private String deadLetterTopic;
+
     private Map<String, Object> configs;
     // This is a map of secretName(aka how the secret is going to be
     // accessed in the function via context) to an object that
@@ -69,6 +73,7 @@ public class SinkConfig {
     private Resources resources;
     private Boolean autoAck;
     private Long timeoutMs;
+    private Long negativeAckRedeliveryDelayMs;
     private String archive;
     // Whether the subscriptions the functions created/used should be deleted when the functions is deleted
     private Boolean cleanupSubscription;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 7bb8238..aa3a7b5 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -730,6 +730,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
             if (sourceSpec.getTimeoutMs() > 0 ) {
                 pulsarSourceConfig.setTimeoutMs(sourceSpec.getTimeoutMs());
             }
+            if (sourceSpec.getNegativeAckRedeliveryDelayMs() > 0) {
+                pulsarSourceConfig.setNegativeAckRedeliveryDelayMs(sourceSpec.getNegativeAckRedeliveryDelayMs());
+            }
 
             if (this.instanceConfig.getFunctionDetails().hasRetryDetails()) {
                 pulsarSourceConfig.setMaxMessageRetries(this.instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index f4a91fc..35d6210 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -87,7 +87,10 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
                 cb = cb.receiverQueueSize(conf.getReceiverQueueSize());
             }
             cb = cb.properties(properties);
-
+            if (pulsarSourceConfig.getNegativeAckRedeliveryDelayMs() != null
+                    && pulsarSourceConfig.getNegativeAckRedeliveryDelayMs() > 0) {
+                cb.negativeAckRedeliveryDelay(pulsarSourceConfig.getNegativeAckRedeliveryDelayMs(), TimeUnit.MILLISECONDS);
+            }
             if (pulsarSourceConfig.getTimeoutMs() != null) {
                 cb = cb.ackTimeout(pulsarSourceConfig.getTimeoutMs(), TimeUnit.MILLISECONDS);
             }
@@ -98,7 +101,7 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
                 if (pulsarSourceConfig.getDeadLetterTopic() != null && !pulsarSourceConfig.getDeadLetterTopic().isEmpty()) {
                     deadLetterPolicyBuilder.deadLetterTopic(pulsarSourceConfig.getDeadLetterTopic());
                 }
-                cb = cb.deadLetterPolicy(deadLetterPolicyBuilder.build());
+                cb = cb.enableRetry(true).deadLetterPolicy(deadLetterPolicyBuilder.build());
             }
 
             Consumer<T> consumer = cb.subscribeAsync().join();
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
index 81b35eb..c47d810 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
@@ -47,6 +47,7 @@ public class PulsarSourceConfig {
 
     private String typeClassName;
     private Long timeoutMs;
+    private Long negativeAckRedeliveryDelayMs;
 
     public static PulsarSourceConfig load(Map<String, Object> map) throws IOException {
         ObjectMapper mapper = ObjectMapperFactory.getThreadLocal();
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 3170e6e..8229de2 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -360,7 +360,9 @@ public class LocalRunner {
                 instanceConfig.setMaxBufferedTuples(1024);
                 instanceConfig.setPort(FunctionCommon.findAvailablePort());
                 instanceConfig.setClusterName("local");
-                instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
+                if (functionConfig != null) {
+                    instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
+                }
                 RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
                         instanceConfig,
                         userCodeFile,
@@ -420,7 +422,9 @@ public class LocalRunner {
             instanceConfig.setMaxBufferedTuples(1024);
             instanceConfig.setPort(FunctionCommon.findAvailablePort());
             instanceConfig.setClusterName("local");
-            instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
+            if (functionConfig != null) {
+                instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
+            }
             RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
                     instanceConfig,
                     userCodeFile,
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index d2ad028..97215bd 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -119,6 +119,7 @@ message SourceSpec {
     string subscriptionName = 9;
     bool cleanupSubscription = 11;
     SubscriptionPosition subscriptionPosition = 12;
+    uint64 negativeAckRedeliveryDelayMs = 13;
 }
 
 message SinkSpec {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index ec027a4..d430515 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -168,6 +168,9 @@ public class SinkConfigUtils {
         if (sinkConfig.getTimeoutMs() != null) {
             sourceSpecBuilder.setTimeoutMs(sinkConfig.getTimeoutMs());
         }
+        if (sinkConfig.getNegativeAckRedeliveryDelayMs() != null && sinkConfig.getNegativeAckRedeliveryDelayMs() > 0) {
+            sourceSpecBuilder.setNegativeAckRedeliveryDelayMs(sinkConfig.getNegativeAckRedeliveryDelayMs());
+        }
 
         if (sinkConfig.getCleanupSubscription() != null) {
             sourceSpecBuilder.setCleanupSubscription(sinkConfig.getCleanupSubscription());
@@ -183,6 +186,15 @@ public class SinkConfigUtils {
 
         functionDetailsBuilder.setSource(sourceSpecBuilder);
 
+        if (sinkConfig.getMaxMessageRetries() != null && sinkConfig.getMaxMessageRetries() > 0) {
+            Function.RetryDetails.Builder retryDetails = Function.RetryDetails.newBuilder();
+            retryDetails.setMaxMessageRetries(sinkConfig.getMaxMessageRetries());
+            if (StringUtils.isNotBlank(sinkConfig.getDeadLetterTopic())) {
+                retryDetails.setDeadLetterTopic(sinkConfig.getDeadLetterTopic());
+            }
+            functionDetailsBuilder.setRetryDetails(retryDetails);
+        }
+
         // set up sink spec
         Function.SinkSpec.Builder sinkSpecBuilder = Function.SinkSpec.newBuilder();
         if (sinkDetails.getSinkClassName() != null) {
@@ -264,6 +276,9 @@ public class SinkConfigUtils {
         if (functionDetails.getSource().getTimeoutMs() != 0) {
             sinkConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
         }
+        if (functionDetails.getSource().getNegativeAckRedeliveryDelayMs() > 0) {
+            sinkConfig.setNegativeAckRedeliveryDelayMs(functionDetails.getSource().getNegativeAckRedeliveryDelayMs());
+        }
         if (!isEmpty(functionDetails.getSink().getClassName())) {
             sinkConfig.setClassName(functionDetails.getSink().getClassName());
         }
@@ -301,6 +316,12 @@ public class SinkConfigUtils {
         if (!isEmpty(functionDetails.getCustomRuntimeOptions())) {
             sinkConfig.setCustomRuntimeOptions(functionDetails.getCustomRuntimeOptions());
         }
+        if (functionDetails.hasRetryDetails()) {
+            sinkConfig.setMaxMessageRetries(functionDetails.getRetryDetails().getMaxMessageRetries());
+            if (StringUtils.isNotBlank(functionDetails.getRetryDetails().getDeadLetterTopic())) {
+                sinkConfig.setDeadLetterTopic(functionDetails.getRetryDetails().getDeadLetterTopic());
+            }
+        }
 
         return sinkConfig;
     }