You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/07/24 20:31:25 UTC
[pulsar] branch master updated: Allow ability to specify retain key
ordering in functions (#7647)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ff4c3f6 Allow ability to specify retain key ordering in functions (#7647)
ff4c3f6 is described below
commit ff4c3f65367526d3bdbcd51a55c7be675ed10de3
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri Jul 24 13:31:04 2020 -0700
Allow ability to specify retain key ordering in functions (#7647)
* Allow ability to specify retain key ordering in functions
* Address comments
Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
.../pulsar/common/functions/FunctionConfig.java | 4 ++
.../functions/instance/JavaInstanceRunnable.java | 3 ++
.../proto/src/main/proto/Function.proto | 3 ++
.../functions/utils/FunctionConfigUtils.java | 53 ++++++++++++++++------
.../functions/utils/FunctionConfigUtilsTest.java | 10 ++++
5 files changed, 60 insertions(+), 13 deletions(-)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
index 97de6ad..8c680e8 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
@@ -86,7 +86,11 @@ public class FunctionConfig {
private String outputSerdeClassName;
private String logTopic;
private ProcessingGuarantees processingGuarantees;
+ // Do we want function instances to process data in the same order as in the input topics
+ // This essentially means that every partition of input topic is consumed by only one instance
private Boolean retainOrdering;
+ // Do we want the same function instance to process all data keyed by the input topic's message key
+ private Boolean retainKeyOrdering;
private Boolean forwardSourceMessageProperty;
private Map<String, Object> userConfig;
// This is a map of secretName(aka how the secret is going to be
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 753cc17..06dbf19 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -722,6 +722,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
case FAILOVER:
pulsarSourceConfig.setSubscriptionType(SubscriptionType.Failover);
break;
+ case KEY_SHARED:
+ pulsarSourceConfig.setSubscriptionType(SubscriptionType.Key_Shared);
+ break;
default:
pulsarSourceConfig.setSubscriptionType(SubscriptionType.Shared);
break;
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index db834a5..f39fdfd 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -32,6 +32,7 @@ enum ProcessingGuarantees {
enum SubscriptionType {
SHARED = 0;
FAILOVER = 1;
+ KEY_SHARED = 2;
}
enum SubscriptionPosition {
@@ -84,6 +85,8 @@ message FunctionDetails {
/* If specified, this will refer to an archive that is
* already present in the server */
string builtin = 20;
+ bool retainOrdering = 21;
+ bool retainKeyOrdering = 22;
}
message ConsumerSpec {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 169ffc1..342ba27 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -138,11 +138,16 @@ public class FunctionConfigUtils {
});
}
- // Set subscription type based on ordering and EFFECTIVELY_ONCE semantics
- Function.SubscriptionType subType = ((functionConfig.getRetainOrdering() != null && functionConfig.getRetainOrdering())
- || FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE.equals(functionConfig.getProcessingGuarantees()))
- ? Function.SubscriptionType.FAILOVER
- : Function.SubscriptionType.SHARED;
+ // Set subscription type
+ Function.SubscriptionType subType;
+ if ((functionConfig.getRetainOrdering() != null && functionConfig.getRetainOrdering())
+ || FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE.equals(functionConfig.getProcessingGuarantees())) {
+ subType = Function.SubscriptionType.FAILOVER;
+ } else if (functionConfig.getRetainKeyOrdering() != null && functionConfig.getRetainKeyOrdering()) {
+ subType = Function.SubscriptionType.KEY_SHARED;
+ } else {
+ subType = Function.SubscriptionType.SHARED;
+ }
sourceSpecBuilder.setSubscriptionType(subType);
if (isNotBlank(functionConfig.getSubName())) {
@@ -214,6 +219,12 @@ public class FunctionConfigUtils {
functionDetailsBuilder.setProcessingGuarantees(
FunctionCommon.convertProcessingGuarantee(functionConfig.getProcessingGuarantees()));
}
+ if (functionConfig.getRetainKeyOrdering() != null) {
+ functionDetailsBuilder.setRetainKeyOrdering(functionConfig.getRetainKeyOrdering());
+ }
+ if (functionConfig.getRetainOrdering() != null) {
+ functionDetailsBuilder.setRetainOrdering(functionConfig.getRetainOrdering());
+ }
if (functionConfig.getMaxMessageRetries() != null && functionConfig.getMaxMessageRetries() >= 0) {
Function.RetryDetails.Builder retryBuilder = Function.RetryDetails.newBuilder();
@@ -315,13 +326,9 @@ public class FunctionConfigUtils {
if (!isEmpty(functionDetails.getSource().getSubscriptionName())) {
functionConfig.setSubName(functionDetails.getSource().getSubscriptionName());
}
- if (functionDetails.getSource().getSubscriptionType() == Function.SubscriptionType.FAILOVER) {
- functionConfig.setRetainOrdering(true);
- functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
- } else {
- functionConfig.setRetainOrdering(false);
- functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
- }
+ functionConfig.setRetainOrdering(functionDetails.getRetainOrdering());
+ functionConfig.setRetainKeyOrdering(functionDetails.getRetainKeyOrdering());
+
functionConfig.setCleanupSubscription(functionDetails.getSource().getCleanupSubscription());
functionConfig.setAutoAck(functionDetails.getAutoAck());
if (functionDetails.getSource().getTimeoutMs() != 0) {
@@ -341,7 +348,6 @@ public class FunctionConfigUtils {
}
functionConfig.setForwardSourceMessageProperty(functionDetails.getSink().getForwardSourceMessageProperty());
functionConfig.setRuntime(FunctionCommon.convertRuntime(functionDetails.getRuntime()));
- functionConfig.setProcessingGuarantees(FunctionCommon.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
if (functionDetails.hasRetryDetails()) {
functionConfig.setMaxMessageRetries(functionDetails.getRetryDetails().getMaxMessageRetries());
if (!isEmpty(functionDetails.getRetryDetails().getDeadLetterTopic())) {
@@ -532,6 +538,10 @@ public class FunctionConfigUtils {
if (functionConfig.getMaxMessageRetries() != null && functionConfig.getMaxMessageRetries() >= 0) {
throw new IllegalArgumentException("Message retries not yet supported in python");
}
+
+ if (functionConfig.getRetainKeyOrdering() != null && functionConfig.getRetainKeyOrdering()) {
+ throw new IllegalArgumentException("Retain Key Orderering not yet supported in python");
+ }
}
private static void doGolangChecks(FunctionConfig functionConfig) {
@@ -546,6 +556,10 @@ public class FunctionConfigUtils {
if (functionConfig.getMaxMessageRetries() != null && functionConfig.getMaxMessageRetries() >= 0) {
throw new IllegalArgumentException("Message retries not yet supported in Go function");
}
+
+ if (functionConfig.getRetainKeyOrdering() != null && functionConfig.getRetainKeyOrdering()) {
+ throw new IllegalArgumentException("Retain Key Orderering not yet supported in Go function");
+ }
}
private static void verifyNoTopicClash(Collection<String> inputTopics, String outputTopic) throws IllegalArgumentException {
@@ -639,6 +653,16 @@ public class FunctionConfigUtils {
if ((functionConfig.getMaxMessageRetries() == null || functionConfig.getMaxMessageRetries() < 0) && !org.apache.commons.lang3.StringUtils.isEmpty(functionConfig.getDeadLetterTopic())) {
throw new IllegalArgumentException("Dead Letter Topic specified, however max retries is set to infinity");
}
+ if (functionConfig.getRetainKeyOrdering() != null
+ && functionConfig.getRetainKeyOrdering()
+ && functionConfig.getProcessingGuarantees() != null
+ && functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+ throw new IllegalArgumentException("When effectively once processing guarantee is specified, retain Key ordering cannot be set");
+ }
+ if (functionConfig.getRetainKeyOrdering() != null && functionConfig.getRetainKeyOrdering()
+ && functionConfig.getRetainOrdering() != null && functionConfig.getRetainOrdering()) {
+ throw new IllegalArgumentException("Only one of retain ordering or retain key ordering can be set");
+ }
if (!isEmpty(functionConfig.getPy()) && !org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionConfig.getPy())
&& functionConfig.getPy().startsWith(BUILTIN)) {
@@ -804,6 +828,9 @@ public class FunctionConfigUtils {
if (newConfig.getRetainOrdering() != null && !newConfig.getRetainOrdering().equals(existingConfig.getRetainOrdering())) {
throw new IllegalArgumentException("Retain Ordering cannot be altered");
}
+ if (newConfig.getRetainKeyOrdering() != null && !newConfig.getRetainKeyOrdering().equals(existingConfig.getRetainKeyOrdering())) {
+ throw new IllegalArgumentException("Retain Key Ordering cannot be altered");
+ }
if (!StringUtils.isEmpty(newConfig.getOutput())) {
mergedConfig.setOutput(newConfig.getOutput());
}
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index 280db6c..9d8c0bc 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -60,6 +60,7 @@ public class FunctionConfigUtilsTest {
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
functionConfig.setRetainOrdering(false);
+ functionConfig.setRetainKeyOrdering(false);
functionConfig.setForwardSourceMessageProperty(true);
functionConfig.setUserConfig(new HashMap<>());
functionConfig.setAutoAck(true);
@@ -94,6 +95,7 @@ public class FunctionConfigUtilsTest {
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
functionConfig.setRetainOrdering(false);
+ functionConfig.setRetainKeyOrdering(false);
functionConfig.setForwardSourceMessageProperty(true);
functionConfig.setUserConfig(new HashMap<>());
functionConfig.setAutoAck(true);
@@ -232,6 +234,13 @@ public class FunctionConfigUtilsTest {
FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
}
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Retain Key Ordering cannot be altered")
+ public void testMergeDifferentRetainKeyOrdering() {
+ FunctionConfig functionConfig = createFunctionConfig();
+ FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("retainKeyOrdering", true);
+ FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+ }
+
@Test
public void testMergeDifferentUserConfig() {
FunctionConfig functionConfig = createFunctionConfig();
@@ -423,6 +432,7 @@ public class FunctionConfigUtilsTest {
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
functionConfig.setRetainOrdering(false);
+ functionConfig.setRetainKeyOrdering(false);
functionConfig.setForwardSourceMessageProperty(false);
functionConfig.setUserConfig(new HashMap<>());
functionConfig.setAutoAck(true);