You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/08/10 14:21:16 UTC

[pulsar] branch master updated: Allow the option to make producers thread local (#7764)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a3eb556  Allow the option to make producers thread local (#7764)
a3eb556 is described below

commit a3eb55653ac9cc36ffcd444275cbf2a9e5f6b298
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Mon Aug 10 07:20:53 2020 -0700

    Allow the option to make producers thread local (#7764)
    
    If a function has many threads who are doing context.newOutputMessage().send(), there could be a lot of contention due to big synchronization blocks in the ProducerImpl. By allowing functions to use thread local producers, this synchronization can be avoided leading to increased performance.
    This pr adds the configurability of using thread local producers in functions and sources
    
    Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
 .../pulsar/common/functions/ProducerConfig.java    |  1 +
 .../pulsar/functions/instance/ContextImpl.java     | 40 +++++++++----
 .../instance/src/main/python/Function_pb2.py       | 67 ++++++++++++----------
 .../proto/src/main/proto/Function.proto            |  1 +
 .../functions/utils/FunctionConfigUtils.java       |  4 ++
 .../pulsar/functions/utils/SourceConfigUtils.java  |  4 ++
 .../functions/utils/FunctionConfigUtilsTest.java   |  2 +
 .../functions/utils/SourceConfigUtilsTest.java     |  1 +
 8 files changed, 80 insertions(+), 40 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
index b28370e..8d3dd66 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ProducerConfig.java
@@ -35,4 +35,5 @@ import lombok.NoArgsConstructor;
 public class ProducerConfig {
     private Integer maxPendingMessages;
     private Integer maxPendingMessagesAcrossPartitions;
+    private Boolean useThreadLocalProducers;
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 763d9f7..e3f169e 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -68,6 +68,7 @@ class ContextImpl implements Context, SinkContext, SourceContext {
 
     private PulsarClient client;
     private Map<String, Producer<?>> publishProducers;
+    private ThreadLocal<Map<String, Producer<?>>> tlPublishProducers;
     private ProducerBuilderImpl<?> producerBuilder;
 
     private final TopicSchema topicSchema;
@@ -102,12 +103,12 @@ class ContextImpl implements Context, SinkContext, SourceContext {
         this.config = config;
         this.logger = logger;
         this.client = client;
-        this.publishProducers = new HashMap<>();
         this.topicSchema = new TopicSchema(client);
         this.statsManager = statsManager;
 
         this.producerBuilder = (ProducerBuilderImpl<?>) client.newProducer().blockIfQueueFull(true).enableBatching(true)
                 .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
+        boolean useThreadLocalProducers = false;
         if (config.getFunctionDetails().getSink().getProducerSpec() != null) {
             if (config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages() != 0) {
                 this.producerBuilder.maxPendingMessages(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages());
@@ -115,6 +116,12 @@ class ContextImpl implements Context, SinkContext, SourceContext {
             if (config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions() != 0) {
                 this.producerBuilder.maxPendingMessagesAcrossPartitions(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions());
             }
+            useThreadLocalProducers = config.getFunctionDetails().getSink().getProducerSpec().getUseThreadLocalProducers();
+        }
+        if (useThreadLocalProducers) {
+            tlPublishProducers = new ThreadLocal<>();
+        } else {
+            publishProducers = new HashMap<>();
         }
 
         if (config.getFunctionDetails().getUserConfig().isEmpty()) {
@@ -413,7 +420,17 @@ class ContextImpl implements Context, SinkContext, SourceContext {
     }
 
     private <O> Producer<O> getProducer(String topicName, Schema<O> schema) throws PulsarClientException {
-        Producer<O> producer = (Producer<O>) publishProducers.get(topicName);
+        Producer<O> producer;
+        if (tlPublishProducers != null) {
+            Map<String, Producer<?>> producerMap = tlPublishProducers.get();
+            if (producerMap == null) {
+                producerMap = new HashMap<>();
+                tlPublishProducers.set(producerMap);
+            }
+            producer = (Producer<O>) producerMap.get(topicName);
+        } else {
+            producer = (Producer<O>) publishProducers.get(topicName);
+        }
 
         if (producer == null) {
 
@@ -438,16 +455,19 @@ class ContextImpl implements Context, SinkContext, SourceContext {
                             this.config.getInstanceId()))
                     .create();
 
-            Producer<O> existingProducer = (Producer<O>) publishProducers.putIfAbsent(topicName, newProducer);
-
-            if (existingProducer != null) {
-                // The value in the map was not updated after the concurrent put
-                newProducer.close();
-                producer = existingProducer;
+            if (tlPublishProducers != null) {
+                tlPublishProducers.get().put(topicName, newProducer);
             } else {
-                producer = newProducer;
+                Producer<O> existingProducer = (Producer<O>) publishProducers.putIfAbsent(topicName, newProducer);
+
+                if (existingProducer != null) {
+                    // The value in the map was not updated after the concurrent put
+                    newProducer.close();
+                    producer = existingProducer;
+                } else {
+                    producer = newProducer;
+                }
             }
-
         }
         return producer;
     }
diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py b/pulsar-functions/instance/src/main/python/Function_pb2.py
index 203809d..3ee58e9 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -38,7 +38,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
   syntax='proto3',
   serialized_options=b'\n!org.apache.pulsar.functions.protoB\010Function',
   create_key=_descriptor._internal_create_key,
-  serialized_pb=b'\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 \x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 \x01(\t\"\xe7\x05\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\ [...]
+  serialized_pb=b'\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 \x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 \x01(\t\"\xe7\x05\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\ [...]
 )
 
 _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
@@ -66,8 +66,8 @@ _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   serialized_options=None,
-  serialized_start=3174,
-  serialized_end=3253,
+  serialized_start=3207,
+  serialized_end=3286,
 )
 _sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES)
 
@@ -97,8 +97,8 @@ _SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   serialized_options=None,
-  serialized_start=3255,
-  serialized_end=3315,
+  serialized_start=3288,
+  serialized_end=3348,
 )
 _sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE)
 
@@ -123,8 +123,8 @@ _SUBSCRIPTIONPOSITION = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   serialized_options=None,
-  serialized_start=3317,
-  serialized_end=3365,
+  serialized_start=3350,
+  serialized_end=3398,
 )
 _sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONPOSITION)
 
@@ -149,8 +149,8 @@ _FUNCTIONSTATE = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   serialized_options=None,
-  serialized_start=3367,
-  serialized_end=3408,
+  serialized_start=3400,
+  serialized_end=3441,
 )
 _sym_db.RegisterEnumDescriptor(_FUNCTIONSTATE)
 
