You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/30 09:36:39 UTC
[pulsar] branch master updated: fix Pulsar supporting DLQ for
sources/sinks #7032 (#7094)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 36df3bb fix Pulsar supporting DLQ for sources/sinks #7032 (#7094)
36df3bb is described below
commit 36df3bb3e28e4bc7f98e5788ea1680ddcce45997
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Sat May 30 17:36:27 2020 +0800
fix Pulsar supporting DLQ for sources/sinks #7032 (#7094)
Fixes #7032
### Motivation
Source data flow:
Custom Source-> Function-> Pulsar producer-> Broker
Sink data flow:
Broker-> Pulsar consumer-> Function-> Custom Sink
Pulsar consumer is only used in sink mode, so it is necessary to add DLQ parameters for sink mode
---
.../apache/pulsar/io/PulsarFunctionE2ETest.java | 69 ++++++++++++++++++++++
.../java/org/apache/pulsar/io/SinkForTest.java | 46 +++++++++++++++
.../java/org/apache/pulsar/admin/cli/CmdSinks.java | 16 ++++-
.../org/apache/pulsar/common/io/SinkConfig.java | 5 ++
.../functions/instance/JavaInstanceRunnable.java | 3 +
.../pulsar/functions/source/PulsarSource.java | 7 ++-
.../functions/source/PulsarSourceConfig.java | 1 +
.../org/apache/pulsar/functions/LocalRunner.java | 8 ++-
.../proto/src/main/proto/Function.proto | 1 +
.../pulsar/functions/utils/SinkConfigUtils.java | 21 +++++++
10 files changed, 170 insertions(+), 7 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index f28c9eb..b3be45d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -90,6 +90,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.LocalRunner;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.utils.FunctionCommon;
@@ -500,6 +501,74 @@ public class PulsarFunctionE2ETest {
testE2EPulsarFunction(jarFilePathUrl);
}
+ @Test(timeOut = 30000)
+ private void testPulsarSinkDLQ() throws Exception {
+ final String namespacePortion = "io";
+ final String replNamespace = tenant + "/" + namespacePortion;
+ final String sourceTopic = "persistent://" + replNamespace + "/input";
+ final String dlqTopic = sourceTopic+"-DLQ";
+ final String sinkName = "PulsarSink-test";
+ final String propertyKey = "key";
+ final String propertyValue = "value";
+ final String subscriptionName = "test-sub";
+ admin.namespaces().createNamespace(replNamespace);
+ Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+ admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
+ // 1 create producer、DLQ consumer
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(dlqTopic).subscriptionName(subscriptionName).subscribe();
+
+ // 2 setup sink
+ SinkConfig sinkConfig = createSinkConfig(tenant, namespacePortion, sinkName, sourceTopic, subscriptionName);
+ sinkConfig.setNegativeAckRedeliveryDelayMs(1001L);
+ sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+ sinkConfig.setMaxMessageRetries(2);
+ sinkConfig.setDeadLetterTopic(dlqTopic);
+ sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().receiverQueueSize(1000).build()));
+ sinkConfig.setClassName(SinkForTest.class.getName());
+ LocalRunner localRunner = LocalRunner.builder()
+ .sinkConfig(sinkConfig)
+ .clientAuthPlugin(AuthenticationTls.class.getName())
+ .clientAuthParams(String.format("tlsCertFile:%s,tlsKeyFile:%s", TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH))
+ .useTls(true)
+ .tlsTrustCertFilePath(TLS_TRUST_CERT_FILE_PATH)
+ .tlsAllowInsecureConnection(true)
+ .tlsHostNameVerificationEnabled(false)
+ .brokerServiceUrl(pulsar.getBrokerServiceUrlTls()).build();
+
+ localRunner.start(false);
+
+ retryStrategically((test) -> {
+ try {
+ TopicStats topicStats = admin.topics().getStats(sourceTopic);
+
+ return topicStats.subscriptions.containsKey(subscriptionName)
+ && topicStats.subscriptions.get(subscriptionName).consumers.size() == 1
+ && topicStats.subscriptions.get(subscriptionName).consumers.get(0).availablePermits == 1000;
+
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 50, 150);
+
+ // 3 send message
+ int totalMsgs = 10;
+ for (int i = 0; i < totalMsgs; i++) {
+ producer.newMessage().property(propertyKey, propertyValue).value("fail" + i).sendAsync();
+ }
+
+ //4 All messages should enter DLQ
+ for (int i = 0; i < totalMsgs; i++) {
+ Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
+ assertNotNull(message);
+ assertEquals(message.getValue(), "fail" + i);
+ }
+
+ //clean up
+ producer.close();
+ consumer.close();
+ }
+
private void testPulsarSinkStats(String jarFilePathUrl) throws Exception {
final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/SinkForTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/SinkForTest.java
new file mode 100644
index 0000000..853019c
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/SinkForTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io;
+
+import java.util.Map;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+public class SinkForTest<T> implements Sink<String> {
+ @Override
+ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+
+ }
+
+ @Override
+ public void write(Record<String> record) throws Exception {
+ if (record.getValue().contains("fail")) {
+ record.fail();
+ } else {
+ record.ack();
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 5e23635..14fe9be 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -276,6 +276,10 @@ public class CmdSinks extends CmdBase {
@Parameter(names = "--custom-schema-inputs", description = "The map of input topics to Schema types or class names (as a JSON string)")
protected String customSchemaInputString;
+ @Parameter(names = "--max-redeliver-count", description = "Maximum number of times that a message will be redelivered before being sent to the dead letter queue")
+ protected Integer maxMessageRetries;
+ @Parameter(names = "--dead-letter-topic", description = "Name of the dead topic where the failing messages will be sent.")
+ protected String deadLetterTopic;
@Parameter(names = "--processingGuarantees", description = "The processing guarantees (aka delivery semantics) applied to the sink", hidden = true)
protected FunctionConfig.ProcessingGuarantees DEPRECATED_processingGuarantees;
@@ -314,6 +318,8 @@ public class CmdSinks extends CmdBase {
protected Boolean autoAck;
@Parameter(names = "--timeout-ms", description = "The message timeout in milliseconds")
protected Long timeoutMs;
+ @Parameter(names = "--negative-ack-redelivery-delay-ms", description = "The negative ack message redelivery delay in milliseconds")
+ protected Long negativeAckRedeliveryDelayMs;
@Parameter(names = "--custom-runtime-options", description = "A string that encodes options to customize the runtime, see docs for configured runtime for details")
protected String customRuntimeOptions;
@@ -380,6 +386,11 @@ public class CmdSinks extends CmdBase {
sinkConfig.setTopicToSchemaType(customSchemaInputMap);
}
+ sinkConfig.setMaxMessageRetries(maxMessageRetries);
+ if (null != deadLetterTopic) {
+ sinkConfig.setDeadLetterTopic(deadLetterTopic);
+ }
+
if (isNotBlank(subsName)) {
sinkConfig.setSourceSubscriptionName(subsName);
}
@@ -443,9 +454,8 @@ public class CmdSinks extends CmdBase {
if (timeoutMs != null) {
sinkConfig.setTimeoutMs(timeoutMs);
}
-
- if (null != sinkConfigString) {
- sinkConfig.setConfigs(parseConfigs(sinkConfigString));
+ if (negativeAckRedeliveryDelayMs != null && negativeAckRedeliveryDelayMs > 0) {
+ sinkConfig.setNegativeAckRedeliveryDelayMs(negativeAckRedeliveryDelayMs);
}
if (customRuntimeOptions != null) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
index a9c904f..a82d7ac 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
@@ -56,6 +56,10 @@ public class SinkConfig {
private Map<String, ConsumerConfig> inputSpecs;
+ private Integer maxMessageRetries;
+
+ private String deadLetterTopic;
+
private Map<String, Object> configs;
// This is a map of secretName(aka how the secret is going to be
// accessed in the function via context) to an object that
@@ -69,6 +73,7 @@ public class SinkConfig {
private Resources resources;
private Boolean autoAck;
private Long timeoutMs;
+ private Long negativeAckRedeliveryDelayMs;
private String archive;
// Whether the subscriptions the functions created/used should be deleted when the functions is deleted
private Boolean cleanupSubscription;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 7bb8238..aa3a7b5 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -730,6 +730,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
if (sourceSpec.getTimeoutMs() > 0 ) {
pulsarSourceConfig.setTimeoutMs(sourceSpec.getTimeoutMs());
}
+ if (sourceSpec.getNegativeAckRedeliveryDelayMs() > 0) {
+ pulsarSourceConfig.setNegativeAckRedeliveryDelayMs(sourceSpec.getNegativeAckRedeliveryDelayMs());
+ }
if (this.instanceConfig.getFunctionDetails().hasRetryDetails()) {
pulsarSourceConfig.setMaxMessageRetries(this.instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index f4a91fc..35d6210 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -87,7 +87,10 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
cb = cb.receiverQueueSize(conf.getReceiverQueueSize());
}
cb = cb.properties(properties);
-
+ if (pulsarSourceConfig.getNegativeAckRedeliveryDelayMs() != null
+ && pulsarSourceConfig.getNegativeAckRedeliveryDelayMs() > 0) {
+ cb.negativeAckRedeliveryDelay(pulsarSourceConfig.getNegativeAckRedeliveryDelayMs(), TimeUnit.MILLISECONDS);
+ }
if (pulsarSourceConfig.getTimeoutMs() != null) {
cb = cb.ackTimeout(pulsarSourceConfig.getTimeoutMs(), TimeUnit.MILLISECONDS);
}
@@ -98,7 +101,7 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
if (pulsarSourceConfig.getDeadLetterTopic() != null && !pulsarSourceConfig.getDeadLetterTopic().isEmpty()) {
deadLetterPolicyBuilder.deadLetterTopic(pulsarSourceConfig.getDeadLetterTopic());
}
- cb = cb.deadLetterPolicy(deadLetterPolicyBuilder.build());
+ cb = cb.enableRetry(true).deadLetterPolicy(deadLetterPolicyBuilder.build());
}
Consumer<T> consumer = cb.subscribeAsync().join();
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
index 81b35eb..c47d810 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
@@ -47,6 +47,7 @@ public class PulsarSourceConfig {
private String typeClassName;
private Long timeoutMs;
+ private Long negativeAckRedeliveryDelayMs;
public static PulsarSourceConfig load(Map<String, Object> map) throws IOException {
ObjectMapper mapper = ObjectMapperFactory.getThreadLocal();
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 3170e6e..8229de2 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -360,7 +360,9 @@ public class LocalRunner {
instanceConfig.setMaxBufferedTuples(1024);
instanceConfig.setPort(FunctionCommon.findAvailablePort());
instanceConfig.setClusterName("local");
- instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
+ if (functionConfig != null) {
+ instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
+ }
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
instanceConfig,
userCodeFile,
@@ -420,7 +422,9 @@ public class LocalRunner {
instanceConfig.setMaxBufferedTuples(1024);
instanceConfig.setPort(FunctionCommon.findAvailablePort());
instanceConfig.setClusterName("local");
- instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
+ if (functionConfig != null) {
+ instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
+ }
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
instanceConfig,
userCodeFile,
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index d2ad028..97215bd 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -119,6 +119,7 @@ message SourceSpec {
string subscriptionName = 9;
bool cleanupSubscription = 11;
SubscriptionPosition subscriptionPosition = 12;
+ uint64 negativeAckRedeliveryDelayMs = 13;
}
message SinkSpec {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index ec027a4..d430515 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -168,6 +168,9 @@ public class SinkConfigUtils {
if (sinkConfig.getTimeoutMs() != null) {
sourceSpecBuilder.setTimeoutMs(sinkConfig.getTimeoutMs());
}
+ if (sinkConfig.getNegativeAckRedeliveryDelayMs() != null && sinkConfig.getNegativeAckRedeliveryDelayMs() > 0) {
+ sourceSpecBuilder.setNegativeAckRedeliveryDelayMs(sinkConfig.getNegativeAckRedeliveryDelayMs());
+ }
if (sinkConfig.getCleanupSubscription() != null) {
sourceSpecBuilder.setCleanupSubscription(sinkConfig.getCleanupSubscription());
@@ -183,6 +186,15 @@ public class SinkConfigUtils {
functionDetailsBuilder.setSource(sourceSpecBuilder);
+ if (sinkConfig.getMaxMessageRetries() != null && sinkConfig.getMaxMessageRetries() > 0) {
+ Function.RetryDetails.Builder retryDetails = Function.RetryDetails.newBuilder();
+ retryDetails.setMaxMessageRetries(sinkConfig.getMaxMessageRetries());
+ if (StringUtils.isNotBlank(sinkConfig.getDeadLetterTopic())) {
+ retryDetails.setDeadLetterTopic(sinkConfig.getDeadLetterTopic());
+ }
+ functionDetailsBuilder.setRetryDetails(retryDetails);
+ }
+
// set up sink spec
Function.SinkSpec.Builder sinkSpecBuilder = Function.SinkSpec.newBuilder();
if (sinkDetails.getSinkClassName() != null) {
@@ -264,6 +276,9 @@ public class SinkConfigUtils {
if (functionDetails.getSource().getTimeoutMs() != 0) {
sinkConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
}
+ if (functionDetails.getSource().getNegativeAckRedeliveryDelayMs() > 0) {
+ sinkConfig.setNegativeAckRedeliveryDelayMs(functionDetails.getSource().getNegativeAckRedeliveryDelayMs());
+ }
if (!isEmpty(functionDetails.getSink().getClassName())) {
sinkConfig.setClassName(functionDetails.getSink().getClassName());
}
@@ -301,6 +316,12 @@ public class SinkConfigUtils {
if (!isEmpty(functionDetails.getCustomRuntimeOptions())) {
sinkConfig.setCustomRuntimeOptions(functionDetails.getCustomRuntimeOptions());
}
+ if (functionDetails.hasRetryDetails()) {
+ sinkConfig.setMaxMessageRetries(functionDetails.getRetryDetails().getMaxMessageRetries());
+ if (StringUtils.isNotBlank(functionDetails.getRetryDetails().getDeadLetterTopic())) {
+ sinkConfig.setDeadLetterTopic(functionDetails.getRetryDetails().getDeadLetterTopic());
+ }
+ }
return sinkConfig;
}