You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/04/20 20:20:28 UTC
[pulsar] branch master updated: Fix: set receive queue size for
sinks (#4091)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 84a4141 Fix: set receive queue size for sinks (#4091)
84a4141 is described below
commit 84a414183bfd5b378e9bd0116767bc57136da755
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sat Apr 20 13:20:22 2019 -0700
Fix: set receive queue size for sinks (#4091)
* fix setting recieve queue size for sinks
* fix setting recieve queue size for sinks
---
.../apache/pulsar/io/PulsarFunctionE2ETest.java | 102 ++++++++++++---------
.../java/org/apache/pulsar/admin/cli/CmdSinks.java | 1 -
.../pulsar/functions/utils/SinkConfigUtils.java | 21 +++--
.../functions/utils/SinkConfigUtilsTest.java | 2 +-
4 files changed, 75 insertions(+), 51 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 313a5b5..40b1783 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.io.SinkConfig;
@@ -383,7 +384,7 @@ public class PulsarFunctionE2ETest {
sinkConfig.setName(functionName);
sinkConfig.setParallelism(1);
sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
- sinkConfig.setInputs(Collections.singleton(sourceTopic));
+ sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().build()));
sinkConfig.setSourceSubscriptionName(subName);
sinkConfig.setCleanupSubscription(true);
return sinkConfig;
@@ -514,7 +515,7 @@ public class PulsarFunctionE2ETest {
final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
final String sourceTopic = "persistent://" + replNamespace + "/input";
- final String functionName = "PulsarSink-test";
+ final String sinkName = "PulsarSink-test";
final String propertyKey = "key";
final String propertyValue = "value";
final String subscriptionName = "test-sub";
@@ -525,20 +526,34 @@ public class PulsarFunctionE2ETest {
// create a producer that creates a topic at broker
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
- SinkConfig sinkConfig = createSinkConfig(tenant, namespacePortion, functionName, sourceTopic, subscriptionName);
+ SinkConfig sinkConfig = createSinkConfig(tenant, namespacePortion, sinkName, sourceTopic, subscriptionName);
+
+ sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().receiverQueueSize(1000).build()));
+
admin.sink().createSinkWithUrl(sinkConfig, jarFilePathUrl);
+ sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().receiverQueueSize(523).build()));
+
admin.sink().updateSinkWithUrl(sinkConfig, jarFilePathUrl);
retryStrategically((test) -> {
try {
- return admin.topics().getStats(sourceTopic).subscriptions.size() == 1;
+ TopicStats topicStats = admin.topics().getStats(sourceTopic);
+
+ return topicStats.subscriptions.containsKey(subscriptionName)
+ && topicStats.subscriptions.get(subscriptionName).consumers.size() == 1
+ && topicStats.subscriptions.get(subscriptionName).consumers.get(0).availablePermits == 523;
+
} catch (PulsarAdminException e) {
return false;
}
}, 50, 150);
- // validate pulsar sink consumer has started on the topic
- assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
+
+ TopicStats topicStats = admin.topics().getStats(sourceTopic);
+ assertEquals(topicStats.subscriptions.size(), 1);
+ assertTrue(topicStats.subscriptions.containsKey(subscriptionName));
+ assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
+ assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.get(0).availablePermits, 523);
// validate prometheus metrics empty
String prometheusMetrics = getPrometheusMetrics(brokerWebServicePort);
@@ -548,65 +563,65 @@ public class PulsarFunctionE2ETest {
Metric m = metrics.get("pulsar_sink_received_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_received_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_written_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_written_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_sink_exceptions_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_sink_exceptions_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_system_exceptions_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_system_exceptions_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_last_invocation");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, 0.0);
int totalMsgs = 10;
@@ -631,70 +646,70 @@ public class PulsarFunctionE2ETest {
m = metrics.get("pulsar_sink_received_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, (double) totalMsgs);
m = metrics.get("pulsar_sink_received_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, (double) totalMsgs);
m = metrics.get("pulsar_sink_written_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, (double) totalMsgs);
m = metrics.get("pulsar_sink_written_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, (double) totalMsgs);
m = metrics.get("pulsar_sink_sink_exceptions_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_sink_exceptions_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_system_exceptions_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_system_exceptions_total_1min");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertEquals(m.value, 0.0);
m = metrics.get("pulsar_sink_last_invocation");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
- assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("name"), sinkName);
assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
- assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
assertTrue(m.value > 0.0);
// delete functions
- admin.sink().deleteSink(tenant, namespacePortion, functionName);
+ admin.sink().deleteSink(tenant, namespacePortion, sinkName);
retryStrategically((test) -> {
try {
@@ -740,6 +755,7 @@ public class PulsarFunctionE2ETest {
admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
SourceConfig sourceConfig = createSourceConfig(tenant, namespacePortion, sourceName, sinkTopic);
+
admin.source().createSourceWithUrl(sourceConfig, jarFilePathUrl);
retryStrategically((test) -> {
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 18ffb53..6d9544e 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -324,7 +324,6 @@ public class CmdSinks extends CmdBase {
if (null != sinkConfigFile) {
this.sinkConfig = CmdUtils.loadConfig(sinkConfigFile, SinkConfig.class);
- log.info("The sinkConfig read from file is {}", sinkConfig);
} else {
this.sinkConfig = new SinkConfig();
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 9159f4f..65cf6f5 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -123,12 +123,18 @@ public class SinkConfigUtils {
}
if (sinkConfig.getInputSpecs() != null) {
sinkConfig.getInputSpecs().forEach((topic, spec) -> {
- sourceSpecBuilder.putInputSpecs(topic,
- Function.ConsumerSpec.newBuilder()
- .setSerdeClassName(spec.getSerdeClassName() != null ? spec.getSerdeClassName() : "")
- .setSchemaType(spec.getSchemaType() != null ? spec.getSchemaType() : "")
- .setIsRegexPattern(spec.isRegexPattern())
- .build());
+ Function.ConsumerSpec.Builder bldr = Function.ConsumerSpec.newBuilder()
+ .setIsRegexPattern(spec.isRegexPattern());
+ if (!StringUtils.isBlank(spec.getSchemaType())) {
+ bldr.setSchemaType(spec.getSchemaType());
+ } else if (!StringUtils.isBlank(spec.getSerdeClassName())) {
+ bldr.setSerdeClassName(spec.getSerdeClassName());
+ }
+ if (spec.getReceiverQueueSize() != null) {
+ bldr.setReceiverQueueSize(Function.ConsumerSpec.ReceiverQueueSize.newBuilder()
+ .setValue(spec.getReceiverQueueSize()).build());
+ }
+ sourceSpecBuilder.putInputSpecs(topic, bldr.build());
});
}
@@ -216,6 +222,9 @@ public class SinkConfigUtils {
if (!isEmpty(input.getValue().getSchemaType())) {
consumerConfig.setSchemaType(input.getValue().getSchemaType());
}
+ if (input.getValue().hasReceiverQueueSize()) {
+ consumerConfig.setReceiverQueueSize(input.getValue().getReceiverQueueSize().getValue());
+ }
consumerConfig.setRegexPattern(input.getValue().getIsRegexPattern());
consumerConfigMap.put(input.getKey(), consumerConfig);
}
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index d72698d..446deb3 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -50,7 +50,7 @@ public class SinkConfigUtilsTest {
sinkConfig.setArchive("builtin://jdbc");
sinkConfig.setSourceSubscriptionName("test-subscription");
Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
- inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build());
+ inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).receiverQueueSize(532).serdeClassName("test-serde").build());
sinkConfig.setInputSpecs(inputSpecs);
sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);