@@ -695,6 +695,13 @@ _PRODUCERSPEC = _descriptor.Descriptor(
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
+    _descriptor.FieldDescriptor(
+      name='useThreadLocalProducers', full_name='proto.ProducerSpec.useThreadLocalProducers', index=2,
+      number=3, type=8, cpp_type=7, label=1,
+      has_default_value=False, default_value=False,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
   ],
   extensions=[
   ],
@@ -708,7 +715,7 @@ _PRODUCERSPEC = _descriptor.Descriptor(
   oneofs=[
   ],
   serialized_start=1337,
-  serialized_end=1423,
+  serialized_end=1456,
 )
 
 
@@ -746,8 +753,8 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1903,
-  serialized_end=1964,
+  serialized_start=1936,
+  serialized_end=1997,
 )
 
 _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
@@ -784,8 +791,8 @@ _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1966,
-  serialized_end=2036,
+  serialized_start=1999,
+  serialized_end=2069,
 )
 
 _SOURCESPEC = _descriptor.Descriptor(
@@ -899,8 +906,8 @@ _SOURCESPEC = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1426,
-  serialized_end=2036,
+  serialized_start=1459,
+  serialized_end=2069,
 )
 
 
@@ -1077,8 +1084,8 @@ _SINKSPEC = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=2039,
-  serialized_end=2515,
+  serialized_start=2072,
+  serialized_end=2548,
 )
 
 
@@ -1116,8 +1123,8 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=2517,
-  serialized_end=2589,
+  serialized_start=2550,
+  serialized_end=2622,
 )
 
 
@@ -1155,8 +1162,8 @@ _FUNCTIONMETADATA_INSTANCESTATESENTRY = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=2885,
-  serialized_end=2960,
+  serialized_start=2918,
+  serialized_end=2993,
 )
 
 _FUNCTIONMETADATA = _descriptor.Descriptor(
@@ -1221,8 +1228,8 @@ _FUNCTIONMETADATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=2592,
-  serialized_end=2960,
+  serialized_start=2625,
+  serialized_end=2993,
 )
 
 
@@ -1260,8 +1267,8 @@ _FUNCTIONAUTHENTICATIONSPEC = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=2962,
-  serialized_end=3022,
+  serialized_start=2995,
+  serialized_end=3055,
 )
 
 
