You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/11/16 18:20:28 UTC

[pulsar] branch master updated: Support key_based batch builder for functions and sources (#8523)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8da9422  Support key_based batch builder for functions and sources (#8523)
8da9422 is described below

commit 8da9422e822888cb198f62b4c7cd6ab3d400a87e
Author: xiaolong ran <rx...@apache.org>
AuthorDate: Tue Nov 17 02:20:06 2020 +0800

    Support key_based batch builder for functions and sources (#8523)
    
    ### Motivation
    
    Currently, we support the Key_Shared subscription mode in Pulsar Function. In order to ensure that when batch is turned on, we can also ensure that messages are distributed to different consumers in the correct order, so we need to support the batch builder of `KEY_BASED` in Pulsar Functions.
    
    ### Modifications
    
    - Add `--batch-builder` for Pulsar Functions
    - Add `--batch-builder` for Pulsar Sources
    - Add test case
---
 .../src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java   | 6 ++++++
 .../src/main/java/org/apache/pulsar/admin/cli/CmdSources.java     | 7 +++++++
 .../java/org/apache/pulsar/common/functions/FunctionConfig.java   | 2 ++
 .../java/org/apache/pulsar/common/functions/ProducerConfig.java   | 1 +
 .../src/main/java/org/apache/pulsar/common/io/SourceConfig.java   | 2 ++
 .../main/java/org/apache/pulsar/functions/sink/PulsarSink.java    | 8 ++++++++
 pulsar-functions/proto/src/main/proto/Function.proto              | 1 +
 .../org/apache/pulsar/functions/utils/FunctionConfigUtils.java    | 6 ++++++
 .../java/org/apache/pulsar/functions/utils/SourceConfigUtils.java | 6 ++++++
 .../apache/pulsar/functions/utils/FunctionConfigUtilsTest.java    | 3 +++
 .../org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java  | 1 +
 .../tests/integration/functions/utils/CommandGenerator.java       | 7 +++++++
 12 files changed, 50 insertions(+)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index dacc839..f9670e0 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -266,6 +266,8 @@ public class CmdFunctions extends CmdBase {
         protected Boolean DEPRECATED_retainOrdering;
         @Parameter(names = "--retain-ordering", description = "Function consumes and processes messages in order")
         protected Boolean retainOrdering;
+        @Parameter(names = "--batch-builder", description = "BatcherBuilder provides two types of batch construction methods, DEFAULT and KEY_BASED. The default value is: DEFAULT")
+        protected String batchBuilder;
         @Parameter(names = "--forward-source-message-property", description = "Forwarding input message's properties to output topic when processing")
         protected Boolean forwardSourceMessageProperty = true;
         @Parameter(names = "--subs-name", description = "Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer")
@@ -419,6 +421,10 @@ public class CmdFunctions extends CmdBase {
                 functionConfig.setRetainOrdering(retainOrdering);
             }
 
+            if (isNotBlank(batchBuilder)) {
+                functionConfig.setBatchBuilder(batchBuilder);
+            }
+
             if (null != forwardSourceMessageProperty) {
                 functionConfig.setForwardSourceMessageProperty(forwardSourceMessageProperty);
             }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index fcc580a..521924b 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -274,6 +274,9 @@ public class CmdSources extends CmdBase {
         @Parameter(names = "--producer-config", description = "The custom producer configuration (as a JSON string)")
         protected String producerConfig;
 
+        @Parameter(names = "--batch-builder", description = "BatchBuilder provides two types of batch construction methods, DEFAULT and KEY_BASED. The default value is: DEFAULT")
+        protected String batchBuilder;
+
         @Parameter(names = "--deserializationClassName", description = "The SerDe classname for the source", hidden = true)
         protected String DEPRECATED_deserializationClassName;
         @Parameter(names = "--deserialization-classname", description = "The SerDe classname for the source")
@@ -360,6 +363,10 @@ public class CmdSources extends CmdBase {
                 sourceConfig.setSchemaType(schemaType);
             }
 
+            if (null != batchBuilder) {
+                sourceConfig.setBatchBuilder(batchBuilder);
+            }
+
             if (null != processingGuarantees) {
                 sourceConfig.setProcessingGuarantees(processingGuarantees);
             }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
index 9cd8ef6..c9f4645 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
@@ -95,6 +95,8 @@ public class FunctionConfig {
     private Boolean retainOrdering;
     // Do we want the same function instance to process all data keyed by the input topic's message key
     private Boolean retainKeyOrdering;
+    // batchBuilder provides two types of batch construction methods, DEFAULT and KEY_BASED
+    private String batchBuilder;
     private Boolean forwardSourceMessageProperty;
     private Map<String, Object> userConfig;
     // This is a map of secretName(aka how the secret is going to be
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
index 5b68663..0d2f4b4 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
@@ -37,4 +37,5 @@ public class ProducerConfig {
     private Integer maxPendingMessagesAcrossPartitions;
     private Boolean useThreadLocalProducers;
     private CryptoConfig cryptoConfig;
+    private String batchBuilder;
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
index 31a8634..79a5f81 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SourceConfig.java
@@ -69,4 +69,6 @@ public class SourceConfig {
 
     // If this is a BatchSource, its batch related configs are stored here
     private BatchSourceConfig batchSourceConfig;
+    // batchBuilder provides two types of batch construction methods, DEFAULT and KEY_BASED
+    private String batchBuilder;
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index f121950..9ba4059 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -24,6 +24,7 @@ import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.HashingScheme;
@@ -133,6 +134,13 @@ public class PulsarSink<T> implements Sink<T> {
                         builder.addEncryptionKey(encryptionKeyName);
                     }
                 }
+                if (producerConfig.getBatchBuilder() != null) {
+                    if (producerConfig.getBatchBuilder().equals("KEY_BASED")) {
+                        builder.batcherBuilder(BatcherBuilder.KEY_BASED);
+                    } else {
+                        builder.batcherBuilder(BatcherBuilder.DEFAULT);
+                    }
+                }
             }
             return builder.properties(properties).create();
         }
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index 4243fb6..4b840a3 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -108,6 +108,7 @@ message ProducerSpec {
     int32 maxPendingMessagesAcrossPartitions = 2;
     bool useThreadLocalProducers = 3;
     CryptoSpec cryptoSpec = 4;
+    string batchBuilder = 5;
 }
 
 message CryptoSpec {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index bdf250a..b42292f 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -226,6 +226,9 @@ public class FunctionConfigUtils {
             if (producerConf.getCryptoConfig() != null) {
                 pbldr.setCryptoSpec(CryptoUtils.convert(producerConf.getCryptoConfig()));
             }
+            if (producerConf.getBatchBuilder() != null) {
+                pbldr.setBatchBuilder(producerConf.getBatchBuilder());
+            }
             sinkSpecBuilder.setProducerSpec(pbldr.build());
         }
         functionDetailsBuilder.setSink(sinkSpecBuilder);
@@ -388,6 +391,9 @@ public class FunctionConfigUtils {
             if (spec.hasCryptoSpec()) {
                 producerConfig.setCryptoConfig(CryptoUtils.convertFromSpec(spec.getCryptoSpec()));
             }
+            if (spec.getBatchBuilder() != null) {
+                producerConfig.setBatchBuilder(spec.getBatchBuilder());
+            }
             producerConfig.setUseThreadLocalProducers(spec.getUseThreadLocalProducers());
             functionConfig.setProducerConfig(producerConfig);
         }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index aec11fe..04f524a 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -163,6 +163,9 @@ public class SourceConfigUtils {
             if (conf.getCryptoConfig() != null) {
                 pbldr.setCryptoSpec(CryptoUtils.convert(conf.getCryptoConfig()));
             }
+            if (conf.getBatchBuilder() != null) {
+                pbldr.setBatchBuilder(conf.getBatchBuilder());
+            }
             sinkSpecBuilder.setProducerSpec(pbldr.build());
         }
 
@@ -247,6 +250,9 @@ public class SourceConfigUtils {
             if (spec.hasCryptoSpec()) {
                 producerConfig.setCryptoConfig(CryptoUtils.convertFromSpec(spec.getCryptoSpec()));
             }
+            if (spec.getBatchBuilder() != null) {
+                producerConfig.setBatchBuilder(spec.getBatchBuilder());
+            }
             producerConfig.setUseThreadLocalProducers(spec.getUseThreadLocalProducers());
             sourceConfig.setProducerConfig(producerConfig);
         }
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index fcafd49..03c45bd 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -75,6 +75,7 @@ public class FunctionConfigUtilsTest {
         producerConfig.setMaxPendingMessages(100);
         producerConfig.setMaxPendingMessagesAcrossPartitions(1000);
         producerConfig.setUseThreadLocalProducers(true);
+        producerConfig.setBatchBuilder("DEFAULT");
         functionConfig.setProducerConfig(producerConfig);
         Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, null);
         FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
@@ -115,6 +116,7 @@ public class FunctionConfigUtilsTest {
         producerConfig.setMaxPendingMessages(100);
         producerConfig.setMaxPendingMessagesAcrossPartitions(1000);
         producerConfig.setUseThreadLocalProducers(true);
+        producerConfig.setBatchBuilder("KEY_BASED");
         functionConfig.setProducerConfig(producerConfig);
         Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, null);
         FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
@@ -449,6 +451,7 @@ public class FunctionConfigUtilsTest {
         functionConfig.setRetainOrdering(false);
         functionConfig.setRetainKeyOrdering(false);
         functionConfig.setSubscriptionPosition(SubscriptionInitialPosition.Earliest);
+        functionConfig.setBatchBuilder("DEFAULT");
         functionConfig.setForwardSourceMessageProperty(false);
         functionConfig.setUserConfig(new HashMap<>());
         functionConfig.setAutoAck(true);
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index 520b416..bcece84 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -354,6 +354,7 @@ public class SourceConfigUtilsTest extends PowerMockTestCase {
         producerConfig.setMaxPendingMessages(100);
         producerConfig.setMaxPendingMessagesAcrossPartitions(1000);
         producerConfig.setUseThreadLocalProducers(true);
+        producerConfig.setBatchBuilder("DEFAULT");
         sourceConfig.setProducerConfig(producerConfig);
 
         sourceConfig.setConfigs(configs);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
index f0b8494..102ded5 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
@@ -49,6 +49,7 @@ public class CommandGenerator {
     private Runtime runtime;
     private Integer parallelism;
     private String adminUrl;
+    private String batchBuilder;
     private Integer windowLengthCount;
     private Long windowLengthDurationMs;
     private Integer slidingIntervalCount;
@@ -154,6 +155,9 @@ public class CommandGenerator {
         if (logTopic != null) {
             commandBuilder.append(" --logTopic " + logTopic);
         }
+        if (batchBuilder != null) {
+            commandBuilder.append("--batch-builder" + batchBuilder);
+        }
         if (customSereSourceTopics != null && !customSereSourceTopics.isEmpty()) {
             commandBuilder.append(" --customSerdeInputs \'" + new Gson().toJson(customSereSourceTopics) + "\'");
         }
@@ -239,6 +243,9 @@ public class CommandGenerator {
         if (customSereSourceTopics != null && !customSereSourceTopics.isEmpty()) {
             commandBuilder.append(" --customSerdeInputs \'" + new Gson().toJson(customSereSourceTopics) + "\'");
         }
+        if (batchBuilder != null) {
+            commandBuilder.append("--batch-builder" + batchBuilder);
+        }
         if (sinkTopic != null) {
             commandBuilder.append(" --output " + sinkTopic);
         }