You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/01/07 22:01:50 UTC
[pulsar] branch master updated: Clean up and correct properties to
producer and consumers created by Functions/Sinks/Sources (#3315)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7b239ec Clean up and correct properties to producer and consumers created by Functions/Sinks/Sources (#3315)
7b239ec is described below
commit 7b239ec838f36ef2d61a817f858d0b4711b6c219
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Mon Jan 7 14:01:46 2019 -0800
Clean up and correct properties to producer and consumers created by Functions/Sinks/Sources (#3315)
* clean up and correct properties to producer and consumers created by functions
* fix test
* cleaning up comment
* refactoring
---
.../pulsar/functions/instance/ContextImpl.java | 13 +++++++++++-
.../pulsar/functions/instance/InstanceUtils.java | 22 ++++++++++++++++++++
.../functions/instance/JavaInstanceRunnable.java | 20 ++++++++++++------
.../apache/pulsar/functions/sink/PulsarSink.java | 19 ++++++++---------
.../pulsar/functions/source/PulsarSource.java | 13 +++++-------
.../instance/src/main/python/contextimpl.py | 7 ++++++-
.../instance/src/main/python/python_instance.py | 24 ++++++++++++++++++----
pulsar-functions/instance/src/main/python/util.py | 6 ++++++
.../pulsar/functions/sink/PulsarSinkTest.java | 18 ++++++++--------
.../pulsar/functions/source/PulsarSourceTest.java | 11 +++++-----
10 files changed, 110 insertions(+), 43 deletions(-)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 460ba75..97956b8 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.functions.instance.stats.SourceStatsManager;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.source.TopicSchema;
+import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
@@ -95,6 +96,7 @@ class ContextImpl implements Context, SinkContext, SourceContext {
userMetricsLabelNames = Arrays.copyOf(ComponentStatsManager.metricsLabelNames, ComponentStatsManager.metricsLabelNames.length + 1);
userMetricsLabelNames[ComponentStatsManager.metricsLabelNames.length] = "metric";
}
+ private final Utils.ComponentType componentType;
public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, List<String> inputTopics,
SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels,
@@ -148,6 +150,7 @@ class ContextImpl implements Context, SinkContext, SourceContext {
.quantile(0.99, 0.01)
.quantile(0.999, 0.01)
.register(collectorRegistry);
+ this.componentType = componentType;
}
public void setCurrentMessageContext(Record<?> record) {
@@ -307,7 +310,15 @@ class ContextImpl implements Context, SinkContext, SourceContext {
if (producer == null) {
try {
Producer<O> newProducer = ((ProducerBuilderImpl<O>) producerBuilder.clone())
- .schema(schema).topic(topicName).create();
+ .schema(schema)
+ .topic(topicName)
+ .properties(InstanceUtils.getProperties(componentType,
+ FunctionDetailsUtils.getFullyQualifiedName(
+ this.config.getFunctionDetails().getTenant(),
+ this.config.getFunctionDetails().getNamespace(),
+ this.config.getFunctionDetails().getName()),
+ this.config.getInstanceId()))
+ .create();
Producer<O> existingProducer = (Producer<O>) publishProducers.putIfAbsent(topicName, newProducer);
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index 9e32736..88a9df3 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -36,6 +36,9 @@ import org.apache.pulsar.functions.utils.Reflections;
import net.jodah.typetools.TypeResolver;
import org.apache.pulsar.functions.utils.Utils;
+import java.util.HashMap;
+import java.util.Map;
+
@UtilityClass
public class InstanceUtils {
public static SerDe<?> initializeSerDe(String serdeClassName, ClassLoader clsLoader, Class<?> typeArg,
@@ -103,4 +106,23 @@ public class InstanceUtils {
}
return SINK;
}
+
+ public static Map<String, String> getProperties(Utils.ComponentType componentType,
+ String fullyQualifiedName, int instanceId) {
+ Map<String, String> properties = new HashMap<>();
+ switch (componentType) {
+ case FUNCTION:
+ properties.put("application", "pulsar-function");
+ break;
+ case SOURCE:
+ properties.put("application", "pulsar-source");
+ break;
+ case SINK:
+ properties.put("application", "pulsar-sink");
+ break;
+ }
+ properties.put("id", fullyQualifiedName);
+ properties.put("instance_id", String.valueOf(instanceId));
+ return properties;
+ }
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 7c36b58..fde9628 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -132,6 +132,10 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private InstanceCache instanceCache;
+ private final Utils.ComponentType componentType;
+
+ private final Map<String, String> properties;
+
public JavaInstanceRunnable(InstanceConfig instanceConfig,
FunctionCacheManager fnCache,
String jarFile,
@@ -156,6 +160,12 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails())
};
+ this.componentType = InstanceUtils.calculateSubjectType(instanceConfig.getFunctionDetails());
+
+ this.properties = InstanceUtils.getProperties(this.componentType,
+ FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()),
+ this.instanceConfig.getInstanceId());
+
// Declare function local collector registry so that it will not clash with other function instances'
// metrics collection especially in threaded mode
// In process mode the JavaInstanceMain will declare a CollectorRegistry and pass it down
@@ -205,7 +215,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
Logger instanceLog = LoggerFactory.getLogger(
"function-" + instanceConfig.getFunctionDetails().getName());
return new ContextImpl(instanceConfig, instanceLog, client, inputTopics, secretsProvider,
- collectorRegistry, metricsLabels, InstanceUtils.calculateSubjectType(this.instanceConfig.getFunctionDetails()));
+ collectorRegistry, metricsLabels, this.componentType);
}
/**
@@ -221,7 +231,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
this.stats = ComponentStatsManager.getStatsManager(this.collectorRegistry, this.metricsLabels,
this.instanceCache.getScheduledExecutorService(),
- InstanceUtils.calculateSubjectType(this.instanceConfig.getFunctionDetails()));
+ this.componentType);
ContextImpl contextImpl = setupContext();
javaInstance = setupJavaInstance(contextImpl);
@@ -648,8 +658,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
pulsarSourceConfig.setMaxMessageRetries(this.instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
pulsarSourceConfig.setDeadLetterTopic(this.instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic());
}
- object = new PulsarSource(this.client, pulsarSourceConfig,
- FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
+ object = new PulsarSource(this.client, pulsarSourceConfig, this.properties);
} else {
object = Reflections.createInstance(
sourceSpec.getClassName(),
@@ -695,8 +704,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName());
- object = new PulsarSink(this.client, pulsarSinkConfig,
- FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
+ object = new PulsarSink(this.client, pulsarSinkConfig, this.properties);
}
} else {
object = Reflections.createInstance(
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 1c61b45..3195a82 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -32,11 +32,13 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.FunctionResultRouter;
+import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.instance.SinkRecord;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
@@ -54,12 +56,12 @@ public class PulsarSink<T> implements Sink<T> {
private final PulsarClient client;
private final PulsarSinkConfig pulsarSinkConfig;
+ private final Map<String, String> properties;
@VisibleForTesting
PulsarSinkProcessor<T> pulsarSinkProcessor;
private final TopicSchema topicSchema;
- private final String fqfn;
private interface PulsarSinkProcessor<T> {
@@ -78,7 +80,7 @@ public class PulsarSink<T> implements Sink<T> {
this.schema = schema;
}
- public <T> Producer<T> createProducer(PulsarClient client, String topic, String producerName, Schema<T> schema, String fqfn)
+ public <T> Producer<T> createProducer(PulsarClient client, String topic, String producerName, Schema<T> schema)
throws PulsarClientException {
ProducerBuilder<T> builder = client.newProducer(schema)
.blockIfQueueFull(true)
@@ -96,9 +98,7 @@ public class PulsarSink<T> implements Sink<T> {
builder.producerName(producerName);
}
- return builder
- .property("application", "pulsarfunction")
- .property("fqfn", fqfn).create();
+ return builder.properties(properties).create();
}
protected Producer<T> getProducer(String destinationTopic) {
@@ -112,8 +112,7 @@ public class PulsarSink<T> implements Sink<T> {
client,
topicName,
producerName,
- schema,
- fqfn);
+ schema);
} catch (PulsarClientException e) {
log.error("Failed to create Producer while doing user publish", e);
throw new RuntimeException(e);
@@ -143,7 +142,7 @@ public class PulsarSink<T> implements Sink<T> {
// initialize default topic
try {
publishProducers.put(pulsarSinkConfig.getTopic(),
- createProducer(client, pulsarSinkConfig.getTopic(), null, schema, fqfn));
+ createProducer(client, pulsarSinkConfig.getTopic(), null, schema));
} catch (PulsarClientException e) {
log.error("Failed to create Producer while doing user publish", e);
throw new RuntimeException(e); }
@@ -209,11 +208,11 @@ public class PulsarSink<T> implements Sink<T> {
}
}
- public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, String fqfn) {
+ public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, Map<String, String> properties) {
this.client = client;
this.pulsarSinkConfig = pulsarSinkConfig;
this.topicSchema = new TopicSchema(client);
- this.fqfn = fqfn;
+ this.properties = properties;
}
@Override
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index ff41dc8..869c706 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -36,7 +36,9 @@ import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
@@ -45,17 +47,16 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
private final PulsarClient pulsarClient;
private final PulsarSourceConfig pulsarSourceConfig;
+ private final Map<String, String> properties;
private List<String> inputTopics;
private List<Consumer<T>> inputConsumers;
private final TopicSchema topicSchema;
- private final String fqfn;
- public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig,
- String fqfn) {
+ public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig, Map<String, String> properties) {
this.pulsarClient = pulsarClient;
this.pulsarSourceConfig = pulsarConfig;
this.topicSchema = new TopicSchema(pulsarClient);
- this.fqfn = fqfn;
+ this.properties = properties;
}
@Override
@@ -64,10 +65,6 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
log.info("Opening pulsar source with config: {}", pulsarSourceConfig);
Map<String, ConsumerConfig<T>> configs = setupConsumerConfigs();
- Map<String, String> properties = new HashMap<>();
- properties.put("application", "pulsarfunction");
- properties.put("fqfn", fqfn);
-
inputConsumers = configs.entrySet().stream().map(e -> {
String topic = e.getKey();
ConsumerConfig<T> conf = e.getValue();
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py
index 63332d0..6b56163 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -147,7 +147,12 @@ class ContextImpl(pulsar.Context):
batching_enabled=True,
batching_max_publish_delay_ms=1,
max_pending_messages=100000,
- compression_type=pulsar_compression_type
+ compression_type=pulsar_compression_type,
+ properties=util.get_properties(util.getFullyQualifiedFunctionName(
+ self.instance_config.function_details.tenant,
+ self.instance_config.function_details.namespace,
+ self.instance_config.function_details.name),
+ self.instance_config.instance_id)
)
if serde_class_name not in self.publish_serializers:
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index d4c3da5..cfd9eae 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -122,6 +122,13 @@ class PythonInstance(object):
subscription_name = str(self.instance_config.function_details.tenant) + "/" + \
str(self.instance_config.function_details.namespace) + "/" + \
str(self.instance_config.function_details.name)
+
+ properties = util.get_properties(util.getFullyQualifiedFunctionName(
+ self.instance_config.function_details.tenant,
+ self.instance_config.function_details.namespace,
+ self.instance_config.function_details.name),
+ self.instance_config.instance_id)
+
for topic, serde in self.instance_config.function_details.source.topicsToSerDeClassName.items():
if not serde:
serde_kclass = util.import_class(os.path.dirname(self.user_code), DEFAULT_SERIALIZER)
@@ -133,7 +140,8 @@ class PythonInstance(object):
str(topic), subscription_name,
consumer_type=mode,
message_listener=partial(self.message_listener, self.input_serdes[topic]),
- unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None
+ unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None,
+ properties=properties
)
for topic, consumer_conf in self.instance_config.function_details.source.inputSpecs.items():
@@ -148,14 +156,16 @@ class PythonInstance(object):
re.compile(str(topic)), subscription_name,
consumer_type=mode,
message_listener=partial(self.message_listener, self.input_serdes[topic]),
- unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None
+ unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None,
+ properties=properties
)
else:
self.consumers[topic] = self.pulsar_client.subscribe(
str(topic), subscription_name,
consumer_type=mode,
message_listener=partial(self.message_listener, self.input_serdes[topic]),
- unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None
+ unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None,
+ properties=properties
)
function_kclass = util.import_class(os.path.dirname(self.user_code), self.instance_config.function_details.className)
@@ -271,7 +281,13 @@ class PythonInstance(object):
# set send timeout to be infinity to prevent potential deadlock with consumer
# that might happen when consumer is blocked due to unacked messages
send_timeout_millis=0,
- max_pending_messages=100000)
+ max_pending_messages=100000,
+ properties=util.get_properties(util.getFullyQualifiedFunctionName(
+ self.instance_config.function_details.tenant,
+ self.instance_config.function_details.namespace,
+ self.instance_config.function_details.name),
+ self.instance_config.instance_id)
+ )
def message_listener(self, serde, consumer, message):
# increment number of received records from source
diff --git a/pulsar-functions/instance/src/main/python/util.py b/pulsar-functions/instance/src/main/python/util.py
index 76f75bd..0978f39 100644
--- a/pulsar-functions/instance/src/main/python/util.py
+++ b/pulsar-functions/instance/src/main/python/util.py
@@ -68,6 +68,12 @@ def import_class_from_path(from_path, full_class_name):
def getFullyQualifiedFunctionName(tenant, namespace, name):
return "%s/%s/%s" % (tenant, namespace, name)
+def getFullyQualifiedInstanceId(tenant, namespace, name, instance_id):
+ return "%s/%s/%s:%s" % (tenant, namespace, name, instance_id)
+
+def get_properties(fullyQualifiedName, instanceId):
+ return {"application": "pulsar-function", "id": str(fullyQualifiedName), "instance_id": str(instanceId)}
+
class FixedTimer():
def __init__(self, t, hFunction):
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index fb3d2c2..53cfeb6 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.instance.SinkRecord;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.io.core.SinkContext;
import org.mockito.ArgumentMatcher;
import org.testng.Assert;
@@ -100,6 +101,7 @@ public class PulsarSinkTest {
doReturn(producerBuilder).when(producerBuilder).topic(anyString());
doReturn(producerBuilder).when(producerBuilder).producerName(anyString());
doReturn(producerBuilder).when(producerBuilder).property(anyString(), anyString());
+ doReturn(producerBuilder).when(producerBuilder).properties(any());
doReturn(producerBuilder).when(producerBuilder).sendTimeout(anyInt(), any());
CompletableFuture completableFuture = new CompletableFuture<>();
@@ -158,7 +160,7 @@ public class PulsarSinkTest {
PulsarSinkConfig pulsarConfig = getPulsarConfigs();
// set type to void
pulsarConfig.setTypeClassName(Void.class.getName());
- PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test");
+ PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>());
try {
Schema schema = pulsarSink.initializeSchema();
@@ -176,7 +178,7 @@ public class PulsarSinkTest {
// set type to be inconsistent to that of SerDe
pulsarConfig.setTypeClassName(Integer.class.getName());
pulsarConfig.setSerdeClassName(TestSerDe.class.getName());
- PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test");
+ PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>());
try {
pulsarSink.initializeSchema();
fail("Should fail constructing java instance if function type is inconsistent with serde type");
@@ -198,7 +200,7 @@ public class PulsarSinkTest {
PulsarSinkConfig pulsarConfig = getPulsarConfigs();
// set type to void
pulsarConfig.setTypeClassName(String.class.getName());
- PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test");
+ PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>());
try {
pulsarSink.initializeSchema();
@@ -217,7 +219,7 @@ public class PulsarSinkTest {
// set type to void
pulsarConfig.setTypeClassName(String.class.getName());
pulsarConfig.setSerdeClassName(TopicSchema.DEFAULT_SERDE);
- PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test");
+ PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>());
try {
pulsarSink.initializeSchema();
@@ -233,7 +235,7 @@ public class PulsarSinkTest {
// set type to void
pulsarConfig.setTypeClassName(ComplexUserDefinedType.class.getName());
pulsarConfig.setSerdeClassName(ComplexSerDe.class.getName());
- PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test");
+ PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>());
try {
pulsarSink.initializeSchema();
@@ -257,7 +259,7 @@ public class PulsarSinkTest {
/** test At-least-once **/
pulsarClient = getPulsarClient();
pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
- PulsarSink pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, "test");
+ PulsarSink pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>());
pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
@@ -314,7 +316,7 @@ public class PulsarSinkTest {
/** test At-most-once **/
pulsarClient = getPulsarClient();
pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATMOST_ONCE);
- pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, "test");
+ pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>());
pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
@@ -371,7 +373,7 @@ public class PulsarSinkTest {
/** test Effectively-once **/
pulsarClient = getPulsarClient();
pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
- pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, "test");
+ pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>());
pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index 8e59e00..88c9637 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -46,6 +46,7 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.io.core.SourceContext;
import org.testng.annotations.Test;
@@ -125,7 +126,7 @@ public class PulsarSourceTest {
PulsarSourceConfig pulsarConfig = getPulsarConfigs();
// set type to void
pulsarConfig.setTypeClassName(Void.class.getName());
- PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test");
+ PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>());
try {
pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
@@ -151,7 +152,7 @@ public class PulsarSourceTest {
topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result",
ConsumerConfig.builder().serdeClassName(TestSerDe.class.getName()).build());
pulsarConfig.setTopicSchema(topicSerdeClassNameMap);
- PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test");
+ PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>());
try {
pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
fail("Should fail constructing java instance if function type is inconsistent with serde type");
@@ -176,7 +177,7 @@ public class PulsarSourceTest {
consumerConfigs.put("persistent://sample/standalone/ns1/test_result",
ConsumerConfig.builder().serdeClassName(TopicSchema.DEFAULT_SERDE).build());
pulsarConfig.setTopicSchema(consumerConfigs);
- PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test");
+ PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>());
pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
}
@@ -192,7 +193,7 @@ public class PulsarSourceTest {
consumerConfigs.put("persistent://sample/standalone/ns1/test_result",
ConsumerConfig.builder().serdeClassName(TopicSchema.DEFAULT_SERDE).build());
pulsarConfig.setTopicSchema(consumerConfigs);
- PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test");
+ PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>());
pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
}
@@ -205,7 +206,7 @@ public class PulsarSourceTest {
consumerConfigs.put("persistent://sample/standalone/ns1/test_result",
ConsumerConfig.builder().serdeClassName(ComplexSerDe.class.getName()).build());
pulsarConfig.setTopicSchema(consumerConfigs);
- PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test");
+ PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>());
pulsarSource.setupConsumerConfigs();
}