You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/05/14 22:37:41 UTC
[incubator-pulsar] branch branch-2.0 updated: Functions API
compatibility patch for 2.0 (#1777)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 314f3fb Functions API compatibility patch for 2.0 (#1777)
314f3fb is described below
commit 314f3fbb5103c6fa052aec84e8006ad2d708e500
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Mon May 14 15:37:39 2018 -0700
Functions API compatibility patch for 2.0 (#1777)
* Functions API compatibility patch for 2.0
* remove comment
* adding missing header
---
.../org/apache/pulsar/admin/cli/CmdFunctions.java | 130 ++++++++++++---------
.../org/apache/pulsar/functions/api/Context.java | 6 +-
.../pulsar/functions/instance/ContextImpl.java | 20 +++-
.../functions/instance/JavaInstanceRunnable.java | 26 ++---
.../instance/processors/MessageProcessorBase.java | 9 +-
.../instance/src/main/python/Function_pb2.py | 125 +++++++-------------
.../instance/src/main/python/contextimpl.py | 10 +-
.../src/main/python/python_instance_main.py | 4 +-
.../functions/api/examples/PublishFunction.java | 2 +-
.../functions/api/examples/UserConfigFunction.java | 4 +-
.../proto/src/main/proto/Function.proto | 4 +-
.../pulsar/functions/runtime/JavaInstanceMain.java | 10 +-
.../pulsar/functions/runtime/ProcessRuntime.java | 4 +-
.../pulsar/functions/utils/FunctionConfig.java | 2 +-
14 files changed, 174 insertions(+), 182 deletions(-)
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index df15fc9..dd1da6f 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -18,28 +18,19 @@
*/
package org.apache.pulsar.admin.cli;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.Objects.isNull;
-import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.lang.reflect.Type;
-import java.net.MalformedURLException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.stream.IntStream;
-
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.beust.jcommander.converters.StringConverter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonParser;
+import com.google.gson.reflect.TypeToken;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import net.jodah.typetools.TypeResolver;
import org.apache.bookkeeper.api.StorageClient;
import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.api.kv.result.KeyValue;
@@ -52,9 +43,6 @@ import org.apache.pulsar.client.admin.internal.FunctionsImpl;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.functions.sink.PulsarSink;
-import org.apache.pulsar.functions.source.PulsarSource;
-import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.instance.InstanceConfig;
@@ -63,27 +51,31 @@ import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBuf;
import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBufUtil;
import org.apache.pulsar.functions.shaded.io.netty.buffer.Unpooled;
+import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees;
import org.apache.pulsar.functions.shaded.proto.Function.SinkSpec;
import org.apache.pulsar.functions.shaded.proto.Function.SourceSpec;
-import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.shaded.proto.Function.SubscriptionType;
-import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.Utils;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
-import com.beust.jcommander.converters.StringConverter;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonParser;
-import com.google.gson.reflect.TypeToken;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.net.MalformedURLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.isNull;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
@Slf4j
@Parameters(commandDescription = "Interface for managing Pulsar Functions (lightweight, Lambda-style compute processes that work with Pulsar)")
@@ -285,7 +277,7 @@ public class CmdFunctions extends CmdBase {
}
if (null != userConfigString) {
Type type = new TypeToken<Map<String, String>>(){}.getType();
- Map<String, String> userConfigMap = new Gson().fromJson(userConfigString, type);
+ Map<String, Object> userConfigMap = new Gson().fromJson(userConfigString, type);
functionConfig.setUserConfig(userConfigMap);
}
if (null != jarFile) {
@@ -330,13 +322,7 @@ public class CmdFunctions extends CmdBase {
inferMissingArguments(functionConfig);
}
- private void doJavaSubmitChecks(FunctionConfig functionConfig) {
- if (isNull(functionConfig.getClassName())) {
- throw new IllegalArgumentException("You supplied a jar file but no main class");
- }
-
- File file = new File(jarFile);
- // check if the function class exists in Jar and it implements Function class
+ private void assertClassExistsInJar(File file) {
if (!Reflections.classExistsInJar(file, functionConfig.getClassName())) {
throw new IllegalArgumentException(String.format("Pulsar function class %s does not exist in jar %s",
functionConfig.getClassName(), jarFile));
@@ -345,16 +331,14 @@ public class CmdFunctions extends CmdBase {
throw new IllegalArgumentException(String.format("The Pulsar function class %s in jar %s implements neither org.apache.pulsar.functions.api.Function nor java.util.function.Function",
functionConfig.getClassName(), jarFile));
}
+ }
- ClassLoader userJarLoader;
- try {
- userJarLoader = Reflections.loadJar(file);
- } catch (MalformedURLException e) {
- throw new RuntimeException("Failed to load user jar " + file, e);
- }
+ private Class<?>[] getFunctionTypes(File file, FunctionConfig functionConfig) {
+ assertClassExistsInJar(file);
Object userClass = Reflections.createInstance(functionConfig.getClassName(), file);
Class<?>[] typeArgs;
+
if (userClass instanceof Function) {
Function pulsarFunction = (Function) userClass;
if (pulsarFunction == null) {
@@ -370,6 +354,22 @@ public class CmdFunctions extends CmdBase {
}
typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass());
}
+ return typeArgs;
+ }
+
+ private void doJavaSubmitChecks(FunctionConfig functionConfig) {
+ if (isNull(functionConfig.getClassName())) {
+ throw new IllegalArgumentException("You supplied a jar file but no main class");
+ }
+
+ File file = new File(jarFile);
+ ClassLoader userJarLoader;
+ try {
+ userJarLoader = Reflections.loadJar(file);
+ } catch (MalformedURLException e) {
+ throw new RuntimeException("Failed to load user jar " + file, e);
+ }
+ Class<?>[] typeArgs = getFunctionTypes(file, functionConfig);
// Check if the Input serialization/deserialization class exists in jar or already loaded and that it
// implements SerDe class
@@ -538,34 +538,47 @@ public class CmdFunctions extends CmdBase {
protected FunctionDetails convert(FunctionConfig functionConfig)
throws IOException {
+
+ Class<?>[] typeArgs = null;
+ if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
+
+ File file = new File(jarFile);
+ try {
+ Reflections.loadJar(file);
+ } catch (MalformedURLException e) {
+ throw new RuntimeException("Failed to load user jar " + file, e);
+ }
+ typeArgs = getFunctionTypes(file, functionConfig);
+ }
+
FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
// Setup source
Map<String, String> topicToSerDeClassNameMap = new HashMap<>();
topicToSerDeClassNameMap.putAll(functionConfig.getCustomSerdeInputs());
SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
- if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
- sourceSpecBuilder.setClassName(PulsarSource.class.getName());
- }
functionConfig.getInputs().forEach(v -> topicToSerDeClassNameMap.put(v, ""));
sourceSpecBuilder.putAllTopicsToSerDeClassName(topicToSerDeClassNameMap);
if (functionConfig.getSubscriptionType() != null) {
sourceSpecBuilder
.setSubscriptionType(convertSubscriptionType(functionConfig.getSubscriptionType()));
}
+ if (typeArgs != null) {
+ sourceSpecBuilder.setTypeClassName(typeArgs[0].getName());
+ }
functionDetailsBuilder.setSource(sourceSpecBuilder);
// Setup sink
SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
- if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
- sinkSpecBuilder.setClassName(PulsarSink.class.getName());
- }
if (functionConfig.getOutput() != null) {
sinkSpecBuilder.setTopic(functionConfig.getOutput());
}
if (functionConfig.getOutputSerdeClassName() != null) {
sinkSpecBuilder.setSerDeClassName(functionConfig.getOutputSerdeClassName());
}
+ if (typeArgs != null) {
+ sinkSpecBuilder.setTypeClassName(typeArgs[1].getName());
+ }
functionDetailsBuilder.setSink(sinkSpecBuilder);
if (functionConfig.getTenant() != null) {
@@ -587,7 +600,7 @@ public class CmdFunctions extends CmdBase {
functionDetailsBuilder.setRuntime(convertRuntime(functionConfig.getRuntime()));
}
if (!functionConfig.getUserConfig().isEmpty()) {
- functionDetailsBuilder.putAllUserConfig(functionConfig.getUserConfig());
+ functionDetailsBuilder.setUserConfig(new Gson().toJson(functionConfig.getUserConfig()));
}
if (functionConfig.getProcessingGuarantees() != null) {
functionDetailsBuilder.setProcessingGuarantees(
@@ -595,6 +608,7 @@ public class CmdFunctions extends CmdBase {
}
functionDetailsBuilder.setAutoAck(functionConfig.isAutoAck());
functionDetailsBuilder.setParallelism(functionConfig.getParallelism());
+
return functionDetailsBuilder.build();
}
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index 01d8291..7af6144 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -117,14 +117,14 @@ public interface Context {
* Get a map of all user-defined key/value configs for the function
* @return The full map of user-defined config values
*/
- Map<String, String> getUserConfigMap();
+ Map<String, Object> getUserConfigMap();
/**
* Get any user-defined key/value
* @param key The key
* @return The Optional value specified by the user for that key.
*/
- Optional<String> getUserConfigValue(String key);
+ Optional<Object> getUserConfigValue(String key);
/**
* Get any user-defined key/value or a default value if none is present
@@ -132,7 +132,7 @@ public interface Context {
* @param defaultValue
* @return Either the user config value associated with a given key or a supplied default value
*/
- String getUserConfigValueOrDefault(String key, String defaultValue);
+ Object getUserConfigValueOrDefault(String key, String defaultValue);
/**
* Record a user defined metric
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 563e8e3..7b76df8 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -18,8 +18,11 @@
*/
package org.apache.pulsar.functions.instance;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
import lombok.Getter;
import lombok.Setter;
+import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
@@ -93,6 +96,7 @@ class ContextImpl implements Context {
@Getter
@Setter
private StateContextImpl stateContext;
+ private Map<String, Object> userConfigs;
public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
ClassLoader classLoader, Consumer inputConsumer) {
@@ -109,6 +113,8 @@ class ContextImpl implements Context {
producerConfiguration.setBatchingEnabled(true);
producerConfiguration.setBatchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
producerConfiguration.setMaxPendingMessages(1000000);
+ userConfigs = new Gson().fromJson(config.getFunctionDetails().getUserConfig(),
+ new TypeToken<Map<String, Object>>(){}.getType());
}
public void setCurrentMessageContext(MessageId messageId, String topicName) {
@@ -181,18 +187,18 @@ class ContextImpl implements Context {
}
@Override
- public Optional<String> getUserConfigValue(String key) {
- return Optional.ofNullable(config.getFunctionDetails().getUserConfigOrDefault(key, null));
+ public Optional<Object> getUserConfigValue(String key) {
+ return Optional.ofNullable(userConfigs.getOrDefault(key, null));
}
@Override
- public String getUserConfigValueOrDefault(String key, String defaultValue) {
+ public Object getUserConfigValueOrDefault(String key, String defaultValue) {
return getUserConfigValue(key).orElse(defaultValue);
}
@Override
- public Map<String, String> getUserConfigMap() {
- return config.getFunctionDetails().getUserConfigMap();
+ public Map<String, Object> getUserConfigMap() {
+ return userConfigs;
}
@Override
@@ -221,6 +227,10 @@ class ContextImpl implements Context {
}
}
+ if (StringUtils.isEmpty(serDeClassName)) {
+ serDeClassName = DefaultSerDe.class.getName();
+ }
+
if (!publishSerializers.containsKey(serDeClassName)) {
SerDe serDe;
if (serDeClassName.equals(DefaultSerDe.class.getName())) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index ca6414d..8a4c7af 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -19,19 +19,7 @@
package org.apache.pulsar.functions.instance;
-import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
-
import io.netty.buffer.ByteBuf;
-
-import java.util.Arrays;
-import java.util.Base64;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
-
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -55,15 +43,23 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.connect.core.Record;
import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.instance.processors.MessageProcessor;
+import org.apache.pulsar.functions.instance.state.StateContextImpl;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.source.PulsarRecord;
-import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
-import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.instance.state.StateContextImpl;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
+
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
/**
* A function container implemented using java thread.
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
index 33b699a..ad963f9 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
@@ -65,7 +65,8 @@ abstract class MessageProcessorBase implements MessageProcessor {
org.apache.pulsar.functions.proto.Function.SourceSpec sourceSpec = this.functionDetails.getSource();
Object object;
- if (sourceSpec.getClassName().equals(PulsarSource.class.getName())) {
+ // If source classname is not set, we default pulsar source
+ if (sourceSpec.getClassName().isEmpty()) {
PulsarConfig pulsarConfig = new PulsarConfig();
pulsarConfig.setTopicSerdeClassNameMap(this.functionDetails.getSource().getTopicsToSerDeClassNameMap());
@@ -80,7 +81,7 @@ abstract class MessageProcessorBase implements MessageProcessor {
Class[] paramTypes = {PulsarClient.class, PulsarConfig.class};
object = Reflections.createInstance(
- sourceSpec.getClassName(),
+ PulsarSource.class.getName(),
PulsarSource.class.getClassLoader(), params, paramTypes);
} else {
@@ -147,7 +148,9 @@ abstract class MessageProcessorBase implements MessageProcessor {
public void close() {
try {
- this.source.close();
+ if (this.source != null) {
+ this.source.close();
+ }
} catch (Exception e) {
log.warn("Failed to close source {}", this.source, e);
}
diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py b/pulsar-functions/instance/src/main/python/Function_pb2.py
index d1a2496..cda7229 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -39,7 +39,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
name='Function.proto',
package='proto',
syntax='proto3',
- serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"\xcb\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\x10\n\x08logTopic\x18\x05 \x01(\t\x12\x39\n\x14processingGuarantees\x18\x06 \x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12:\n\nuserConfig\x18\x07 \x03(\x0b\x32&.proto.FunctionDetails.UserConfigEntry\x12/\n\x07runtime\x18\x08 \x01(\x0e\x32\x1e.proto.Fu [...]
+ serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"\xf0\x02\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\x10\n\x08logTopic\x18\x05 \x01(\t\x12\x39\n\x14processingGuarantees\x18\x06 \x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12\x12\n\nuserConfig\x18\x07 \x01(\t\x12/\n\x07runtime\x18\x08 \x01(\x0e\x32\x1e.proto.FunctionDetails.Runtime\x12\x0f\n\x07\x61uto [...]
)
_PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
@@ -63,8 +63,8 @@ _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
- serialized_start=1180,
- serialized_end=1259,
+ serialized_start=1135,
+ serialized_end=1214,
)
_sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES)
@@ -86,8 +86,8 @@ _SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
- serialized_start=1261,
- serialized_end=1305,
+ serialized_start=1216,
+ serialized_end=1260,
)
_sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE)
@@ -116,49 +116,12 @@ _FUNCTIONDETAILS_RUNTIME = _descriptor.EnumDescriptor(
],
containing_type=None,
options=None,
- serialized_start=454,
- serialized_end=485,
+ serialized_start=363,
+ serialized_end=394,
)
_sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_RUNTIME)
-_FUNCTIONDETAILS_USERCONFIGENTRY = _descriptor.Descriptor(
- name='UserConfigEntry',
- full_name='proto.FunctionDetails.UserConfigEntry',
- filename=None,
- file=DESCRIPTOR,
- containing_type=None,
- fields=[
- _descriptor.FieldDescriptor(
- name='key', full_name='proto.FunctionDetails.UserConfigEntry.key', index=0,
- number=1, type=9, cpp_type=9, label=1,
- has_default_value=False, default_value=_b("").decode('utf-8'),
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
- _descriptor.FieldDescriptor(
- name='value', full_name='proto.FunctionDetails.UserConfigEntry.value', index=1,
- number=2, type=9, cpp_type=9, label=1,
- has_default_value=False, default_value=_b("").decode('utf-8'),
- message_type=None, enum_type=None, containing_type=None,
- is_extension=False, extension_scope=None,
- options=None, file=DESCRIPTOR),
- ],
- extensions=[
- ],
- nested_types=[],
- enum_types=[
- ],
- options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
- is_extendable=False,
- syntax='proto3',
- extension_ranges=[],
- oneofs=[
- ],
- serialized_start=403,
- serialized_end=452,
-)
-
_FUNCTIONDETAILS = _descriptor.Descriptor(
name='FunctionDetails',
full_name='proto.FunctionDetails',
@@ -210,8 +173,8 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='userConfig', full_name='proto.FunctionDetails.userConfig', index=6,
- number=7, type=11, cpp_type=10, label=3,
- has_default_value=False, default_value=[],
+ number=7, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
@@ -253,7 +216,7 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
],
extensions=[
],
- nested_types=[_FUNCTIONDETAILS_USERCONFIGENTRY, ],
+ nested_types=[],
enum_types=[
_FUNCTIONDETAILS_RUNTIME,
],
@@ -264,7 +227,7 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
oneofs=[
],
serialized_start=26,
- serialized_end=485,
+ serialized_end=394,
)
@@ -301,8 +264,8 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=668,
- serialized_end=729,
+ serialized_start=600,
+ serialized_end=661,
)
_SOURCESPEC = _descriptor.Descriptor(
@@ -327,14 +290,21 @@ _SOURCESPEC = _descriptor.Descriptor(
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
- name='subscriptionType', full_name='proto.SourceSpec.subscriptionType', index=2,
+ name='typeClassName', full_name='proto.SourceSpec.typeClassName', index=2,
+ number=5, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='subscriptionType', full_name='proto.SourceSpec.subscriptionType', index=3,
number=3, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
- name='topicsToSerDeClassName', full_name='proto.SourceSpec.topicsToSerDeClassName', index=3,
+ name='topicsToSerDeClassName', full_name='proto.SourceSpec.topicsToSerDeClassName', index=4,
number=4, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
@@ -352,8 +322,8 @@ _SOURCESPEC = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=488,
- serialized_end=729,
+ serialized_start=397,
+ serialized_end=661,
)
@@ -379,15 +349,22 @@ _SINKSPEC = _descriptor.Descriptor(
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
- name='topic', full_name='proto.SinkSpec.topic', index=2,
- number=4, type=9, cpp_type=9, label=1,
+ name='typeClassName', full_name='proto.SinkSpec.typeClassName', index=2,
+ number=5, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
- name='serDeClassName', full_name='proto.SinkSpec.serDeClassName', index=3,
- number=5, type=9, cpp_type=9, label=1,
+ name='topic', full_name='proto.SinkSpec.topic', index=3,
+ number=3, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='serDeClassName', full_name='proto.SinkSpec.serDeClassName', index=4,
+ number=4, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
@@ -404,8 +381,8 @@ _SINKSPEC = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=731,
- serialized_end=816,
+ serialized_start=663,
+ serialized_end=771,
)
@@ -435,8 +412,8 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=818,
- serialized_end=864,
+ serialized_start=773,
+ serialized_end=819,
)
@@ -487,8 +464,8 @@ _FUNCTIONMETADATA = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=867,
- serialized_end=1028,
+ serialized_start=822,
+ serialized_end=983,
)
@@ -525,8 +502,8 @@ _INSTANCE = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1030,
- serialized_end=1111,
+ serialized_start=985,
+ serialized_end=1066,
)
@@ -563,13 +540,11 @@ _ASSIGNMENT = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=1113,
- serialized_end=1178,
+ serialized_start=1068,
+ serialized_end=1133,
)
-_FUNCTIONDETAILS_USERCONFIGENTRY.containing_type = _FUNCTIONDETAILS
_FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = _PROCESSINGGUARANTEES
-_FUNCTIONDETAILS.fields_by_name['userConfig'].message_type = _FUNCTIONDETAILS_USERCONFIGENTRY
_FUNCTIONDETAILS.fields_by_name['runtime'].enum_type = _FUNCTIONDETAILS_RUNTIME
_FUNCTIONDETAILS.fields_by_name['source'].message_type = _SOURCESPEC
_FUNCTIONDETAILS.fields_by_name['sink'].message_type = _SINKSPEC
@@ -593,19 +568,11 @@ DESCRIPTOR.enum_types_by_name['SubscriptionType'] = _SUBSCRIPTIONTYPE
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
FunctionDetails = _reflection.GeneratedProtocolMessageType('FunctionDetails', (_message.Message,), dict(
-
- UserConfigEntry = _reflection.GeneratedProtocolMessageType('UserConfigEntry', (_message.Message,), dict(
- DESCRIPTOR = _FUNCTIONDETAILS_USERCONFIGENTRY,
- __module__ = 'Function_pb2'
- # @@protoc_insertion_point(class_scope:proto.FunctionDetails.UserConfigEntry)
- ))
- ,
DESCRIPTOR = _FUNCTIONDETAILS,
__module__ = 'Function_pb2'
# @@protoc_insertion_point(class_scope:proto.FunctionDetails)
))
_sym_db.RegisterMessage(FunctionDetails)
-_sym_db.RegisterMessage(FunctionDetails.UserConfigEntry)
SourceSpec = _reflection.GeneratedProtocolMessageType('SourceSpec', (_message.Message,), dict(
@@ -660,8 +627,6 @@ _sym_db.RegisterMessage(Assignment)
DESCRIPTOR.has_options = True
DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n!org.apache.pulsar.functions.protoB\010Function'))
-_FUNCTIONDETAILS_USERCONFIGENTRY.has_options = True
-_FUNCTIONDETAILS_USERCONFIGENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
_SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY.has_options = True
_SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
# @@protoc_insertion_point(module_scope)
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py
index a841152..3ef1ebe 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -25,6 +25,7 @@
import time
import os
+import json
import pulsar
import util
@@ -59,6 +60,9 @@ class ContextImpl(pulsar.Context):
self.current_message_id = None
self.current_input_topic_name = None
self.current_start_time = None
+ self.user_config = json.loads(instance_config.function_details.userConfig) \
+ if instance_config.function_details.userConfig \
+ else [];
# Called on a per message basis to set the context for the current message
def set_current_message_context(self, msgid, topic):
@@ -94,13 +98,13 @@ class ContextImpl(pulsar.Context):
return self.log
def get_user_config_value(self, key):
- if key in self.instance_config.function_details.userConfig:
- return str(self.instance_config.function_details.userConfig[key])
+ if key in self.user_config:
+ return self.user_config[key]
else:
return None
def get_user_config_map(self):
- return self.instance_config.function_details.userConfig
+ return self.user_config
def record_metric(self, metric_name, metric_value):
if not metric_name in self.accumulated_metrics:
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index f2763a9..7454132 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -116,9 +116,7 @@ def main():
else:
function_details.autoAck = False
if args.user_config != None and len(args.user_config) != 0:
- user_config = json.loads(args.user_config)
- for (key, value) in user_config.items():
- function_details.userConfig[str(key)] = str(value)
+ function_details.userConfig = args.user_config
pulsar_client = pulsar.Client(args.pulsar_serviceurl)
pyinstance = python_instance.PythonInstance(str(args.instance_id), str(args.function_id),
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
index 01e3852..e97b692 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
@@ -25,7 +25,7 @@ import org.apache.pulsar.functions.api.utils.DefaultSerDe;
public class PublishFunction implements Function<String, Void> {
@Override
public Void process(String input, Context context) {
- String publishTopic = context.getUserConfigValueOrDefault("publish-topic", "persistent://sample/standalone/ns1/publish");
+ String publishTopic = (String) context.getUserConfigValueOrDefault("publish-topic", "persistent://sample/standalone/ns1/publish");
String output = String.format("%s!", input);
context.publish(publishTopic, output, DefaultSerDe.class.getName());
return null;
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java
index 6ff3dd8..fb3ceb0 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java
@@ -28,11 +28,11 @@ public class UserConfigFunction implements Function<String, Void> {
@Override
public Void process(String input, Context context) {
String key = "config-key";
- Optional<String> maybeValue = context.getUserConfigValue(key);
+ Optional<Object> maybeValue = context.getUserConfigValue(key);
Logger LOG = context.getLogger();
if (maybeValue.isPresent()) {
- String value = maybeValue.get();
+ String value = (String) maybeValue.get();
LOG.info("The config value is {}", value);
} else {
LOG.error("No value present for the key {}", key);
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index f4b1fb8..3c1414e 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -45,7 +45,7 @@ message FunctionDetails {
string className = 4;
string logTopic = 5;
ProcessingGuarantees processingGuarantees = 6;
- map<string,string> userConfig = 7;
+ string userConfig = 7;
Runtime runtime = 8;
bool autoAck = 9;
int32 parallelism = 10;
@@ -57,6 +57,7 @@ message SourceSpec {
string className = 1;
// map in json format
string configs = 2;
+ string typeClassName = 5;
// configs used only when source feeds into functions
SubscriptionType subscriptionType = 3;
@@ -67,6 +68,7 @@ message SinkSpec {
string className = 1;
// map in json format
string configs = 2;
+ string typeClassName = 5;
// configs used only when functions output to sink
string topic = 3;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index e0330bb..f19dc26 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -100,7 +100,7 @@ public class JavaInstanceMain {
@Parameter(names = "--auto_ack", description = "Enable Auto Acking?\n")
protected String autoAck = "true";
- @Parameter(names = "--source_classname", description = "The source classname", required = true)
+ @Parameter(names = "--source_classname", description = "The source classname")
protected String sourceClassname;
@Parameter(names = "--source_configs", description = "The source configs")
@@ -149,13 +149,13 @@ public class JavaInstanceMain {
functionDetailsBuilder.setAutoAck(false);
}
if (userConfig != null && !userConfig.isEmpty()) {
- Type type = new TypeToken<Map<String, String>>(){}.getType();
- Map<String, String> userConfigMap = new Gson().fromJson(userConfig, type);
- functionDetailsBuilder.putAllUserConfig(userConfigMap);
+ functionDetailsBuilder.setUserConfig(userConfig);
}
SourceSpec.Builder sourceDetailsBuilder = SourceSpec.newBuilder();
- sourceDetailsBuilder.setClassName(sourceClassname);
+ if (sourceClassname != null) {
+ sourceDetailsBuilder.setClassName(sourceClassname);
+ }
if (sourceConfigs != null && !sourceConfigs.isEmpty()) {;
sourceDetailsBuilder.setConfigs(sourceConfigs);
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 3aae26f..10d04d1 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -134,10 +134,10 @@ class ProcessRuntime implements Runtime {
args.add(pulsarServiceUrl);
args.add("--max_buffered_tuples");
args.add(String.valueOf(instanceConfig.getMaxBufferedTuples()));
- Map<String, String> userConfig = instanceConfig.getFunctionDetails().getUserConfigMap();
+ String userConfig = instanceConfig.getFunctionDetails().getUserConfig();
if (userConfig != null && !userConfig.isEmpty()) {
args.add("--user_config");
- args.add(new Gson().toJson(userConfig));
+ args.add(userConfig);
}
instancePort = findAvailablePort();
args.add("--port");
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index cb7d0a2..04bb771 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -74,7 +74,7 @@ public class FunctionConfig {
private String logTopic;
private ProcessingGuarantees processingGuarantees;
- private Map<String, String> userConfig = new HashMap<>();
+ private Map<String, Object> userConfig = new HashMap<>();
private SubscriptionType subscriptionType;
private Runtime runtime;
private boolean autoAck;
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.