You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/04/20 20:20:28 UTC

[pulsar] branch master updated: Fix: set receive queue size for sinks (#4091)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 84a4141  Fix: set receive queue size for sinks (#4091)
84a4141 is described below

commit 84a414183bfd5b378e9bd0116767bc57136da755
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sat Apr 20 13:20:22 2019 -0700

    Fix: set receive queue size for sinks (#4091)
    
    * fix setting recieve queue size for sinks
    
    * fix setting recieve queue size for sinks
---
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    | 102 ++++++++++++---------
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java |   1 -
 .../pulsar/functions/utils/SinkConfigUtils.java    |  21 +++--
 .../functions/utils/SinkConfigUtilsTest.java       |   2 +-
 4 files changed, 75 insertions(+), 51 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 313a5b5..40b1783 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.io.SinkConfig;
@@ -383,7 +384,7 @@ public class PulsarFunctionE2ETest {
         sinkConfig.setName(functionName);
         sinkConfig.setParallelism(1);
         sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
-        sinkConfig.setInputs(Collections.singleton(sourceTopic));
+        sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().build()));
         sinkConfig.setSourceSubscriptionName(subName);
         sinkConfig.setCleanupSubscription(true);
         return sinkConfig;
@@ -514,7 +515,7 @@ public class PulsarFunctionE2ETest {
         final String namespacePortion = "io";
         final String replNamespace = tenant + "/" + namespacePortion;
         final String sourceTopic = "persistent://" + replNamespace + "/input";
-        final String functionName = "PulsarSink-test";
+        final String sinkName = "PulsarSink-test";
         final String propertyKey = "key";
         final String propertyValue = "value";
         final String subscriptionName = "test-sub";
@@ -525,20 +526,34 @@ public class PulsarFunctionE2ETest {
         // create a producer that creates a topic at broker
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
 
-        SinkConfig sinkConfig = createSinkConfig(tenant, namespacePortion, functionName, sourceTopic, subscriptionName);
+        SinkConfig sinkConfig = createSinkConfig(tenant, namespacePortion, sinkName, sourceTopic, subscriptionName);
+
+        sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().receiverQueueSize(1000).build()));
+
         admin.sink().createSinkWithUrl(sinkConfig, jarFilePathUrl);
 
+        sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().receiverQueueSize(523).build()));
+
         admin.sink().updateSinkWithUrl(sinkConfig, jarFilePathUrl);
 
         retryStrategically((test) -> {
             try {
-                return admin.topics().getStats(sourceTopic).subscriptions.size() == 1;
+                TopicStats topicStats = admin.topics().getStats(sourceTopic);
+
+                return topicStats.subscriptions.containsKey(subscriptionName)
+                        && topicStats.subscriptions.get(subscriptionName).consumers.size() == 1
+                        && topicStats.subscriptions.get(subscriptionName).consumers.get(0).availablePermits == 523;
+
             } catch (PulsarAdminException e) {
                 return false;
             }
         }, 50, 150);
-        // validate pulsar sink consumer has started on the topic
-        assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
+
+        TopicStats topicStats = admin.topics().getStats(sourceTopic);
+        assertEquals(topicStats.subscriptions.size(), 1);
+        assertTrue(topicStats.subscriptions.containsKey(subscriptionName));
+        assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
+        assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.get(0).availablePermits, 523);
 
         // validate prometheus metrics empty
         String prometheusMetrics = getPrometheusMetrics(brokerWebServicePort);
