You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/10/10 23:45:13 UTC
[pulsar] branch master updated: Fix Topic Pattern functionality for
Python functions (#2760)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4ed2e33 Fix Topic Pattern functionality for Python functions (#2760)
4ed2e33 is described below
commit 4ed2e336a27a5e987878955d033f1f396fa3b184
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Wed Oct 10 16:45:09 2018 -0700
Fix Topic Pattern functionality for Python functions (#2760)
* pulsar client no longer has subscribe_pattern interface
* Added integration tests for topic pattern
* Fixed integration test
---
.../instance/src/main/python/python_instance.py | 5 +-
.../integration/functions/PulsarFunctionsTest.java | 78 +++++++++++++++++++---
.../functions/utils/CommandGenerator.java | 22 ++----
3 files changed, 76 insertions(+), 29 deletions(-)
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index 54a7329..5a1bff5 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -36,6 +36,7 @@ from collections import namedtuple
from threading import Timer
import traceback
import sys
+import re
import pulsar
import contextimpl
@@ -202,8 +203,8 @@ class PythonInstance(object):
self.input_serdes[topic] = serde_kclass()
Log.info("Setting up consumer for topic %s with subname %s" % (topic, subscription_name))
if consumer_conf.isRegexPattern:
- self.consumers[topic] = self.pulsar_client.subscribe_pattern(
- str(topic), subscription_name,
+ self.consumers[topic] = self.pulsar_client.subscribe(
+ re.compile(str(topic)), subscription_name,
consumer_type=mode,
message_listener=partial(self.message_listener, self.input_serdes[topic]),
unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index b8a1dae..0ea5404 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -24,8 +24,11 @@ import static org.testng.Assert.fail;
import com.google.common.base.Stopwatch;
import com.google.gson.Gson;
+
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
@@ -581,22 +584,48 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
@Test(enabled = false)
public void testPythonExclamationFunction() throws Exception {
- testExclamationFunction(Runtime.PYTHON);
+ testExclamationFunction(Runtime.PYTHON, false);
+ }
+
+ @Test(enabled = false)
+ public void testPythonExclamationTopicPatternFunction() throws Exception {
+ testExclamationFunction(Runtime.PYTHON, true);
}
@Test
public void testJavaExclamationFunction() throws Exception {
- testExclamationFunction(Runtime.JAVA);
+ testExclamationFunction(Runtime.JAVA, false);
+ }
+
+ @Test
+ public void testJavaExclamationTopicPatternFunction() throws Exception {
+ testExclamationFunction(Runtime.JAVA, true);
}
- private void testExclamationFunction(Runtime runtime) throws Exception {
+ private void testExclamationFunction(Runtime runtime, boolean isTopicPattern) throws Exception {
if (functionRuntimeType == FunctionRuntimeType.THREAD && runtime == Runtime.PYTHON) {
// python can only run on process mode
return;
}
- String inputTopicName = "test-exclamation-" + runtime + "-input-" + randomName(8);
+ String inputTopicName = "persistent://public/default/test-exclamation-" + runtime + "-input-" + randomName(8);
String outputTopicName = "test-exclamation-" + runtime + "-output-" + randomName(8);
+ if (isTopicPattern) {
+ @Cleanup PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build();
+ @Cleanup Consumer<String> consumer1 = client.newConsumer(Schema.STRING)
+ .topic(inputTopicName + "1")
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscriptionName("test-sub")
+ .subscribe();
+ @Cleanup Consumer<String> consumer2 = client.newConsumer(Schema.STRING)
+ .topic(inputTopicName + "2")
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscriptionName("test-sub")
+ .subscribe();
+ inputTopicName = inputTopicName + ".*";
+ }
String functionName = "test-exclamation-fn-" + randomName(8);
final int numMessages = 10;
@@ -640,7 +669,11 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
String functionClass,
Schema<T> inputTopicSchema) throws Exception {
CommandGenerator generator;
- generator = CommandGenerator.createDefaultGenerator(inputTopicName, functionClass);
+ if (inputTopicName.endsWith(".*")) {
+ generator = CommandGenerator.createTopicPatternGenerator(inputTopicName, functionClass);
+ } else {
+ generator = CommandGenerator.createDefaultGenerator(inputTopicName, functionClass);
+ }
generator.setSinkTopic(outputTopicName);
generator.setFunctionName(functionName);
String command;
@@ -731,17 +764,40 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("test-sub")
.subscribe();
- @Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
- .topic(inputTopic)
- .create();
+ if (inputTopic.endsWith(".*")) {
+ @Cleanup Producer<String> producer1 = client.newProducer(Schema.STRING)
+ .topic(inputTopic.substring(0, inputTopic.length() - 2) + "1")
+ .create();
+ @Cleanup Producer<String> producer2 = client.newProducer(Schema.STRING)
+ .topic(inputTopic.substring(0, inputTopic.length() - 2) + "2")
+ .create();
+
+ for (int i = 0; i < numMessages / 2; i++) {
+ producer1.send("message-" + i);
+ }
+
+ for (int i = numMessages / 2; i < numMessages; i++) {
+ producer2.send("message-" + i);
+ }
+ } else {
+ @Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
+ .topic(inputTopic)
+ .create();
+ for (int i = 0; i < numMessages; i++) {
+ producer.send("message-" + i);
+ }
+ }
+
+ Set<String> expectedMessages = new HashSet<>();
for (int i = 0; i < numMessages; i++) {
- producer.send("message-" + i);
+ expectedMessages.add("message-" + i + "!");
}
for (int i = 0; i < numMessages; i++) {
- Message<String> msg = consumer.receive();
- assertEquals("message-" + i + "!", msg.getValue());
+ Message<String> msg = consumer.receive(10, TimeUnit.SECONDS);
+ assertTrue(expectedMessages.contains(msg.getValue()));
+ expectedMessages.remove(msg.getValue());
}
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
index 6f4d012..64b70a6 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
@@ -39,6 +39,7 @@ public class CommandGenerator {
private String namespace = "default";
private String functionClassName;
private String sourceTopic;
+ private String sourceTopicPattern;
private Map<String, String> customSereSourceTopics;
private String sinkTopic;
private String logTopic;
@@ -64,28 +65,14 @@ public class CommandGenerator {
return generator;
}
- public static CommandGenerator createDefaultGenerator(Map<String, String> customSereSourceTopics,
- String functionClassName) {
+ public static CommandGenerator createTopicPatternGenerator(String sourceTopicPattern, String functionClassName) {
CommandGenerator generator = new CommandGenerator();
- generator.setCustomSereSourceTopics(customSereSourceTopics);
+ generator.setSourceTopicPattern(sourceTopicPattern);
generator.setFunctionClassName(functionClassName);
generator.setRuntime(Runtime.JAVA);
return generator;
}
- public static CommandGenerator createDefaultGenerator(String tenant, String namespace, String functionName) {
- CommandGenerator generator = new CommandGenerator();
- generator.setTenant(tenant);
- generator.setNamespace(namespace);
- generator.setFunctionName(functionName);
- generator.setRuntime(Runtime.JAVA);
- return generator;
- }
-
- public void createAdminUrl(String workerHost, int port) {
- adminUrl = "http://" + workerHost + ":" + port;
- }
-
public String generateCreateFunctionCommand() {
return generateCreateFunctionCommand(null);
}
@@ -110,6 +97,9 @@ public class CommandGenerator {
if (sourceTopic != null) {
commandBuilder.append(" --inputs " + sourceTopic);
}
+ if (sourceTopicPattern != null) {
+ commandBuilder.append(" --topics-pattern " + sourceTopicPattern);
+ }
if (logTopic != null) {
commandBuilder.append(" --logTopic " + logTopic);
}