You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2019/05/15 06:29:52 UTC

[pulsar] branch master updated: Added an explicit field in the function details for componenttype (#4250)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ad4c9f3  Added an explicit field in the function details for componenttype (#4250)
ad4c9f3 is described below

commit ad4c9f3608cb228bbb59bade7e4e12ccd14e2fb3
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Tue May 14 23:29:45 2019 -0700

    Added an explicit field in the function details for componenttype (#4250)
    
    * Added an explicit field in the function details for componenttype
    
    * Fixed unittests
    
    * Updated the defn of python pb file
    
    * Added licence
    
    * Took feedback into account
    
    * Added unittest
---
 .../pulsar/functions/instance/ContextImpl.java     |   6 +-
 .../pulsar/functions/instance/InstanceUtils.java   |  19 +-
 .../functions/instance/JavaInstanceRunnable.java   |   3 +-
 .../instance/stats/ComponentStatsManager.java      |   5 +-
 .../instance/src/main/python/Function_pb2.py       | 366 +++++++++++++--------
 .../pulsar/functions/instance/ContextImplTest.java |   5 +-
 .../functions/instance/InstanceUtilsTest.java      |  77 +++++
 .../proto/src/main/proto/Function.proto            |   7 +
 .../functions/runtime/KubernetesRuntime.java       |   3 +-
 ...{ComponentType.java => ComponentTypeUtils.java} |  27 +-
 .../functions/utils/FunctionConfigUtils.java       |   2 +
 .../pulsar/functions/utils/SinkConfigUtils.java    |   2 +
 .../pulsar/functions/utils/SourceConfigUtils.java  |   2 +
 .../functions/worker/rest/api/ComponentImpl.java   | 237 +++++++------
 .../functions/worker/rest/api/FunctionsImpl.java   |   3 +-
 .../pulsar/functions/worker/rest/api/SinkImpl.java |  14 +-
 .../functions/worker/rest/api/SourceImpl.java      |  14 +-
 .../worker/rest/api/FunctionsImplTest.java         |   5 +-
 .../rest/api/v2/FunctionApiV2ResourceTest.java     |   3 +-
 .../rest/api/v3/FunctionApiV3ResourceTest.java     |  13 +-
 .../worker/rest/api/v3/SinkApiV3ResourceTest.java  |   9 +-
 .../rest/api/v3/SourceApiV3ResourceTest.java       |   9 +-
 22 files changed, 500 insertions(+), 331 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index f0d5ade..5bf8c59 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -35,10 +35,10 @@ import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
 import org.apache.pulsar.functions.instance.stats.FunctionStatsManager;
 import org.apache.pulsar.functions.instance.stats.SinkStatsManager;
 import org.apache.pulsar.functions.instance.stats.SourceStatsManager;
+import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import org.apache.pulsar.functions.source.TopicSchema;
-import org.apache.pulsar.functions.utils.ComponentType;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.SourceContext;
@@ -91,11 +91,11 @@ class ContextImpl implements Context, SinkContext, SourceContext {
         userMetricsLabelNames[ComponentStatsManager.metricsLabelNames.length] = "metric";
     }
 
-    private final ComponentType componentType;
+    private final Function.FunctionDetails.ComponentType componentType;
 
     public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
                        SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels,
-                       ComponentType componentType, ComponentStatsManager statsManager) {
+                       Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager) {
         this.config = config;
         this.logger = logger;
         this.publishProducers = new HashMap<>();
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index 5cae6a8..e73f1ce 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -20,9 +20,6 @@ package org.apache.pulsar.functions.instance;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
-import static org.apache.pulsar.functions.utils.ComponentType.FUNCTION;
-import static org.apache.pulsar.functions.utils.ComponentType.SINK;
-import static org.apache.pulsar.functions.utils.ComponentType.SOURCE;
 
 import lombok.experimental.UtilityClass;
 
@@ -31,7 +28,6 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.sink.PulsarSink;
-import org.apache.pulsar.functions.utils.ComponentType;
 import org.apache.pulsar.functions.utils.Reflections;
 
 import net.jodah.typetools.TypeResolver;
@@ -89,23 +85,26 @@ public class InstanceUtils {
         }
     }
 
-    public ComponentType calculateSubjectType(Function.FunctionDetails functionDetails) {
+    public Function.FunctionDetails.ComponentType calculateSubjectType(Function.FunctionDetails functionDetails) {
+        if (functionDetails.getComponentType() != Function.FunctionDetails.ComponentType.UNKNOWN) {
+            return functionDetails.getComponentType();
+        }
         Function.SourceSpec sourceSpec = functionDetails.getSource();
         Function.SinkSpec sinkSpec = functionDetails.getSink();
         if (sourceSpec.getInputSpecsCount() == 0) {
-            return SOURCE;
+            return Function.FunctionDetails.ComponentType.SOURCE;
         }
         // Now its between sink and function
 
         if (!isEmpty(sinkSpec.getBuiltin())) {
             // if its built in, its a sink
-            return SINK;
+            return Function.FunctionDetails.ComponentType.SINK;
         }
 
         if (isEmpty(sinkSpec.getClassName()) || sinkSpec.getClassName().equals(PulsarSink.class.getName())) {
-            return FUNCTION;
+            return Function.FunctionDetails.ComponentType.FUNCTION;
         }
-        return SINK;
+        return Function.FunctionDetails.ComponentType.SINK;
     }
 
     public static String getDefaultSubscriptionName(String tenant, String namespace, String name) {
@@ -119,7 +118,7 @@ public class InstanceUtils {
                 functionDetails.getName());
     }
 
-    public static Map<String, String> getProperties(ComponentType componentType,
+    public static Map<String, String> getProperties(Function.FunctionDetails.ComponentType componentType,
                                                     String fullyQualifiedName, int instanceId) {
         Map<String, String> properties = new HashMap<>();
         switch (componentType) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 6b265ae..b6c4fc8 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -66,7 +66,6 @@ import org.apache.pulsar.functions.sink.PulsarSinkConfig;
 import org.apache.pulsar.functions.sink.PulsarSinkDisable;
 import org.apache.pulsar.functions.source.PulsarSource;
 import org.apache.pulsar.functions.source.PulsarSourceConfig;
-import org.apache.pulsar.functions.utils.ComponentType;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
@@ -129,7 +128,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
 
     private InstanceCache instanceCache;
 
-    private final ComponentType componentType;
+    private final org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType componentType;
 
     private final Map<String, String> properties;
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
index b03fbf0..716fded 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
@@ -20,10 +20,9 @@ package org.apache.pulsar.functions.instance.stats;
 
 import com.google.common.collect.EvictingQueue;
 import io.prometheus.client.CollectorRegistry;
-import io.prometheus.client.exporter.common.TextFormat;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.utils.ComponentType;
+import org.apache.pulsar.functions.proto.Function;
 
 import java.io.IOException;
 import java.io.StringWriter;
@@ -58,7 +57,7 @@ public abstract class ComponentStatsManager implements AutoCloseable {
     public static ComponentStatsManager getStatsManager(CollectorRegistry collectorRegistry,
                                   String[] metricsLabels,
                                   ScheduledExecutorService scheduledExecutorService,
-                                  ComponentType componentType) {
+                                  Function.FunctionDetails.ComponentType componentType) {
         switch (componentType) {
             case FUNCTION:
                 return new FunctionStatsManager(collectorRegistry, metricsLabels, scheduledExecutorService);
diff --git a/pulsar-functions/instance/src/main/python/Function_pb2.py b/pulsar-functions/instance/src/main/python/Function_pb2.py
index 8187e09..409f749 100644
--- a/pulsar-functions/instance/src/main/python/Function_pb2.py
+++ b/pulsar-functions/instance/src/main/python/Function_pb2.py
@@ -17,6 +17,7 @@
 # under the License.
 #
 
+# -*- coding: utf-8 -*-
 # Generated by the protocol buffer compiler.  DO NOT EDIT!
 # source: Function.proto
 
@@ -27,7 +28,6 @@ from google.protobuf import descriptor as _descriptor
 from google.protobuf import message as _message
 from google.protobuf import reflection as _reflection
 from google.protobuf import symbol_database as _symbol_database
-from google.protobuf import descriptor_pb2
 # @@protoc_insertion_point(imports)
 
 _sym_db = _symbol_database.Default()
@@ -39,7 +39,8 @@ DESCRIPTOR = _descriptor.FileDescriptor(
   name='Function.proto',
   package='proto',
   syntax='proto3',
-  serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 \x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 \x01(\t\"\xe8\x03\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x1 [...]
+  serialized_options=_b('\n!org.apache.pulsar.functions.protoB\010Function'),
+  serialized_pb=_b('\n\x0e\x46unction.proto\x12\x05proto\"3\n\tResources\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x01\x12\x0b\n\x03ram\x18\x02 \x01(\x03\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x03\"B\n\x0cRetryDetails\x12\x19\n\x11maxMessageRetries\x18\x01 \x01(\x05\x12\x17\n\x0f\x64\x65\x61\x64LetterTopic\x18\x02 \x01(\t\"\x85\x05\n\x0f\x46unctionDetails\x12\x0e\n\x06tenant\x18\x01 \x01(\t\x12\x11\n\tnamespace\x18\x02 \x01(\t\x12\x0c\n\x04name\x18\x03 \x01(\t\x12\x11\n\tclassName\x18\x04 \x01(\t\x1 [...]
 )
 
 _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
@@ -50,21 +51,21 @@ _PROCESSINGGUARANTEES = _descriptor.EnumDescriptor(
   values=[
     _descriptor.EnumValueDescriptor(
       name='ATLEAST_ONCE', index=0, number=0,
-      options=None,
+      serialized_options=None,
       type=None),
     _descriptor.EnumValueDescriptor(
       name='ATMOST_ONCE', index=1, number=1,
-      options=None,
+      serialized_options=None,
       type=None),
     _descriptor.EnumValueDescriptor(
       name='EFFECTIVELY_ONCE', index=2, number=2,
-      options=None,
+      serialized_options=None,
       type=None),
   ],
   containing_type=None,
-  options=None,
-  serialized_start=2022,
-  serialized_end=2101,
+  serialized_options=None,
+  serialized_start=2302,
+  serialized_end=2381,
 )
 _sym_db.RegisterEnumDescriptor(_PROCESSINGGUARANTEES)
 
@@ -77,17 +78,17 @@ _SUBSCRIPTIONTYPE = _descriptor.EnumDescriptor(
   values=[
     _descriptor.EnumValueDescriptor(
       name='SHARED', index=0, number=0,
-      options=None,
+      serialized_options=None,
       type=None),
     _descriptor.EnumValueDescriptor(
       name='FAILOVER', index=1, number=1,
-      options=None,
+      serialized_options=None,
       type=None),
   ],
   containing_type=None,
-  options=None,
-  serialized_start=2103,
-  serialized_end=2147,
+  serialized_options=None,
+  serialized_start=2383,
+  serialized_end=2427,
 )
 _sym_db.RegisterEnumDescriptor(_SUBSCRIPTIONTYPE)
 
@@ -100,17 +101,17 @@ _FUNCTIONSTATE = _descriptor.EnumDescriptor(
   values=[
     _descriptor.EnumValueDescriptor(
       name='RUNNING', index=0, number=0,
-      options=None,
+      serialized_options=None,
       type=None),
     _descriptor.EnumValueDescriptor(
       name='STOPPED', index=1, number=1,
-      options=None,
+      serialized_options=None,
       type=None),
   ],
   containing_type=None,
-  options=None,
-  serialized_start=2149,
-  serialized_end=2190,
+  serialized_options=None,
+  serialized_start=2429,
+  serialized_end=2470,
 )
 _sym_db.RegisterEnumDescriptor(_FUNCTIONSTATE)
 
@@ -132,20 +133,54 @@ _FUNCTIONDETAILS_RUNTIME = _descriptor.EnumDescriptor(
   values=[
     _descriptor.EnumValueDescriptor(
       name='JAVA', index=0, number=0,
-      options=None,
+      serialized_options=None,
       type=None),
     _descriptor.EnumValueDescriptor(
       name='PYTHON', index=1, number=1,
-      options=None,
+      serialized_options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='GO', index=2, number=3,
+      serialized_options=None,
       type=None),
   ],
   containing_type=None,
-  options=None,
-  serialized_start=604,
-  serialized_end=635,
+  serialized_options=None,
+  serialized_start=687,
+  serialized_end=726,
 )
 _sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_RUNTIME)
 
+_FUNCTIONDETAILS_COMPONENTTYPE = _descriptor.EnumDescriptor(
+  name='ComponentType',
+  full_name='proto.FunctionDetails.ComponentType',
+  filename=None,
+  file=DESCRIPTOR,
+  values=[
+    _descriptor.EnumValueDescriptor(
+      name='UNKNOWN', index=0, number=0,
+      serialized_options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='FUNCTION', index=1, number=1,
+      serialized_options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='SOURCE', index=2, number=2,
+      serialized_options=None,
+      type=None),
+    _descriptor.EnumValueDescriptor(
+      name='SINK', index=3, number=3,
+      serialized_options=None,
+      type=None),
+  ],
+  containing_type=None,
+  serialized_options=None,
+  serialized_start=728,
+  serialized_end=792,
+)
+_sym_db.RegisterEnumDescriptor(_FUNCTIONDETAILS_COMPONENTTYPE)
+
 
 _RESOURCES = _descriptor.Descriptor(
   name='Resources',
@@ -160,28 +195,28 @@ _RESOURCES = _descriptor.Descriptor(
       has_default_value=False, default_value=float(0),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='ram', full_name='proto.Resources.ram', index=1,
       number=2, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='disk', full_name='proto.Resources.disk', index=2,
       number=3, type=3, cpp_type=2, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
   nested_types=[],
   enum_types=[
   ],
-  options=None,
+  serialized_options=None,
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
@@ -205,21 +240,21 @@ _RETRYDETAILS = _descriptor.Descriptor(
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='deadLetterTopic', full_name='proto.RetryDetails.deadLetterTopic', index=1,
       number=2, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
   nested_types=[],
   enum_types=[
   ],
-  options=None,
+  serialized_options=None,
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
@@ -243,127 +278,142 @@ _FUNCTIONDETAILS = _descriptor.Descriptor(
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='namespace', full_name='proto.FunctionDetails.namespace', index=1,
       number=2, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='name', full_name='proto.FunctionDetails.name', index=2,
       number=3, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='className', full_name='proto.FunctionDetails.className', index=3,
       number=4, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='logTopic', full_name='proto.FunctionDetails.logTopic', index=4,
       number=5, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='processingGuarantees', full_name='proto.FunctionDetails.processingGuarantees', index=5,
       number=6, type=14, cpp_type=8, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='userConfig', full_name='proto.FunctionDetails.userConfig', index=6,
       number=7, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='secretsMap', full_name='proto.FunctionDetails.secretsMap', index=7,
       number=16, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='runtime', full_name='proto.FunctionDetails.runtime', index=8,
       number=8, type=14, cpp_type=8, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='autoAck', full_name='proto.FunctionDetails.autoAck', index=9,
       number=9, type=8, cpp_type=7, label=1,
       has_default_value=False, default_value=False,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='parallelism', full_name='proto.FunctionDetails.parallelism', index=10,
       number=10, type=5, cpp_type=1, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='source', full_name='proto.FunctionDetails.source', index=11,
       number=11, type=11, cpp_type=10, label=1,
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='sink', full_name='proto.FunctionDetails.sink', index=12,
       number=12, type=11, cpp_type=10, label=1,
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='resources', full_name='proto.FunctionDetails.resources', index=13,
       number=13, type=11, cpp_type=10, label=1,
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='packageUrl', full_name='proto.FunctionDetails.packageUrl', index=14,
       number=14, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='retryDetails', full_name='proto.FunctionDetails.retryDetails', index=15,
       number=15, type=11, cpp_type=10, label=1,
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='runtimeFlags', full_name='proto.FunctionDetails.runtimeFlags', index=16,
+      number=17, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='componentType', full_name='proto.FunctionDetails.componentType', index=17,
+      number=18, type=14, cpp_type=8, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
   nested_types=[],
   enum_types=[
     _FUNCTIONDETAILS_RUNTIME,
+    _FUNCTIONDETAILS_COMPONENTTYPE,
   ],
-  options=None,
+  serialized_options=None,
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
   oneofs=[
   ],
   serialized_start=147,
-  serialized_end=635,
+  serialized_end=792,
 )
 
 
@@ -380,21 +430,21 @@ _CONSUMERSPEC_RECEIVERQUEUESIZE = _descriptor.Descriptor(
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
   nested_types=[],
   enum_types=[
   ],
-  options=None,
+  serialized_options=None,
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=788,
-  serialized_end=822,
+  serialized_start=945,
+  serialized_end=979,
 )
 
 _CONSUMERSPEC = _descriptor.Descriptor(
@@ -410,42 +460,42 @@ _CONSUMERSPEC = _descriptor.Descriptor(
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='serdeClassName', full_name='proto.ConsumerSpec.serdeClassName', index=1,
       number=2, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='isRegexPattern', full_name='proto.ConsumerSpec.isRegexPattern', index=2,
       number=3, type=8, cpp_type=7, label=1,
       has_default_value=False, default_value=False,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='receiverQueueSize', full_name='proto.ConsumerSpec.receiverQueueSize', index=3,
       number=4, type=11, cpp_type=10, label=1,
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
   nested_types=[_CONSUMERSPEC_RECEIVERQUEUESIZE, ],
   enum_types=[
   ],
-  options=None,
+  serialized_options=None,
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=638,
-  serialized_end=822,
+  serialized_start=795,
+  serialized_end=979,
 )
 
 
@@ -462,28 +512,28 @@ _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY = _descriptor.Descriptor(
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='value', full_name='proto.SourceSpec.TopicsToSerDeClassNameEntry.value', index=1,
       number=2, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
   nested_types=[],
   enum_types=[
   ],
-  options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+  serialized_options=_b('8\001'),
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1205,
-  serialized_end=1266,
+  serialized_start=1362,
+  serialized_end=1423,
 )
 
 _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
@@ -499,28 +549,28 @@ _SOURCESPEC_INPUTSPECSENTRY = _descriptor.Descriptor(
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='value', full_name='proto.SourceSpec.InputSpecsEntry.value', index=1,
       number=2, type=11, cpp_type=10, label=1,
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
   nested_types=[],
   enum_types=[
   ],
-  options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+  serialized_options=_b('8\001'),
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1268,
-  serialized_end=1338,
+  serialized_start=1425,
+  serialized_end=1495,
 )
 
 _SOURCESPEC = _descriptor.Descriptor(
@@ -536,91 +586,91 @@ _SOURCESPEC = _descriptor.Descriptor(
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='configs', full_name='proto.SourceSpec.configs', index=1,
       number=2, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='typeClassName', full_name='proto.SourceSpec.typeClassName', index=2,
       number=5, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='subscriptionType', full_name='proto.SourceSpec.subscriptionType', index=3,
       number=3, type=14, cpp_type=8, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='topicsToSerDeClassName', full_name='proto.SourceSpec.topicsToSerDeClassName', index=4,
       number=4, type=11, cpp_type=10, label=3,
       has_default_value=False, default_value=[],
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=_descriptor._ParseOptions(descriptor_pb2.FieldOptions(), _b('\030\001')), file=DESCRIPTOR),
+      serialized_options=_b('\030\001'), file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='inputSpecs', full_name='proto.SourceSpec.inputSpecs', index=5,
       number=10, type=11, cpp_type=10, label=3,
       has_default_value=False, default_value=[],
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='timeoutMs', full_name='proto.SourceSpec.timeoutMs', index=6,
       number=6, type=4, cpp_type=4, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='topicsPattern', full_name='proto.SourceSpec.topicsPattern', index=7,
       number=7, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=_descriptor._ParseOptions(descriptor_pb2.FieldOptions(), _b('\030\001')), file=DESCRIPTOR),
+      serialized_options=_b('\030\001'), file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='builtin', full_name='proto.SourceSpec.builtin', index=8,
       number=8, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='subscriptionName', full_name='proto.SourceSpec.subscriptionName', index=9,
       number=9, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='cleanupSubscription', full_name='proto.SourceSpec.cleanupSubscription', index=10,
       number=11, type=8, cpp_type=7, label=1,
       has_default_value=False, default_value=False,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
   nested_types=[_SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY, _SOURCESPEC_INPUTSPECSENTRY, ],
   enum_types=[
   ],
-  options=None,
+  serialized_options=None,
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=825,
-  serialized_end=1338,
+  serialized_start=982,
+  serialized_end=1495,
 )
 
 
@@ -637,63 +687,63 @@ _SINKSPEC = _descriptor.Descriptor(
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='configs', full_name='proto.SinkSpec.configs', index=1,
       number=2, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='typeClassName', full_name='proto.SinkSpec.typeClassName', index=2,
       number=5, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='topic', full_name='proto.SinkSpec.topic', index=3,
       number=3, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='serDeClassName', full_name='proto.SinkSpec.serDeClassName', index=4,
       number=4, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='builtin', full_name='proto.SinkSpec.builtin', index=5,
       number=6, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='schemaType', full_name='proto.SinkSpec.schemaType', index=6,
       number=7, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
   nested_types=[],
   enum_types=[
   ],
-  options=None,
+  serialized_options=None,
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1341,
-  serialized_end=1486,
+  serialized_start=1498,
+  serialized_end=1643,
 )
 
 
@@ -710,28 +760,28 @@ _PACKAGELOCATIONMETADATA = _descriptor.Descriptor(
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='originalFileName', full_name='proto.PackageLocationMetaData.originalFileName', index=1,
       number=2, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
   nested_types=[],
   enum_types=[
   ],
-  options=None,
+  serialized_options=None,
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1488,
-  serialized_end=1560,
+  serialized_start=1645,
+  serialized_end=1717,
 )
 
 
@@ -748,28 +798,28 @@ _FUNCTIONMETADATA_INSTANCESTATESENTRY = _descriptor.Descriptor(
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='value', full_name='proto.FunctionMetaData.InstanceStatesEntry.value', index=1,
       number=2, type=14, cpp_type=8, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
   nested_types=[],
   enum_types=[
   ],
-  options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
+  serialized_options=_b('8\001'),
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1795,
-  serialized_end=1870,
+  serialized_start=2013,
+  serialized_end=2088,
 )
 
 _FUNCTIONMETADATA = _descriptor.Descriptor(
@@ -785,49 +835,94 @@ _FUNCTIONMETADATA = _descriptor.Descriptor(
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='packageLocation', full_name='proto.FunctionMetaData.packageLocation', index=1,
       number=2, type=11, cpp_type=10, label=1,
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='version', full_name='proto.FunctionMetaData.version', index=2,
       number=3, type=4, cpp_type=4, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='createTime', full_name='proto.FunctionMetaData.createTime', index=3,
       number=4, type=4, cpp_type=4, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='instanceStates', full_name='proto.FunctionMetaData.instanceStates', index=4,
       number=5, type=11, cpp_type=10, label=3,
       has_default_value=False, default_value=[],
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='functionAuthSpec', full_name='proto.FunctionMetaData.functionAuthSpec', index=5,
+      number=6, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
   nested_types=[_FUNCTIONMETADATA_INSTANCESTATESENTRY, ],
   enum_types=[
   ],
-  options=None,
+  serialized_options=None,
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1563,
-  serialized_end=1870,
+  serialized_start=1720,
+  serialized_end=2088,
+)
+
+
+_FUNCTIONAUTHENTICATIONSPEC = _descriptor.Descriptor(
+  name='FunctionAuthenticationSpec',
+  full_name='proto.FunctionAuthenticationSpec',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='data', full_name='proto.FunctionAuthenticationSpec.data', index=0,
+      number=1, type=12, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b(""),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='provider', full_name='proto.FunctionAuthenticationSpec.provider', index=1,
+      number=2, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=_b("").decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+  ],
+  serialized_options=None,
+  is_extendable=False,
+  syntax='proto3',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=2090,
+  serialized_end=2150,
 )
 
 
@@ -844,28 +939,28 @@ _INSTANCE = _descriptor.Descriptor(
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='instanceId', full_name='proto.Instance.instanceId', index=1,
       number=2, type=5, cpp_type=1, label=1,
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
   nested_types=[],
   enum_types=[
   ],
-  options=None,
+  serialized_options=None,
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1872,
-  serialized_end=1953,
+  serialized_start=2152,
+  serialized_end=2233,
 )
 
 
@@ -882,28 +977,28 @@ _ASSIGNMENT = _descriptor.Descriptor(
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='workerId', full_name='proto.Assignment.workerId', index=1,
       number=2, type=9, cpp_type=9, label=1,
       has_default_value=False, default_value=_b("").decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      options=None, file=DESCRIPTOR),
+      serialized_options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
   nested_types=[],
   enum_types=[
   ],
-  options=None,
+  serialized_options=None,
   is_extendable=False,
   syntax='proto3',
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1955,
-  serialized_end=2020,
+  serialized_start=2235,
+  serialized_end=2300,
 )
 
 _FUNCTIONDETAILS.fields_by_name['processingGuarantees'].enum_type = _PROCESSINGGUARANTEES
@@ -912,7 +1007,9 @@ _FUNCTIONDETAILS.fields_by_name['source'].message_type = _SOURCESPEC
 _FUNCTIONDETAILS.fields_by_name['sink'].message_type = _SINKSPEC
 _FUNCTIONDETAILS.fields_by_name['resources'].message_type = _RESOURCES
 _FUNCTIONDETAILS.fields_by_name['retryDetails'].message_type = _RETRYDETAILS
+_FUNCTIONDETAILS.fields_by_name['componentType'].enum_type = _FUNCTIONDETAILS_COMPONENTTYPE
 _FUNCTIONDETAILS_RUNTIME.containing_type = _FUNCTIONDETAILS
+_FUNCTIONDETAILS_COMPONENTTYPE.containing_type = _FUNCTIONDETAILS
 _CONSUMERSPEC_RECEIVERQUEUESIZE.containing_type = _CONSUMERSPEC
 _CONSUMERSPEC.fields_by_name['receiverQueueSize'].message_type = _CONSUMERSPEC_RECEIVERQUEUESIZE
 _SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY.containing_type = _SOURCESPEC
@@ -926,6 +1023,7 @@ _FUNCTIONMETADATA_INSTANCESTATESENTRY.containing_type = _FUNCTIONMETADATA
 _FUNCTIONMETADATA.fields_by_name['functionDetails'].message_type = _FUNCTIONDETAILS
 _FUNCTIONMETADATA.fields_by_name['packageLocation'].message_type = _PACKAGELOCATIONMETADATA
 _FUNCTIONMETADATA.fields_by_name['instanceStates'].message_type = _FUNCTIONMETADATA_INSTANCESTATESENTRY
+_FUNCTIONMETADATA.fields_by_name['functionAuthSpec'].message_type = _FUNCTIONAUTHENTICATIONSPEC
 _INSTANCE.fields_by_name['functionMetaData'].message_type = _FUNCTIONMETADATA
 _ASSIGNMENT.fields_by_name['instance'].message_type = _INSTANCE
 DESCRIPTOR.message_types_by_name['Resources'] = _RESOURCES
@@ -936,6 +1034,7 @@ DESCRIPTOR.message_types_by_name['SourceSpec'] = _SOURCESPEC
 DESCRIPTOR.message_types_by_name['SinkSpec'] = _SINKSPEC
 DESCRIPTOR.message_types_by_name['PackageLocationMetaData'] = _PACKAGELOCATIONMETADATA
 DESCRIPTOR.message_types_by_name['FunctionMetaData'] = _FUNCTIONMETADATA
+DESCRIPTOR.message_types_by_name['FunctionAuthenticationSpec'] = _FUNCTIONAUTHENTICATIONSPEC
 DESCRIPTOR.message_types_by_name['Instance'] = _INSTANCE
 DESCRIPTOR.message_types_by_name['Assignment'] = _ASSIGNMENT
 DESCRIPTOR.enum_types_by_name['ProcessingGuarantees'] = _PROCESSINGGUARANTEES
@@ -1031,6 +1130,13 @@ FunctionMetaData = _reflection.GeneratedProtocolMessageType('FunctionMetaData',
 _sym_db.RegisterMessage(FunctionMetaData)
 _sym_db.RegisterMessage(FunctionMetaData.InstanceStatesEntry)
 
+FunctionAuthenticationSpec = _reflection.GeneratedProtocolMessageType('FunctionAuthenticationSpec', (_message.Message,), dict(
+  DESCRIPTOR = _FUNCTIONAUTHENTICATIONSPEC,
+  __module__ = 'Function_pb2'
+  # @@protoc_insertion_point(class_scope:proto.FunctionAuthenticationSpec)
+  ))
+_sym_db.RegisterMessage(FunctionAuthenticationSpec)
+
 Instance = _reflection.GeneratedProtocolMessageType('Instance', (_message.Message,), dict(
   DESCRIPTOR = _INSTANCE,
   __module__ = 'Function_pb2'
@@ -1046,16 +1152,10 @@ Assignment = _reflection.GeneratedProtocolMessageType('Assignment', (_message.Me
 _sym_db.RegisterMessage(Assignment)
 
 
-DESCRIPTOR.has_options = True
-DESCRIPTOR._options = _descriptor._ParseOptions(descriptor_pb2.FileOptions(), _b('\n!org.apache.pulsar.functions.protoB\010Function'))
-_SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY.has_options = True
-_SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
-_SOURCESPEC_INPUTSPECSENTRY.has_options = True
-_SOURCESPEC_INPUTSPECSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
-_SOURCESPEC.fields_by_name['topicsToSerDeClassName'].has_options = True
-_SOURCESPEC.fields_by_name['topicsToSerDeClassName']._options = _descriptor._ParseOptions(descriptor_pb2.FieldOptions(), _b('\030\001'))
-_SOURCESPEC.fields_by_name['topicsPattern'].has_options = True
-_SOURCESPEC.fields_by_name['topicsPattern']._options = _descriptor._ParseOptions(descriptor_pb2.FieldOptions(), _b('\030\001'))
-_FUNCTIONMETADATA_INSTANCESTATESENTRY.has_options = True
-_FUNCTIONMETADATA_INSTANCESTATESENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
+DESCRIPTOR._options = None
+_SOURCESPEC_TOPICSTOSERDECLASSNAMEENTRY._options = None
+_SOURCESPEC_INPUTSPECSENTRY._options = None
+_SOURCESPEC.fields_by_name['topicsToSerDeClassName']._options = None
+_SOURCESPEC.fields_by_name['topicsPattern']._options = None
+_FUNCTIONMETADATA_INSTANCESTATESENTRY._options = None
 # @@protoc_insertion_point(module_scope)
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 7eb7aae..43a5649 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -31,7 +31,6 @@ import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
-import org.apache.pulsar.functions.utils.ComponentType;
 import org.mockito.Matchers;
 import org.slf4j.Logger;
 import org.testng.annotations.BeforeMethod;
@@ -42,11 +41,9 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.same;
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -89,7 +86,7 @@ public class ContextImplTest {
             logger,
             client,
             new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0],
-                ComponentType.FUNCTION, null);
+                FunctionDetails.ComponentType.FUNCTION, null);
         context.setCurrentMessageContext(new Record<String>() {
             @Override
             public String getValue() {
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/InstanceUtilsTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/InstanceUtilsTest.java
new file mode 100644
index 0000000..698df6c
--- /dev/null
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/InstanceUtilsTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance;
+
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+public class InstanceUtilsTest {
+
+    /**
+     * Test the calculateSubjectType function for sources
+     */
+    @Test
+    public void testCalculateSubjectTypeForSource() {
+        FunctionDetails.Builder builder = FunctionDetails.newBuilder();
+        // no input topics mean source
+        builder.setSource(Function.SourceSpec.newBuilder().build());
+        assertEquals(InstanceUtils.calculateSubjectType(builder.build()), FunctionDetails.ComponentType.SOURCE);
+        // make sure that if the componenttype is set, that gets precedence.
+        builder.setComponentType(FunctionDetails.ComponentType.SINK);
+        assertEquals(InstanceUtils.calculateSubjectType(builder.build()), FunctionDetails.ComponentType.SINK);
+        builder.setComponentType(FunctionDetails.ComponentType.FUNCTION);
+        assertEquals(InstanceUtils.calculateSubjectType(builder.build()), FunctionDetails.ComponentType.FUNCTION);
+    }
+
+    /**
+     * Test the calculateSubjectType function for function
+     */
+    @Test
+    public void testCalculateSubjectTypeForFunction() {
+        FunctionDetails.Builder builder = FunctionDetails.newBuilder();
+        // an input but no sink classname is a function
+        builder.setSource(Function.SourceSpec.newBuilder().putInputSpecs("topic", Function.ConsumerSpec.newBuilder().build()).build());
+        assertEquals(InstanceUtils.calculateSubjectType(builder.build()), FunctionDetails.ComponentType.FUNCTION);
+        // make sure that if the componenttype is set, that gets precedence.
+        builder.setComponentType(FunctionDetails.ComponentType.SOURCE);
+        assertEquals(InstanceUtils.calculateSubjectType(builder.build()), FunctionDetails.ComponentType.SOURCE);
+        builder.setComponentType(FunctionDetails.ComponentType.SINK);
+        assertEquals(InstanceUtils.calculateSubjectType(builder.build()), FunctionDetails.ComponentType.SINK);
+    }
+
+    /**
+     * Test the calculateSubjectType function for Sink
+     */
+    @Test
+    public void testCalculateSubjectTypeForSink() {
+        FunctionDetails.Builder builder = FunctionDetails.newBuilder();
+        // an input and a sink classname is a sink
+        builder.setSource(Function.SourceSpec.newBuilder().putInputSpecs("topic", Function.ConsumerSpec.newBuilder().build()).build());
+        builder.setSink(Function.SinkSpec.newBuilder().setClassName("something").build());
+        assertEquals(InstanceUtils.calculateSubjectType(builder.build()), FunctionDetails.ComponentType.SINK);
+        // make sure that if the componenttype is set, that gets precedence.
+        builder.setComponentType(FunctionDetails.ComponentType.SOURCE);
+        assertEquals(InstanceUtils.calculateSubjectType(builder.build()), FunctionDetails.ComponentType.SOURCE);
+        builder.setComponentType(FunctionDetails.ComponentType.FUNCTION);
+        assertEquals(InstanceUtils.calculateSubjectType(builder.build()), FunctionDetails.ComponentType.FUNCTION);
+    }
+}
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index c435c09..34b6c90 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -51,6 +51,12 @@ message FunctionDetails {
         PYTHON = 1;
         GO = 3;
     }
+    enum ComponentType {
+        UNKNOWN = 0;
+        FUNCTION = 1;
+        SOURCE = 2;
+        SINK = 3;
+    }
     string tenant = 1;
     string namespace = 2;
     string name = 3;
@@ -68,6 +74,7 @@ message FunctionDetails {
     string packageUrl = 14; //present only if function submitted with package-url
     RetryDetails retryDetails = 15;
     string runtimeFlags = 17;
+    ComponentType componentType = 18;
 }
 
 message ConsumerSpec {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 3b10092..4c1e3d9 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -62,7 +62,6 @@ import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
 import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.Actions;
-import org.apache.pulsar.functions.utils.ComponentType;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 
 import java.io.IOException;
@@ -853,7 +852,7 @@ public class KubernetesRuntime implements Runtime {
 
     private Map<String, String> getLabels(Function.FunctionDetails functionDetails) {
         final Map<String, String> labels = new HashMap<>();
-        ComponentType componentType = InstanceUtils.calculateSubjectType(functionDetails);
+        Function.FunctionDetails.ComponentType componentType = InstanceUtils.calculateSubjectType(functionDetails);
         String component;
         switch (componentType) {
             case FUNCTION:
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ComponentType.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ComponentTypeUtils.java
similarity index 63%
rename from pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ComponentType.java
rename to pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ComponentTypeUtils.java
index a99edc2..240eb63 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ComponentType.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ComponentTypeUtils.java
@@ -16,21 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.utils;
-
-public enum ComponentType {
-    FUNCTION("Function"),
-    SOURCE("Source"),
-    SINK("Sink");
 
-    private final String componentName;
+package org.apache.pulsar.functions.utils;
 
-    ComponentType(String componentName) {
-        this.componentName = componentName;
-    }
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 
-    @Override
-    public String toString() {
-        return componentName;
+public class ComponentTypeUtils {
+    public static String toString(FunctionDetails.ComponentType componentType) {
+        switch (componentType) {
+            case FUNCTION:
+                return "Function";
+            case SOURCE:
+                return "Source";
+            case SINK:
+                return "Sink";
+            default:
+                throw new RuntimeException("Unknown componentType " + componentType);
+        }
     }
 }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index a9e5a00..e207e1e 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -227,6 +227,8 @@ public class FunctionConfigUtils {
             functionDetailsBuilder.setRuntimeFlags(functionConfig.getRuntimeFlags());
         }
 
+        functionDetailsBuilder.setComponentType(FunctionDetails.ComponentType.FUNCTION);
+
         return functionDetailsBuilder.build();
     }
 
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 65cf6f5..40e96b2 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -203,6 +203,8 @@ public class SinkConfigUtils {
             functionDetailsBuilder.setRuntimeFlags(sinkConfig.getRuntimeFlags());
         }
 
+        functionDetailsBuilder.setComponentType(FunctionDetails.ComponentType.SINK);
+
         return functionDetailsBuilder.build();
     }
 
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 01ea03a..90d63df 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -142,6 +142,8 @@ public class SourceConfigUtils {
             functionDetailsBuilder.setRuntimeFlags(sourceConfig.getRuntimeFlags());
         }
 
+        functionDetailsBuilder.setComponentType(FunctionDetails.ComponentType.SOURCE);
+
         return functionDetailsBuilder.build();
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 7065a49..d9804e5 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -66,11 +66,11 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
-import org.apache.pulsar.functions.utils.ComponentType;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.utils.SinkConfigUtils;
 import org.apache.pulsar.functions.utils.SourceConfigUtils;
+import org.apache.pulsar.functions.utils.ComponentTypeUtils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -113,9 +113,6 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
-import static org.apache.pulsar.functions.utils.ComponentType.FUNCTION;
-import static org.apache.pulsar.functions.utils.ComponentType.SINK;
-import static org.apache.pulsar.functions.utils.ComponentType.SOURCE;
 import static org.apache.pulsar.functions.utils.FunctionCommon.getStateNamespace;
 import static org.apache.pulsar.functions.utils.FunctionCommon.getUniquePackageName;
 import static org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
@@ -126,9 +123,9 @@ public abstract class ComponentImpl {
 
     private final AtomicReference<StorageClient> storageClient = new AtomicReference<>();
     protected final Supplier<WorkerService> workerServiceSupplier;
-    protected final ComponentType componentType;
+    protected final Function.FunctionDetails.ComponentType componentType;
 
-    public ComponentImpl(Supplier<WorkerService> workerServiceSupplier, ComponentType componentType) {
+    public ComponentImpl(Supplier<WorkerService> workerServiceSupplier, Function.FunctionDetails.ComponentType componentType) {
         this.workerServiceSupplier = workerServiceSupplier;
         this.componentType = componentType;
     }
@@ -307,13 +304,13 @@ public abstract class ComponentImpl {
             throw new RestException(Status.BAD_REQUEST, "Namespace is not provided");
         }
         if (componentName == null) {
-            throw new RestException(Status.BAD_REQUEST, componentType + " Name is not provided");
+            throw new RestException(Status.BAD_REQUEST, ComponentTypeUtils.toString(componentType) + " Name is not provided");
         }
 
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                 log.error("{}/{}/{} Client [{}] is not admin and authorized to register {}", tenant, namespace,
-                        componentName, clientRole, componentType);
+                        componentName, clientRole, ComponentTypeUtils.toString(componentType));
                 throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
             }
         } catch (PulsarAdminException e) {
@@ -337,7 +334,7 @@ public abstract class ComponentImpl {
             }
         } catch (PulsarAdminException.NotAuthorizedException e) {
             log.error("{}/{}/{} Client [{}] is not admin and authorized to operate {} on tenant", tenant, namespace,
-                    componentName, clientRole, componentType);
+                    componentName, clientRole, ComponentTypeUtils.toString(componentType));
             throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
         } catch (PulsarAdminException.NotFoundException e) {
             log.error("{}/{}/{} Tenant {} does not exist", tenant, namespace, componentName, tenant);
@@ -350,8 +347,8 @@ public abstract class ComponentImpl {
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
 
         if (functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
-            log.error("{} {}/{}/{} already exists", componentType, tenant, namespace, componentName);
-            throw new RestException(Status.BAD_REQUEST, String.format("%s %s already exists", componentType, componentName));
+            log.error("{} {}/{}/{} already exists", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
+            throw new RestException(Status.BAD_REQUEST, String.format("%s %s already exists", ComponentTypeUtils.toString(componentType), componentName));
         }
 
         FunctionDetails functionDetails = null;
@@ -369,7 +366,7 @@ public abstract class ComponentImpl {
                     try {
                         componentPackageFile = FunctionCommon.extractFileFromPkgURL(functionPkgUrl);
                     } catch (Exception e) {
-                        throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), componentType, functionPkgUrl));
+                        throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), functionPkgUrl));
                     }
                     functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
                             componentConfigJson, componentType, componentPackageFile);
@@ -380,19 +377,19 @@ public abstract class ComponentImpl {
                     functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
                             componentConfigJson, componentType, componentPackageFile);
                     if (!isFunctionCodeBuiltin(functionDetails) && (componentPackageFile == null || fileDetail == null)) {
-                        throw new IllegalArgumentException(componentType + " Package is not provided");
+                        throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package is not provided");
                     }
                 }
             } catch (Exception e) {
-                log.error("Invalid register {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+                log.error("Invalid register {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
                 throw new RestException(Status.BAD_REQUEST, e.getMessage());
             }
 
             try {
                 worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
             } catch (Exception e) {
-                log.error("{} {}/{}/{} cannot be admitted by the runtime factory", componentType, tenant, namespace, componentName);
-                throw new RestException(Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s", componentType, componentName, e.getMessage()));
+                log.error("{} {}/{}/{} cannot be admitted by the runtime factory", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
+                throw new RestException(Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s", ComponentTypeUtils.toString(componentType), componentName, e.getMessage()));
             }
 
             // function state
@@ -418,10 +415,10 @@ public abstract class ComponentImpl {
                                             .build());
                         }
                     } catch (Exception e) {
-                        log.error("Error caching authentication data for {} {}/{}/{}", componentType, tenant, namespace, componentName, e);
+                        log.error("Error caching authentication data for {} {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
 
 
-                        throw new RestException(Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", componentType, componentName, e.getMessage()));
+                        throw new RestException(Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", ComponentTypeUtils.toString(componentType), componentName, e.getMessage()));
                     }
                 }
             }
@@ -431,7 +428,7 @@ public abstract class ComponentImpl {
                 packageLocationMetaDataBuilder = getFunctionPackageLocation(functionMetaDataBuilder.build(),
                         functionPkgUrl, fileDetail, componentPackageFile);
             } catch (Exception e) {
-                log.error("Failed process {} {}/{}/{} package: ", componentType, tenant, namespace, componentName, e);
+                log.error("Failed process {} {}/{}/{} package: ", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
                 throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
             }
 
@@ -461,7 +458,7 @@ public abstract class ComponentImpl {
             // For externally managed schedulers, the pkgUrl/builtin stuff should be copied to bk
             if (isBuiltin) {
                 File sinkOrSource;
-                if (componentType.equals(SOURCE)) {
+                if (componentType == FunctionDetails.ComponentType.SOURCE) {
                     String archiveName = functionDetails.getSource().getBuiltin();
                     sinkOrSource = worker().getConnectorsManager().getSourceArchive(archiveName).toFile();
                 } else {
@@ -471,13 +468,13 @@ public abstract class ComponentImpl {
                 packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName,
                         sinkOrSource.getName()));
                 packageLocationMetaDataBuilder.setOriginalFileName(sinkOrSource.getName());
-                log.info("Uploading {} package to {}", componentType, packageLocationMetaDataBuilder.getPackagePath());
+                log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType), packageLocationMetaDataBuilder.getPackagePath());
                 WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), sinkOrSource, worker().getDlogNamespace());
             } else if (isPkgUrlProvided) {
                 packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName,
                         uploadedInputStreamAsFile.getName()));
                 packageLocationMetaDataBuilder.setOriginalFileName(uploadedInputStreamAsFile.getName());
-                log.info("Uploading {} package to {}", componentType, packageLocationMetaDataBuilder.getPackagePath());
+                log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType), packageLocationMetaDataBuilder.getPackagePath());
                 WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, worker().getDlogNamespace());
             } else if (functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.HTTP)
                     || functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.FILE)) {
@@ -485,13 +482,13 @@ public abstract class ComponentImpl {
                 packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName,
                         fileName));
                 packageLocationMetaDataBuilder.setOriginalFileName(fileName);
-                log.info("Uploading {} package to {}", componentType, packageLocationMetaDataBuilder.getPackagePath());
+                log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType), packageLocationMetaDataBuilder.getPackagePath());
                 WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, worker().getDlogNamespace());
             } else {
                 packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName,
                         fileDetail.getFileName()));
                 packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName());
-                log.info("Uploading {} package to {}", componentType, packageLocationMetaDataBuilder.getPackagePath());
+                log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType), packageLocationMetaDataBuilder.getPackagePath());
                 WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, worker().getDlogNamespace());
             }
         } else {
@@ -506,7 +503,7 @@ public abstract class ComponentImpl {
             } else {
                 packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName, fileDetail.getFileName()));
                 packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName());
