You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/09 08:00:53 UTC

[pulsar] 01/09: [Functions] Support KEY_BASED batch builder for Java based functions and sources (#11706)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 720b1d52cf2a530514de67b6a7ab33d3d377de54
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Aug 25 20:33:50 2021 +0300

    [Functions] Support KEY_BASED batch builder for Java based functions and sources (#11706)
    
    * [Functions] Support KEY_BASED batch builder for Java based functions and sources
    
    * Include batchBuilder in ProducerSpec -> ProducerConfig.ProducerConfigBuilder conversion
    
    * Support setting batch builder for sources with "--batch-builder KEY_BASED" argument
    
    (cherry picked from commit b923af16f629bf298ee2e8fec44864a2c8a2615b)
---
 .../pulsar/functions/instance/ContextImpl.java     | 21 ++++++++++-----
 .../functions/instance/JavaInstanceRunnable.java   |  1 +
 .../pulsar/functions/utils/SourceConfigUtils.java  |  7 +++++
 .../functions/utils/SourceConfigUtilsTest.java     | 31 ++++++++++++++++++++++
 4 files changed, 54 insertions(+), 6 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index fc806c1..f3438c4 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -39,6 +39,7 @@ import java.util.stream.Stream;
 import lombok.ToString;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -143,14 +144,22 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
         this.producerBuilder = (ProducerBuilderImpl<?>) client.newProducer().blockIfQueueFull(true).enableBatching(true)
                 .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
         boolean useThreadLocalProducers = false;
-        if (config.getFunctionDetails().getSink().getProducerSpec() != null) {
-            if (config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages() != 0) {
-                this.producerBuilder.maxPendingMessages(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages());
+        Function.ProducerSpec producerSpec = config.getFunctionDetails().getSink().getProducerSpec();
+        if (producerSpec != null) {
+            if (producerSpec.getMaxPendingMessages() != 0) {
+                this.producerBuilder.maxPendingMessages(producerSpec.getMaxPendingMessages());
             }
-            if (config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions() != 0) {
-                this.producerBuilder.maxPendingMessagesAcrossPartitions(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions());
+            if (producerSpec.getMaxPendingMessagesAcrossPartitions() != 0) {
+                this.producerBuilder.maxPendingMessagesAcrossPartitions(producerSpec.getMaxPendingMessagesAcrossPartitions());
             }
-            useThreadLocalProducers = config.getFunctionDetails().getSink().getProducerSpec().getUseThreadLocalProducers();
+            if (producerSpec.getBatchBuilder() != null) {
+                if (producerSpec.getBatchBuilder().equals("KEY_BASED")) {
+                    this.producerBuilder.batcherBuilder(BatcherBuilder.KEY_BASED);
+                } else {
+                    this.producerBuilder.batcherBuilder(BatcherBuilder.DEFAULT);
+                }
+            }
+            useThreadLocalProducers = producerSpec.getUseThreadLocalProducers();
         }
         if (useThreadLocalProducers) {
             tlPublishProducers = new ThreadLocal<>();
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 3d79c2b..692902e 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -776,6 +776,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
                     ProducerConfig.ProducerConfigBuilder builder = ProducerConfig.builder()
                             .maxPendingMessages(conf.getMaxPendingMessages())
                             .maxPendingMessagesAcrossPartitions(conf.getMaxPendingMessagesAcrossPartitions())
+                            .batchBuilder(conf.getBatchBuilder())
                             .useThreadLocalProducers(conf.getUseThreadLocalProducers())
                             .cryptoConfig(CryptoUtils.convertFromSpec(conf.getCryptoSpec()));
                     pulsarSinkConfig.setProducerConfig(builder.build());
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 9049eb6..6450d6e 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -167,6 +167,13 @@ public class SourceConfigUtils {
             sinkSpecBuilder.setProducerSpec(pbldr.build());
         }
 
+        if (sourceConfig.getBatchBuilder() != null) {
+            Function.ProducerSpec.Builder builder = sinkSpecBuilder.getProducerSpec() != null
+                    ? sinkSpecBuilder.getProducerSpec().toBuilder()
+                    : Function.ProducerSpec.newBuilder();
+            sinkSpecBuilder.setProducerSpec(builder.setBatchBuilder(sourceConfig.getBatchBuilder()).build());
+        }
+
         sinkSpecBuilder.setForwardSourceMessageProperty(true);
 
         functionDetailsBuilder.setSink(sinkSpecBuilder);
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index 20a64f8..22b5afa 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -331,6 +331,37 @@ public class SourceConfigUtilsTest extends PowerMockTestCase {
         assertTrue(e.getMessage().contains("Could not validate source config: Field 'configParameter' cannot be null!"));
     }
 
+    @Test
+    public void testSupportsBatchBuilderWhenProducerConfigIsNull() {
+        SourceConfig sourceConfig = createSourceConfig();
+        sourceConfig.setProducerConfig(null);
+        sourceConfig.setBatchBuilder("KEY_BASED");
+        Function.FunctionDetails functionDetails =
+                SourceConfigUtils.convert(sourceConfig, new SourceConfigUtils.ExtractedSourceDetails(null, null));
+        assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(), "KEY_BASED");
+    }
+
+    @Test
+    public void testSupportsBatchBuilderWhenProducerConfigExists() {
+        SourceConfig sourceConfig = createSourceConfig();
+        sourceConfig.setBatchBuilder("KEY_BASED");
+        sourceConfig.getProducerConfig().setMaxPendingMessages(123456);
+        Function.FunctionDetails functionDetails =
+                SourceConfigUtils.convert(sourceConfig, new SourceConfigUtils.ExtractedSourceDetails(null, null));
+        assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(), "KEY_BASED");
+        assertEquals(functionDetails.getSink().getProducerSpec().getMaxPendingMessages(), 123456);
+    }
+
+    @Test
+    public void testSupportsBatchBuilderDefinedInProducerConfigWhenTopLevelBatchBuilderIsUndefined() {
+        SourceConfig sourceConfig = createSourceConfig();
+        sourceConfig.setBatchBuilder(null);
+        sourceConfig.getProducerConfig().setBatchBuilder("KEY_BASED");
+        Function.FunctionDetails functionDetails =
+                SourceConfigUtils.convert(sourceConfig, new SourceConfigUtils.ExtractedSourceDetails(null, null));
+        assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(), "KEY_BASED");
+    }
+
     private SourceConfig createSourceConfigWithBatch() {
         SourceConfig sourceConfig = createSourceConfig();
         BatchSourceConfig batchSourceConfig = createBatchSourceConfig();