You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/09/19 17:01:26 UTC
[incubator-pulsar] branch master updated: Add support for dead
letter topics for java functions (#2606)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 82aa2b8 Add support for dead letter topics for java functions (#2606)
82aa2b8 is described below
commit 82aa2b83359c31f71eae40bb8f068ce703f08b59
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Wed Sep 19 10:01:21 2018 -0700
Add support for dead letter topics for java functions (#2606)
* Added ability to specify dead letter topic to functions
* Fix bug
* Added an example function that fails on a particular message consistently
* Revert change
---
.../org/apache/pulsar/admin/cli/CmdFunctions.java | 25 ++++++++++++--
.../functions/instance/JavaInstanceRunnable.java | 5 +++
.../pulsar/functions/source/PulsarSource.java | 18 +++++-----
.../functions/source/PulsarSourceConfig.java | 2 ++
.../api/examples/ConsistentlyFailingFunction.java | 38 ++++++++++++++++++++++
.../proto/src/main/proto/Function.proto | 6 ++++
.../pulsar/functions/utils/FunctionConfig.java | 2 ++
.../functions/utils/validation/ValidatorImpls.java | 12 +++++++
8 files changed, 97 insertions(+), 11 deletions(-)
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index e8c8740..5284b59 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -24,7 +24,7 @@ import static java.util.Objects.isNull;
import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import static org.apache.commons.lang.StringUtils.isBlank;
import static org.apache.commons.lang.StringUtils.isNotBlank;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.commons.lang.StringUtils.isNotEmpty;
import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
import static org.apache.pulsar.functions.utils.Utils.fileExists;
@@ -45,7 +45,6 @@ import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.MalformedURLException;
@@ -79,12 +78,12 @@ import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.Resources;
+import org.apache.pulsar.functions.proto.Function.RetryDetails;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.Function.SubscriptionType;
import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
-import org.apache.pulsar.functions.utils.ConsumerConfig;
import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.Utils;
@@ -321,6 +320,10 @@ public class CmdFunctions extends CmdBase {
protected Long DEPRECATED_timeoutMs;
@Parameter(names = "--timeout-ms", description = "The message timeout in milliseconds")
protected Long timeoutMs;
+ @Parameter(names = "--max-message-retries", description = "How many times should we try to process a message before giving up")
+ protected Integer maxMessageRetries = -1;
+ @Parameter(names = "--dead-letter-topic", description = "The topic where all messages which could not be processed successfully are sent")
+ protected String deadLetterTopic;
protected FunctionConfig functionConfig;
protected String userCodeFile;
@@ -464,6 +467,13 @@ public class CmdFunctions extends CmdBase {
functionConfig.setAutoAck(autoAck);
+ if (null != maxMessageRetries) {
+ functionConfig.setMaxMessageRetries(maxMessageRetries);
+ }
+ if (null != deadLetterTopic) {
+ functionConfig.setDeadLetterTopic(deadLetterTopic);
+ }
+
if (null != jarFile) {
functionConfig.setJar(jarFile);
}
@@ -717,6 +727,15 @@ public class CmdFunctions extends CmdBase {
Utils.convertProcessingGuarantee(functionConfig.getProcessingGuarantees()));
}
+ if (functionConfig.getMaxMessageRetries() >= 0) {
+ RetryDetails.Builder retryBuilder = RetryDetails.newBuilder();
+ retryBuilder.setMaxMessageRetries(functionConfig.getMaxMessageRetries());
+ if (isNotEmpty(functionConfig.getDeadLetterTopic())) {
+ retryBuilder.setDeadLetterTopic(functionConfig.getDeadLetterTopic());
+ }
+ functionDetailsBuilder.setRetryDetails(retryBuilder);
+ }
+
Map<String, Object> configs = new HashMap<>();
configs.putAll(functionConfig.getUserConfig());
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index b3f86ea..1e07516 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -547,6 +547,11 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
if (sourceSpec.getTimeoutMs() > 0 ) {
pulsarSourceConfig.setTimeoutMs(sourceSpec.getTimeoutMs());
}
+
+ if (this.instanceConfig.getFunctionDetails().getRetryDetails() != null) {
+ pulsarSourceConfig.setMaxMessageRetries(this.instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
+ pulsarSourceConfig.setDeadLetterTopic(this.instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic());
+ }
object = new PulsarSource(this.client, pulsarSourceConfig,
FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
} else {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 6eed8e0..afac782 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -31,14 +31,7 @@ import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageListener;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.functions.api.Record;
@@ -97,6 +90,15 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
cb.ackTimeout(pulsarSourceConfig.getTimeoutMs(), TimeUnit.MILLISECONDS);
}
+ if (pulsarSourceConfig.getMaxMessageRetries() >= 0) {
+ DeadLetterPolicy.DeadLetterPolicyBuilder deadLetterPolicyBuilder = DeadLetterPolicy.builder();
+ deadLetterPolicyBuilder.maxRedeliverCount(pulsarSourceConfig.getMaxMessageRetries());
+ if (pulsarSourceConfig.getDeadLetterTopic() != null && !pulsarSourceConfig.getDeadLetterTopic().isEmpty()) {
+ deadLetterPolicyBuilder.deadLetterTopic(pulsarSourceConfig.getDeadLetterTopic());
+ }
+ cb.deadLetterPolicy(deadLetterPolicyBuilder.build());
+ }
+
return cb.subscribeAsync();
}).collect(Collectors.toList()).stream().map(CompletableFuture::join).collect(Collectors.toList());
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
index f1cb09b..4e2afa7 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
@@ -37,6 +37,8 @@ public class PulsarSourceConfig {
private FunctionConfig.ProcessingGuarantees processingGuarantees;
SubscriptionType subscriptionType;
private String subscriptionName;
+ private int maxMessageRetries;
+ private String deadLetterTopic;
private Map<String, ConsumerConfig> topicSchema = new TreeMap<>();
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ConsistentlyFailingFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ConsistentlyFailingFunction.java
new file mode 100644
index 0000000..792a574
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ConsistentlyFailingFunction.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+
+
+/**
+ * This Function simulates a pulsar function encountering failing on a particular message.
+ */
+public class ConsistentlyFailingFunction implements Function<String, String> {
+ @Override
+ public String process(String input, Context context) {
+ if (input.equals("FAIL")) {
+ throw new RuntimeException("Failed");
+ } else {
+ return "SUCCESS";
+ }
+ }
+}
+
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index a76cf8d..482d901 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -40,6 +40,11 @@ message Resources {
int64 disk = 3;
}
+message RetryDetails {
+ int32 maxMessageRetries = 1;
+ string deadLetterTopic = 2;
+}
+
message FunctionDetails {
enum Runtime {
JAVA = 0;
@@ -59,6 +64,7 @@ message FunctionDetails {
SinkSpec sink = 12;
Resources resources = 13;
string packageUrl = 14; //present only if function submitted with package-url
+ RetryDetails retryDetails = 15;
}
message ConsumerSpec {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index 1335f8c..dc36812 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -107,6 +107,8 @@ public class FunctionConfig {
private Map<String, Object> userConfig;
private Runtime runtime;
private boolean autoAck;
+ private int maxMessageRetries;
+ private String deadLetterTopic;
private String subName;
@isPositiveNumber
private int parallelism;
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
index f60f3c0..e8acc28 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
@@ -514,6 +514,10 @@ public class ValidatorImpls {
if (functionConfig.getWindowConfig() != null) {
throw new IllegalArgumentException("There is currently no support windowing in python");
}
+
+ if (functionConfig.getMaxMessageRetries() >= 0) {
+ throw new IllegalArgumentException("Message retries not yet supported in python");
+ }
}
private static void verifyNoTopicClash(Collection<String> inputTopics, String outputTopic) throws IllegalArgumentException {
@@ -549,6 +553,14 @@ public class ValidatorImpls {
throw new IllegalArgumentException("Message timeout can only be specified with processing guarantee is "
+ FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE.name());
}
+
+ if (functionConfig.getMaxMessageRetries() >= 0
+ && functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+ throw new IllegalArgumentException("MaxMessageRetries and Effectively once don't gel well");
+ }
+ if (functionConfig.getMaxMessageRetries() < 0 && !StringUtils.isEmpty(functionConfig.getDeadLetterTopic())) {
+ throw new IllegalArgumentException("Dead Letter Topic specified, however max retries is set to infinity");
+ }
}
private static Collection<String> collectAllInputTopics(FunctionConfig functionConfig) {