-                log.info("Uploading {} package to {}", componentType, packageLocationMetaDataBuilder.getPackagePath());
+                log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType), packageLocationMetaDataBuilder.getPackagePath());
                 WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, worker().getDlogNamespace());
             }
         }
@@ -536,13 +533,13 @@ public abstract class ComponentImpl {
             throw new RestException(Status.BAD_REQUEST, "Namespace is not provided");
         }
         if (componentName == null) {
-            throw new RestException(Status.BAD_REQUEST, componentType + " Name is not provided");
+            throw new RestException(Status.BAD_REQUEST, ComponentTypeUtils.toString(componentType) + " Name is not provided");
         }
 
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                 log.error("{}/{}/{} Client [{}] is not admin and authorized to update {}", tenant, namespace,
-                        componentName, clientRole, componentType);
+                        componentName, clientRole, ComponentTypeUtils.toString(componentType));
                 throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
 
             }
@@ -554,7 +551,7 @@ public abstract class ComponentImpl {
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
 
         if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
-            throw new RestException(Status.BAD_REQUEST, String.format("%s %s doesn't exist", componentType, componentName));
+            throw new RestException(Status.BAD_REQUEST, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
         }
 
         String mergedComponentConfigJson;
@@ -563,11 +560,11 @@ public abstract class ComponentImpl {
         FunctionMetaData existingComponent = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
 
         if (!InstanceUtils.calculateSubjectType(existingComponent.getFunctionDetails()).equals(componentType)) {
-            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
-            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, ComponentTypeUtils.toString(componentType));
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
         }
 
-        if (componentType.equals(FUNCTION)) {
+        if (componentType.equals(FunctionDetails.ComponentType.FUNCTION)) {
             FunctionConfig existingFunctionConfig = FunctionConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
             existingComponentConfigJson = new Gson().toJson(existingFunctionConfig);
             FunctionConfig functionConfig = new Gson().fromJson(componentConfigJson, FunctionConfig.class);
@@ -582,7 +579,7 @@ public abstract class ComponentImpl {
             } catch (Exception e) {
                 throw new RestException(Status.BAD_REQUEST, e.getMessage());
             }
-        } else if (componentType.equals(SOURCE)) {
+        } else if (componentType.equals(FunctionDetails.ComponentType.SOURCE)) {
             SourceConfig existingSourceConfig = SourceConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
             existingComponentConfigJson = new Gson().toJson(existingSourceConfig);
             SourceConfig sourceConfig = new Gson().fromJson(componentConfigJson, SourceConfig.class);
@@ -627,7 +624,7 @@ public abstract class ComponentImpl {
                     try {
                         componentPackageFile = FunctionCommon.extractFileFromPkgURL(functionPkgUrl);
                     } catch (Exception e) {
-                        throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), componentType, functionPkgUrl));
+                        throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), functionPkgUrl));
                     }
                     functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
                             mergedComponentConfigJson, componentType, componentPackageFile);
