You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/07/24 20:31:25 UTC

[pulsar] branch master updated: Allow ability to specify retain key ordering in functions (#7647)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ff4c3f6  Allow ability to specify retain key ordering in functions (#7647)
ff4c3f6 is described below

commit ff4c3f65367526d3bdbcd51a55c7be675ed10de3
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri Jul 24 13:31:04 2020 -0700

    Allow ability to specify retain key ordering in functions (#7647)
    
    * Allow ability to specify retain key ordering in functions
    
    * Address comments
    
    Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
 .../pulsar/common/functions/FunctionConfig.java    |  4 ++
 .../functions/instance/JavaInstanceRunnable.java   |  3 ++
 .../proto/src/main/proto/Function.proto            |  3 ++
 .../functions/utils/FunctionConfigUtils.java       | 53 ++++++++++++++++------
 .../functions/utils/FunctionConfigUtilsTest.java   | 10 ++++
 5 files changed, 60 insertions(+), 13 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
index 97de6ad..8c680e8 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
@@ -86,7 +86,11 @@ public class FunctionConfig {
     private String outputSerdeClassName;
     private String logTopic;
     private ProcessingGuarantees processingGuarantees;
+    // Do we want function instances to process data in the same order as in the input topics
+    // This essentially means that every partition of input topic is consumed by only one instance
     private Boolean retainOrdering;
+    // Do we want the same function instance to process all data keyed by the input topic's message key
+    private Boolean retainKeyOrdering;
     private Boolean forwardSourceMessageProperty;
     private Map<String, Object> userConfig;
     // This is a map of secretName(aka how the secret is going to be
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 753cc17..06dbf19 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -722,6 +722,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
                 case FAILOVER:
                     pulsarSourceConfig.setSubscriptionType(SubscriptionType.Failover);
                     break;
+                case KEY_SHARED:
+                    pulsarSourceConfig.setSubscriptionType(SubscriptionType.Key_Shared);
+                    break;
                 default:
                     pulsarSourceConfig.setSubscriptionType(SubscriptionType.Shared);
                     break;
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index db834a5..f39fdfd 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -32,6 +32,7 @@ enum ProcessingGuarantees {
 enum SubscriptionType {
     SHARED = 0;
     FAILOVER = 1;
+    KEY_SHARED = 2;
 }
 
 enum SubscriptionPosition {
@@ -84,6 +85,8 @@ message FunctionDetails {
     /* If specified, this will refer to an archive that is
      * already present in the server */
     string builtin = 20;
+    bool retainOrdering = 21;
+    bool retainKeyOrdering = 22;
 }
 
 message ConsumerSpec {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 169ffc1..342ba27 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -138,11 +138,16 @@ public class FunctionConfigUtils {
             });
         }
 
-        // Set subscription type based on ordering and EFFECTIVELY_ONCE semantics
-        Function.SubscriptionType subType = ((functionConfig.getRetainOrdering() != null && functionConfig.getRetainOrdering())
-                || FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE.equals(functionConfig.getProcessingGuarantees()))
-                ? Function.SubscriptionType.FAILOVER
-                : Function.SubscriptionType.SHARED;
+        // Set subscription type
+        Function.SubscriptionType subType;
+        if ((functionConfig.getRetainOrdering() != null && functionConfig.getRetainOrdering())
+                || FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE.equals(functionConfig.getProcessingGuarantees())) {
+            subType = Function.SubscriptionType.FAILOVER;
+        } else if (functionConfig.getRetainKeyOrdering() != null && functionConfig.getRetainKeyOrdering()) {
+            subType = Function.SubscriptionType.KEY_SHARED;
+        } else {
+            subType = Function.SubscriptionType.SHARED;
+        }
         sourceSpecBuilder.setSubscriptionType(subType);
 
         if (isNotBlank(functionConfig.getSubName())) {
@@ -214,6 +219,12 @@ public class FunctionConfigUtils {
             functionDetailsBuilder.setProcessingGuarantees(
                     FunctionCommon.convertProcessingGuarantee(functionConfig.getProcessingGuarantees()));
         }
+        if (functionConfig.getRetainKeyOrdering() != null) {
+            functionDetailsBuilder.setRetainKeyOrdering(functionConfig.getRetainKeyOrdering());
+        }
+        if (functionConfig.getRetainOrdering() != null) {
+            functionDetailsBuilder.setRetainOrdering(functionConfig.getRetainOrdering());
+        }
 
         if (functionConfig.getMaxMessageRetries() != null && functionConfig.getMaxMessageRetries() >= 0) {
             Function.RetryDetails.Builder retryBuilder = Function.RetryDetails.newBuilder();
@@ -315,13 +326,9 @@ public class FunctionConfigUtils {
         if (!isEmpty(functionDetails.getSource().getSubscriptionName())) {
             functionConfig.setSubName(functionDetails.getSource().getSubscriptionName());
         }
-        if (functionDetails.getSource().getSubscriptionType() == Function.SubscriptionType.FAILOVER) {
-            functionConfig.setRetainOrdering(true);
-            functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
-        } else {
-            functionConfig.setRetainOrdering(false);
-            functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
-        }
+        functionConfig.setRetainOrdering(functionDetails.getRetainOrdering());
+        functionConfig.setRetainKeyOrdering(functionDetails.getRetainKeyOrdering());
+
         functionConfig.setCleanupSubscription(functionDetails.getSource().getCleanupSubscription());
         functionConfig.setAutoAck(functionDetails.getAutoAck());
         if (functionDetails.getSource().getTimeoutMs() != 0) {
@@ -341,7 +348,6 @@ public class FunctionConfigUtils {
         }
         functionConfig.setForwardSourceMessageProperty(functionDetails.getSink().getForwardSourceMessageProperty());
         functionConfig.setRuntime(FunctionCommon.convertRuntime(functionDetails.getRuntime()));
-        functionConfig.setProcessingGuarantees(FunctionCommon.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
         if (functionDetails.hasRetryDetails()) {
             functionConfig.setMaxMessageRetries(functionDetails.getRetryDetails().getMaxMessageRetries());
             if (!isEmpty(functionDetails.getRetryDetails().getDeadLetterTopic())) {
@@ -532,6 +538,10 @@ public class FunctionConfigUtils {
         if (functionConfig.getMaxMessageRetries() != null && functionConfig.getMaxMessageRetries() >= 0) {
             throw new IllegalArgumentException("Message retries not yet supported in python");
         }
+
+        if (functionConfig.getRetainKeyOrdering() != null && functionConfig.getRetainKeyOrdering()) {
+            throw new IllegalArgumentException("Retain Key Orderering not yet supported in python");
+        }
     }
 
     private static void doGolangChecks(FunctionConfig functionConfig) {
@@ -546,6 +556,10 @@ public class FunctionConfigUtils {
         if (functionConfig.getMaxMessageRetries() != null && functionConfig.getMaxMessageRetries() >= 0) {
             throw new IllegalArgumentException("Message retries not yet supported in Go function");
         }
+
+        if (functionConfig.getRetainKeyOrdering() != null && functionConfig.getRetainKeyOrdering()) {
+            throw new IllegalArgumentException("Retain Key Orderering not yet supported in Go function");
+        }
     }
 
     private static void verifyNoTopicClash(Collection<String> inputTopics, String outputTopic) throws IllegalArgumentException {
@@ -639,6 +653,16 @@ public class FunctionConfigUtils {
         if ((functionConfig.getMaxMessageRetries() == null || functionConfig.getMaxMessageRetries() < 0) && !org.apache.commons.lang3.StringUtils.isEmpty(functionConfig.getDeadLetterTopic())) {
             throw new IllegalArgumentException("Dead Letter Topic specified, however max retries is set to infinity");
         }
+        if (functionConfig.getRetainKeyOrdering() != null
+                && functionConfig.getRetainKeyOrdering()
+                && functionConfig.getProcessingGuarantees() != null
+                && functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+            throw new IllegalArgumentException("When effectively once processing guarantee is specified, retain Key ordering cannot be set");
+        }
+        if (functionConfig.getRetainKeyOrdering() != null && functionConfig.getRetainKeyOrdering()
+                && functionConfig.getRetainOrdering() != null && functionConfig.getRetainOrdering()) {
+            throw new IllegalArgumentException("Only one of retain ordering or retain key ordering can be set");
+        }
 
         if (!isEmpty(functionConfig.getPy()) && !org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(functionConfig.getPy())
                 && functionConfig.getPy().startsWith(BUILTIN)) {
@@ -804,6 +828,9 @@ public class FunctionConfigUtils {
         if (newConfig.getRetainOrdering() != null && !newConfig.getRetainOrdering().equals(existingConfig.getRetainOrdering())) {
             throw new IllegalArgumentException("Retain Ordering cannot be altered");
         }
+        if (newConfig.getRetainKeyOrdering() != null && !newConfig.getRetainKeyOrdering().equals(existingConfig.getRetainKeyOrdering())) {
+            throw new IllegalArgumentException("Retain Key Ordering cannot be altered");
+        }
         if (!StringUtils.isEmpty(newConfig.getOutput())) {
             mergedConfig.setOutput(newConfig.getOutput());
         }
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index 280db6c..9d8c0bc 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -60,6 +60,7 @@ public class FunctionConfigUtilsTest {
         functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
         functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
         functionConfig.setRetainOrdering(false);
+        functionConfig.setRetainKeyOrdering(false);
         functionConfig.setForwardSourceMessageProperty(true);
         functionConfig.setUserConfig(new HashMap<>());
         functionConfig.setAutoAck(true);
@@ -94,6 +95,7 @@ public class FunctionConfigUtilsTest {
         functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
         functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
         functionConfig.setRetainOrdering(false);
+        functionConfig.setRetainKeyOrdering(false);
         functionConfig.setForwardSourceMessageProperty(true);
         functionConfig.setUserConfig(new HashMap<>());
         functionConfig.setAutoAck(true);
@@ -232,6 +234,13 @@ public class FunctionConfigUtilsTest {
         FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
     }
 
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Retain Key Ordering cannot be altered")
+    public void testMergeDifferentRetainKeyOrdering() {
+        FunctionConfig functionConfig = createFunctionConfig();
+        FunctionConfig newFunctionConfig = createUpdatedFunctionConfig("retainKeyOrdering", true);
+        FunctionConfigUtils.validateUpdate(functionConfig, newFunctionConfig);
+    }
+
     @Test
     public void testMergeDifferentUserConfig() {
         FunctionConfig functionConfig = createFunctionConfig();
@@ -423,6 +432,7 @@ public class FunctionConfigUtilsTest {
         functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
         functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
         functionConfig.setRetainOrdering(false);
+        functionConfig.setRetainKeyOrdering(false);
         functionConfig.setForwardSourceMessageProperty(false);
         functionConfig.setUserConfig(new HashMap<>());
         functionConfig.setAutoAck(true);