You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/05/14 22:37:41 UTC

[incubator-pulsar] branch branch-2.0 updated: Functions API compatibility patch for 2.0 (#1777)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 314f3fb  Functions API compatibility patch for 2.0 (#1777)
314f3fb is described below

commit 314f3fbb5103c6fa052aec84e8006ad2d708e500
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Mon May 14 15:37:39 2018 -0700

    Functions API compatibility patch for 2.0 (#1777)
    
    * Functions API compatibility patch for 2.0
    
    * remove comment
    
    * adding missing header
---
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  | 130 ++++++++++++---------
 .../org/apache/pulsar/functions/api/Context.java   |   6 +-
 .../pulsar/functions/instance/ContextImpl.java     |  20 +++-
 .../functions/instance/JavaInstanceRunnable.java   |  26 ++---
 .../instance/processors/MessageProcessorBase.java  |   9 +-
 .../instance/src/main/python/Function_pb2.py       | 125 +++++++-------------
 .../instance/src/main/python/contextimpl.py        |  10 +-
 .../src/main/python/python_instance_main.py        |   4 +-
 .../functions/api/examples/PublishFunction.java    |   2 +-
 .../functions/api/examples/UserConfigFunction.java |   4 +-
 .../proto/src/main/proto/Function.proto            |   4 +-
 .../pulsar/functions/runtime/JavaInstanceMain.java |  10 +-
 .../pulsar/functions/runtime/ProcessRuntime.java   |   4 +-
 .../pulsar/functions/utils/FunctionConfig.java     |   2 +-
 14 files changed, 174 insertions(+), 182 deletions(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index df15fc9..dd1da6f 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -18,28 +18,19 @@
  */
 package org.apache.pulsar.admin.cli;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.Objects.isNull;
-import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.lang.reflect.Type;
-import java.net.MalformedURLException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.stream.IntStream;
-
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.beust.jcommander.converters.StringConverter;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonParser;
+import com.google.gson.reflect.TypeToken;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import net.jodah.typetools.TypeResolver;
 import org.apache.bookkeeper.api.StorageClient;
 import org.apache.bookkeeper.api.kv.Table;
 import org.apache.bookkeeper.api.kv.result.KeyValue;
@@ -52,9 +43,6 @@ import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.functions.sink.PulsarSink;
-import org.apache.pulsar.functions.source.PulsarSource;
-import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.instance.InstanceConfig;
@@ -63,27 +51,31 @@ import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBuf;
 import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBufUtil;
 import org.apache.pulsar.functions.shaded.io.netty.buffer.Unpooled;
+import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees;
 import org.apache.pulsar.functions.shaded.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.shaded.proto.Function.SourceSpec;
-import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.shaded.proto.Function.SubscriptionType;
-import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.Utils;
 
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.Parameters;
-import com.beust.jcommander.converters.StringConverter;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonParser;
-import com.google.gson.reflect.TypeToken;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.net.MalformedURLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.isNull;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 
 @Slf4j
 @Parameters(commandDescription = "Interface for managing Pulsar Functions (lightweight, Lambda-style compute processes that work with Pulsar)")
@@ -285,7 +277,7 @@ public class CmdFunctions extends CmdBase {
             }
             if (null != userConfigString) {
                 Type type = new TypeToken<Map<String, String>>(){}.getType();
-                Map<String, String> userConfigMap = new Gson().fromJson(userConfigString, type);
+                Map<String, Object> userConfigMap = new Gson().fromJson(userConfigString, type);
                 functionConfig.setUserConfig(userConfigMap);
             }
             if (null != jarFile) {
@@ -330,13 +322,7 @@ public class CmdFunctions extends CmdBase {
             inferMissingArguments(functionConfig);
         }
 
-        private void doJavaSubmitChecks(FunctionConfig functionConfig) {
-            if (isNull(functionConfig.getClassName())) {
-                throw new IllegalArgumentException("You supplied a jar file but no main class");
-            }
-
-            File file = new File(jarFile);
-            // check if the function class exists in Jar and it implements Function class
+        private void assertClassExistsInJar(File file) {
             if (!Reflections.classExistsInJar(file, functionConfig.getClassName())) {
                 throw new IllegalArgumentException(String.format("Pulsar function class %s does not exist in jar %s",
                         functionConfig.getClassName(), jarFile));
@@ -345,16 +331,14 @@ public class CmdFunctions extends CmdBase {
                 throw new IllegalArgumentException(String.format("The Pulsar function class %s in jar %s implements neither org.apache.pulsar.functions.api.Function nor java.util.function.Function",
                         functionConfig.getClassName(), jarFile));
             }
+        }
 
-            ClassLoader userJarLoader;
-            try {
-                userJarLoader = Reflections.loadJar(file);
-            } catch (MalformedURLException e) {
-                throw new RuntimeException("Failed to load user jar " + file, e);
-            }
+        private Class<?>[] getFunctionTypes(File file, FunctionConfig functionConfig) {
+            assertClassExistsInJar(file);
 
             Object userClass = Reflections.createInstance(functionConfig.getClassName(), file);
             Class<?>[] typeArgs;
+
             if (userClass instanceof Function) {
                 Function pulsarFunction = (Function) userClass;
                 if (pulsarFunction == null) {
@@ -370,6 +354,22 @@ public class CmdFunctions extends CmdBase {
                 }
                 typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass());
             }
+            return typeArgs;
+        }
+
+        private void doJavaSubmitChecks(FunctionConfig functionConfig) {
+            if (isNull(functionConfig.getClassName())) {
+                throw new IllegalArgumentException("You supplied a jar file but no main class");
+            }
+
+            File file = new File(jarFile);
+            ClassLoader userJarLoader;
+            try {
+                userJarLoader = Reflections.loadJar(file);
+            } catch (MalformedURLException e) {
+                throw new RuntimeException("Failed to load user jar " + file, e);
+            }
+            Class<?>[] typeArgs = getFunctionTypes(file, functionConfig);
 
             // Check if the Input serialization/deserialization class exists in jar or already loaded and that it
             // implements SerDe class
@@ -538,34 +538,47 @@ public class CmdFunctions extends CmdBase {
 
         protected FunctionDetails convert(FunctionConfig functionConfig)
                 throws IOException {
+
+            Class<?>[] typeArgs = null;
+            if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
+
+                File file = new File(jarFile);
+                try {
+                    Reflections.loadJar(file);
+                } catch (MalformedURLException e) {
+                    throw new RuntimeException("Failed to load user jar " + file, e);
+                }
+                typeArgs = getFunctionTypes(file, functionConfig);
+            }
+
             FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
 
             // Setup source
             Map<String, String> topicToSerDeClassNameMap = new HashMap<>();
             topicToSerDeClassNameMap.putAll(functionConfig.getCustomSerdeInputs());
             SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
-            if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
-                sourceSpecBuilder.setClassName(PulsarSource.class.getName());
-            }
             functionConfig.getInputs().forEach(v -> topicToSerDeClassNameMap.put(v, ""));
             sourceSpecBuilder.putAllTopicsToSerDeClassName(topicToSerDeClassNameMap);
             if (functionConfig.getSubscriptionType() != null) {
                 sourceSpecBuilder
                         .setSubscriptionType(convertSubscriptionType(functionConfig.getSubscriptionType()));
             }
+            if (typeArgs != null) {
+                sourceSpecBuilder.setTypeClassName(typeArgs[0].getName());
+            }
             functionDetailsBuilder.setSource(sourceSpecBuilder);
 
             // Setup sink
             SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
-            if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
-                sinkSpecBuilder.setClassName(PulsarSink.class.getName());
-            }
             if (functionConfig.getOutput() != null) {
                 sinkSpecBuilder.setTopic(functionConfig.getOutput());
             }
             if (functionConfig.getOutputSerdeClassName() != null) {
                 sinkSpecBuilder.setSerDeClassName(functionConfig.getOutputSerdeClassName());
             }
+            if (typeArgs != null) {
+                sinkSpecBuilder.setTypeClassName(typeArgs[1].getName());
+            }
             functionDetailsBuilder.setSink(sinkSpecBuilder);
 
             if (functionConfig.getTenant() != null) {
@@ -587,7 +600,7 @@ public class CmdFunctions extends CmdBase {
                 functionDetailsBuilder.setRuntime(convertRuntime(functionConfig.getRuntime()));
             }
             if (!functionConfig.getUserConfig().isEmpty()) {
-                functionDetailsBuilder.putAllUserConfig(functionConfig.getUserConfig());
+                functionDetailsBuilder.setUserConfig(new Gson().toJson(functionConfig.getUserConfig()));
             }
             if (functionConfig.getProcessingGuarantees() != null) {
                 functionDetailsBuilder.setProcessingGuarantees(
@@ -595,6 +608,7 @@ public class CmdFunctions extends CmdBase {
             }
             functionDetailsBuilder.setAutoAck(functionConfig.isAutoAck());
             functionDetailsBuilder.setParallelism(functionConfig.getParallelism());
+
             return functionDetailsBuilder.build();
         }
 
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index 01d8291..7af6144 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -117,14 +117,14 @@ public interface Context {
      * Get a map of all user-defined key/value configs for the function
      * @return The full map of user-defined config values
      */
-    Map<String, String> getUserConfigMap();
+    Map<String, Object> getUserConfigMap();
 
     /**
      * Get any user-defined key/value
      * @param key The key
      * @return The Optional value specified by the user for that key.
      */
-    Optional<String> getUserConfigValue(String key);
+    Optional<Object> getUserConfigValue(String key);
 
     /**
      * Get any user-defined key/value or a default value if none is present
@@ -132,7 +132,7 @@ public interface Context {
      * @param defaultValue
      * @return Either the user config value associated with a given key or a supplied default value
      */
-    String getUserConfigValueOrDefault(String key, String defaultValue);
+    Object getUserConfigValueOrDefault(String key, String defaultValue);
 
     /**
      * Record a user defined metric
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 563e8e3..7b76df8 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -18,8 +18,11 @@
  */
 package org.apache.pulsar.functions.instance;
 
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 import lombok.Getter;
 import lombok.Setter;
+import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
@@ -93,6 +96,7 @@ class ContextImpl implements Context {
     @Getter
     @Setter
     private StateContextImpl stateContext;
+    private Map<String, Object> userConfigs;
 
     public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
                        ClassLoader classLoader, Consumer inputConsumer) {
@@ -109,6 +113,8 @@ class ContextImpl implements Context {
         producerConfiguration.setBatchingEnabled(true);
         producerConfiguration.setBatchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
         producerConfiguration.setMaxPendingMessages(1000000);
+        userConfigs = new Gson().fromJson(config.getFunctionDetails().getUserConfig(),
+                new TypeToken<Map<String, Object>>(){}.getType());
     }
 
     public void setCurrentMessageContext(MessageId messageId, String topicName) {
@@ -181,18 +187,18 @@ class ContextImpl implements Context {
     }
 
     @Override
-    public Optional<String> getUserConfigValue(String key) {
-        return Optional.ofNullable(config.getFunctionDetails().getUserConfigOrDefault(key, null));
+    public Optional<Object> getUserConfigValue(String key) {
+        return Optional.ofNullable(userConfigs.getOrDefault(key, null));
     }
 
     @Override
-    public String getUserConfigValueOrDefault(String key, String defaultValue) {
+    public Object getUserConfigValueOrDefault(String key, String defaultValue) {
         return getUserConfigValue(key).orElse(defaultValue);
     }
 
     @Override
-    public Map<String, String> getUserConfigMap() {
-        return config.getFunctionDetails().getUserConfigMap();
+    public Map<String, Object> getUserConfigMap() {
+        return userConfigs;
     }
 
     @Override
@@ -221,6 +227,10 @@ class ContextImpl implements Context {
             }
         }
 
+        if (StringUtils.isEmpty(serDeClassName)) {
+            serDeClassName = DefaultSerDe.class.getName();
+        }
+
         if (!publishSerializers.containsKey(serDeClassName)) {
             SerDe serDe;
             if (serDeClassName.equals(DefaultSerDe.class.getName())) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index ca6414d..8a4c7af 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -19,19 +19,7 @@
 
 package org.apache.pulsar.functions.instance;
 
-import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
-
 import io.netty.buffer.ByteBuf;
-
-import java.util.Arrays;
-import java.util.Base64;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -55,15 +43,23 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.connect.core.Record;
 import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.instance.processors.MessageProcessor;
+import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.source.PulsarRecord;
-import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
-import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
+
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
 
 /**
  * A function container implemented using java thread.
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
index 33b699a..ad963f9 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
@@ -65,7 +65,8 @@ abstract class MessageProcessorBase implements MessageProcessor {
 
         org.apache.pulsar.functions.proto.Function.SourceSpec sourceSpec = this.functionDetails.getSource();
         Object object;
-        if (sourceSpec.getClassName().equals(PulsarSource.class.getName())) {
+        // If source classname is not set, we default pulsar source
+        if (sourceSpec.getClassName().isEmpty()) {
 
             PulsarConfig pulsarConfig = new PulsarConfig();
             pulsarConfig.setTopicSerdeClassNameMap(this.functionDetails.getSource().getTopicsToSerDeClassNameMap());
@@ -80,7 +81,7 @@ abstract class MessageProcessorBase implements MessageProcessor {
             Class[] paramTypes = {PulsarClient.class, PulsarConfig.class};
 
             object = Reflections.createInstance(
-                    sourceSpec.getClassName(),
+                    PulsarSource.class.getName(),
                     PulsarSource.class.getClassLoader(), params, paramTypes);
 
         } else {
@@ -147,7 +148,9 @@ abstract class MessageProcessorBase implements MessageProcessor {
     public void close() {
 
         try {
-            this.source.close();
+            if (this.source != null) {
+                this.source.close();
+            }
         } catch (Exception e) {
             log.warn("Failed to close source {}", this.source, e);
         }
diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py b/pulsar-functions/instance/src/main/python/Function_pb2.py
index d1a2496..cda7229 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -39,7 +39,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
   name='Function.proto',
   package='proto',
   syntax='proto3',
-  serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"\xcb\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\x10\n\x08logTopic\x18\x05 \x01(\t\x12\x39\n\x14processingGuarantees\x18\x06 \x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12:\n\nuserConfig\x18\x07 \x03(\x0b\x32&.proto.FunctionDetails.UserConfigEntry\x12/\n\x07runtime\x18\x08 \x01(\x0e\x32\x1e.proto.Fu [...]
+  serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"\xf0\x02\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x12\x10\n\x08logTopic\x18\x05 \x01(\t\x12\x39\n\x14processingGuarantees\x18\x06 \x01(\x0e\x32\x1b.proto.ProcessingGuarantees\x12\x12\n\nuserConfig\x18\x07 \x01(\t\x12/\n\x07runtime\x18\x08 \x01(\x0e\x32\x1e.proto.FunctionDetails.Runtime\x12\x0f\n\x07\x61uto [...]
 )
 
 _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
@@ -63,8 +63,8 @@ _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=1180,
-  serialized_end=1259,
+  serialized_start=1135,
+  serialized_end=1214,
 )
 _sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES)
 
@@ -86,8 +86,8 @@ _SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=1261,
-  serialized_end=1305,
+  serialized_start=1216,
+  serialized_end=1260,
 )
 _sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE)
 
@@ -116,49 +116,12 @@ _FUNCTIONDETAILS_RUNTIME = _descriptor.EnumDescriptor(
   ],
   containing_type=None,
   options=None,
-  serialized_start=454,
-  serialized_end=485,
+  serialized_start=363,
+  serialized_end=394,
 )
 _sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_RUNTIME)
 
 
-_FUNCTIONDETAILS_USERCONFIGENTRY = _descriptor.Descriptor(
-  name='UserConfigEntry',
-  full_name='proto.FunctionDetails.UserConfigEntry',
-  filename=None,
-  file=DESCRIPTOR,
-  containing_type=None,
-  fields=[
-    _descriptor.FieldDescriptor(
-      name='key', full_name='proto.FunctionDetails.UserConfigEntry.key', index=0,
-      number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
-    _descriptor.FieldDescriptor(
-      name='value', full_name='proto.FunctionDetails.UserConfigEntry.value', index=1,
-      number=2, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
-  ],
-  extensions=[
-  ],
-  nested_types=[],
-  enum_types=[
-  ],
-  options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
-  is_extendable=False,
-  syntax='proto3',
-  extension_ranges=[],
-  oneofs=[
-  ],
-  serialized_start=403,
-  serialized_end=452,
-)
-
 _FUNCTIONDETAILS = _descriptor.Descriptor(
   name='FunctionDetails',
   full_name='proto.FunctionDetails',
@@ -210,8 +173,8 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='userConfig', full_name='proto.FunctionDetails.userConfig', index=6,
-      number=7, type=11, cpp_type=10, label=3,
-      has_default_value=False, default_value=[],
+      number=7, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
@@ -253,7 +216,7 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
   ],
   extensions=[
   ],
-  nested_types=[_FUNCTIONDETAILS_USERCONFIGENTRY, ],
+  nested_types=[],
   enum_types=[
     _FUNCTIONDETAILS_RUNTIME,
   ],
@@ -264,7 +227,7 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
   oneofs=[
   ],
   serialized_start=26,
-  serialized_end=485,
+  serialized_end=394,
 )
 
 
@@ -301,8 +264,8 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=668,
-  serialized_end=729,
+  serialized_start=600,
+  serialized_end=661,
 )
 
 _SOURCESPEC = _descriptor.Descriptor(
@@ -327,14 +290,21 @@ _SOURCESPEC = _descriptor.Descriptor(
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='subscriptionType', full_name='proto.SourceSpec.subscriptionType', index=2,
+      name='typeClassName', full_name='proto.SourceSpec.typeClassName', index=2,
+      number=5, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='subscriptionType', full_name='proto.SourceSpec.subscriptionType', index=3,
       number=3, type=14, cpp_type=8, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='topicsToSerDeClassName', full_name='proto.SourceSpec.topicsToSerDeClassName', index=3,
+      name='topicsToSerDeClassName', full_name='proto.SourceSpec.topicsToSerDeClassName', index=4,
       number=4, type=11, cpp_type=10, label=3,
       has_default_value=False, default_value=[],
       message_type=None, enum_type=None, containing_type=None,
@@ -352,8 +322,8 @@ _SOURCESPEC = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=488,
-  serialized_end=729,
+  serialized_start=397,
+  serialized_end=661,
 )
 
 
@@ -379,15 +349,22 @@ _SINKSPEC = _descriptor.Descriptor(
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='topic', full_name='proto.SinkSpec.topic', index=2,
-      number=4, type=9, cpp_type=9, label=1,
+      name='typeClassName', full_name='proto.SinkSpec.typeClassName', index=2,
+      number=5, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
-      name='serDeClassName', full_name='proto.SinkSpec.serDeClassName', index=3,
-      number=5, type=9, cpp_type=9, label=1,
+      name='topic', full_name='proto.SinkSpec.topic', index=3,
+      number=3, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='serDeClassName', full_name='proto.SinkSpec.serDeClassName', index=4,
+      number=4, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
@@ -404,8 +381,8 @@ _SINKSPEC = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=731,
-  serialized_end=816,
+  serialized_start=663,
+  serialized_end=771,
 )
 
 
@@ -435,8 +412,8 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=818,
-  serialized_end=864,
+  serialized_start=773,
+  serialized_end=819,
 )
 
 
@@ -487,8 +464,8 @@ _FUNCTIONMETADATA = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=867,
-  serialized_end=1028,
+  serialized_start=822,
+  serialized_end=983,
 )
 
 
@@ -525,8 +502,8 @@ _INSTANCE = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1030,
-  serialized_end=1111,
+  serialized_start=985,
+  serialized_end=1066,
 )
 
 
@@ -563,13 +540,11 @@ _ASSIGNMENT = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1113,
-  serialized_end=1178,
+  serialized_start=1068,
+  serialized_end=1133,
 )
 
-_FUNCTIONDETAILS_USERCONFIGENTRY.containing_type = _FUNCTIONDETAILS
 _FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = _PROCESSINGGUARANTEES
-_FUNCTIONDETAILS.fields_by_name['userConfig'].message_type = _FUNCTIONDETAILS_USERCONFIGENTRY
 _FUNCTIONDETAILS.fields_by_name['runtime'].enum_type = _FUNCTIONDETAILS_RUNTIME
 _FUNCTIONDETAILS.fields_by_name['source'].message_type = _SOURCESPEC
 _FUNCTIONDETAILS.fields_by_name['sink'].message_type = _SINKSPEC
@@ -593,19 +568,11 @@ DESCRIPTOR.enum_types_by_name['SubscriptionType'] = _SUBSCRIPTIONTYPE
 _sym_db.RegisterFileDescriptor(DESCRIPTOR)
 
 FunctionDetails = _reflection.GeneratedProtocolMessageType('FunctionDetails', (_message.Message,), dict(
-
-  UserConfigEntry = _reflection.GeneratedProtocolMessageType('UserConfigEntry', (_message.Message,), dict(
-    DESCRIPTOR = _FUNCTIONDETAILS_USERCONFIGENTRY,
-    __module__ = 'Function_pb2'
-    # @@protoc_insertion_point(class_scope:proto.FunctionDetails.UserConfigEntry)
-    ))
-  ,
   DESCRIPTOR = _FUNCTIONDETAILS,
   __module__ = 'Function_pb2'
   # @@protoc_insertion_point(class_scope:proto.FunctionDetails)
   ))
 _sym_db.RegisterMessage(FunctionDetails)
-_sym_db.RegisterMessage(FunctionDetails.UserConfigEntry)
 
 SourceSpec = _reflection.GeneratedProtocolMessageType('SourceSpec', (_message.Message,), dict(
 
@@ -660,8 +627,6 @@ _sym_db.RegisterMessage(Assignment)
 
 DESCRIPTOR.has_options = True
 DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n!org.apache.pulsar.functions.protoB\010Function'))
-_FUNCTIONDETAILS_USERCONFIGENTRY.has_options = True
-_FUNCTIONDETAILS_USERCONFIGENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
 _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY.has_options = True
 _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
 # @@protoc_insertion_point(module_scope)
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py
index a841152..3ef1ebe 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -25,6 +25,7 @@
 
 import time
 import os
+import json
 
 import pulsar
 import util
@@ -59,6 +60,9 @@ class ContextImpl(pulsar.Context):
     self.current_message_id = None
     self.current_input_topic_name = None
     self.current_start_time = None
+    self.user_config = json.loads(instance_config.function_details.userConfig) \
+      if instance_config.function_details.userConfig \
+      else [];
 
   # Called on a per message basis to set the context for the current message
   def set_current_message_context(self, msgid, topic):
@@ -94,13 +98,13 @@ class ContextImpl(pulsar.Context):
     return self.log
 
   def get_user_config_value(self, key):
-    if key in self.instance_config.function_details.userConfig:
-      return str(self.instance_config.function_details.userConfig[key])
+    if key in self.user_config:
+      return self.user_config[key]
     else:
       return None
   
   def get_user_config_map(self):
-    return self.instance_config.function_details.userConfig
+    return self.user_config
 
   def record_metric(self, metric_name, metric_value):
     if not metric_name in self.accumulated_metrics:
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index f2763a9..7454132 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -116,9 +116,7 @@ def main():
   else:
     function_details.autoAck = False
   if args.user_config != None and len(args.user_config) != 0:
-    user_config = json.loads(args.user_config)
-    for (key, value) in user_config.items():
-      function_details.userConfig[str(key)] = str(value)
+    function_details.userConfig = args.user_config
 
   pulsar_client = pulsar.Client(args.pulsar_serviceurl)
   pyinstance = python_instance.PythonInstance(str(args.instance_id), str(args.function_id),
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
index 01e3852..e97b692 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
@@ -25,7 +25,7 @@ import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 public class PublishFunction implements Function<String, Void> {
     @Override
     public Void process(String input, Context context) {
-        String publishTopic = context.getUserConfigValueOrDefault("publish-topic", "persistent://sample/standalone/ns1/publish");
+        String publishTopic = (String) context.getUserConfigValueOrDefault("publish-topic", "persistent://sample/standalone/ns1/publish");
         String output = String.format("%s!", input);
         context.publish(publishTopic, output, DefaultSerDe.class.getName());
         return null;
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java
index 6ff3dd8..fb3ceb0 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java
@@ -28,11 +28,11 @@ public class UserConfigFunction implements Function<String, Void> {
     @Override
     public Void process(String input, Context context) {
         String key = "config-key";
-        Optional<String> maybeValue = context.getUserConfigValue(key);
+        Optional<Object> maybeValue = context.getUserConfigValue(key);
         Logger LOG = context.getLogger();
 
         if (maybeValue.isPresent()) {
-            String value = maybeValue.get();
+            String value = (String) maybeValue.get();
             LOG.info("The config value is {}", value);
         } else {
             LOG.error("No value present for the key {}", key);
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index f4b1fb8..3c1414e 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -45,7 +45,7 @@ message FunctionDetails {
     string className = 4;
     string logTopic = 5;
     ProcessingGuarantees processingGuarantees = 6;
-    map<string,string> userConfig = 7;
+    string userConfig = 7;
     Runtime runtime = 8;
     bool autoAck = 9;
     int32 parallelism = 10;
@@ -57,6 +57,7 @@ message SourceSpec {
     string className = 1;
     // map in json format
     string configs = 2;
+    string typeClassName = 5;
 
     // configs used only when source feeds into functions
     SubscriptionType subscriptionType = 3;
@@ -67,6 +68,7 @@ message SinkSpec {
     string className = 1;
     // map in json format
     string configs = 2;
+    string typeClassName = 5;
 
     // configs used only when functions output to sink
     string topic = 3;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index e0330bb..f19dc26 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -100,7 +100,7 @@ public class JavaInstanceMain {
     @Parameter(names = "--auto_ack", description = "Enable Auto Acking?\n")
     protected String autoAck = "true";
 
-    @Parameter(names = "--source_classname", description = "The source classname", required = true)
+    @Parameter(names = "--source_classname", description = "The source classname")
     protected String sourceClassname;
 
     @Parameter(names = "--source_configs", description = "The source configs")
@@ -149,13 +149,13 @@ public class JavaInstanceMain {
             functionDetailsBuilder.setAutoAck(false);
         }
         if (userConfig != null && !userConfig.isEmpty()) {
-            Type type = new TypeToken<Map<String, String>>(){}.getType();
-            Map<String, String> userConfigMap = new Gson().fromJson(userConfig, type);
-            functionDetailsBuilder.putAllUserConfig(userConfigMap);
+            functionDetailsBuilder.setUserConfig(userConfig);
         }
 
         SourceSpec.Builder sourceDetailsBuilder = SourceSpec.newBuilder();
-        sourceDetailsBuilder.setClassName(sourceClassname);
+        if (sourceClassname != null) {
+            sourceDetailsBuilder.setClassName(sourceClassname);
+        }
         if (sourceConfigs != null && !sourceConfigs.isEmpty()) {;
             sourceDetailsBuilder.setConfigs(sourceConfigs);
         }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 3aae26f..10d04d1 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -134,10 +134,10 @@ class ProcessRuntime implements Runtime {
         args.add(pulsarServiceUrl);
         args.add("--max_buffered_tuples");
         args.add(String.valueOf(instanceConfig.getMaxBufferedTuples()));
-        Map<String, String> userConfig = instanceConfig.getFunctionDetails().getUserConfigMap();
+        String userConfig = instanceConfig.getFunctionDetails().getUserConfig();
         if (userConfig != null && !userConfig.isEmpty()) {
             args.add("--user_config");
-            args.add(new Gson().toJson(userConfig));
+            args.add(userConfig);
         }
         instancePort = findAvailablePort();
         args.add("--port");
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index cb7d0a2..04bb771 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -74,7 +74,7 @@ public class FunctionConfig {
 
     private String logTopic;
     private ProcessingGuarantees processingGuarantees;
-    private Map<String, String> userConfig = new HashMap<>();
+    private Map<String, Object> userConfig = new HashMap<>();
     private SubscriptionType subscriptionType;
     private Runtime runtime;
     private boolean autoAck;

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.