@@ -637,7 +634,7 @@ public abstract class ComponentImpl {
                     try {
                         componentPackageFile = FunctionCommon.extractFileFromPkgURL(existingComponent.getPackageLocation().getPackagePath());
                     } catch (Exception e) {
-                        throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), componentType, functionPkgUrl));
+                        throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), functionPkgUrl));
                     }
                     functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
                             mergedComponentConfigJson, componentType, componentPackageFile);
@@ -651,7 +648,7 @@ public abstract class ComponentImpl {
                     functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
                             mergedComponentConfigJson, componentType, componentPackageFile);
                     if (!isFunctionCodeBuiltin(functionDetails) && (componentPackageFile == null || fileDetail == null)) {
-                        throw new IllegalArgumentException(componentType + " Package is not provided");
+                        throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package is not provided");
                     }
                 } else {
 
@@ -664,16 +661,16 @@ public abstract class ComponentImpl {
                             mergedComponentConfigJson, componentType, componentPackageFile);
                 }
             } catch (Exception e) {
-                log.error("Invalid update {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+                log.error("Invalid update {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
                 throw new RestException(Status.BAD_REQUEST, e.getMessage());
             }
 
             try {
                 worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
             } catch (Exception e) {
-                log.error("Updated {} {}/{}/{} cannot be submitted to runtime factory", componentType, tenant, namespace, componentName);
+                log.error("Updated {} {}/{}/{} cannot be submitted to runtime factory", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
                 throw new RestException(Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s",
-                        componentType, componentName, e.getMessage()));
+                        ComponentTypeUtils.toString(componentType), componentName, e.getMessage()));
             }
 
             // merge from existing metadata
