You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/09 08:00:53 UTC
[pulsar] 01/09: [Functions] Support KEY_BASED batch builder for
Java based functions and sources (#11706)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 720b1d52cf2a530514de67b6a7ab33d3d377de54
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Wed Aug 25 20:33:50 2021 +0300
[Functions] Support KEY_BASED batch builder for Java based functions and sources (#11706)
* [Functions] Support KEY_BASED batch builder for Java based functions and sources
* Include batchBuilder in ProducerSpec -> ProducerConfig.ProducerConfigBuilder conversion
* Support setting batch builder for sources with "--batch-builder KEY_BASED" argument
(cherry picked from commit b923af16f629bf298ee2e8fec44864a2c8a2615b)
---
.../pulsar/functions/instance/ContextImpl.java | 21 ++++++++++-----
.../functions/instance/JavaInstanceRunnable.java | 1 +
.../pulsar/functions/utils/SourceConfigUtils.java | 7 +++++
.../functions/utils/SourceConfigUtilsTest.java | 31 ++++++++++++++++++++++
4 files changed, 54 insertions(+), 6 deletions(-)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index fc806c1..f3438c4 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -39,6 +39,7 @@ import java.util.stream.Stream;
import lombok.ToString;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -143,14 +144,22 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
this.producerBuilder = (ProducerBuilderImpl<?>) client.newProducer().blockIfQueueFull(true).enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
boolean useThreadLocalProducers = false;
- if (config.getFunctionDetails().getSink().getProducerSpec() != null) {
- if (config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages() != 0) {
- this.producerBuilder.maxPendingMessages(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages());
+ Function.ProducerSpec producerSpec = config.getFunctionDetails().getSink().getProducerSpec();
+ if (producerSpec != null) {
+ if (producerSpec.getMaxPendingMessages() != 0) {
+ this.producerBuilder.maxPendingMessages(producerSpec.getMaxPendingMessages());
}
- if (config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions() != 0) {
- this.producerBuilder.maxPendingMessagesAcrossPartitions(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions());
+ if (producerSpec.getMaxPendingMessagesAcrossPartitions() != 0) {
+ this.producerBuilder.maxPendingMessagesAcrossPartitions(producerSpec.getMaxPendingMessagesAcrossPartitions());
}
- useThreadLocalProducers = config.getFunctionDetails().getSink().getProducerSpec().getUseThreadLocalProducers();
+ if (producerSpec.getBatchBuilder() != null) {
+ if (producerSpec.getBatchBuilder().equals("KEY_BASED")) {
+ this.producerBuilder.batcherBuilder(BatcherBuilder.KEY_BASED);
+ } else {
+ this.producerBuilder.batcherBuilder(BatcherBuilder.DEFAULT);
+ }
+ }
+ useThreadLocalProducers = producerSpec.getUseThreadLocalProducers();
}
if (useThreadLocalProducers) {
tlPublishProducers = new ThreadLocal<>();
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 3d79c2b..692902e 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -776,6 +776,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
ProducerConfig.ProducerConfigBuilder builder = ProducerConfig.builder()
.maxPendingMessages(conf.getMaxPendingMessages())
.maxPendingMessagesAcrossPartitions(conf.getMaxPendingMessagesAcrossPartitions())
+ .batchBuilder(conf.getBatchBuilder())
.useThreadLocalProducers(conf.getUseThreadLocalProducers())
.cryptoConfig(CryptoUtils.convertFromSpec(conf.getCryptoSpec()));
pulsarSinkConfig.setProducerConfig(builder.build());
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 9049eb6..6450d6e 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -167,6 +167,13 @@ public class SourceConfigUtils {
sinkSpecBuilder.setProducerSpec(pbldr.build());
}
+ if (sourceConfig.getBatchBuilder() != null) {
+ Function.ProducerSpec.Builder builder = sinkSpecBuilder.getProducerSpec() != null
+ ? sinkSpecBuilder.getProducerSpec().toBuilder()
+ : Function.ProducerSpec.newBuilder();
+ sinkSpecBuilder.setProducerSpec(builder.setBatchBuilder(sourceConfig.getBatchBuilder()).build());
+ }
+
sinkSpecBuilder.setForwardSourceMessageProperty(true);
functionDetailsBuilder.setSink(sinkSpecBuilder);
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index 20a64f8..22b5afa 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -331,6 +331,37 @@ public class SourceConfigUtilsTest extends PowerMockTestCase {
assertTrue(e.getMessage().contains("Could not validate source config: Field 'configParameter' cannot be null!"));
}
+ @Test
+ public void testSupportsBatchBuilderWhenProducerConfigIsNull() {
+ SourceConfig sourceConfig = createSourceConfig();
+ sourceConfig.setProducerConfig(null);
+ sourceConfig.setBatchBuilder("KEY_BASED");
+ Function.FunctionDetails functionDetails =
+ SourceConfigUtils.convert(sourceConfig, new SourceConfigUtils.ExtractedSourceDetails(null, null));
+ assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(), "KEY_BASED");
+ }
+
+ @Test
+ public void testSupportsBatchBuilderWhenProducerConfigExists() {
+ SourceConfig sourceConfig = createSourceConfig();
+ sourceConfig.setBatchBuilder("KEY_BASED");
+ sourceConfig.getProducerConfig().setMaxPendingMessages(123456);
+ Function.FunctionDetails functionDetails =
+ SourceConfigUtils.convert(sourceConfig, new SourceConfigUtils.ExtractedSourceDetails(null, null));
+ assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(), "KEY_BASED");
+ assertEquals(functionDetails.getSink().getProducerSpec().getMaxPendingMessages(), 123456);
+ }
+
+ @Test
+ public void testSupportsBatchBuilderDefinedInProducerConfigWhenTopLevelBatchBuilderIsUndefined() {
+ SourceConfig sourceConfig = createSourceConfig();
+ sourceConfig.setBatchBuilder(null);
+ sourceConfig.getProducerConfig().setBatchBuilder("KEY_BASED");
+ Function.FunctionDetails functionDetails =
+ SourceConfigUtils.convert(sourceConfig, new SourceConfigUtils.ExtractedSourceDetails(null, null));
+ assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(), "KEY_BASED");
+ }
+
private SourceConfig createSourceConfigWithBatch() {
SourceConfig sourceConfig = createSourceConfig();
BatchSourceConfig batchSourceConfig = createBatchSourceConfig();