@@ -1299,8 +1306,8 @@ _INSTANCE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=3024,
-  serialized_end=3105,
+  serialized_start=3057,
+  serialized_end=3138,
 )
 
 
@@ -1338,8 +1345,8 @@ _ASSIGNMENT = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=3107,
-  serialized_end=3172,
+  serialized_start=3140,
+  serialized_end=3205,
 )
 
 _FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = _PROCESSINGGUARANTEES
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index 68cc936..3edcf5d 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -104,6 +104,7 @@ message ConsumerSpec {
 message ProducerSpec {
     int32 maxPendingMessages = 1;
     int32 maxPendingMessagesAcrossPartitions = 2;
+    bool useThreadLocalProducers = 3;
 }
 
 message SourceSpec {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 1b0e4fd..2a9a2bf 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -203,6 +203,9 @@ public class FunctionConfigUtils {
             if (functionConfig.getProducerConfig().getMaxPendingMessagesAcrossPartitions() != null) {
                 pbldr.setMaxPendingMessagesAcrossPartitions(functionConfig.getProducerConfig().getMaxPendingMessagesAcrossPartitions());
             }
+            if (functionConfig.getProducerConfig().getUseThreadLocalProducers() != null) {
+                pbldr.setUseThreadLocalProducers(functionConfig.getProducerConfig().getUseThreadLocalProducers());
+            }
             sinkSpecBuilder.setProducerSpec(pbldr.build());
         }
         functionDetailsBuilder.setSink(sinkSpecBuilder);
@@ -358,6 +361,7 @@ public class FunctionConfigUtils {
             if (functionDetails.getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions() != 0) {
                 producerConfig.setMaxPendingMessagesAcrossPartitions(functionDetails.getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions());
             }
+            producerConfig.setUseThreadLocalProducers(functionDetails.getSink().getProducerSpec().getUseThreadLocalProducers());
             functionConfig.setProducerConfig(producerConfig);
         }
         if (!isEmpty(functionDetails.getLogTopic())) {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index d912206..0d31c9d 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -155,6 +155,9 @@ public class SourceConfigUtils {
             if (sourceConfig.getProducerConfig().getMaxPendingMessagesAcrossPartitions() != null) {
                 pbldr.setMaxPendingMessagesAcrossPartitions(sourceConfig.getProducerConfig().getMaxPendingMessagesAcrossPartitions());
             }
+            if (sourceConfig.getProducerConfig().getUseThreadLocalProducers() != null) {
+                pbldr.setUseThreadLocalProducers(sourceConfig.getProducerConfig().getUseThreadLocalProducers());
+            }
             sinkSpecBuilder.setProducerSpec(pbldr.build());
         }
 
@@ -235,6 +238,7 @@ public class SourceConfigUtils {
             if (sinkSpec.getProducerSpec().getMaxPendingMessagesAcrossPartitions() != 0) {
                 producerConfig.setMaxPendingMessagesAcrossPartitions(sinkSpec.getProducerSpec().getMaxPendingMessagesAcrossPartitions());
             }
+            producerConfig.setUseThreadLocalProducers(sinkSpec.getProducerSpec().getUseThreadLocalProducers());
             sourceConfig.setProducerConfig(producerConfig);
         }
         if (functionDetails.hasResources()) {
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index e604980..4aa2dc0 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -66,6 +66,7 @@ public class FunctionConfigUtilsTest {
         ProducerConfig producerConfig = new ProducerConfig();
         producerConfig.setMaxPendingMessages(100);
         producerConfig.setMaxPendingMessagesAcrossPartitions(1000);
+        producerConfig.setUseThreadLocalProducers(true);
         functionConfig.setProducerConfig(producerConfig);
         Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, null);
         FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
@@ -105,6 +106,7 @@ public class FunctionConfigUtilsTest {
         ProducerConfig producerConfig = new ProducerConfig();
         producerConfig.setMaxPendingMessages(100);
         producerConfig.setMaxPendingMessagesAcrossPartitions(1000);
+        producerConfig.setUseThreadLocalProducers(true);
         functionConfig.setProducerConfig(producerConfig);
         Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, null);
         FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index 65f29ef..520b416 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -353,6 +353,7 @@ public class SourceConfigUtilsTest extends PowerMockTestCase {
         ProducerConfig producerConfig = new ProducerConfig();
         producerConfig.setMaxPendingMessages(100);
         producerConfig.setMaxPendingMessagesAcrossPartitions(1000);
+        producerConfig.setUseThreadLocalProducers(true);
         sourceConfig.setProducerConfig(producerConfig);
 
         sourceConfig.setConfigs(configs);