@@ -707,8 +704,8 @@ public abstract class ComponentImpl {
                             functionMetaDataBuilder.clearFunctionAuthSpec();
                         }
                     } catch (Exception e) {
-                        log.error("Error updating authentication data for {} {}/{}/{}", componentType, tenant, namespace, componentName, e);
-                        throw new RestException(Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", componentType, componentName, e.getMessage()));
+                        log.error("Error updating authentication data for {} {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
+                        throw new RestException(Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", ComponentTypeUtils.toString(componentType), componentName, e.getMessage()));
                     }
                 }
             }
@@ -719,7 +716,7 @@ public abstract class ComponentImpl {
                     packageLocationMetaDataBuilder = getFunctionPackageLocation(functionMetaDataBuilder.build(),
                             functionPkgUrl, fileDetail, componentPackageFile);
                 } catch (Exception e) {
-                    log.error("Failed process {} {}/{}/{} package: ", componentType, tenant, namespace, componentName, e);
+                    log.error("Failed process {} {}/{}/{} package: ", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
                     throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
                 }
             } else {
@@ -750,7 +747,7 @@ public abstract class ComponentImpl {
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                 log.error("{}/{}/{} Client [{}] is not admin and authorized to deregister {}", tenant, namespace,
-                        componentName, clientRole, componentType);
+                        componentName, clientRole, ComponentTypeUtils.toString(componentType));
                 throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
             }
         } catch (PulsarAdminException e) {
@@ -776,20 +773,20 @@ public abstract class ComponentImpl {
         try {
             validateDeregisterRequestParams(tenant, namespace, componentName, componentType);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid deregister {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+            log.error("Invalid deregister {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
             throw new RestException(Status.BAD_REQUEST, e.getMessage());
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
         if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
-            log.error("{} to deregister does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
-            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+            log.error("{} to deregister does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
         }
         FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
 
         if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) {
-            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
-            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, ComponentTypeUtils.toString(componentType));
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
         }
 
         CompletableFuture<RequestResult> completableFuture = functionMetaDataManager.deregisterFunction(tenant,
@@ -803,11 +800,11 @@ public abstract class ComponentImpl {
             }
         } catch (ExecutionException e) {
             log.error("Execution Exception while deregistering {} @ /{}/{}/{}",
-                    componentType, tenant, namespace, componentName, e);
+                    ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
             throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getCause().getMessage());
         } catch (InterruptedException e) {
             log.error("Interrupted Exception while deregistering {} @ /{}/{}/{}",
-                    componentType, tenant, namespace, componentName, e);
+                    ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
             throw new RestException(Status.REQUEST_TIMEOUT, e.getMessage());
         }
     }
@@ -825,7 +822,7 @@ public abstract class ComponentImpl {
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                 log.error("{}/{}/{} Client [{}] is not admin and authorized to get {}", tenant, namespace,
-                        componentName, clientRole, componentType);
+                        componentName, clientRole, ComponentTypeUtils.toString(componentType));
                 throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
             }
         } catch (PulsarAdminException e) {
@@ -837,19 +834,19 @@ public abstract class ComponentImpl {
         try {
             validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid get {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+            log.error("Invalid get {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
             throw new RestException(Status.BAD_REQUEST, e.getMessage());
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
         if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
-            log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
-            throw new RestException(Status.NOT_FOUND, String.format(componentType + " %s doesn't exist", componentName));
+            log.error("{} does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
+            throw new RestException(Status.NOT_FOUND, String.format(ComponentTypeUtils.toString(componentType) + " %s doesn't exist", componentName));
         }
         FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
         if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) {
-            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
-            throw new RestException(Status.NOT_FOUND, String.format(componentType + " %s doesn't exist", componentName));
+            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, ComponentTypeUtils.toString(componentType));
+            throw new RestException(Status.NOT_FOUND, String.format(ComponentTypeUtils.toString(componentType) + " %s doesn't exist", componentName));
         }
         FunctionConfig config = FunctionConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
         return config;
@@ -891,7 +888,7 @@ public abstract class ComponentImpl {
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                 log.error("{}/{}/{} Client [{}] is not admin and authorized to start/stop {}", tenant, namespace,
-                        componentName, clientRole, componentType);
+                        componentName, clientRole, ComponentTypeUtils.toString(componentType));
                 throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
             }
         } catch (PulsarAdminException e) {
@@ -903,20 +900,20 @@ public abstract class ComponentImpl {
         try {
             validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid start/stop {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+            log.error("Invalid start/stop {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
             throw new RestException(Status.BAD_REQUEST, e.getMessage());
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
         if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
-            log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
-            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+            log.error("{} does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
         }
 
         FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
         if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) {
-            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
-            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, ComponentTypeUtils.toString(componentType));
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
         }
 
         if (!functionMetaDataManager.canChangeState(functionMetaData, Integer.parseInt(instanceId), start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED)) {
@@ -930,7 +927,7 @@ public abstract class ComponentImpl {
         } catch (WebApplicationException we) {
             throw we;
         } catch (Exception e) {
-            log.error("Failed to start/stop {}: {}/{}/{}/{}", componentType, tenant, namespace, componentName, instanceId, e);
+            log.error("Failed to start/stop {}: {}/{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, instanceId, e);
             throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
         }
     }
@@ -949,7 +946,7 @@ public abstract class ComponentImpl {
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                 log.error("{}/{}/{} Client [{}] is not admin and authorized to restart {}", tenant, namespace,
-                        componentName, clientRole, componentType);
+                        componentName, clientRole, ComponentTypeUtils.toString(componentType));
                 throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
             }
         } catch (PulsarAdminException e) {
@@ -961,20 +958,20 @@ public abstract class ComponentImpl {
         try {
             validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid restart {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+            log.error("Invalid restart {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
             throw new RestException(Status.BAD_REQUEST, e.getMessage());
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
         if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
-            log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
-            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+            log.error("{} does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
         }
 
         FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
         if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) {
-            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
-            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, ComponentTypeUtils.toString(componentType));
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
         }
 
         FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
@@ -984,7 +981,7 @@ public abstract class ComponentImpl {
         } catch (WebApplicationException we) {
             throw we;
         } catch (Exception e) {
-            log.error("Failed to restart {}: {}/{}/{}/{}", componentType, tenant, namespace, componentName, instanceId, e);
+            log.error("Failed to restart {}: {}/{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, instanceId, e);
             throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
         }
     }
@@ -1019,7 +1016,7 @@ public abstract class ComponentImpl {
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                 log.error("{}/{}/{} Client [{}] is not admin and authorized to start/stop {}", tenant, namespace,
-                        componentName, clientRole, componentType);
+                        componentName, clientRole, ComponentTypeUtils.toString(componentType));
                 throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
             }
         } catch (PulsarAdminException e) {
@@ -1031,20 +1028,20 @@ public abstract class ComponentImpl {
         try {
             validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid start/stop {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+            log.error("Invalid start/stop {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
             throw new RestException(Status.BAD_REQUEST, e.getMessage());
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
         if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
-            log.error("{} in stopFunctionInstances does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
-            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+            log.error("{} in stopFunctionInstances does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
         }
 
         FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
         if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) {
-            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
-            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, ComponentTypeUtils.toString(componentType));
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
         }
 
         if (!functionMetaDataManager.canChangeState(functionMetaData, -1, start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED)) {
@@ -1057,7 +1054,7 @@ public abstract class ComponentImpl {
         } catch (WebApplicationException we) {
             throw we;
         } catch (Exception e) {
-            log.error("Failed to start/stop {}: {}/{}/{}", componentType, tenant, namespace, componentName, e);
+            log.error("Failed to start/stop {}: {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
             throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
         }
     }
@@ -1074,7 +1071,7 @@ public abstract class ComponentImpl {
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                 log.error("{}/{}/{} Client [{}] is not admin and authorized to restart {}", tenant, namespace,
-                        componentName, clientRole, componentType);
+                        componentName, clientRole, ComponentTypeUtils.toString(componentType));
                 throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
             }
         } catch (PulsarAdminException e) {
@@ -1086,20 +1083,20 @@ public abstract class ComponentImpl {
         try {
             validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid restart {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+            log.error("Invalid restart {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
             throw new RestException(Status.BAD_REQUEST, e.getMessage());
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
         if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
-            log.error("{} in stopFunctionInstances does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
-            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+            log.error("{} in stopFunctionInstances does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
         }
 
         FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
         if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) {
-            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
-            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, ComponentTypeUtils.toString(componentType));
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
         }
 
         FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
@@ -1108,7 +1105,7 @@ public abstract class ComponentImpl {
         } catch (WebApplicationException we) {
             throw we;
         } catch (Exception e) {
-            log.error("Failed to restart {}: {}/{}/{}", componentType, tenant, namespace, componentName, e);
+            log.error("Failed to restart {}: {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
             throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
         }
     }
@@ -1126,7 +1123,7 @@ public abstract class ComponentImpl {
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                 log.error("{}/{}/{} Client [{}] is not admin and authorized to get stats for {}", tenant, namespace,
-                        componentName, clientRole, componentType);
+                        componentName, clientRole, ComponentTypeUtils.toString(componentType));
                 throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
             }
         } catch (PulsarAdminException e) {
@@ -1138,20 +1135,20 @@ public abstract class ComponentImpl {
         try {
             validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid get {} Stats request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+            log.error("Invalid get {} Stats request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
             throw new RestException(Status.BAD_REQUEST, e.getMessage());
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
         if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
-            log.error("{} in get {} Stats does not exist @ /{}/{}/{}", componentType, componentType, tenant, namespace, componentName);
-            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+            log.error("{} in get {} Stats does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), componentType, tenant, namespace, componentName);
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
         }
 
         FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
         if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) {
-            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
-            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, ComponentTypeUtils.toString(componentType));
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
         }
 
         FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
@@ -1182,7 +1179,7 @@ public abstract class ComponentImpl {
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                 log.error("{}/{}/{} Client [{}] is not admin and authorized to get stats for {}", tenant, namespace,
-                        componentName, clientRole, componentType);
+                        componentName, clientRole, ComponentTypeUtils.toString(componentType));
                 throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
             }
         } catch (PulsarAdminException e) {
@@ -1194,26 +1191,26 @@ public abstract class ComponentImpl {
         try {
             validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid get {} Stats request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+            log.error("Invalid get {} Stats request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
             throw new RestException(Status.BAD_REQUEST, e.getMessage());
 
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
         if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
-            log.error("{} in get {} Stats does not exist @ /{}/{}/{}", componentType, componentType, tenant, namespace, componentName);
-            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+            log.error("{} in get {} Stats does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), componentType, tenant, namespace, componentName);
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
         }
         FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
         if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) {
-            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
-            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, ComponentTypeUtils.toString(componentType));
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
 
         }
         int instanceIdInt = Integer.parseInt(instanceId);
         if (instanceIdInt < 0 || instanceIdInt >= functionMetaData.getFunctionDetails().getParallelism()) {
-            log.error("instanceId in get {} Stats out of bounds @ /{}/{}/{}", componentType, tenant, namespace, componentName);
-            throw new RestException(Status.BAD_REQUEST, String.format("%s %s doesn't have instance with id %s", componentType, componentName, instanceId));
+            log.error("instanceId in get {} Stats out of bounds @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
+            throw new RestException(Status.BAD_REQUEST, String.format("%s %s doesn't have instance with id %s", ComponentTypeUtils.toString(componentType), componentName, instanceId));
         }
 
         FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
@@ -1242,7 +1239,7 @@ public abstract class ComponentImpl {
 
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
-                log.error("{}/{} Client [{}] is not admin and authorized to list {}", tenant, namespace, clientRole, componentType);
+                log.error("{}/{} Client [{}] is not admin and authorized to list {}", tenant, namespace, clientRole, ComponentTypeUtils.toString(componentType));
                 throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
             }
         } catch (PulsarAdminException e) {
@@ -1254,7 +1251,7 @@ public abstract class ComponentImpl {
         try {
             validateListFunctionRequestParams(tenant, namespace);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid list {} request @ /{}/{}", componentType, tenant, namespace, e);
+            log.error("Invalid list {} request @ /{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, e);
             throw new RestException(Status.BAD_REQUEST, e.getMessage());
         }
 
@@ -1315,7 +1312,7 @@ public abstract class ComponentImpl {
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                 log.error("{}/{}/{} Client [{}] is not admin and authorized to trigger {}", tenant, namespace,
-                        functionName, clientRole, componentType);
+                        functionName, clientRole, ComponentTypeUtils.toString(componentType));
                 throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
             }
         } catch (PulsarAdminException e) {
@@ -1430,7 +1427,7 @@ public abstract class ComponentImpl {
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                 log.error("{}/{}/{} Client [{}] is not admin and authorized to get state for {}", tenant, namespace,
-                        functionName, clientRole, componentType);
+                        functionName, clientRole, ComponentTypeUtils.toString(componentType));
                 throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
             }
         } catch (PulsarAdminException e) {
@@ -1549,7 +1546,7 @@ public abstract class ComponentImpl {
     protected void validateGetFunctionInstanceRequestParams(final String tenant,
                                                             final String namespace,
                                                             final String componentName,
-                                                            final ComponentType componentType,
+                                                            final FunctionDetails.ComponentType componentType,
                                                             final String instanceId) throws IllegalArgumentException {
         validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
         if (instanceId == null) {
@@ -1557,7 +1554,7 @@ public abstract class ComponentImpl {
         }
     }
 
-    protected void validateGetFunctionRequestParams(String tenant, String namespace, String subject, ComponentType componentType)
+    protected void validateGetFunctionRequestParams(String tenant, String namespace, String subject, FunctionDetails.ComponentType componentType)
             throws IllegalArgumentException {
 
         if (tenant == null) {
@@ -1567,11 +1564,11 @@ public abstract class ComponentImpl {
             throw new IllegalArgumentException("Namespace is not provided");
         }
         if (subject == null) {
-            throw new IllegalArgumentException(componentType + " Name is not provided");
+            throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Name is not provided");
         }
     }
 
-    private void validateDeregisterRequestParams(String tenant, String namespace, String subject, ComponentType componentType)
+    private void validateDeregisterRequestParams(String tenant, String namespace, String subject, FunctionDetails.ComponentType componentType)
             throws IllegalArgumentException {
 
         if (tenant == null) {
@@ -1581,7 +1578,7 @@ public abstract class ComponentImpl {
             throw new IllegalArgumentException("Namespace is not provided");
         }
         if (subject == null) {
-            throw new IllegalArgumentException(componentType + " Name is not provided");
+            throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Name is not provided");
         }
     }
 
@@ -1627,7 +1624,7 @@ private FunctionDetails validateUpdateRequestParams(final String tenant,
                                                     final String namespace,
                                                     final String componentName,
                                                     final String componentConfigJson,
-                                                    final ComponentType componentType,
+                                                    final FunctionDetails.ComponentType componentType,
                                                     final File componentPackageFile) throws IOException {
         if (tenant == null) {
             throw new IllegalArgumentException("Tenant is not provided");
@@ -1636,10 +1633,10 @@ private FunctionDetails validateUpdateRequestParams(final String tenant,
             throw new IllegalArgumentException("Namespace is not provided");
         }
         if (componentName == null) {
-            throw new IllegalArgumentException(String.format("%s Name is not provided", componentType));
+            throw new IllegalArgumentException(String.format("%s Name is not provided", ComponentTypeUtils.toString(componentType)));
         }
 
-        if (componentType.equals(FUNCTION) && !isEmpty(componentConfigJson)) {
+        if (componentType.equals(FunctionDetails.ComponentType.FUNCTION) && !isEmpty(componentConfigJson)) {
             FunctionConfig functionConfig = new Gson().fromJson(componentConfigJson, FunctionConfig.class);
             // The rest end points take precedence over whatever is there in functionconfig
             functionConfig.setTenant(tenant);
@@ -1649,7 +1646,7 @@ private FunctionDetails validateUpdateRequestParams(final String tenant,
             ClassLoader clsLoader = FunctionConfigUtils.validate(functionConfig, componentPackageFile);
             return FunctionConfigUtils.convert(functionConfig, clsLoader);
         }
-        if (componentType.equals(SOURCE)) {
+        if (componentType.equals(FunctionDetails.ComponentType.SOURCE)) {
             Path archivePath = null;
             SourceConfig sourceConfig = new Gson().fromJson(componentConfigJson, SourceConfig.class);
             // The rest end points take precedence over whatever is there in sourceconfig
@@ -1671,7 +1668,7 @@ private FunctionDetails validateUpdateRequestParams(final String tenant,
             SourceConfigUtils.ExtractedSourceDetails sourceDetails = SourceConfigUtils.validate(sourceConfig, archivePath, componentPackageFile);
             return SourceConfigUtils.convert(sourceConfig, sourceDetails);
         }
-        if (componentType.equals(SINK)) {
+        if (componentType.equals(FunctionDetails.ComponentType.SINK)) {
             Path archivePath = null;
             SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, SinkConfig.class);
             // The rest end points take precedence over whatever is there in sinkConfig
@@ -1693,7 +1690,7 @@ private FunctionDetails validateUpdateRequestParams(final String tenant,
             SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validate(sinkConfig, archivePath, componentPackageFile);
             return SinkConfigUtils.convert(sinkConfig, sinkDetails);
         } else {
-            throw new IllegalArgumentException("Unrecognized component type: " + componentType);
+            throw new IllegalArgumentException("Unrecognized component type: " + ComponentTypeUtils.toString(componentType));
         }
     }
 
@@ -1769,7 +1766,7 @@ private FunctionDetails validateUpdateRequestParams(final String tenant,
         try {
             if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                 log.error("{}/{}/{} Client [{}] is not admin and authorized get status for {}", tenant, namespace,
-                        componentName, clientRole, componentType);
+                        componentName, clientRole, ComponentTypeUtils.toString(componentType));
                 throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
             }
         } catch (PulsarAdminException e) {
@@ -1781,20 +1778,20 @@ private FunctionDetails validateUpdateRequestParams(final String tenant,
         try {
             validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid get {} Status request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+            log.error("Invalid get {} Status request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
             throw new RestException(Status.BAD_REQUEST, e.getMessage());
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
         if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
-            log.error("{} in get {} Status does not exist @ /{}/{}/{}", componentType, componentType, tenant, namespace, componentName);
-            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+            log.error("{} in get {} Status does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), componentType, tenant, namespace, componentName);
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
         }
 
         FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
         if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) {
-            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
-            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
+            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, ComponentTypeUtils.toString(componentType));
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
         }
     }
 
@@ -1810,9 +1807,9 @@ private FunctionDetails validateUpdateRequestParams(final String tenant,
         FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
         int parallelism = functionMetaData.getFunctionDetails().getParallelism();
         if (instanceId < 0 || instanceId >= parallelism) {
-            log.error("instanceId in get {} Status out of bounds @ /{}/{}/{}", componentType, tenant, namespace, componentName);
+            log.error("instanceId in get {} Status out of bounds @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
             throw new RestException(Status.BAD_REQUEST,
-                    String.format("%s %s doesn't have instance with id %s", componentType, componentName, instanceId));
+                    String.format("%s %s doesn't have instance with id %s", ComponentTypeUtils.toString(componentType), componentName, instanceId));
         }
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 93642f6..bf09fbf 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -25,7 +25,6 @@ import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.utils.ComponentType;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.RestException;
 
@@ -197,7 +196,7 @@ public class FunctionsImpl extends ComponentImpl {
     }
 
     public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
-        super(workerServiceSupplier, ComponentType.FUNCTION);
+        super(workerServiceSupplier, Function.FunctionDetails.ComponentType.FUNCTION);
     }
 
     /**
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
index c0cac01..e7b9ff1 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
@@ -27,7 +27,7 @@ import org.apache.pulsar.common.policies.data.SinkStatus;
 import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.utils.ComponentType;
+import org.apache.pulsar.functions.utils.ComponentTypeUtils;
 import org.apache.pulsar.functions.utils.SinkConfigUtils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.WorkerService;
@@ -207,7 +207,7 @@ public class SinkImpl extends ComponentImpl {
     }
 
     public SinkImpl(Supplier<WorkerService> workerServiceSupplier) {
-        super(workerServiceSupplier, ComponentType.SINK);
+        super(workerServiceSupplier, Function.FunctionDetails.ComponentType.SINK);
     }
 
     public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkInstanceStatus(final String tenant,
@@ -270,19 +270,19 @@ public class SinkImpl extends ComponentImpl {
         try {
             validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid get {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+            log.error("Invalid get {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
             throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
         if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
-            log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
-            throw new RestException(Response.Status.NOT_FOUND, String.format(componentType + " %s doesn't exist", componentName));
+            log.error("{} does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
+            throw new RestException(Response.Status.NOT_FOUND, String.format(ComponentTypeUtils.toString(componentType) + " %s doesn't exist", componentName));
         }
         Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
         if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) {
-            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
-            throw new RestException(Response.Status.NOT_FOUND, String.format(componentType + " %s doesn't exist", componentName));
+            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, ComponentTypeUtils.toString(componentType));
+            throw new RestException(Response.Status.NOT_FOUND, String.format(ComponentTypeUtils.toString(componentType) + " %s doesn't exist", componentName));
         }
         SinkConfig config = SinkConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
         return config;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
index 43bb4e7..a2b59ef 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
@@ -27,7 +27,7 @@ import org.apache.pulsar.common.policies.data.SourceStatus;
 import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.utils.ComponentType;
+import org.apache.pulsar.functions.utils.ComponentTypeUtils;
 import org.apache.pulsar.functions.utils.SourceConfigUtils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.WorkerService;
@@ -209,7 +209,7 @@ public class SourceImpl extends ComponentImpl {
     }
 
     public SourceImpl(Supplier<WorkerService> workerServiceSupplier) {
-        super(workerServiceSupplier, ComponentType.SOURCE);
+        super(workerServiceSupplier, Function.FunctionDetails.ComponentType.SOURCE);
     }
 
     public SourceStatus getSourceStatus(final String tenant,
@@ -268,19 +268,19 @@ public class SourceImpl extends ComponentImpl {
         try {
             validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid get {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+            log.error("Invalid get {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
             throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
         if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
-            log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
-            throw new RestException(Response.Status.NOT_FOUND, String.format(componentType + " %s doesn't exist", componentName));
+            log.error("{} does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
+            throw new RestException(Response.Status.NOT_FOUND, String.format(ComponentTypeUtils.toString(componentType) + " %s doesn't exist", componentName));
         }
         Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
         if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) {
-            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
-            throw new RestException(Response.Status.NOT_FOUND, String.format(componentType + " %s doesn't exist", componentName));
+            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, ComponentTypeUtils.toString(componentType));
+            throw new RestException(Response.Status.NOT_FOUND, String.format(ComponentTypeUtils.toString(componentType) + " %s doesn't exist", componentName));
         }
         SourceConfig config = SourceConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
         return config;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
index 78248e0..dcfa8f1 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -37,7 +37,6 @@ import org.apache.pulsar.functions.runtime.Runtime;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.apache.pulsar.functions.source.TopicSchema;
-import org.apache.pulsar.functions.utils.ComponentType;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
@@ -62,9 +61,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Supplier;
 
-import static org.apache.pulsar.functions.utils.ComponentType.FUNCTION;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.eq;
@@ -192,7 +189,7 @@ public class FunctionsImplTest {
         this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
 
         mockStatic(InstanceUtils.class);
-        PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(ComponentType.FUNCTION);    }
+        PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(Function.FunctionDetails.ComponentType.FUNCTION);    }
 
     @Test
     public void testStatusEmpty() {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 1790fe7..17b0234 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -45,7 +45,6 @@ import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.Function.SubscriptionType;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.source.TopicSchema;
-import org.apache.pulsar.functions.utils.ComponentType;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -183,7 +182,7 @@ public class FunctionApiV2ResourceTest {
         FunctionsImpl functions = spy(new FunctionsImpl(() -> mockedWorkerService));
 
         mockStatic(InstanceUtils.class);
-        PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(ComponentType.FUNCTION);
+        PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(FunctionDetails.ComponentType.FUNCTION);
 
         this.resource = spy(new FunctionsImplV2(functions));
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index ffbf552..696bd19 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -43,7 +43,6 @@ import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.Function.SubscriptionType;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.source.TopicSchema;
-import org.apache.pulsar.functions.utils.ComponentType;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -78,9 +77,6 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
-import static org.apache.pulsar.functions.utils.ComponentType.FUNCTION;
-import static org.apache.pulsar.functions.utils.ComponentType.SINK;
-import static org.apache.pulsar.functions.utils.ComponentType.SOURCE;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
@@ -88,7 +84,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.doNothing;
-import static org.powermock.api.mockito.PowerMockito.doReturn;
 import static org.powermock.api.mockito.PowerMockito.doThrow;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 import static org.testng.Assert.assertEquals;
@@ -183,7 +178,7 @@ public class FunctionApiV3ResourceTest {
 
         this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
         mockStatic(InstanceUtils.class);
-        PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(ComponentType.FUNCTION);
+        PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(FunctionDetails.ComponentType.FUNCTION);
     }
 
     //
@@ -1406,9 +1401,9 @@ public class FunctionApiV3ResourceTest {
         when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
 
         mockStatic(InstanceUtils.class);
-        PowerMockito.when(InstanceUtils.calculateSubjectType(f1.getFunctionDetails())).thenReturn(ComponentType.SOURCE);
-        PowerMockito.when(InstanceUtils.calculateSubjectType(f2.getFunctionDetails())).thenReturn(ComponentType.FUNCTION);
-        PowerMockito.when(InstanceUtils.calculateSubjectType(f3.getFunctionDetails())).thenReturn(ComponentType.SINK);
+        PowerMockito.when(InstanceUtils.calculateSubjectType(f1.getFunctionDetails())).thenReturn(FunctionDetails.ComponentType.SOURCE);
+        PowerMockito.when(InstanceUtils.calculateSubjectType(f2.getFunctionDetails())).thenReturn(FunctionDetails.ComponentType.FUNCTION);
+        PowerMockito.when(InstanceUtils.calculateSubjectType(f3.getFunctionDetails())).thenReturn(FunctionDetails.ComponentType.SINK);
 
         List<String> functionList = listDefaultFunctions();
         assertEquals(functions, functionList);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 827cabf..59ee08b 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -39,7 +39,6 @@ import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
-import org.apache.pulsar.functions.utils.ComponentType;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.SinkConfigUtils;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
@@ -178,7 +177,7 @@ public class SinkApiV3ResourceTest {
 
         this.resource = spy(new SinkImpl(() -> mockedWorkerService));
         mockStatic(InstanceUtils.class);
-        PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(ComponentType.SINK);
+        PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(FunctionDetails.ComponentType.SINK);
     }
 
     //
@@ -1336,9 +1335,9 @@ public class SinkApiV3ResourceTest {
         when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
 
         mockStatic(InstanceUtils.class);
-        PowerMockito.when(InstanceUtils.calculateSubjectType(f1.getFunctionDetails())).thenReturn(ComponentType.SOURCE);
-        PowerMockito.when(InstanceUtils.calculateSubjectType(f2.getFunctionDetails())).thenReturn(ComponentType.FUNCTION);
-        PowerMockito.when(InstanceUtils.calculateSubjectType(f3.getFunctionDetails())).thenReturn(ComponentType.SINK);
+        PowerMockito.when(InstanceUtils.calculateSubjectType(f1.getFunctionDetails())).thenReturn(FunctionDetails.ComponentType.SOURCE);
+        PowerMockito.when(InstanceUtils.calculateSubjectType(f2.getFunctionDetails())).thenReturn(FunctionDetails.ComponentType.FUNCTION);
+        PowerMockito.when(InstanceUtils.calculateSubjectType(f3.getFunctionDetails())).thenReturn(FunctionDetails.ComponentType.SINK);
 
         List<String> sinkList = listDefaultSinks();
         assertEquals(functions, sinkList);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index 93a3cea..c56066b 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -42,7 +42,6 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.source.TopicSchema;
-import org.apache.pulsar.functions.utils.ComponentType;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.SourceConfigUtils;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
@@ -173,7 +172,7 @@ public class SourceApiV3ResourceTest {
 
         this.resource = spy(new SourceImpl(() -> mockedWorkerService));
         mockStatic(InstanceUtils.class);
-        PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(ComponentType.SOURCE);
+        PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(FunctionDetails.ComponentType.SOURCE);
     }
 
     //
@@ -1345,9 +1344,9 @@ public class SourceApiV3ResourceTest {
         functionMetaDataList.add(f3);
         when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
         mockStatic(InstanceUtils.class);
-        PowerMockito.when(InstanceUtils.calculateSubjectType(f1.getFunctionDetails())).thenReturn(ComponentType.SOURCE);
-        PowerMockito.when(InstanceUtils.calculateSubjectType(f2.getFunctionDetails())).thenReturn(ComponentType.FUNCTION);
-        PowerMockito.when(InstanceUtils.calculateSubjectType(f3.getFunctionDetails())).thenReturn(ComponentType.SINK);
+        PowerMockito.when(InstanceUtils.calculateSubjectType(f1.getFunctionDetails())).thenReturn(FunctionDetails.ComponentType.SOURCE);
+        PowerMockito.when(InstanceUtils.calculateSubjectType(f2.getFunctionDetails())).thenReturn(FunctionDetails.ComponentType.FUNCTION);
+        PowerMockito.when(InstanceUtils.calculateSubjectType(f3.getFunctionDetails())).thenReturn(FunctionDetails.ComponentType.SINK);
 
         List<String> sourceList = listDefaultSources();
         assertEquals(functions, sourceList);