@@ -548,65 +563,65 @@ public class PulsarFunctionE2ETest {
         Metric m = metrics.get("pulsar_sink_received_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), functionName);
+        assertEquals(m.tags.get("name"), sinkName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_sink_received_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), functionName);
+        assertEquals(m.tags.get("name"), sinkName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_sink_written_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), functionName);
+        assertEquals(m.tags.get("name"), sinkName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_sink_written_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), functionName);
+        assertEquals(m.tags.get("name"), sinkName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_sink_sink_exceptions_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), functionName);
+        assertEquals(m.tags.get("name"), sinkName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_sink_sink_exceptions_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), functionName);
+        assertEquals(m.tags.get("name"), sinkName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_sink_system_exceptions_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), functionName);
+        assertEquals(m.tags.get("name"), sinkName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_sink_system_exceptions_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), functionName);
+        assertEquals(m.tags.get("name"), sinkName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_sink_last_invocation");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), functionName);
+        assertEquals(m.tags.get("name"), sinkName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
         assertEquals(m.value, 0.0);
 
         int totalMsgs = 10;
@@ -631,70 +646,70 @@ public class PulsarFunctionE2ETest {
         m = metrics.get("pulsar_sink_received_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), functionName);
+        assertEquals(m.tags.get("name"), sinkName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
         assertEquals(m.value, (double) totalMsgs);
         m = metrics.get("pulsar_sink_received_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), functionName);
+        assertEquals(m.tags.get("name"), sinkName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
         assertEquals(m.value, (double) totalMsgs);
         m = metrics.get("pulsar_sink_written_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), functionName);
+        assertEquals(m.tags.get("name"), sinkName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
         assertEquals(m.value, (double) totalMsgs);
         m = metrics.get("pulsar_sink_written_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), functionName);
+        assertEquals(m.tags.get("name"), sinkName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
         assertEquals(m.value, (double) totalMsgs);
         m = metrics.get("pulsar_sink_sink_exceptions_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), functionName);
+        assertEquals(m.tags.get("name"), sinkName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_sink_sink_exceptions_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), functionName);
+        assertEquals(m.tags.get("name"), sinkName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_sink_system_exceptions_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), functionName);
+        assertEquals(m.tags.get("name"), sinkName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_sink_system_exceptions_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), functionName);
+        assertEquals(m.tags.get("name"), sinkName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_sink_last_invocation");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
-        assertEquals(m.tags.get("name"), functionName);
+        assertEquals(m.tags.get("name"), sinkName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
         assertTrue(m.value > 0.0);
 
 
         // delete functions
-        admin.sink().deleteSink(tenant, namespacePortion, functionName);
+        admin.sink().deleteSink(tenant, namespacePortion, sinkName);
 
         retryStrategically((test) -> {
             try {
@@ -740,6 +755,7 @@ public class PulsarFunctionE2ETest {
         admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
 
         SourceConfig sourceConfig = createSourceConfig(tenant, namespacePortion, sourceName, sinkTopic);
+
         admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl);
 
         retryStrategically((test) -> {
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 18ffb53..6d9544e 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -324,7 +324,6 @@ public class CmdSinks extends CmdBase {
 
             if (null != sinkConfigFile) {
                 this.sinkConfig = CmdUtils.loadConfig(sinkConfigFile, SinkConfig.class);
-                log.info("The sinkConfig read from file is {}", sinkConfig);
             } else {
                 this.sinkConfig = new SinkConfig();
             }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 9159f4f..65cf6f5 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -123,12 +123,18 @@ public class SinkConfigUtils {
         }
         if (sinkConfig.getInputSpecs() != null) {
             sinkConfig.getInputSpecs().forEach((topic, spec) -> {
-                sourceSpecBuilder.putInputSpecs(topic,
-                        Function.ConsumerSpec.newBuilder()
-                                .setSerdeClassName(spec.getSerdeClassName() != null ? spec.getSerdeClassName() : "")
-                                .setSchemaType(spec.getSchemaType() != null ? spec.getSchemaType() : "")
-                                .setIsRegexPattern(spec.isRegexPattern())
-                                .build());
+                Function.ConsumerSpec.Builder bldr = Function.ConsumerSpec.newBuilder()
+                        .setIsRegexPattern(spec.isRegexPattern());
+                if (!StringUtils.isBlank(spec.getSchemaType())) {
+                    bldr.setSchemaType(spec.getSchemaType());
+                } else if (!StringUtils.isBlank(spec.getSerdeClassName())) {
+                    bldr.setSerdeClassName(spec.getSerdeClassName());
+                }
+                if (spec.getReceiverQueueSize() != null) {
+                    bldr.setReceiverQueueSize(Function.ConsumerSpec.ReceiverQueueSize.newBuilder()
+                            .setValue(spec.getReceiverQueueSize()).build());
+                }
+                sourceSpecBuilder.putInputSpecs(topic, bldr.build());
             });
         }
 
@@ -216,6 +222,9 @@ public class SinkConfigUtils {
             if (!isEmpty(input.getValue().getSchemaType())) {
                 consumerConfig.setSchemaType(input.getValue().getSchemaType());
             }
+            if (input.getValue().hasReceiverQueueSize()) {
+                consumerConfig.setReceiverQueueSize(input.getValue().getReceiverQueueSize().getValue());
+            }
             consumerConfig.setRegexPattern(input.getValue().getIsRegexPattern());
             consumerConfigMap.put(input.getKey(), consumerConfig);
         }
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index d72698d..446deb3 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -50,7 +50,7 @@ public class SinkConfigUtilsTest {
         sinkConfig.setArchive("builtin://jdbc");
         sinkConfig.setSourceSubscriptionName("test-subscription");
         Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
-        inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build());
+        inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).receiverQueueSize(532).serdeClassName("test-serde").build());
         sinkConfig.setInputSpecs(inputSpecs);
         sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);