You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/04/01 16:52:42 UTC

[pulsar] branch master updated: Refactor and consolidate Utils classes (#3952)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f70043  Refactor and consolidate Utils classes (#3952)
6f70043 is described below

commit 6f70043183e2c3e7259ba515266a97caa711b321
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Mon Apr 1 11:52:36 2019 -0500

    Refactor and consolidate Utils classes (#3952)
    
    * Consolidating utils classes
    
    * further refactoring
    
    * adding missing class
    
    * add license header
---
 .../org/apache/pulsar/broker/PulsarService.java    |   4 +-
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    | 100 +++++++--------
 .../apache/pulsar/io/PulsarFunctionTlsTest.java    |   4 +-
 .../apache/pulsar/admin/cli/CmdFunctionsTest.java  |   4 +-
 .../org/apache/pulsar/admin/cli/TestCmdSinks.java  |   4 +-
 .../apache/pulsar/admin/cli/TestCmdSources.java    |   3 +-
 .../pulsar/functions/instance/ContextImpl.java     |  11 +-
 .../pulsar/functions/instance/InstanceUtils.java   |  16 +--
 .../functions/instance/JavaInstanceRunnable.java   |  20 ++-
 .../apache/pulsar/functions/instance/Utils.java    |  57 ---------
 .../instance/stats/ComponentStatsManager.java      |   4 +-
 .../pulsar/functions/source/PulsarRecord.java      |   4 +-
 .../pulsar/functions/instance/ContextImplTest.java |   5 +-
 .../pulsar/functions/instance/UtilsTest.java       |  55 ---------
 .../auth/KubernetesSecretsTokenAuthProvider.java   |   4 +-
 .../functions/runtime/KubernetesRuntime.java       |  14 +--
 .../pulsar/functions/runtime/LocalRunner.java      |   4 +-
 .../pulsar/functions/runtime/ProcessRuntime.java   |   6 +-
 .../pulsar/functions/runtime/RuntimeSpawner.java   |   4 +-
 .../pulsar/functions/runtime/RuntimeUtils.java     |   9 +-
 .../pulsar/functions/runtime/ThreadRuntime.java    |   4 +-
 .../functions/runtime/KubernetesRuntimeTest.java   |   4 +-
 .../functions/runtime/ProcessRuntimeTest.java      |   4 +-
 .../{package-info.java => ComponentType.java}      |  21 +++-
 .../utils/{Utils.java => FunctionCommon.java}      |  89 +++++++++++---
 .../functions/utils/FunctionConfigUtils.java       |  18 +--
 .../functions/utils/FunctionDetailsUtils.java      |  71 -----------
 .../apache/pulsar/functions/utils/Reflections.java |   6 +-
 .../pulsar/functions/utils/SinkConfigUtils.java    |  12 +-
 .../pulsar/functions/utils/SourceConfigUtils.java  |  12 +-
 .../apache/pulsar/functions/utils/StateUtils.java  |  40 ------
 .../pulsar/functions/utils/ValidatorUtils.java     |   6 +-
 .../utils/functioncache/package-info.java          |  23 ----
 .../{UtilsTest.java => FunctionCommonTest.java}    |  52 +++++++-
 .../pulsar/functions/worker/FunctionActioner.java  |  42 +++++--
 .../functions/worker/FunctionRuntimeManager.java   |  25 ++--
 .../pulsar/functions/worker/MembershipManager.java |  13 +-
 .../pulsar/functions/worker/SchedulerManager.java  |   9 +-
 .../org/apache/pulsar/functions/worker/Worker.java |   4 +-
 .../pulsar/functions/worker/WorkerService.java     |  15 +--
 .../worker/{Utils.java => WorkerUtils.java}        |  48 +++-----
 .../functions/worker/rest/api/ComponentImpl.java   |  39 +++---
 .../functions/worker/rest/api/FunctionsImpl.java   |   4 +-
 .../functions/worker/rest/api/FunctionsImplV2.java |   7 +-
 .../pulsar/functions/worker/rest/api/SinkImpl.java |   4 +-
 .../functions/worker/rest/api/SourceImpl.java      |   4 +-
 .../functions/worker/rest/api/WorkerImpl.java      |   9 +-
 .../worker/FunctionRuntimeManagerTest.java         |  10 +-
 .../functions/worker/SchedulerManagerTest.java     |  36 +++---
 .../apache/pulsar/functions/worker/UtilsTest.java  |  43 -------
 .../worker/request/ServiceRequestManagerTest.java  |   4 +-
 .../worker/rest/api/FunctionsImplTest.java         |   8 +-
 .../rest/api/v2/FunctionApiV2ResourceTest.java     |  92 +++++++-------
 .../rest/api/v3/FunctionApiV3ResourceTest.java     |  94 +++++++--------
 .../worker/rest/api/v3/SinkApiV3ResourceTest.java  | 134 +++++++++++----------
 .../rest/api/v3/SourceApiV3ResourceTest.java       | 134 +++++++++++----------
 56 files changed, 639 insertions(+), 833 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 400d207..9cd6be8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -94,7 +94,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.compaction.TwoPhaseCompactor;
-import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.utils.PulsarBrokerVersionStringUtils;
@@ -1036,7 +1036,7 @@ public class PulsarService implements AutoCloseable {
             URI dlogURI;
             try {
                 // initializing dlog namespace for function worker
-                dlogURI = Utils.initializeDlogNamespace(
+                dlogURI = WorkerUtils.initializeDlogNamespace(
                         internalConf.getZookeeperServers(),
                         internalConf.getLedgersRootPath());
             } catch (IOException ioe) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index abd9684..cd14acd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -51,7 +51,7 @@ import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.functions.instance.InstanceUtils;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
@@ -504,63 +504,63 @@ public class PulsarFunctionE2ETest {
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_sink_received_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_sink_written_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_sink_written_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_sink_sink_exceptions_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_sink_sink_exceptions_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_sink_system_exceptions_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_sink_system_exceptions_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_sink_last_invocation");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
 
         int totalMsgs = 10;
@@ -587,63 +587,63 @@ public class PulsarFunctionE2ETest {
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, (double) totalMsgs);
         m = metrics.get("pulsar_sink_received_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, (double) totalMsgs);
         m = metrics.get("pulsar_sink_written_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, (double) totalMsgs);
         m = metrics.get("pulsar_sink_written_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, (double) totalMsgs);
         m = metrics.get("pulsar_sink_sink_exceptions_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_sink_sink_exceptions_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_sink_system_exceptions_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_sink_system_exceptions_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_sink_last_invocation");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertTrue(m.value > 0.0);
 
 
@@ -714,63 +714,63 @@ public class PulsarFunctionE2ETest {
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertTrue(m.value > 0.0);
         m = metrics.get("pulsar_source_received_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertTrue(m.value > 0.0);
         m = metrics.get("pulsar_source_written_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertTrue(m.value > 0.0);
         m = metrics.get("pulsar_source_written_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertTrue(m.value > 0.0);
         m = metrics.get("pulsar_source_source_exceptions_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_source_source_exceptions_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_source_system_exceptions_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_source_system_exceptions_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_source_last_invocation");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertTrue(m.value > 0.0);
     }
 
@@ -870,77 +870,77 @@ public class PulsarFunctionE2ETest {
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_received_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_user_exceptions_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_user_exceptions_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_process_latency_ms");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, Double.NaN);
         m = metrics.get("pulsar_function_process_latency_ms_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, Double.NaN);
         m = metrics.get("pulsar_function_system_exceptions_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_system_exceptions_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_last_invocation");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_processed_successfully_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_processed_successfully_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
 
 
@@ -1028,77 +1028,77 @@ public class PulsarFunctionE2ETest {
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, (double) totalMsgs);
         m = metrics.get("pulsar_function_received_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, (double) totalMsgs);
         m = metrics.get("pulsar_function_user_exceptions_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_user_exceptions_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_process_latency_ms");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertTrue(m.value > 0.0);
         m = metrics.get("pulsar_function_process_latency_ms_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertTrue(m.value > 0.0);
         m = metrics.get("pulsar_function_system_exceptions_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_system_exceptions_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, 0.0);
         m = metrics.get("pulsar_function_last_invocation");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertTrue(m.value > 0.0);
         m = metrics.get("pulsar_function_processed_successfully_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, (double) totalMsgs);
         m = metrics.get("pulsar_function_processed_successfully_total_1min");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
         assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
-        assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
         assertEquals(m.value, (double) totalMsgs);
 
         // delete functions
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index a37786f..aba83a7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -45,7 +45,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
@@ -226,7 +226,7 @@ public class PulsarFunctionTlsTest {
 
         File file = new File(jarFile);
         try {
-            Utils.loadJar(file);
+            FunctionCommon.loadJar(file);
         } catch (MalformedURLException e) {
             throw new RuntimeException("Failed to load user jar " + file, e);
         }
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index a64c38c..4df3312 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -58,7 +58,7 @@ import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.testng.IObjectFactory;
@@ -70,7 +70,7 @@ import org.testng.annotations.Test;
  * Unit test of {@link CmdFunctions}.
  */
 @Slf4j
-@PrepareForTest({ CmdFunctions.class, Reflections.class, StorageClientBuilder.class, Utils.class})
+@PrepareForTest({ CmdFunctions.class, Reflections.class, StorageClientBuilder.class, FunctionCommon.class})
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*" })
 public class CmdFunctionsTest {
 
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index 2bb3877..5d8a303 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -43,7 +43,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.SinkConfig;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -120,7 +120,7 @@ public class TestCmdSinks {
             throw new RuntimeException("Failed to file required test archive: " + JAR_FILE_NAME);
         }
         JAR_FILE_PATH = file.getFile();
-        Thread.currentThread().setContextClassLoader(Utils.loadJar(new File(JAR_FILE_PATH)));
+        Thread.currentThread().setContextClassLoader(FunctionCommon.loadJar(new File(JAR_FILE_PATH)));
     }
 
     public SinkConfig getSinkConfig() {
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index 77e26a5..ac2dab9 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -42,7 +42,6 @@ import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.functions.utils.*;
-import org.mockito.Mockito;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -101,7 +100,7 @@ public class TestCmdSources {
         mockStatic(CmdFunctions.class);
         PowerMockito.doNothing().when(localSourceRunner).runCmd();
         JAR_FILE_PATH = Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME).getFile();
-        Thread.currentThread().setContextClassLoader(Utils.loadJar(new File(JAR_FILE_PATH)));
+        Thread.currentThread().setContextClassLoader(FunctionCommon.loadJar(new File(JAR_FILE_PATH)));
     }
 
     public SourceConfig getSourceConfig() {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 5400b41..bb70b41 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -44,8 +44,8 @@ import org.apache.pulsar.functions.instance.stats.SourceStatsManager;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import org.apache.pulsar.functions.source.TopicSchema;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.ComponentType;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.SourceContext;
 import org.slf4j.Logger;
@@ -54,7 +54,6 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -98,11 +97,11 @@ class ContextImpl implements Context, SinkContext, SourceContext {
         userMetricsLabelNames = Arrays.copyOf(ComponentStatsManager.metricsLabelNames, ComponentStatsManager.metricsLabelNames.length + 1);
         userMetricsLabelNames[ComponentStatsManager.metricsLabelNames.length] = "metric";
     }
-    private final Utils.ComponentType componentType;
+    private final ComponentType componentType;
 
     public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
                        SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels,
-                       Utils.ComponentType componentType, ComponentStatsManager statsManager) {
+                       ComponentType componentType, ComponentStatsManager statsManager) {
         this.config = config;
         this.logger = logger;
         this.publishProducers = new HashMap<>();
@@ -335,7 +334,7 @@ class ContextImpl implements Context, SinkContext, SourceContext {
                         .sendTimeout(0, TimeUnit.SECONDS)
                         .topic(topicName)
                         .properties(InstanceUtils.getProperties(componentType,
-                                FunctionDetailsUtils.getFullyQualifiedName(
+                                FunctionCommon.getFullyQualifiedName(
                                         this.config.getFunctionDetails().getTenant(),
                                         this.config.getFunctionDetails().getNamespace(),
                                         this.config.getFunctionDetails().getName()),
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index 9db47cf..5cae6a8 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -20,9 +20,9 @@ package org.apache.pulsar.functions.instance;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
-import static org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION;
-import static org.apache.pulsar.functions.utils.Utils.ComponentType.SINK;
-import static org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE;
+import static org.apache.pulsar.functions.utils.ComponentType.FUNCTION;
+import static org.apache.pulsar.functions.utils.ComponentType.SINK;
+import static org.apache.pulsar.functions.utils.ComponentType.SOURCE;
 
 import lombok.experimental.UtilityClass;
 
@@ -31,11 +31,11 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.sink.PulsarSink;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import org.apache.pulsar.functions.utils.ComponentType;
 import org.apache.pulsar.functions.utils.Reflections;
 
 import net.jodah.typetools.TypeResolver;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -89,7 +89,7 @@ public class InstanceUtils {
         }
     }
 
-    public Utils.ComponentType calculateSubjectType(Function.FunctionDetails functionDetails) {
+    public ComponentType calculateSubjectType(Function.FunctionDetails functionDetails) {
         Function.SourceSpec sourceSpec = functionDetails.getSource();
         Function.SinkSpec sinkSpec = functionDetails.getSink();
         if (sourceSpec.getInputSpecsCount() == 0) {
@@ -109,7 +109,7 @@ public class InstanceUtils {
     }
 
     public static String getDefaultSubscriptionName(String tenant, String namespace, String name) {
-        return FunctionDetailsUtils.getFullyQualifiedName(tenant, namespace, name);
+        return FunctionCommon.getFullyQualifiedName(tenant, namespace, name);
     }
 
     public static String getDefaultSubscriptionName(Function.FunctionDetails functionDetails) {
@@ -119,7 +119,7 @@ public class InstanceUtils {
                 functionDetails.getName());
     }
 
-    public static Map<String, String> getProperties(Utils.ComponentType componentType,
+    public static Map<String, String> getProperties(ComponentType componentType,
                                                     String fullyQualifiedName, int instanceId) {
         Map<String, String> properties = new HashMap<>();
         switch (componentType) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 8a60537..6b265ae 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -66,10 +66,9 @@ import org.apache.pulsar.functions.sink.PulsarSinkConfig;
 import org.apache.pulsar.functions.sink.PulsarSinkDisable;
 import org.apache.pulsar.functions.source.PulsarSource;
 import org.apache.pulsar.functions.source.PulsarSourceConfig;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import org.apache.pulsar.functions.utils.ComponentType;
 import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.utils.StateUtils;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.Source;
@@ -80,7 +79,6 @@ import java.io.FileNotFoundException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -131,7 +129,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
 
     private InstanceCache instanceCache;
 
-    private final Utils.ComponentType componentType;
+    private final ComponentType componentType;
 
     private final Map<String, String> properties;
 
@@ -156,13 +154,13 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
                 instanceConfig.getFunctionDetails().getName(),
                 String.valueOf(instanceConfig.getInstanceId()),
                 instanceConfig.getClusterName(),
-                FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails())
+                FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails())
         };
 
         this.componentType = InstanceUtils.calculateSubjectType(instanceConfig.getFunctionDetails());
 
         this.properties = InstanceUtils.getProperties(this.componentType,
-                FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()),
+                FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()),
                 this.instanceConfig.getInstanceId());
 
         // Declare function local collector registry so that it will not clash with other function instances'
@@ -176,7 +174,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
      */
     JavaInstance setupJavaInstance(ContextImpl contextImpl) throws Exception {
         // initialize the thread context
-        ThreadContext.put("function", FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
+        ThreadContext.put("function", FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
         ThreadContext.put("functionname", instanceConfig.getFunctionDetails().getName());
         ThreadContext.put("instance", instanceConfig.getInstanceName());
 
@@ -276,7 +274,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
                 }
             }
         } catch (Throwable t) {
-            log.error("[{}] Uncaught exception in Java Instance", Utils.getFullyQualifiedInstanceId(
+            log.error("[{}] Uncaught exception in Java Instance", FunctionCommon.getFullyQualifiedInstanceId(
                     instanceConfig.getFunctionDetails().getTenant(),
                     instanceConfig.getFunctionDetails().getNamespace(),
                     instanceConfig.getFunctionDetails().getName(),
@@ -366,7 +364,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
             return;
         }
 
-        String tableNs = StateUtils.getStateNamespace(
+        String tableNs = FunctionCommon.getStateNamespace(
             instanceConfig.getFunctionDetails().getTenant(),
             instanceConfig.getFunctionDetails().getNamespace()
         );
@@ -586,7 +584,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         if (instanceConfig.getFunctionDetails().getLogTopic() != null &&
                 !instanceConfig.getFunctionDetails().getLogTopic().isEmpty()) {
             logAppender = new LogAppender(client, instanceConfig.getFunctionDetails().getLogTopic(),
-                    FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
+                    FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
             logAppender.start();
         }
     }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/Utils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/Utils.java
deleted file mode 100644
index 7149bfe..0000000
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/Utils.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.instance;
-
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.client.impl.TopicMessageIdImpl;
-
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
-/**
- * Utils used for instance.
- */
-@Slf4j
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public class Utils {
-
-    public static final long getSequenceId(MessageId messageId) {
-        MessageIdImpl msgId = (MessageIdImpl) ((messageId instanceof TopicMessageIdImpl)
-                ? ((TopicMessageIdImpl) messageId).getInnerMessageId()
-                : messageId);
-        long ledgerId = msgId.getLedgerId();
-        long entryId = msgId.getEntryId();
-
-        // Combine ledger id and entry id to form offset
-        // Use less than 32 bits to represent entry id since it will get
-        // rolled over way before overflowing the max int range
-        long offset = (ledgerId << 28) | entryId;
-        return offset;
-    }
-
-    public static final MessageId getMessageId(long sequenceId) {
-        // Demultiplex ledgerId and entryId from offset
-        long ledgerId = sequenceId >>> 28;
-        long entryId = sequenceId & 0x0F_FF_FF_FFL;
-
-        return new MessageIdImpl(ledgerId, entryId, -1);
-    }
-}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
index 0c9b9af..081556c 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java
@@ -23,7 +23,7 @@ import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.exporter.common.TextFormat;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.ComponentType;
 
 import java.io.IOException;
 import java.io.StringWriter;
@@ -58,7 +58,7 @@ public abstract class ComponentStatsManager implements AutoCloseable {
     public static ComponentStatsManager getStatsManager(CollectorRegistry collectorRegistry,
                                   String[] metricsLabels,
                                   ScheduledExecutorService scheduledExecutorService,
-                                  Utils.ComponentType componentType) {
+                                  ComponentType componentType) {
         switch (componentType) {
             case FUNCTION:
                 return new FunctionStatsManager(collectorRegistry, metricsLabels, scheduledExecutorService);
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index dc5a08a..c03ed9f 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -29,7 +29,7 @@ import lombok.ToString;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.api.EncryptionContext;
-import org.apache.pulsar.functions.instance.Utils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 
 @Builder
 @Getter
@@ -66,7 +66,7 @@ public class PulsarRecord<T> implements RecordWithEncryptionContext<T> {
 
     @Override
     public Optional<Long> getRecordSequence() {
-        return Optional.of(Utils.getSequenceId(message.getMessageId()));
+        return Optional.of(FunctionCommon.getSequenceId(message.getMessageId()));
     }
 
     @Override
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index df243cf..a898e54 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -27,14 +27,13 @@ import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.ComponentType;
 import org.mockito.Matchers;
 import org.slf4j.Logger;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
@@ -78,7 +77,7 @@ public class ContextImplTest {
             logger,
             client,
             new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0],
-                Utils.ComponentType.FUNCTION, null);
+                ComponentType.FUNCTION, null);
     }
 
     @Test(expectedExceptions = IllegalStateException.class)
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/UtilsTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/UtilsTest.java
deleted file mode 100644
index ba83c89..0000000
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/UtilsTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.instance;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-
-import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.testng.annotations.Test;
-
-/**
- * Unit test of {@link Utils}.
- */
-public class UtilsTest {
-
-    @Test
-    public void testGetSequenceId() {
-        long lid = 12345L;
-        long eid = 34566L;
-        MessageIdImpl id = mock(MessageIdImpl.class);
-        when(id.getLedgerId()).thenReturn(lid);
-        when(id.getEntryId()).thenReturn(eid);
-
-        assertEquals((lid << 28) | eid, Utils.getSequenceId(id));
-    }
-
-    @Test
-    public void testGetMessageId() {
-        long lid = 12345L;
-        long eid = 34566L;
-        long sequenceId = (lid << 28) | eid;
-
-        MessageIdImpl id = (MessageIdImpl) Utils.getMessageId(sequenceId);
-        assertEquals(lid, id.getLedgerId());
-        assertEquals(eid, id.getEntryId());
-    }
-
-}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
index ff2cd57..e1cd3e8 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
@@ -34,7 +34,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.utils.Actions;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 
 import java.util.Collections;
 import java.util.Optional;
@@ -112,7 +112,7 @@ public class KubernetesSecretsTokenAuthProvider implements KubernetesFunctionAut
 
     @Override
     public void cleanUpAuthData(String tenant, String namespace, String name, FunctionAuthData functionAuthData) throws Exception {
-        String fqfn = FunctionDetailsUtils.getFullyQualifiedName(tenant, namespace, name);
+        String fqfn = FunctionCommon.getFullyQualifiedName(tenant, namespace, name);
 
         String secretName = new String(functionAuthData.getData());
         Actions.Action deleteSecrets = Actions.Action.builder()
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index fb51b36..103980f 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -62,8 +62,8 @@ import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
 import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.Actions;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.ComponentType;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -370,7 +370,7 @@ public class KubernetesRuntime implements Runtime {
         final V1Service service = createService();
         log.info("Submitting the following service to k8 {}", coreClient.getApiClient().getJSON().serialize(service));
 
-        String fqfn = FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails());
+        String fqfn = FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails());
 
         Actions.Action createService = Actions.Action.builder()
                 .actionName(String.format("Submitting service for function %s", fqfn))
@@ -447,7 +447,7 @@ public class KubernetesRuntime implements Runtime {
 
         log.info("Submitting the following spec to k8 {}", appsClient.getApiClient().getJSON().serialize(statefulSet));
 
-        String fqfn = FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails());
+        String fqfn = FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails());
 
         Actions.Action createStatefulSet = Actions.Action.builder()
                 .actionName(String.format("Submitting statefulset for function %s", fqfn))
@@ -495,7 +495,7 @@ public class KubernetesRuntime implements Runtime {
         options.setGracePeriodSeconds(5L);
         options.setPropagationPolicy("Foreground");
 
-        String fqfn = FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails());
+        String fqfn = FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails());
         Actions.Action deleteStatefulSet = Actions.Action.builder()
                 .actionName(String.format("Deleting statefulset for function %s", fqfn))
                 .numRetries(KubernetesRuntimeFactory.NUM_RETRIES)
@@ -644,7 +644,7 @@ public class KubernetesRuntime implements Runtime {
         final V1DeleteOptions options = new V1DeleteOptions();
         options.setGracePeriodSeconds(0L);
         options.setPropagationPolicy("Foreground");
-        String fqfn = FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails());
+        String fqfn = FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails());
         String serviceName = createJobName(instanceConfig.getFunctionDetails());
 
         Actions.Action deleteService = Actions.Action.builder()
@@ -846,7 +846,7 @@ public class KubernetesRuntime implements Runtime {
 
     private Map<String, String> getLabels(Function.FunctionDetails functionDetails) {
         final Map<String, String> labels = new HashMap<>();
-        Utils.ComponentType componentType = InstanceUtils.calculateSubjectType(functionDetails);
+        ComponentType componentType = InstanceUtils.calculateSubjectType(functionDetails);
         String component;
         switch (componentType) {
             case FUNCTION:
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
index 717659e..98950e7 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
@@ -47,7 +47,7 @@ import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.functions.utils.io.Connectors;
 
 import static org.apache.pulsar.common.functions.Utils.inferMissingArguments;
-import static org.apache.pulsar.functions.utils.Utils.*;
+import static org.apache.pulsar.functions.utils.FunctionCommon.*;
 
 @Slf4j
 public class LocalRunner {
@@ -187,7 +187,7 @@ public class LocalRunner {
                 instanceConfig.setFunctionId(UUID.randomUUID().toString());
                 instanceConfig.setInstanceId(i + instanceIdOffset);
                 instanceConfig.setMaxBufferedTuples(1024);
-                instanceConfig.setPort(Utils.findAvailablePort());
+                instanceConfig.setPort(FunctionCommon.findAvailablePort());
                 instanceConfig.setClusterName("local");
                 RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
                         instanceConfig,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 734925a..c202ad9 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -37,7 +37,7 @@ import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
 import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -85,7 +85,7 @@ class ProcessRuntime implements Runtime {
                    Long expectedHealthCheckInterval) throws Exception {
         this.instanceConfig = instanceConfig;
         this.instancePort = instanceConfig.getPort();
-        this.metricsPort = Utils.findAvailablePort();
+        this.metricsPort = FunctionCommon.findAvailablePort();
         this.expectedHealthCheckInterval = expectedHealthCheckInterval;
         this.secretsProviderConfigurator = secretsProviderConfigurator;
         this.funcLogDir = RuntimeUtils.genFunctionLogFolder(logDirectory, instanceConfig);
@@ -201,7 +201,7 @@ class ProcessRuntime implements Runtime {
             // forcibly kill after timeout
             if (process.isAlive()) {
                 log.warn("Process for instance {} did not exit within timeout. Forcibly killing process...",
-                        Utils.getFullyQualifiedInstanceId(
+                        FunctionCommon.getFullyQualifiedInstanceId(
                                 instanceConfig.getFunctionDetails().getTenant(),
                                 instanceConfig.getFunctionDetails().getNamespace(),
                                 instanceConfig.getFunctionDetails().getName(), instanceConfig.getInstanceId()));
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index 5bb3ab1..206745a 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -36,7 +36,7 @@ import org.apache.pulsar.functions.instance.InstanceCache;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 
 @Slf4j
 public class RuntimeSpawner implements AutoCloseable {
@@ -127,7 +127,7 @@ public class RuntimeSpawner implements AutoCloseable {
     public CompletableFuture<String> getFunctionStatusAsJson(int instanceId) {
         return this.getFunctionStatus(instanceId).thenApply(msg -> {
             try {
-                return Utils.printJson(msg);
+                return FunctionCommon.printJson(msg);
             } catch (IOException e) {
                 throw new RuntimeException(
                         instanceConfig.getFunctionDetails().getName() + " Exception parsing getStatus", e);
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index b866f46..8a5ab37 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -27,22 +27,17 @@ import java.io.InputStreamReader;
 import java.net.HttpURLConnection;
 import java.net.InetAddress;
 import java.net.URL;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.function.Supplier;
 
-import lombok.Builder;
-import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
 
-import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 
@@ -247,7 +242,7 @@ public class RuntimeUtils {
         return String.format(
                 "%s/%s",
                 logDirectory,
-                FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
+                FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
     }
 
     public static String getPrometheusMetrics(int metricsPort) throws IOException{
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 8aff81a..83e3782 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -31,9 +31,9 @@ import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
 import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 
 /**
  * A function container implemented using java thread.
@@ -104,7 +104,7 @@ class ThreadRuntime implements Runtime {
         log.info("ThreadContainer starting function with instance config {}", instanceConfig);
         this.fnThread = new Thread(threadGroup, javaInstanceRunnable,
                 String.format("%s-%s",
-                        FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()),
+                        FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()),
                         instanceConfig.getInstanceId()));
         this.fnThread.start();
     }
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index c7a9832..b1c3dfb 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -31,7 +31,7 @@ import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
@@ -291,7 +291,7 @@ public class KubernetesRuntimeTest {
                 + " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile
                 + extraDepsEnv
                 + " -Dlog4j.configurationFile=kubernetes_instance_log4j2.yml "
-                + "-Dpulsar.function.log.dir=" + logDirectory + "/" + FunctionDetailsUtils.getFullyQualifiedName(config.getFunctionDetails())
+                + "-Dpulsar.function.log.dir=" + logDirectory + "/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
                 + " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-$SHARD_ID"
                 + " -Xmx" + String.valueOf(RESOURCES.getRam())
                 + " org.apache.pulsar.functions.runtime.JavaInstanceMain"
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 5460fe3..e2c0499 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -44,7 +44,7 @@ import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
@@ -269,7 +269,7 @@ public class ProcessRuntimeTest {
                 + " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile
                 + extraDepsEnv
                 + " -Dlog4j.configurationFile=java_instance_log4j2.yml "
-                + "-Dpulsar.function.log.dir=" + logDirectory + "/functions/" + FunctionDetailsUtils.getFullyQualifiedName(config.getFunctionDetails())
+                + "-Dpulsar.function.log.dir=" + logDirectory + "/functions/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
                 + " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-" + config.getInstanceId()
                 + " org.apache.pulsar.functions.runtime.JavaInstanceMain"
                 + " --jar " + userJarFile + " --instance_id "
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/package-info.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ComponentType.java
similarity index 69%
rename from pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/package-info.java
rename to pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ComponentType.java
index 7cf7893..a99edc2 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/package-info.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ComponentType.java
@@ -16,8 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.pulsar.functions.utils;
 
-/**
- * Helper classes.
- */
-package org.apache.pulsar.functions.utils;
\ No newline at end of file
+public enum ComponentType {
+    FUNCTION("Function"),
+    SOURCE("Source"),
+    SINK("Sink");
+
+    private final String componentName;
+
+    ComponentType(String componentName) {
+        this.componentName = componentName;
+    }
+
+    @Override
+    public String toString() {
+        return componentName;
+    }
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
similarity index 81%
rename from pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
rename to pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index 38002a1..2212734 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -18,9 +18,11 @@
  */
 package org.apache.pulsar.functions.utils;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.ObjectOutputStream;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.ParameterizedType;
@@ -31,7 +33,11 @@ import java.nio.channels.ReadableByteChannel;
 import java.nio.file.Path;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.UUID;
 
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.api.Function;
@@ -56,7 +62,7 @@ import static org.apache.commons.lang3.StringUtils.isEmpty;
  */
 @Slf4j
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
-public class Utils {
+public class FunctionCommon {
 
     public static String printJson(MessageOrBuilder msg) throws IOException {
         return JsonFormat.printer().print(msg);
@@ -235,6 +241,16 @@ public class Utils {
         return null;
     }
 
+    public static void downloadFromHttpUrl(String destPkgUrl, File targetFile) throws IOException {
+        URL website = new URL(destPkgUrl);
+        ReadableByteChannel rbc = Channels.newChannel(website.openStream());
+        log.info("Downloading function package from {} to {} ...", destPkgUrl, targetFile.getAbsoluteFile());
+        try (FileOutputStream fos = new FileOutputStream(targetFile)) {
+            fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
+        }
+        log.info("Downloading function package from {} to {} completed!", destPkgUrl, targetFile.getAbsoluteFile());
+    }
+
     /**
      * Load a jar.
      *
@@ -266,14 +282,8 @@ public class Utils {
             }
             return file;
         } else if (destPkgUrl.startsWith("http")) {
-            URL website = new URL(destPkgUrl);
             File tempFile = File.createTempFile("function", ".tmp");
-            ReadableByteChannel rbc = Channels.newChannel(website.openStream());
-            log.info("Downloading function package from {} to {} ...", destPkgUrl, tempFile.getAbsoluteFile());
-            try (FileOutputStream fos = new FileOutputStream(tempFile)) {
-                fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
-            }
-            log.info("Downloading function package from {} to {} completed!", destPkgUrl, tempFile.getAbsoluteFile());
+            downloadFromHttpUrl(destPkgUrl, tempFile);
             return tempFile;
         } else {
             throw new IllegalArgumentException("Unsupported url protocol "+ destPkgUrl +", supported url protocols: [file/http/https]");
@@ -351,20 +361,61 @@ public class Utils {
         return String.format("%s/%s/%s:%d", tenant, namespace, functionName, instanceId);
     }
 
-    public enum ComponentType {
-        FUNCTION("Function"),
-        SOURCE("Source"),
-        SINK("Sink");
+    public static final long getSequenceId(MessageId messageId) {
+        MessageIdImpl msgId = (MessageIdImpl) ((messageId instanceof TopicMessageIdImpl)
+                ? ((TopicMessageIdImpl) messageId).getInnerMessageId()
+                : messageId);
+        long ledgerId = msgId.getLedgerId();
+        long entryId = msgId.getEntryId();
+
+        // Combine ledger id and entry id to form offset
+        // Use less than 32 bits to represent entry id since it will get
+        // rolled over way before overflowing the max int range
+        long offset = (ledgerId << 28) | entryId;
+        return offset;
+    }
+
+    public static final MessageId getMessageId(long sequenceId) {
+        // Demultiplex ledgerId and entryId from offset
+        long ledgerId = sequenceId >>> 28;
+        long entryId = sequenceId & 0x0F_FF_FF_FFL;
 
-        private final String componentName;
+        return new MessageIdImpl(ledgerId, entryId, -1);
+    }
 
-        ComponentType(String componentName) {
-            this.componentName = componentName;
+    public static byte[] toByteArray(Object obj) throws IOException {
+        byte[] bytes = null;
+        try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+             ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+            oos.writeObject(obj);
+            oos.flush();
+            bytes = bos.toByteArray();
         }
+        return bytes;
+    }
 
-        @Override
-        public String toString() {
-            return componentName;
-        }
+    public static String getUniquePackageName(String packageName) {
+        return String.format("%s-%s", UUID.randomUUID().toString(), packageName);
+    }
+
+    /**
+     * Convert pulsar tenant and namespace to state storage namespace.
+     *
+     * @param tenant pulsar tenant
+     * @param namespace pulsar namespace
+     * @return state storage namespace
+     */
+    public static String getStateNamespace(String tenant, String namespace) {
+        return String.format("%s_%s", tenant, namespace)
+            .replace("-", "_");
+    }
+
+    public static String getFullyQualifiedName(org.apache.pulsar.functions.proto.Function.FunctionDetails FunctionDetails) {
+        return getFullyQualifiedName(FunctionDetails.getTenant(), FunctionDetails.getNamespace(), FunctionDetails.getName());
+
+    }
+
+    public static String getFullyQualifiedName(String tenant, String namespace, String functionName) {
+        return String.format("%s/%s/%s", tenant, namespace, functionName);
     }
 }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 1a41eb5..aeb26c3 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -39,7 +39,7 @@ import static org.apache.commons.lang.StringUtils.isNotBlank;
 import static org.apache.commons.lang.StringUtils.isNotEmpty;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.pulsar.common.functions.Utils.BUILTIN;
-import static org.apache.pulsar.functions.utils.Utils.loadJar;
+import static org.apache.pulsar.functions.utils.FunctionCommon.loadJar;
 
 public class FunctionConfigUtils {
     public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader classLoader)
@@ -48,7 +48,7 @@ public class FunctionConfigUtils {
         Class<?>[] typeArgs = null;
         if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
             if (classLoader != null) {
-                typeArgs = Utils.getFunctionTypes(functionConfig, classLoader);
+                typeArgs = FunctionCommon.getFunctionTypes(functionConfig, classLoader);
             }
         }
 
@@ -159,11 +159,11 @@ public class FunctionConfigUtils {
             functionDetailsBuilder.setLogTopic(functionConfig.getLogTopic());
         }
         if (functionConfig.getRuntime() != null) {
-            functionDetailsBuilder.setRuntime(Utils.convertRuntime(functionConfig.getRuntime()));
+            functionDetailsBuilder.setRuntime(FunctionCommon.convertRuntime(functionConfig.getRuntime()));
         }
         if (functionConfig.getProcessingGuarantees() != null) {
             functionDetailsBuilder.setProcessingGuarantees(
-                    Utils.convertProcessingGuarantee(functionConfig.getProcessingGuarantees()));
+                    FunctionCommon.convertProcessingGuarantee(functionConfig.getProcessingGuarantees()));
         }
 
         if (functionConfig.getMaxMessageRetries() != null && functionConfig.getMaxMessageRetries() >= 0) {
@@ -234,7 +234,7 @@ public class FunctionConfigUtils {
         functionConfig.setNamespace(functionDetails.getNamespace());
         functionConfig.setName(functionDetails.getName());
         functionConfig.setParallelism(functionDetails.getParallelism());
-        functionConfig.setProcessingGuarantees(Utils.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
+        functionConfig.setProcessingGuarantees(FunctionCommon.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
         Map<String, ConsumerConfig> consumerConfigMap = new HashMap<>();
         for (Map.Entry<String, Function.ConsumerSpec> input : functionDetails.getSource().getInputSpecsMap().entrySet()) {
             ConsumerConfig consumerConfig = new ConsumerConfig();
@@ -278,8 +278,8 @@ public class FunctionConfigUtils {
         if (!isEmpty(functionDetails.getLogTopic())) {
             functionConfig.setLogTopic(functionDetails.getLogTopic());
         }
-        functionConfig.setRuntime(Utils.convertRuntime(functionDetails.getRuntime()));
-        functionConfig.setProcessingGuarantees(Utils.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
+        functionConfig.setRuntime(FunctionCommon.convertRuntime(functionDetails.getRuntime()));
+        functionConfig.setProcessingGuarantees(FunctionCommon.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
         if (functionDetails.hasRetryDetails()) {
             functionConfig.setMaxMessageRetries(functionDetails.getRetryDetails().getMaxMessageRetries());
             if (!isEmpty(functionDetails.getRetryDetails().getDeadLetterTopic())) {
@@ -355,7 +355,7 @@ public class FunctionConfigUtils {
     }
 
     private static void doJavaChecks(FunctionConfig functionConfig, ClassLoader clsLoader) {
-        Class<?>[] typeArgs = Utils.getFunctionTypes(functionConfig, clsLoader);
+        Class<?>[] typeArgs = FunctionCommon.getFunctionTypes(functionConfig, clsLoader);
         // inputs use default schema, so there is no check needed there
 
         // Check if the Input serialization/deserialization class exists in jar or already loaded and that it
@@ -566,7 +566,7 @@ public class FunctionConfigUtils {
             ClassLoader classLoader = null;
             if (org.apache.commons.lang3.StringUtils.isNotBlank(functionPkgUrl)) {
                 try {
-                    classLoader = Utils.extractClassLoader(functionPkgUrl);
+                    classLoader = FunctionCommon.extractClassLoader(functionPkgUrl);
                 } catch (Exception e) {
                     throw new IllegalArgumentException("Corrupted Jar File", e);
                 }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionDetailsUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionDetailsUtils.java
deleted file mode 100644
index a9787ba..0000000
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionDetailsUtils.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pulsar.functions.utils;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-
-public class FunctionDetailsUtils {
-
-    public static String getFullyQualifiedName(FunctionDetails FunctionDetails) {
-        return getFullyQualifiedName(FunctionDetails.getTenant(), FunctionDetails.getNamespace(), FunctionDetails.getName());
-    }
-
-    public static String getFullyQualifiedName(String tenant, String namespace, String functionName) {
-        return String.format("%s/%s/%s", tenant, namespace, functionName);
-    }
-
-    public static String extractTenantFromFQN(String fullyQualifiedName) {
-        return fullyQualifiedName.split("/")[0];
-    }
-
-    public static String extractNamespaceFromFQN(String fullyQualifiedName) {
-        return fullyQualifiedName.split("/")[1];
-    }
-
-    public static String extractFunctionNameFromFQN(String fullyQualifiedName) {
-        return fullyQualifiedName.split("/")[2];
-    }
-
-    public static String getDownloadFileName(FunctionDetails FunctionDetails,
-                                             Function.PackageLocationMetaData packageLocation) {
-        if (!StringUtils.isEmpty(packageLocation.getOriginalFileName())) {
-            return packageLocation.getOriginalFileName();
-        }
-        String[] hierarchy = FunctionDetails.getClassName().split("\\.");
-        String fileName;
-        if (hierarchy.length <= 0) {
-            fileName = FunctionDetails.getClassName();
-        } else if (hierarchy.length == 1) {
-            fileName =  hierarchy[0];
-        } else {
-            fileName = hierarchy[hierarchy.length - 2];
-        }
-        switch (FunctionDetails.getRuntime()) {
-            case JAVA:
-                return fileName + ".jar";
-            case PYTHON:
-                return fileName + ".py";
-            default:
-                throw new RuntimeException("Unknown runtime " + FunctionDetails.getRuntime());
-        }
-    }
-}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
index 7ba22aa..1d89738 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
@@ -165,7 +165,7 @@ public class Reflections {
 
     public static Object createInstance(String userClassName, java.io.File jar) {
         try {
-            return createInstance(userClassName, Utils.loadJar(jar));
+            return createInstance(userClassName, FunctionCommon.loadJar(jar));
         } catch (Exception ex) {
             return null;
         }
@@ -180,7 +180,7 @@ public class Reflections {
      */
     public static boolean classExistsInJar(java.io.File jar, String fqcn) {
         try {
-            java.net.URLClassLoader loader = (URLClassLoader) Utils.loadJar(jar);
+            java.net.URLClassLoader loader = (URLClassLoader) FunctionCommon.loadJar(jar);
             Class.forName(fqcn, false, loader);
             loader.close();
             return true;
@@ -214,7 +214,7 @@ public class Reflections {
     public static boolean classInJarImplementsIface(java.io.File jar, String fqcn, Class xface) {
         boolean ret = false;
         try {
-            java.net.URLClassLoader loader = (URLClassLoader) Utils.loadJar(jar);
+            java.net.URLClassLoader loader = (URLClassLoader) FunctionCommon.loadJar(jar);
             if (xface.isAssignableFrom(Class.forName(fqcn, false, loader))){
                 ret = true;
             }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 81554c4..72a9e6e 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -44,8 +44,8 @@ import java.util.*;
 
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
-import static org.apache.pulsar.functions.utils.Utils.getSinkType;
+import static org.apache.pulsar.functions.utils.FunctionCommon.convertProcessingGuarantee;
+import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType;
 
 @Slf4j
 public class SinkConfigUtils {
@@ -204,7 +204,7 @@ public class SinkConfigUtils {
         sinkConfig.setNamespace(functionDetails.getNamespace());
         sinkConfig.setName(functionDetails.getName());
         sinkConfig.setParallelism(functionDetails.getParallelism());
-        sinkConfig.setProcessingGuarantees(Utils.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
+        sinkConfig.setProcessingGuarantees(FunctionCommon.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
         Map<String, ConsumerConfig> consumerConfigMap = new HashMap<>();
         for (Map.Entry<String, Function.ConsumerSpec> input : functionDetails.getSource().getInputSpecsMap().entrySet()) {
             ConsumerConfig consumerConfig = new ConsumerConfig();
@@ -306,11 +306,11 @@ public class SinkConfigUtils {
             ClassLoader jarClassLoader = null;
             ClassLoader narClassLoader = null;
             try {
-                jarClassLoader = Utils.extractClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
+                jarClassLoader = FunctionCommon.extractClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
             } catch (Exception e) {
             }
             try {
-                narClassLoader = Utils.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
+                narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
             } catch (Exception e) {
             }
             if (jarClassLoader == null && narClassLoader == null) {
@@ -332,7 +332,7 @@ public class SinkConfigUtils {
         } else if (!org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
             throw new IllegalArgumentException("Class-name must be present for archive with file-url");
         } else {
-            classLoader = Utils.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
+            classLoader = FunctionCommon.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
             if (classLoader == null) {
                 throw new IllegalArgumentException("Sink Package is not provided");
             }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 9a32085..9f71d63 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -41,8 +41,8 @@ import java.nio.file.Path;
 import java.util.Map;
 
 import static org.apache.commons.lang3.StringUtils.isEmpty;
-import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
-import static org.apache.pulsar.functions.utils.Utils.getSourceType;
+import static org.apache.pulsar.functions.utils.FunctionCommon.convertProcessingGuarantee;
+import static org.apache.pulsar.functions.utils.FunctionCommon.getSourceType;
 
 public class SourceConfigUtils {
 
@@ -146,7 +146,7 @@ public class SourceConfigUtils {
         sourceConfig.setNamespace(functionDetails.getNamespace());
         sourceConfig.setName(functionDetails.getName());
         sourceConfig.setParallelism(functionDetails.getParallelism());
-        sourceConfig.setProcessingGuarantees(Utils.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
+        sourceConfig.setProcessingGuarantees(FunctionCommon.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
         Function.SourceSpec sourceSpec = functionDetails.getSource();
         if (!StringUtils.isEmpty(sourceSpec.getClassName())) {
             sourceConfig.setClassName(sourceSpec.getClassName());
@@ -217,11 +217,11 @@ public class SourceConfigUtils {
             ClassLoader jarClassLoader = null;
             ClassLoader narClassLoader = null;
             try {
-                jarClassLoader = Utils.extractClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
+                jarClassLoader = FunctionCommon.extractClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
             } catch (Exception e) {
             }
             try {
-                narClassLoader = Utils.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
+                narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
             } catch (Exception e) {
             }
             if (jarClassLoader == null && narClassLoader == null) {
@@ -243,7 +243,7 @@ public class SourceConfigUtils {
         } else if (!StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
             throw new IllegalArgumentException("Class-name must be present for archive with file-url");
         } else {
-            classLoader = Utils.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
+            classLoader = FunctionCommon.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
             if (classLoader == null) {
                 throw new IllegalArgumentException("Source Package is not provided");
             }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/StateUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/StateUtils.java
deleted file mode 100644
index 8eabf3a..0000000
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/StateUtils.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.utils;
-
-/**
- * Utils for state store.
- */
-public final class StateUtils {
-
-    private StateUtils() {}
-
-    /**
-     * Convert pulsar tenant and namespace to state storage namespace.
-     *
-     * @param tenant pulsar tenant
-     * @param namespace pulsar namespace
-     * @return state storage namespace
-     */
-    public static String getStateNamespace(String tenant, String namespace) {
-        return String.format("%s_%s", tenant, namespace)
-            .replace("-", "_");
-    }
-
-}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java
index 72051e5..93a6a72 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java
@@ -35,7 +35,7 @@ public class ValidatorUtils {
             // If it's empty, we use the default schema and no need to validate
             // If it's built-in, no need to validate
         } else {
-            Utils.implementsClass(schemaType, Schema.class, clsLoader);
+            FunctionCommon.implementsClass(schemaType, Schema.class, clsLoader);
             validateSchemaType(schemaType, typeArg, clsLoader, input);
         }
     }
@@ -54,13 +54,13 @@ public class ValidatorUtils {
         if (isEmpty(inputSerializer)) return;
         if (inputSerializer.equals(DEFAULT_SERDE)) return;
         try {
-            Class<?> serdeClass = Utils.loadClass(inputSerializer, clsLoader);
+            Class<?> serdeClass = FunctionCommon.loadClass(inputSerializer, clsLoader);
         } catch (ClassNotFoundException e) {
             throw new IllegalArgumentException(
                     String.format("The input serialization/deserialization class %s does not exist",
                             inputSerializer));
         }
-        Utils.implementsClass(inputSerializer, SerDe.class, clsLoader);
+        FunctionCommon.implementsClass(inputSerializer, SerDe.class, clsLoader);
 
         SerDe serDe = (SerDe) Reflections.createInstance(inputSerializer, clsLoader);
         if (serDe == null) {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/package-info.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/package-info.java
deleted file mode 100644
index eef11b3..0000000
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * A cache system for managing function library dependencies.
- */
-package org.apache.pulsar.functions.utils.functioncache;
\ No newline at end of file
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/UtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionCommonTest.java
similarity index 55%
rename from pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/UtilsTest.java
rename to pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionCommonTest.java
index 72dd26e..2fbb6c0 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/UtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionCommonTest.java
@@ -19,46 +19,86 @@
 
 package org.apache.pulsar.functions.utils;
 
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import java.io.File;
+import java.util.UUID;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+
 /**
  * Unit test of {@link Exceptions}.
  */
-public class UtilsTest {
+public class FunctionCommonTest {
 
     @Test
     public void testValidateLocalFileUrl() throws Exception {
         String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         try {
             // eg: fileLocation : /dir/fileName.jar (invalid)
-            Utils.extractClassLoader(fileLocation);
+            FunctionCommon.extractClassLoader(fileLocation);
             Assert.fail("should fail with invalid url: without protocol");
         } catch (IllegalArgumentException ie) {
             // Ok.. expected exception
         }
         String fileLocationWithProtocol = "file://" + fileLocation;
         // eg: fileLocation : file:///dir/fileName.jar (valid)
-        Utils.extractClassLoader(fileLocationWithProtocol);
+        FunctionCommon.extractClassLoader(fileLocationWithProtocol);
         // eg: fileLocation : file:/dir/fileName.jar (valid)
         fileLocationWithProtocol = "file:" + fileLocation;
-        Utils.extractClassLoader(fileLocationWithProtocol);
+        FunctionCommon.extractClassLoader(fileLocationWithProtocol);
     }
 
     @Test
     public void testValidateHttpFileUrl() throws Exception {
 
         String jarHttpUrl = "http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar";
-        Utils.extractClassLoader(jarHttpUrl);
+        FunctionCommon.extractClassLoader(jarHttpUrl);
 
         jarHttpUrl = "http://_invalidurl_.com";
         try {
             // eg: fileLocation : /dir/fileName.jar (invalid)
-            Utils.extractClassLoader(jarHttpUrl);
+            FunctionCommon.extractClassLoader(jarHttpUrl);
             Assert.fail("should fail with invalid url: without protocol");
         } catch (Exception ie) {
             // Ok.. expected exception
         }
     }
+
+    @Test
+    public void testDownloadFile() throws Exception {
+        String jarHttpUrl = "http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar";
+        String testDir = FunctionCommonTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+        File pkgFile = new File(testDir, UUID.randomUUID().toString());
+        FunctionCommon.downloadFromHttpUrl(jarHttpUrl, pkgFile);
+        Assert.assertTrue(pkgFile.exists());
+        pkgFile.delete();
+    }
+
+    @Test
+    public void testGetSequenceId() {
+        long lid = 12345L;
+        long eid = 34566L;
+        MessageIdImpl id = mock(MessageIdImpl.class);
+        when(id.getLedgerId()).thenReturn(lid);
+        when(id.getEntryId()).thenReturn(eid);
+
+        assertEquals((lid << 28) | eid, FunctionCommon.getSequenceId(id));
+    }
+
+    @Test
+    public void testGetMessageId() {
+        long lid = 12345L;
+        long eid = 34566L;
+        long sequenceId = (lid << 28) | eid;
+
+        MessageIdImpl id = (MessageIdImpl) FunctionCommon.getMessageId(sequenceId);
+        assertEquals(lid, id.getLedgerId());
+        assertEquals(eid, id.getEntryId());
+    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 246834e..b137555 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -46,7 +46,7 @@ import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.apache.pulsar.functions.utils.Actions;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 
 import java.io.File;
@@ -69,8 +69,8 @@ import static org.apache.pulsar.common.functions.Utils.FILE;
 import static org.apache.pulsar.common.functions.Utils.HTTP;
 import static org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported;
 import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
-import static org.apache.pulsar.functions.utils.Utils.getSinkType;
-import static org.apache.pulsar.functions.utils.Utils.getSourceType;
+import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType;
+import static org.apache.pulsar.functions.utils.FunctionCommon.getSourceType;
 
 @Data
 @Setter
@@ -97,6 +97,7 @@ public class FunctionActioner {
         this.pulsarAdmin = pulsarAdmin;
     }
 
+
     public void startFunction(FunctionRuntimeInfo functionRuntimeInfo) {
         try {
             FunctionMetaData functionMetaData = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData();
@@ -127,7 +128,7 @@ public class FunctionActioner {
                     pkgDir.mkdirs();
                     File pkgFile = new File(
                             pkgDir,
-                            new File(FunctionDetailsUtils.getDownloadFileName(functionMetaData.getFunctionDetails(), functionMetaData.getPackageLocation())).getName());
+                            new File(getDownloadFileName(functionMetaData.getFunctionDetails(), functionMetaData.getPackageLocation())).getName());
                     downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId);
                     packageFile = pkgFile.getAbsolutePath();
                 }
@@ -175,7 +176,7 @@ public class FunctionActioner {
         instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
         instanceConfig.setInstanceId(instanceId);
         instanceConfig.setMaxBufferedTuples(1024);
-        instanceConfig.setPort(org.apache.pulsar.functions.utils.Utils.findAvailablePort());
+        instanceConfig.setPort(FunctionCommon.findAvailablePort());
         instanceConfig.setClusterName(clusterName);
         instanceConfig.setFunctionAuthenticationSpec(functionAuthSpec);
         return instanceConfig;
@@ -208,9 +209,9 @@ public class FunctionActioner {
                 downloadFromHttp ? pkgLocationPath : functionMetaData.getPackageLocation());
 
         if(downloadFromHttp) {
-            Utils.downloadFromHttpUrl(pkgLocationPath, new FileOutputStream(tempPkgFile));
+            FunctionCommon.downloadFromHttpUrl(pkgLocationPath, tempPkgFile);
         } else {
-            Utils.downloadFromBookkeeper(
+            WorkerUtils.downloadFromBookkeeper(
                     dlogNamespace,
                     new FileOutputStream(tempPkgFile),
                     pkgLocationPath);
@@ -249,7 +250,7 @@ public class FunctionActioner {
                         Paths.get(pkgDir.toURI()), RecursiveDeleteOption.ALLOW_INSECURE);
             } catch (IOException e) {
                 log.warn("Failed to delete package for function: {}",
-                        FunctionDetailsUtils.getFullyQualifiedName(functionMetaData.getFunctionDetails()), e);
+                        FunctionCommon.getFullyQualifiedName(functionMetaData.getFunctionDetails()), e);
             }
         }
     }
@@ -270,7 +271,7 @@ public class FunctionActioner {
 
     public void terminateFunction(FunctionRuntimeInfo functionRuntimeInfo) {
         FunctionDetails details = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
-        String fqfn = FunctionDetailsUtils.getFullyQualifiedName(details);
+        String fqfn = FunctionCommon.getFullyQualifiedName(details);
         log.info("{}-{} Terminating function...", fqfn,functionRuntimeInfo.getFunctionInstance().getInstanceId());
 
         if (functionRuntimeInfo.getRuntimeSpawner() != null) {
@@ -460,4 +461,27 @@ public class FunctionActioner {
         }
     }
 
+    private static String getDownloadFileName(FunctionDetails FunctionDetails,
+                                             Function.PackageLocationMetaData packageLocation) {
+        if (!org.apache.commons.lang.StringUtils.isEmpty(packageLocation.getOriginalFileName())) {
+            return packageLocation.getOriginalFileName();
+        }
+        String[] hierarchy = FunctionDetails.getClassName().split("\\.");
+        String fileName;
+        if (hierarchy.length <= 0) {
+            fileName = FunctionDetails.getClassName();
+        } else if (hierarchy.length == 1) {
+            fileName =  hierarchy[0];
+        } else {
+            fileName = hierarchy[hierarchy.length - 2];
+        }
+        switch (FunctionDetails.getRuntime()) {
+            case JAVA:
+                return fileName + ".jar";
+            case PYTHON:
+                return fileName + ".py";
+            default:
+                throw new RuntimeException("Unknown runtime " + FunctionDetails.getRuntime());
+        }
+    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 8c79f83..4618e88 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
 import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.FunctionInstanceId;
 import org.apache.pulsar.functions.utils.Reflections;
 
@@ -321,7 +322,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
         final String workerId = this.workerConfig.getWorkerId();
 
         if (assignedWorkerId.equals(workerId)) {
-            stopFunction(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()), true);
+            stopFunction(FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()), true);
             return;
         } else {
             // query other worker
@@ -361,7 +362,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
             Assignment assignment = assignments.iterator().next();
             final String assignedWorkerId = assignment.getWorkerId();
             final String workerId = this.workerConfig.getWorkerId();
-            String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+            String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
             if (assignedWorkerId.equals(workerId)) {
                 stopFunction(fullyQualifiedInstanceId, true);
             } else {
@@ -386,7 +387,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
             for (Assignment assignment : assignments) {
                 final String assignedWorkerId = assignment.getWorkerId();
                 final String workerId = this.workerConfig.getWorkerId();
-                String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+                String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
                 if (assignedWorkerId.equals(workerId)) {
                     stopFunction(fullyQualifiedInstanceId, true);
                 } else {
@@ -424,7 +425,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
         Map<String, Assignment> assignments = workerIdToAssignments.get(workerId);
         if (assignments != null) {
             assignments.values().forEach(assignment -> {
-                String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+                String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
                 try {
                     stopFunction(fullyQualifiedInstanceId, false);
                 } catch (Exception e) {
@@ -478,10 +479,10 @@ public class FunctionRuntimeManager implements AutoCloseable{
         // If I am running worker
         if (assignedWorkerId.equals(workerId)) {
             FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(
-                    org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
+                    FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()));
             RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
             if (runtimeSpawner != null) {
-                return Utils.getFunctionInstanceStats(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()), functionRuntimeInfo, instanceId).getMetrics();
+                return WorkerUtils.getFunctionInstanceStats(FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()), functionRuntimeInfo, instanceId).getMetrics();
             }
             return new FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData();
         } else {
@@ -595,7 +596,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
             existingAssignmentMap.putAll(entry);
         }
 
-        if (existingAssignmentMap.containsKey(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(newAssignment.getInstance()))) {
+        if (existingAssignmentMap.containsKey(FunctionCommon.getFullyQualifiedInstanceId(newAssignment.getInstance()))) {
             updateAssignment(newAssignment);
         } else {
             addAssignment(newAssignment);
@@ -603,7 +604,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
     }
 
     private void updateAssignment(Assignment assignment) {
-        String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+        String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
         Assignment existingAssignment = this.findAssignment(assignment);
         // potential updates need to happen
 
@@ -728,7 +729,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
 
     @VisibleForTesting
     void deleteAssignment(Assignment assignment) {
-        String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+        String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
         Map<String, Assignment> assignmentMap = this.workerIdToAssignments.get(assignment.getWorkerId());
         if (assignmentMap != null) {
             if (assignmentMap.containsKey(fullyQualifiedInstanceId)) {
@@ -751,7 +752,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
     }
 
     private void startFunctionInstance(Assignment assignment) {
-        String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+        String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
         FunctionRuntimeInfo functionRuntimeInfo = _getFunctionRuntimeInfo(fullyQualifiedInstanceId);
 
         if (functionRuntimeInfo == null) {
@@ -776,7 +777,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
      */
     private Assignment findAssignment(String tenant, String namespace, String functionName, int instanceId) {
         String fullyQualifiedInstanceId
-                = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(tenant, namespace, functionName, instanceId);
+                = FunctionCommon.getFullyQualifiedInstanceId(tenant, namespace, functionName, instanceId);
         for (Map.Entry<String, Map<String, Assignment>> entry : this.workerIdToAssignments.entrySet()) {
             Map<String, Assignment> assignmentMap = entry.getValue();
             Assignment existingAssignment = assignmentMap.get(fullyQualifiedInstanceId);
@@ -802,7 +803,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
             this.workerIdToAssignments.put(assignment.getWorkerId(), new HashMap<>());
         }
         this.workerIdToAssignments.get(assignment.getWorkerId()).put(
-                org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()),
+                FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()),
                 assignment);
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index efe702f..f78513a 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -46,7 +46,8 @@ import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
+
 import static org.apache.pulsar.functions.worker.SchedulerManager.checkHeartBeatFunction;
 
 /**
@@ -179,7 +180,7 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener {
         List<Function.FunctionMetaData> functionMetaDataList = functionMetaDataManager.getAllFunctionMetaData();
         Map<String, Function.FunctionMetaData> functionMetaDataMap = new HashMap<>();
         for (Function.FunctionMetaData entry : functionMetaDataList) {
-            functionMetaDataMap.put(FunctionDetailsUtils.getFullyQualifiedName(entry.getFunctionDetails()), entry);
+            functionMetaDataMap.put(FunctionCommon.getFullyQualifiedName(entry.getFunctionDetails()), entry);
         }
         Map<String, Map<String, Function.Assignment>> currentAssignments = functionRuntimeManager.getCurrentAssignments();
         Map<String, Function.Assignment> assignmentMap = new HashMap<>();
@@ -192,9 +193,9 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener {
         Iterator<Map.Entry<Function.Instance, Long>> it = unsignedFunctionDurations.entrySet().iterator();
         while (it.hasNext()) {
             Map.Entry<Function.Instance, Long> entry = it.next();
-            String fullyQualifiedFunctionName = FunctionDetailsUtils.getFullyQualifiedName(
+            String fullyQualifiedFunctionName = FunctionCommon.getFullyQualifiedName(
                     entry.getKey().getFunctionMetaData().getFunctionDetails());
-            String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(entry.getKey());
+            String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(entry.getKey());
             //remove functions that don't exist anymore
             if (!functionMetaDataMap.containsKey(fullyQualifiedFunctionName)) {
                 it.remove();
@@ -262,7 +263,7 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener {
             if (currentTimeMs - unassignedDurationMs > this.workerConfig.getRescheduleTimeoutMs()) {
                 needSchedule.add(instance);
                 // remove assignment from failed node
-                Function.Assignment assignment = assignmentMap.get(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(instance));
+                Function.Assignment assignment = assignmentMap.get(FunctionCommon.getFullyQualifiedInstanceId(instance));
                 if (assignment != null) {
                     needRemove.add(assignment);
                 }
@@ -284,7 +285,7 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener {
 
     private PulsarAdmin getPulsarAdminClient() {
         if (this.pulsarAdminClient == null) {
-            this.pulsarAdminClient = Utils.getPulsarAdminClient(this.workerConfig.getPulsarWebServiceUrl(),
+            this.pulsarAdminClient = WorkerUtils.getPulsarAdminClient(this.workerConfig.getPulsarWebServiceUrl(),
                     workerConfig.getClientAuthenticationPlugin(), workerConfig.getClientAuthenticationParameters(),
                     workerConfig.getTlsTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection());
         }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index c281b15..2e25f0f 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -18,18 +18,15 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import com.google.common.base.Stopwatch;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -49,7 +46,7 @@ import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.proto.Function.Instance;
 import org.apache.pulsar.functions.utils.Actions;
 import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.worker.scheduler.IScheduler;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -256,7 +253,7 @@ public class SchedulerManager implements AutoCloseable {
 
     private void publishNewAssignment(Assignment assignment, boolean deleted) {
         try {
-            String fullyQualifiedInstanceId = org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+            String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
             // publish empty message with instance-id key so, compactor can delete and skip delivery of this instance-id
             // message
             producer.newMessage().key(fullyQualifiedInstanceId)
@@ -272,7 +269,7 @@ public class SchedulerManager implements AutoCloseable {
         Map<String, Function.Instance> functionInstances = new HashMap<>();
         for (FunctionMetaData functionMetaData : allFunctions) {
             for (Function.Instance instance : computeInstances(functionMetaData, externallyManagedRuntime)) {
-                functionInstances.put(Utils.getFullyQualifiedInstanceId(instance), instance);
+                functionInstances.put(FunctionCommon.getFullyQualifiedInstanceId(instance), instance);
             }
         }
         return functionInstances;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index db84462..b9814b3 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -74,7 +74,7 @@ public class Worker {
     private static URI initialize(WorkerConfig workerConfig)
             throws InterruptedException, PulsarAdminException, IOException {
         // initializing pulsar functions namespace
-        PulsarAdmin admin = Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
+        PulsarAdmin admin = WorkerUtils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
                 workerConfig.getClientAuthenticationPlugin(), workerConfig.getClientAuthenticationParameters(),
                 workerConfig.getTlsTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection());
         InternalConfigurationData internalConf;
@@ -141,7 +141,7 @@ public class Worker {
         // initialize the dlog namespace
         // TODO: move this as part of pulsar cluster initialization later
         try {
-            return Utils.initializeDlogNamespace(
+            return WorkerUtils.initializeDlogNamespace(
                     internalConf.getZookeeperServers(),
                     internalConf.getLedgersRootPath());
         } catch (IOException ioe) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index b035fb8..c3211b2 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -24,7 +24,6 @@ import com.google.common.annotations.VisibleForTesting;
 
 import io.netty.util.concurrent.DefaultThreadFactory;
 
-import java.io.IOException;
 import java.net.URI;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -37,24 +36,16 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import org.apache.bookkeeper.clients.StorageClientBuilder;
 import org.apache.bookkeeper.clients.admin.StorageAdminClient;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
-import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
-import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
-import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
-import org.apache.pulsar.zookeeper.GlobalZooKeeperCache;
-import org.apache.pulsar.zookeeper.LocalZooKeeperCache;
-import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
-import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
 
 /**
  * A service component contains everything to run a worker except rest server.
@@ -101,14 +92,14 @@ public class WorkerService {
                       AuthorizationService authorizationService) throws InterruptedException {
         log.info("Starting worker {}...", workerConfig.getWorkerId());
 
-        this.brokerAdmin = Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
+        this.brokerAdmin = WorkerUtils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
                 workerConfig.getClientAuthenticationPlugin(), workerConfig.getClientAuthenticationParameters(),
                 workerConfig.getTlsTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection());
         
         final String functionWebServiceUrl = StringUtils.isNotBlank(workerConfig.getFunctionWebServiceUrl())
                 ? workerConfig.getFunctionWebServiceUrl()
                 : workerConfig.getWorkerWebAddress(); 
-        this.functionAdmin = Utils.getPulsarAdminClient(functionWebServiceUrl,
+        this.functionAdmin = WorkerUtils.getPulsarAdminClient(functionWebServiceUrl,
                 workerConfig.getClientAuthenticationPlugin(), workerConfig.getClientAuthenticationParameters(),
                 workerConfig.getTlsTrustCertsFilePath(), workerConfig.isTlsAllowInsecureConnection());
 
@@ -121,7 +112,7 @@ public class WorkerService {
 
         // create the dlog namespace for storing function packages
         this.dlogUri = dlogUri;
-        DistributedLogConfiguration dlogConf = Utils.getDlogConf(workerConfig);
+        DistributedLogConfiguration dlogConf = WorkerUtils.getDlogConf(workerConfig);
         try {
             this.dlogNamespace = NamespaceBuilder.newBuilder()
                     .conf(dlogConf)
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
similarity index 91%
rename from pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
rename to pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index 3c1fa4c..3332f5c 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -18,19 +18,7 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import java.io.*;
-import java.net.URI;
-import java.net.URL;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-
 import lombok.extern.slf4j.Slf4j;
-
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import org.apache.distributedlog.AppendOnlyStreamWriter;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.api.DistributedLogManager;
@@ -49,25 +37,23 @@ import org.apache.pulsar.functions.worker.dlog.DLInputStream;
 import org.apache.pulsar.functions.worker.dlog.DLOutputStream;
 import org.apache.zookeeper.KeeperException.Code;
 
-@Slf4j
-public final class Utils {
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
 
-    private Utils(){}
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
 
-    public static byte[] toByteArray(Object obj) throws IOException {
-        byte[] bytes = null;
-        try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
-             ObjectOutputStream oos = new ObjectOutputStream(bos)) {
-            oos.writeObject(obj);
-            oos.flush();
-            bytes = bos.toByteArray();
-        }
-        return bytes;
-    }
+@Slf4j
+public final class WorkerUtils {
 
-    public static String getUniquePackageName(String packageName) {
-        return String.format("%s-%s", UUID.randomUUID().toString(), packageName);
-    }
+    private WorkerUtils(){}
 
     public static void uploadFileToBookkeeper(String packagePath, File sourceFile, Namespace dlogNamespace) throws IOException {
         FileInputStream uploadedInputStream = new FileInputStream(sourceFile);
@@ -104,12 +90,6 @@ public final class Utils {
             }
         }
     }
-    
-    public static void downloadFromHttpUrl(String destPkgUrl, FileOutputStream outputStream) throws IOException {
-        URL website = new URL(destPkgUrl);
-        ReadableByteChannel rbc = Channels.newChannel(website.openStream());
-        outputStream.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
-    }
 
     public static void downloadFromBookkeeper(Namespace namespace,
                                               File outputFile,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index fd75da3..31070af 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -73,10 +73,10 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.join;
-import static org.apache.pulsar.functions.utils.Utils.*;
-import static org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION;
-import static org.apache.pulsar.functions.utils.Utils.ComponentType.SINK;
-import static org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE;
+import static org.apache.pulsar.functions.utils.FunctionCommon.*;
+import static org.apache.pulsar.functions.utils.ComponentType.FUNCTION;
+import static org.apache.pulsar.functions.utils.ComponentType.SINK;
+import static org.apache.pulsar.functions.utils.ComponentType.SOURCE;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
@@ -108,14 +108,15 @@ import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.apache.pulsar.functions.sink.PulsarSink;
+import org.apache.pulsar.functions.utils.ComponentType;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.utils.SinkConfigUtils;
 import org.apache.pulsar.functions.utils.SourceConfigUtils;
-import org.apache.pulsar.functions.utils.StateUtils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
-import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.RestException;
@@ -169,7 +170,7 @@ public abstract class ComponentImpl {
             // If I am running worker
             if (assignedWorkerId.equals(workerId)) {
                 FunctionRuntimeInfo functionRuntimeInfo = worker().getFunctionRuntimeManager().getFunctionRuntimeInfo(
-                        org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
+                        FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()));
                 if (functionRuntimeInfo == null) {
                     return notRunning(assignedWorkerId, "");
                 }
@@ -446,20 +447,20 @@ public abstract class ComponentImpl {
                         sinkOrSource.getName()));
                 packageLocationMetaDataBuilder.setOriginalFileName(sinkOrSource.getName());
                 log.info("Uploading {} package to {}", componentType, packageLocationMetaDataBuilder.getPackagePath());
-                Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), sinkOrSource, worker().getDlogNamespace());
+                WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), sinkOrSource, worker().getDlogNamespace());
             } else if (isPkgUrlProvided) {
                 File file = extractFileFromPkg(functionPkgUrl);
                 packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName,
                         file.getName()));
                 packageLocationMetaDataBuilder.setOriginalFileName(file.getName());
                 log.info("Uploading {} package to {}", componentType, packageLocationMetaDataBuilder.getPackagePath());
-                Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), file, worker().getDlogNamespace());
+                WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), file, worker().getDlogNamespace());
             } else {
                 packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName,
                         fileDetail.getFileName()));
                 packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName());
                 log.info("Uploading {} package to {}", componentType, packageLocationMetaDataBuilder.getPackagePath());
-                Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, worker().getDlogNamespace());
+                WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, worker().getDlogNamespace());
             }
         } else {
             // For pulsar managed schedulers, the pkgUrl/builtin stuff should be copied to bk
@@ -471,7 +472,7 @@ public abstract class ComponentImpl {
                 packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName, fileDetail.getFileName()));
                 packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName());
                 log.info("Uploading {} package to {}", componentType, packageLocationMetaDataBuilder.getPackagePath());
-                Utils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, worker().getDlogNamespace());
+                WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, worker().getDlogNamespace());
             }
         }
         return packageLocationMetaDataBuilder;
@@ -651,7 +652,7 @@ public abstract class ComponentImpl {
 
         // delete state table
         if (null != worker().getStateStoreAdminClient()) {
-            final String tableNs = StateUtils.getStateNamespace(tenant, namespace);
+            final String tableNs = getStateNamespace(tenant, namespace);
             final String tableName = componentName;
             try {
                 FutureUtils.result(worker().getStateStoreAdminClient().deleteStream(tableNs, tableName));
@@ -1339,7 +1340,7 @@ public abstract class ComponentImpl {
             throw new RestException(Status.BAD_REQUEST, e.getMessage());
         }
 
-        String tableNs = StateUtils.getStateNamespace(tenant, namespace);
+        String tableNs = getStateNamespace(tenant, namespace);
         String tableName = functionName;
 
         String stateStorageServiceUrl = worker().getWorkerConfig().getStateStorageServiceUrl();
@@ -1389,7 +1390,7 @@ public abstract class ComponentImpl {
         // Upload to bookkeeper
         try {
             log.info("Uploading function package to {}", path);
-            Utils.uploadToBookeeper(worker().getDlogNamespace(), uploadedInputStream, path);
+            WorkerUtils.uploadToBookeeper(worker().getDlogNamespace(), uploadedInputStream, path);
         } catch (IOException e) {
             log.error("Error uploading file {}", path, e);
             throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
@@ -1414,7 +1415,7 @@ public abstract class ComponentImpl {
                         throw new IllegalArgumentException("invalid file url path: " + path);
                     }
                 } else {
-                    Utils.downloadFromBookkeeper(worker().getDlogNamespace(), output, path);
+                    WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), output, path);
                 }
             }
         };
@@ -1514,7 +1515,7 @@ public abstract class ComponentImpl {
                                                                             final ComponentType componentType) throws Exception {
         File tmpFile = File.createTempFile("functions", null);
         tmpFile.deleteOnExit();
-        Utils.downloadFromBookkeeper(worker().getDlogNamespace(), tmpFile, packageLocationMetaData.getPackagePath());
+        WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), tmpFile, packageLocationMetaData.getPackagePath());
         return validateUpdateRequestParams(tenant, namespace, componentName,
                 null, componentConfigJson, componentType, null, tmpFile);
     }
@@ -1659,7 +1660,7 @@ public abstract class ComponentImpl {
             return SinkConfigUtils.convert(sinkConfig, sinkDetails);
         }
         FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
-        org.apache.pulsar.functions.utils.Utils.mergeJson(functionDetailsJson, functionDetailsBuilder);
+        FunctionCommon.mergeJson(functionDetailsJson, functionDetailsBuilder);
         if (isNotBlank(functionPkgUrl)) {
             // set package-url if present
             functionDetailsBuilder.setPackageUrl(functionPkgUrl);
@@ -1728,7 +1729,7 @@ public abstract class ComponentImpl {
 
         // validate function class-type
         Object functionObject = createInstance(functionDetailsBuilder.getClassName(), classLoader);
-        Class<?>[] typeArgs = org.apache.pulsar.functions.utils.Utils.getFunctionTypes(functionObject, false);
+        Class<?>[] typeArgs = FunctionCommon.getFunctionTypes(functionObject, false);
 
         if (!(functionObject instanceof org.apache.pulsar.functions.api.Function)
                 && !(functionObject instanceof java.util.function.Function)) {
@@ -1832,7 +1833,7 @@ public abstract class ComponentImpl {
 
     public static String createPackagePath(String tenant, String namespace, String functionName, String fileName) {
         return String.format("%s/%s/%s/%s", tenant, namespace, Codec.encode(functionName),
-                Utils.getUniquePackageName(Codec.encode(fileName)));
+                getUniquePackageName(Codec.encode(fileName)));
     }
 
     public boolean isAuthorizedRole(String tenant, String namespace, String clientRole,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 04628d6..93642f6 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -25,7 +25,7 @@ import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.ComponentType;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.RestException;
 
@@ -197,7 +197,7 @@ public class FunctionsImpl extends ComponentImpl {
     }
 
     public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
-        super(workerServiceSupplier, Utils.ComponentType.FUNCTION);
+        super(workerServiceSupplier, ComponentType.FUNCTION);
     }
 
     /**
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
index 49d1156..aed383e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
@@ -61,7 +62,7 @@ public class FunctionsImplV2 {
 
         Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace,
                 functionName);
-        String functionDetailsJson = org.apache.pulsar.functions.utils.Utils.printJson(functionMetaData.getFunctionDetails());
+        String functionDetailsJson = FunctionCommon.printJson(functionMetaData.getFunctionDetails());
         return Response.status(Response.Status.OK).entity(functionDetailsJson).build();
     }
 
@@ -71,7 +72,7 @@ public class FunctionsImplV2 {
         org.apache.pulsar.common.policies.data.FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
                 functionInstanceStatus = delegate.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri, null, null);
 
-        String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson(toProto(functionInstanceStatus, instanceId));
+        String jsonResponse = FunctionCommon.printJson(toProto(functionInstanceStatus, instanceId));
         return Response.status(Response.Status.OK).entity(jsonResponse).build();
     }
 
@@ -82,7 +83,7 @@ public class FunctionsImplV2 {
         functionStatus.instances.forEach(functionInstanceStatus -> functionStatusList.addFunctionStatusList(
                 toProto(functionInstanceStatus.getStatus(),
                         String.valueOf(functionInstanceStatus.getInstanceId()))));
-        String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson(functionStatusList);
+        String jsonResponse = FunctionCommon.printJson(functionStatusList);
         return Response.status(Response.Status.OK).entity(jsonResponse).build();
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
index acd924b..e4ab6d3 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
@@ -26,8 +26,8 @@ import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.SinkStatus;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.utils.ComponentType;
 import org.apache.pulsar.functions.utils.SinkConfigUtils;
-import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.RestException;
@@ -204,7 +204,7 @@ public class SinkImpl extends ComponentImpl {
     }
 
     public SinkImpl(Supplier<WorkerService> workerServiceSupplier) {
-        super(workerServiceSupplier, Utils.ComponentType.SINK);
+        super(workerServiceSupplier, ComponentType.SINK);
     }
 
     public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkInstanceStatus(final String tenant,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
index e5ed28e..44b36dc 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
@@ -26,8 +26,8 @@ import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.SourceStatus;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.utils.ComponentType;
 import org.apache.pulsar.functions.utils.SourceConfigUtils;
-import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.RestException;
@@ -206,7 +206,7 @@ public class SourceImpl extends ComponentImpl {
     }
 
     public SourceImpl(Supplier<WorkerService> workerServiceSupplier) {
-        super(workerServiceSupplier, Utils.ComponentType.SOURCE);
+        super(workerServiceSupplier, ComponentType.SOURCE);
     }
 
     public SourceStatus getSourceStatus(final String tenant,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
index 79f4df2..041ac26 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/WorkerImpl.java
@@ -24,10 +24,11 @@ import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats;
 import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.MembershipManager;
-import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.RestException;
 
@@ -163,9 +164,9 @@ public class WorkerImpl {
                 int parallelism = functionDetails.getParallelism();
                 for (int i = 0; i < parallelism; ++i) {
                     FunctionStats.FunctionInstanceStats functionInstanceStats =
-                            Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo, i);
+                            WorkerUtils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo, i);
                     WorkerFunctionInstanceStats workerFunctionInstanceStats = new WorkerFunctionInstanceStats();
-                    workerFunctionInstanceStats.setName(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(
+                    workerFunctionInstanceStats.setName(FunctionCommon.getFullyQualifiedInstanceId(
                             functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName(), i
                     ));
                     workerFunctionInstanceStats.setMetrics(functionInstanceStats.getMetrics());
@@ -173,7 +174,7 @@ public class WorkerImpl {
                 }
             } else {
                 FunctionStats.FunctionInstanceStats functionInstanceStats =
-                        Utils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo,
+                        WorkerUtils.getFunctionInstanceStats(fullyQualifiedInstanceName, functionRuntimeInfo,
                                 functionRuntimeInfo.getFunctionInstance().getInstanceId());
                 WorkerFunctionInstanceStats workerFunctionInstanceStats = new WorkerFunctionInstanceStats();
                 workerFunctionInstanceStats.setName(fullyQualifiedInstanceName);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 91fec23..b009567 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -31,7 +31,7 @@ import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.runtime.KubernetesRuntime;
 import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.mockito.ArgumentMatcher;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -219,7 +219,7 @@ public class FunctionRuntimeManagerTest {
         functionRuntimeManager.processAssignment(assignment1);
         functionRuntimeManager.processAssignment(assignment2);
 
-        functionRuntimeManager.deleteAssignment(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()));
+        functionRuntimeManager.deleteAssignment(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance()));
         verify(functionRuntimeManager, times(0)).setAssignment(any(Function.Assignment.class));
         verify(functionRuntimeManager, times(1)).deleteAssignment(any(String.class));
 
@@ -551,16 +551,16 @@ public class FunctionRuntimeManagerTest {
         List<Message<byte[]>> messageList = new LinkedList<>();
         Message message1 = spy(new MessageImpl("foo", MessageId.latest.toString(),
                 new HashMap<>(), Unpooled.copiedBuffer(assignment1.toByteArray()), null));
-        doReturn(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey();
+        doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance())).when(message1).getKey();
 
         Message message2 = spy(new MessageImpl("foo", MessageId.latest.toString(),
                 new HashMap<>(), Unpooled.copiedBuffer(assignment2.toByteArray()), null));
-        doReturn(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey();
+        doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance())).when(message2).getKey();
 
         // delete function2
         Message message3 = spy(new MessageImpl("foo", MessageId.latest.toString(),
                 new HashMap<>(), Unpooled.copiedBuffer("".getBytes()), null));
-        doReturn(Utils.getFullyQualifiedInstanceId(assignment3.getInstance())).when(message3).getKey();
+        doReturn(FunctionCommon.getFullyQualifiedInstanceId(assignment3.getInstance())).when(message3).getKey();
 
         messageList.add(message1);
         messageList.add(message2);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 465729e..9f4fa2e 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -34,7 +34,7 @@ import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
 import org.mockito.Mockito;
 import org.mockito.invocation.Invocation;
@@ -155,7 +155,7 @@ public class SchedulerManagerTest {
 
         Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>();
         Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
-        assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
+        assignmentEntry1.put(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
         currentAssignments.put("worker-1", assignmentEntry1);
         doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
 
@@ -201,7 +201,7 @@ public class SchedulerManagerTest {
 
         Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>();
         Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
-        assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
+        assignmentEntry1.put(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
         currentAssignments.put("worker-1", assignmentEntry1);
         doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
 
@@ -248,7 +248,7 @@ public class SchedulerManagerTest {
 
         Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>();
         Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
-        assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
+        assignmentEntry1.put(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
         currentAssignments.put("worker-1", assignmentEntry1);
         doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
 
@@ -315,9 +315,9 @@ public class SchedulerManagerTest {
 
         Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>();
         Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
-        assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
+        assignmentEntry1.put(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
         //TODO: delete this assignment
-        assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2.getInstance()), assignment2);
+        assignmentEntry1.put(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance()), assignment2);
 
         currentAssignments.put("worker-1", assignmentEntry1);
         doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
@@ -374,7 +374,7 @@ public class SchedulerManagerTest {
 
         Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>();
         Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
-        assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
+        assignmentEntry1.put(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
 
         currentAssignments.put("worker-1", assignmentEntry1);
         doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
@@ -406,7 +406,7 @@ public class SchedulerManagerTest {
         Assert.assertEquals(assignments, assignment2);
 
         // updating assignments
-        currentAssignments.get("worker-1").put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2.getInstance()), assignment2);
+        currentAssignments.get("worker-1").put(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance()), assignment2);
 
         // scale up
 
@@ -485,7 +485,7 @@ public class SchedulerManagerTest {
 
         Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>();
         Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
-        assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
+        assignmentEntry1.put(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
 
         currentAssignments.put("worker-1", assignmentEntry1);
         doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
@@ -539,9 +539,9 @@ public class SchedulerManagerTest {
         assertTrue(allAssignments.contains(assignment2_3));
 
         // updating assignments
-        currentAssignments.get("worker-1").put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2_1.getInstance()), assignment2_1);
-        currentAssignments.get("worker-1").put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2_2.getInstance()), assignment2_2);
-        currentAssignments.get("worker-1").put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2_3.getInstance()), assignment2_3);
+        currentAssignments.get("worker-1").put(FunctionCommon.getFullyQualifiedInstanceId(assignment2_1.getInstance()), assignment2_1);
+        currentAssignments.get("worker-1").put(FunctionCommon.getFullyQualifiedInstanceId(assignment2_2.getInstance()), assignment2_2);
+        currentAssignments.get("worker-1").put(FunctionCommon.getFullyQualifiedInstanceId(assignment2_3.getInstance()), assignment2_3);
 
         // scale down
 
@@ -666,7 +666,7 @@ public class SchedulerManagerTest {
 
         Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>();
         Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
-        assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
+        assignmentEntry1.put(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
 
         currentAssignments.put("worker-1", assignmentEntry1);
         doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
@@ -717,9 +717,9 @@ public class SchedulerManagerTest {
         assertTrue(allAssignments.contains(assignment2_3));
 
         // updating assignments
-        currentAssignments.get("worker-1").put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2_1.getInstance()), assignment2_1);
-        currentAssignments.get("worker-1").put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2_2.getInstance()), assignment2_2);
-        currentAssignments.get("worker-1").put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment2_3.getInstance()), assignment2_3);
+        currentAssignments.get("worker-1").put(FunctionCommon.getFullyQualifiedInstanceId(assignment2_1.getInstance()), assignment2_1);
+        currentAssignments.get("worker-1").put(FunctionCommon.getFullyQualifiedInstanceId(assignment2_2.getInstance()), assignment2_2);
+        currentAssignments.get("worker-1").put(FunctionCommon.getFullyQualifiedInstanceId(assignment2_3.getInstance()), assignment2_3);
 
         // update field
 
@@ -806,11 +806,11 @@ public class SchedulerManagerTest {
 
         Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>();
         Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
-        assignmentEntry1.put(org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
+        assignmentEntry1.put(FunctionCommon.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1);
         currentAssignments.put("worker-1", assignmentEntry1);
 
         Map<String, Function.Assignment> assignmentEntry2 = new HashMap<>();
-        assignmentEntry2.put(Utils.getFullyQualifiedInstanceId(assignment2.getInstance()), assignment2);
+        assignmentEntry2.put(FunctionCommon.getFullyQualifiedInstanceId(assignment2.getInstance()), assignment2);
         currentAssignments.put("worker-2", assignmentEntry2);
 
         doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments();
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/UtilsTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/UtilsTest.java
deleted file mode 100644
index 77f95d0..0000000
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/UtilsTest.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.worker;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.util.UUID;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-/**
- * Unit test of {@link Utils}.
- */
-public class UtilsTest {
-
-    @Test
-    public void testDownloadFile() throws Exception {
-        String jarHttpUrl = "http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar";
-        String testDir = UtilsTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
-        File pkgFile = new File(testDir, UUID.randomUUID().toString());
-        Utils.downloadFromHttpUrl(jarHttpUrl, new FileOutputStream(pkgFile));
-        Assert.assertTrue(pkgFile.exists());
-        pkgFile.delete();
-    }
-
-}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/request/ServiceRequestManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/request/ServiceRequestManagerTest.java
index 6e9993e..91fa0e6 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/request/ServiceRequestManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/request/ServiceRequestManagerTest.java
@@ -29,7 +29,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.functions.proto.Request.ServiceRequest;
-import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
@@ -37,7 +37,7 @@ import org.testng.annotations.Test;
 /**
  * Unit test of {@link ServiceRequestManager}.
  */
-@PrepareForTest(Utils.class)
+@PrepareForTest(WorkerUtils.class)
 public class ServiceRequestManagerTest {
 
     private final Producer producer;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
index f774180..17a3086 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -38,7 +38,7 @@ import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
-import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
@@ -53,7 +53,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
-import static org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION;
+import static org.apache.pulsar.functions.utils.ComponentType.FUNCTION;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.eq;
@@ -197,9 +197,9 @@ public class FunctionsImplTest {
         FunctionRuntimeInfo functionRuntimeInfo = mock(FunctionRuntimeInfo.class);
         doReturn(runtimeSpawner).when(functionRuntimeInfo).getRuntimeSpawner();
 
-        FunctionStats.FunctionInstanceStats instanceStats1 = Utils
+        FunctionStats.FunctionInstanceStats instanceStats1 = WorkerUtils
                 .getFunctionInstanceStats("public/default/test", functionRuntimeInfo, 0);
-        FunctionStats.FunctionInstanceStats instanceStats2 = Utils
+        FunctionStats.FunctionInstanceStats instanceStats2 = WorkerUtils
                 .getFunctionInstanceStats("public/default/test", functionRuntimeInfo, 1);
 
         FunctionStats functionStats = new FunctionStats();
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 2926c93..5072448 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -45,7 +45,7 @@ import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
-import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.request.RequestResult;
@@ -75,8 +75,8 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
-import static org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION;
-import static org.apache.pulsar.functions.utils.Utils.mergeJson;
+import static org.apache.pulsar.functions.utils.ComponentType.FUNCTION;
+import static org.apache.pulsar.functions.utils.FunctionCommon.mergeJson;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
@@ -92,7 +92,7 @@ import static org.testng.Assert.assertEquals;
 /**
  * Unit test of {@link FunctionApiV2Resource}.
  */
-@PrepareForTest(Utils.class)
+@PrepareForTest(WorkerUtils.class)
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*" })
 @Slf4j
 public class FunctionApiV2ResourceTest {
@@ -524,10 +524,10 @@ public class FunctionApiV2ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "upload failure")
     public void testRegisterFunctionUploadFailure() throws Exception {
         try {
-            mockStatic(Utils.class);
-            doThrow(new IOException("upload failure")).when(Utils.class);
+            mockStatic(WorkerUtils.class);
+            doThrow(new IOException("upload failure")).when(WorkerUtils.class);
 
-            Utils.uploadFileToBookkeeper(
+            WorkerUtils.uploadFileToBookkeeper(
                     anyString(),
                     any(File.class),
                     any(Namespace.class));
@@ -543,9 +543,9 @@ public class FunctionApiV2ResourceTest {
     @Test
     public void testRegisterFunctionSuccess() throws Exception {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.uploadToBookeeper(
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.uploadToBookeeper(
                     any(Namespace.class),
                     any(InputStream.class),
                     anyString());
@@ -590,9 +590,9 @@ public class FunctionApiV2ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "function failed to register")
     public void testRegisterFunctionFailure() throws Exception {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.uploadToBookeeper(
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.uploadToBookeeper(
                     any(Namespace.class),
                     any(InputStream.class),
                     anyString());
@@ -615,9 +615,9 @@ public class FunctionApiV2ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
     public void testRegisterFunctionInterrupted() throws Exception {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.uploadToBookeeper(
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.uploadToBookeeper(
                     any(Namespace.class),
                     any(InputStream.class),
                     anyString());
@@ -705,9 +705,9 @@ public class FunctionApiV2ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
     public void testUpdateFunctionMissingPackage() throws IOException {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
             testUpdateFunctionMissingArguments(
                     tenant,
                     namespace,
@@ -729,9 +729,9 @@ public class FunctionApiV2ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
     public void testUpdateFunctionMissingInputTopic() throws IOException {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
             testUpdateFunctionMissingArguments(
                     tenant,
                     namespace,
@@ -753,9 +753,9 @@ public class FunctionApiV2ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
     public void testUpdateFunctionMissingClassName() throws IOException {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
             testUpdateFunctionMissingArguments(
                     tenant,
                     namespace,
@@ -777,9 +777,9 @@ public class FunctionApiV2ResourceTest {
     @Test
     public void testUpdateFunctionChangedParallelism() throws IOException {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
             testUpdateFunctionMissingArguments(
                     tenant,
                     namespace,
@@ -801,9 +801,9 @@ public class FunctionApiV2ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Output topics differ")
     public void testUpdateFunctionChangedInputs() throws IOException {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
             testUpdateFunctionMissingArguments(
                     tenant,
                     namespace,
@@ -825,9 +825,9 @@ public class FunctionApiV2ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Input Topics cannot be altered")
     public void testUpdateFunctionChangedOutput() throws IOException {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
             Map<String, String> someOtherInput = new HashMap<>();
             someOtherInput.put("DifferentTopic", TopicSchema.DEFAULT_SERDE);
             testUpdateFunctionMissingArguments(
@@ -948,9 +948,9 @@ public class FunctionApiV2ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "upload failure")
     public void testUpdateFunctionUploadFailure() throws Exception {
         try {
-            mockStatic(Utils.class);
-            doThrow(new IOException("upload failure")).when(Utils.class);
-            Utils.uploadFileToBookkeeper(
+            mockStatic(WorkerUtils.class);
+            doThrow(new IOException("upload failure")).when(WorkerUtils.class);
+            WorkerUtils.uploadFileToBookkeeper(
                     anyString(),
                     any(File.class),
                     any(Namespace.class));
@@ -966,9 +966,9 @@ public class FunctionApiV2ResourceTest {
 
     @Test
     public void testUpdateFunctionSuccess() throws Exception {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.uploadToBookeeper(
+        mockStatic(WorkerUtils.class);
+        doNothing().when(WorkerUtils.class);
+        WorkerUtils.uploadToBookeeper(
                 any(Namespace.class),
                 any(InputStream.class),
                 anyString());
@@ -1025,9 +1025,9 @@ public class FunctionApiV2ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "function failed to register")
     public void testUpdateFunctionFailure() throws Exception {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.uploadToBookeeper(
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.uploadToBookeeper(
                     any(Namespace.class),
                     any(InputStream.class),
                     anyString());
@@ -1050,9 +1050,9 @@ public class FunctionApiV2ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
     public void testUpdateFunctionInterrupted() throws Exception {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.uploadToBookeeper(
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.uploadToBookeeper(
                     any(Namespace.class),
                     any(InputStream.class),
                     anyString());
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 871ff5f..3b139e8 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -45,7 +45,7 @@ import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
-import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.request.RequestResult;
@@ -75,9 +75,9 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
-import static org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION;
-import static org.apache.pulsar.functions.utils.Utils.ComponentType.SINK;
-import static org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE;
+import static org.apache.pulsar.functions.utils.ComponentType.FUNCTION;
+import static org.apache.pulsar.functions.utils.ComponentType.SINK;
+import static org.apache.pulsar.functions.utils.ComponentType.SOURCE;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
@@ -93,7 +93,7 @@ import static org.testng.Assert.assertEquals;
 /**
  * Unit test of {@link FunctionApiV2Resource}.
  */
-@PrepareForTest(Utils.class)
+@PrepareForTest(WorkerUtils.class)
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*" })
 @Slf4j
 public class FunctionApiV3ResourceTest {
@@ -521,10 +521,10 @@ public class FunctionApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "upload failure")
     public void testRegisterFunctionUploadFailure() throws Exception {
         try {
-            mockStatic(Utils.class);
-            doThrow(new IOException("upload failure")).when(Utils.class);
+            mockStatic(WorkerUtils.class);
+            doThrow(new IOException("upload failure")).when(WorkerUtils.class);
 
-            Utils.uploadFileToBookkeeper(
+            WorkerUtils.uploadFileToBookkeeper(
                     anyString(),
                 any(File.class),
                 any(Namespace.class));
@@ -540,9 +540,9 @@ public class FunctionApiV3ResourceTest {
     @Test
     public void testRegisterFunctionSuccess() throws Exception {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.uploadToBookeeper(
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.uploadToBookeeper(
                 any(Namespace.class),
                 any(InputStream.class),
                 anyString());
@@ -587,9 +587,9 @@ public class FunctionApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "function failed to register")
     public void testRegisterFunctionFailure() throws Exception {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.uploadToBookeeper(
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.uploadToBookeeper(
                 any(Namespace.class),
                 any(InputStream.class),
                 anyString());
@@ -612,9 +612,9 @@ public class FunctionApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
     public void testRegisterFunctionInterrupted() throws Exception {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.uploadToBookeeper(
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.uploadToBookeeper(
                 any(Namespace.class),
                 any(InputStream.class),
                 anyString());
@@ -702,9 +702,9 @@ public class FunctionApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
     public void testUpdateFunctionMissingPackage() throws IOException {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
             testUpdateFunctionMissingArguments(
                 tenant,
                 namespace,
@@ -726,9 +726,9 @@ public class FunctionApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
     public void testUpdateFunctionMissingInputTopic() throws IOException {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
             testUpdateFunctionMissingArguments(
                     tenant,
                     namespace,
@@ -750,9 +750,9 @@ public class FunctionApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
     public void testUpdateFunctionMissingClassName() throws IOException {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
             testUpdateFunctionMissingArguments(
                 tenant,
                 namespace,
@@ -774,9 +774,9 @@ public class FunctionApiV3ResourceTest {
     @Test
     public void testUpdateFunctionChangedParallelism() throws IOException {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
             testUpdateFunctionMissingArguments(
                 tenant,
                 namespace,
@@ -798,9 +798,9 @@ public class FunctionApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Output topics differ")
     public void testUpdateFunctionChangedInputs() throws IOException {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
             testUpdateFunctionMissingArguments(
                 tenant,
                 namespace,
@@ -822,9 +822,9 @@ public class FunctionApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Input Topics cannot be altered")
     public void testUpdateFunctionChangedOutput() throws IOException {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
             Map<String, String> someOtherInput = new HashMap<>();
             someOtherInput.put("DifferentTopic", TopicSchema.DEFAULT_SERDE);
             testUpdateFunctionMissingArguments(
@@ -945,9 +945,9 @@ public class FunctionApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "upload failure")
     public void testUpdateFunctionUploadFailure() throws Exception {
         try {
-            mockStatic(Utils.class);
-            doThrow(new IOException("upload failure")).when(Utils.class);
-            Utils.uploadFileToBookkeeper(
+            mockStatic(WorkerUtils.class);
+            doThrow(new IOException("upload failure")).when(WorkerUtils.class);
+            WorkerUtils.uploadFileToBookkeeper(
                     anyString(),
                 any(File.class),
                 any(Namespace.class));
@@ -963,9 +963,9 @@ public class FunctionApiV3ResourceTest {
 
     @Test
     public void testUpdateFunctionSuccess() throws Exception {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.uploadToBookeeper(
+        mockStatic(WorkerUtils.class);
+        doNothing().when(WorkerUtils.class);
+        WorkerUtils.uploadToBookeeper(
             any(Namespace.class),
             any(InputStream.class),
             anyString());
@@ -1022,9 +1022,9 @@ public class FunctionApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "function failed to register")
     public void testUpdateFunctionFailure() throws Exception {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.uploadToBookeeper(
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.uploadToBookeeper(
                 any(Namespace.class),
                 any(InputStream.class),
                 anyString());
@@ -1047,9 +1047,9 @@ public class FunctionApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
     public void testUpdateFunctionInterrupted() throws Exception {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.uploadToBookeeper(
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.uploadToBookeeper(
                 any(Namespace.class),
                 any(InputStream.class),
                 anyString());
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index b723411..5112d88 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -38,11 +38,13 @@ import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
+import org.apache.pulsar.functions.utils.ComponentType;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.SinkConfigUtils;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
-import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.request.RequestResult;
@@ -88,7 +90,7 @@ import static org.testng.Assert.assertEquals;
 /**
  * Unit test of {@link SinkApiV3Resource}.
  */
-@PrepareForTest({Utils.class, SinkConfigUtils.class, ConnectorUtils.class, org.apache.pulsar.functions.utils.Utils.class})
+@PrepareForTest({WorkerUtils.class, SinkConfigUtils.class, ConnectorUtils.class, FunctionCommon.class})
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*" })
 @Slf4j
 public class SinkApiV3ResourceTest {
@@ -174,7 +176,7 @@ public class SinkApiV3ResourceTest {
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
         this.resource = spy(new SinkImpl(() -> mockedWorkerService));
-        Mockito.doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SINK).when(this.resource).calculateSubjectType(any());
+        Mockito.doReturn(ComponentType.SINK).when(this.resource).calculateSubjectType(any());
     }
 
     //
@@ -455,9 +457,9 @@ public class SinkApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "upload failure")
     public void testRegisterSinkUploadFailure() throws Exception {
         try {
-            mockStatic(Utils.class);
-            doThrow(new IOException("upload failure")).when(Utils.class);
-            Utils.uploadFileToBookkeeper(
+            mockStatic(WorkerUtils.class);
+            doThrow(new IOException("upload failure")).when(WorkerUtils.class);
+            WorkerUtils.uploadFileToBookkeeper(
                     anyString(),
                 any(File.class),
                 any(Namespace.class));
@@ -473,9 +475,9 @@ public class SinkApiV3ResourceTest {
 
     @Test
     public void testRegisterSinkSuccess() throws Exception {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.uploadFileToBookkeeper(
+        mockStatic(WorkerUtils.class);
+        doNothing().when(WorkerUtils.class);
+        WorkerUtils.uploadFileToBookkeeper(
                 anyString(),
                 any(File.class),
                 any(Namespace.class));
@@ -493,9 +495,9 @@ public class SinkApiV3ResourceTest {
 
     @Test
     public void testRegisterSinkConflictingFields() throws Exception {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.uploadFileToBookkeeper(
+        mockStatic(WorkerUtils.class);
+        doNothing().when(WorkerUtils.class);
+        WorkerUtils.uploadFileToBookkeeper(
                 anyString(),
                 any(File.class),
                 any(Namespace.class));
@@ -535,9 +537,9 @@ public class SinkApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "sink failed to register")
     public void testRegisterSinkFailure() throws Exception {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.uploadFileToBookkeeper(
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.uploadFileToBookkeeper(
                     anyString(),
                     any(File.class),
                     any(Namespace.class));
@@ -560,9 +562,9 @@ public class SinkApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
     public void testRegisterSinkInterrupted() throws Exception {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.uploadFileToBookkeeper(
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.uploadFileToBookkeeper(
                     anyString(),
                     any(File.class),
                     any(Namespace.class));
@@ -644,9 +646,9 @@ public class SinkApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
     public void testUpdateSinkMissingPackage() throws IOException {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
 
             testUpdateSinkMissingArguments(
                 tenant,
@@ -667,9 +669,9 @@ public class SinkApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
     public void testUpdateSinkMissingInputs() throws IOException {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
 
             testUpdateSinkMissingArguments(
                 tenant,
@@ -690,9 +692,9 @@ public class SinkApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Input Topics cannot be altered")
     public void testUpdateSinkDifferentInputs() throws IOException {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
 
             Map<String, String> inputTopics = new HashMap<>();
             inputTopics.put("DifferntTopic", DEFAULT_SERDE);
@@ -714,9 +716,9 @@ public class SinkApiV3ResourceTest {
 
     @Test
     public void testUpdateSinkDifferentParallelism() throws IOException {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+        mockStatic(WorkerUtils.class);
+        doNothing().when(WorkerUtils.class);
+        WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
 
         testUpdateSinkMissingArguments(
                 tenant,
@@ -744,15 +746,15 @@ public class SinkApiV3ResourceTest {
         doReturn(CassandraStringSink.class.getName()).when(ConnectorUtils.class);
         ConnectorUtils.getIOSinkClass(any(NarClassLoader.class));
 
-        mockStatic(org.apache.pulsar.functions.utils.Utils.class);
-        doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class);
-        org.apache.pulsar.functions.utils.Utils.getSinkType(anyString(), any(NarClassLoader.class));
+        mockStatic(FunctionCommon.class);
+        doReturn(String.class).when(FunctionCommon.class);
+        FunctionCommon.getSinkType(anyString(), any(NarClassLoader.class));
 
-        doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class);
-        org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), anyString(), any(File.class));
+        doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
+        FunctionCommon.extractNarClassLoader(any(Path.class), anyString(), any(File.class));
 
-        doReturn(ATLEAST_ONCE).when(org.apache.pulsar.functions.utils.Utils.class);
-        org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        doReturn(ATLEAST_ONCE).when(FunctionCommon.class);
+        FunctionCommon.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
 
 
         this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
@@ -814,15 +816,15 @@ public class SinkApiV3ResourceTest {
         doReturn(CassandraStringSink.class.getName()).when(ConnectorUtils.class);
         ConnectorUtils.getIOSinkClass(any(NarClassLoader.class));
 
-        mockStatic(org.apache.pulsar.functions.utils.Utils.class);
-        doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class);
-        org.apache.pulsar.functions.utils.Utils.getSinkType(anyString(), any(NarClassLoader.class));
+        mockStatic(FunctionCommon.class);
+        doReturn(String.class).when(FunctionCommon.class);
+        FunctionCommon.getSinkType(anyString(), any(NarClassLoader.class));
 
-        doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class);
-        org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), anyString(), any(File.class));
+        doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
+        FunctionCommon.extractNarClassLoader(any(Path.class), anyString(), any(File.class));
 
-        doReturn(ATLEAST_ONCE).when(org.apache.pulsar.functions.utils.Utils.class);
-        org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        doReturn(ATLEAST_ONCE).when(FunctionCommon.class);
+        FunctionCommon.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
 
 
         this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
@@ -854,9 +856,9 @@ public class SinkApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "upload failure")
     public void testUpdateSinkUploadFailure() throws Exception {
         try {
-            mockStatic(Utils.class);
-            doThrow(new IOException("upload failure")).when(Utils.class);
-            Utils.uploadFileToBookkeeper(
+            mockStatic(WorkerUtils.class);
+            doThrow(new IOException("upload failure")).when(WorkerUtils.class);
+            WorkerUtils.uploadFileToBookkeeper(
                     anyString(),
                     any(File.class),
                     any(Namespace.class));
@@ -872,9 +874,9 @@ public class SinkApiV3ResourceTest {
 
     @Test
     public void testUpdateSinkSuccess() throws Exception {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.uploadFileToBookkeeper(
+        mockStatic(WorkerUtils.class);
+        doNothing().when(WorkerUtils.class);
+        WorkerUtils.uploadFileToBookkeeper(
                 anyString(),
                 any(File.class),
                 any(Namespace.class));
@@ -909,15 +911,15 @@ public class SinkApiV3ResourceTest {
         doReturn(CassandraStringSink.class.getName()).when(ConnectorUtils.class);
         ConnectorUtils.getIOSinkClass(any(NarClassLoader.class));
 
-        mockStatic(org.apache.pulsar.functions.utils.Utils.class);
-        doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class);
-        org.apache.pulsar.functions.utils.Utils.getSinkType(anyString(), any(NarClassLoader.class));
+        mockStatic(FunctionCommon.class);
+        doReturn(String.class).when(FunctionCommon.class);
+        FunctionCommon.getSinkType(anyString(), any(NarClassLoader.class));
 
-        doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class);
-        org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), anyString(), any(File.class));
+        doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
+        FunctionCommon.extractNarClassLoader(any(Path.class), anyString(), any(File.class));
 
-        doReturn(ATLEAST_ONCE).when(org.apache.pulsar.functions.utils.Utils.class);
-        org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        doReturn(ATLEAST_ONCE).when(FunctionCommon.class);
+        FunctionCommon.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
 
 
         this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
@@ -944,9 +946,9 @@ public class SinkApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "sink failed to register")
     public void testUpdateSinkFailure() throws Exception {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.uploadFileToBookkeeper(
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.uploadFileToBookkeeper(
                     anyString(),
                     any(File.class),
                     any(Namespace.class));
@@ -969,9 +971,9 @@ public class SinkApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
     public void testUpdateSinkInterrupted() throws Exception {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.uploadFileToBookkeeper(
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.uploadFileToBookkeeper(
                     anyString(),
                     any(File.class),
                     any(Namespace.class));
@@ -1307,9 +1309,9 @@ public class SinkApiV3ResourceTest {
                 FunctionDetails.newBuilder().setName("test-3").build()).build();
         functionMetaDataList.add(f3);
         when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
-        doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
-        doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
-        doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
+        doReturn(ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
+        doReturn(ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
+        doReturn(ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
 
         List<String> sinkList = listDefaultSinks();
         assertEquals(functions, sinkList);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index adaa606..332a360 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -41,11 +41,13 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.source.TopicSchema;
+import org.apache.pulsar.functions.utils.ComponentType;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.SourceConfigUtils;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
-import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.request.RequestResult;
@@ -87,7 +89,7 @@ import static org.testng.Assert.assertEquals;
 /**
  * Unit test of {@link SourceApiV3Resource}.
  */
-@PrepareForTest({Utils.class, ConnectorUtils.class, org.apache.pulsar.functions.utils.Utils.class})
+@PrepareForTest({WorkerUtils.class, ConnectorUtils.class, FunctionCommon.class})
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*" })
 @Slf4j
 public class SourceApiV3ResourceTest {
@@ -169,7 +171,7 @@ public class SourceApiV3ResourceTest {
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
         this.resource = spy(new SourceImpl(() -> mockedWorkerService));
-        Mockito.doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE).when(this.resource).calculateSubjectType(any());
+        Mockito.doReturn(ComponentType.SOURCE).when(this.resource).calculateSubjectType(any());
     }
 
     //
@@ -424,9 +426,9 @@ public class SourceApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "upload failure")
     public void testRegisterSourceUploadFailure() throws Exception {
         try {
-            mockStatic(Utils.class);
-            doThrow(new IOException("upload failure")).when(Utils.class);
-            Utils.uploadFileToBookkeeper(
+            mockStatic(WorkerUtils.class);
+            doThrow(new IOException("upload failure")).when(WorkerUtils.class);
+            WorkerUtils.uploadFileToBookkeeper(
                     anyString(),
                     any(File.class),
                     any(Namespace.class));
@@ -441,9 +443,9 @@ public class SourceApiV3ResourceTest {
     }
 
     public void testRegisterSourceSuccess() throws Exception {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.uploadFileToBookkeeper(
+        mockStatic(WorkerUtils.class);
+        doNothing().when(WorkerUtils.class);
+        WorkerUtils.uploadFileToBookkeeper(
                 anyString(),
                 any(File.class),
                 any(Namespace.class));
@@ -461,9 +463,9 @@ public class SourceApiV3ResourceTest {
 
     @Test
     public void testRegisterSourceConflictingFields() throws Exception {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.uploadFileToBookkeeper(
+        mockStatic(WorkerUtils.class);
+        doNothing().when(WorkerUtils.class);
+        WorkerUtils.uploadFileToBookkeeper(
                 anyString(),
                 any(File.class),
                 any(Namespace.class));
@@ -504,9 +506,9 @@ public class SourceApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "source failed to register")
     public void testRegisterSourceFailure() throws Exception {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.uploadFileToBookkeeper(
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.uploadFileToBookkeeper(
                     anyString(),
                     any(File.class),
                     any(Namespace.class));
@@ -529,9 +531,9 @@ public class SourceApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registration interrupted")
     public void testRegisterSourceInterrupted() throws Exception {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.uploadFileToBookkeeper(
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.uploadFileToBookkeeper(
                     anyString(),
                     any(File.class),
                     any(Namespace.class));
@@ -616,9 +618,9 @@ public class SourceApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
     public void testUpdateSourceMissingPackage() throws IOException {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
 
             testUpdateSourceMissingArguments(
                 tenant,
@@ -640,9 +642,9 @@ public class SourceApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
     public void testUpdateSourceMissingTopicName() throws IOException {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
 
             testUpdateSourceMissingArguments(
                     tenant,
@@ -664,9 +666,9 @@ public class SourceApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source parallelism should positive number")
     public void testUpdateSourceNegativeParallelism() throws IOException {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
 
             testUpdateSourceMissingArguments(
                     tenant,
@@ -688,9 +690,9 @@ public class SourceApiV3ResourceTest {
     @Test
     public void testUpdateSourceChangedParallelism() throws IOException {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
 
             testUpdateSourceMissingArguments(
                 tenant,
@@ -712,9 +714,9 @@ public class SourceApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Destination topics differ")
     public void testUpdateSourceChangedTopic() throws IOException {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
 
             testUpdateSourceMissingArguments(
                     tenant,
@@ -736,9 +738,9 @@ public class SourceApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source parallelism should positive number")
     public void testUpdateSourceZeroParallelism() throws IOException {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
 
             testUpdateSourceMissingArguments(
                     tenant,
@@ -773,12 +775,12 @@ public class SourceApiV3ResourceTest {
         doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class);
         ConnectorUtils.getIOSourceClass(any(NarClassLoader.class));
 
-        mockStatic(org.apache.pulsar.functions.utils.Utils.class);
-        doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class);
-        org.apache.pulsar.functions.utils.Utils.getSourceType(anyString(), any(NarClassLoader.class));
+        mockStatic(FunctionCommon.class);
+        doReturn(String.class).when(FunctionCommon.class);
+        FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class));
 
-        doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class);
-        org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), anyString(), any(File.class));
+        doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
+        FunctionCommon.extractNarClassLoader(any(Path.class), anyString(), any(File.class));
 
         this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
         when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
@@ -843,12 +845,12 @@ public class SourceApiV3ResourceTest {
         doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class);
         ConnectorUtils.getIOSourceClass(any(NarClassLoader.class));
 
-        mockStatic(org.apache.pulsar.functions.utils.Utils.class);
-        doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class);
-        org.apache.pulsar.functions.utils.Utils.getSourceType(anyString(), any(NarClassLoader.class));
+        mockStatic(FunctionCommon.class);
+        doReturn(String.class).when(FunctionCommon.class);
+        FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class));
 
-        doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class);
-        org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), anyString(), any(File.class));
+        doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
+        FunctionCommon.extractNarClassLoader(any(Path.class), anyString(), any(File.class));
 
         this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
         when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
@@ -879,9 +881,9 @@ public class SourceApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "upload failure")
     public void testUpdateSourceUploadFailure() throws Exception {
         try {
-            mockStatic(Utils.class);
-            doThrow(new IOException("upload failure")).when(Utils.class);
-            Utils.uploadFileToBookkeeper(
+            mockStatic(WorkerUtils.class);
+            doThrow(new IOException("upload failure")).when(WorkerUtils.class);
+            WorkerUtils.uploadFileToBookkeeper(
                     anyString(),
                     any(File.class),
                     any(Namespace.class));
@@ -896,9 +898,9 @@ public class SourceApiV3ResourceTest {
 
     @Test
     public void testUpdateSourceSuccess() throws Exception {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.uploadFileToBookkeeper(
+        mockStatic(WorkerUtils.class);
+        doNothing().when(WorkerUtils.class);
+        WorkerUtils.uploadFileToBookkeeper(
                 anyString(),
                 any(File.class),
                 any(Namespace.class));
@@ -933,12 +935,12 @@ public class SourceApiV3ResourceTest {
         doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class);
         ConnectorUtils.getIOSourceClass(any(NarClassLoader.class));
 
-        mockStatic(org.apache.pulsar.functions.utils.Utils.class);
-        doReturn(String.class).when(org.apache.pulsar.functions.utils.Utils.class);
-        org.apache.pulsar.functions.utils.Utils.getSourceType(anyString(), any(NarClassLoader.class));
+        mockStatic(FunctionCommon.class);
+        doReturn(String.class).when(FunctionCommon.class);
+        FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class));
 
-        doReturn(mock(NarClassLoader.class)).when(org.apache.pulsar.functions.utils.Utils.class);
-        org.apache.pulsar.functions.utils.Utils.extractNarClassLoader(any(Path.class), anyString(), any(File.class));
+        doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
+        FunctionCommon.extractNarClassLoader(any(Path.class), anyString(), any(File.class));
 
         this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
         when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
@@ -967,9 +969,9 @@ public class SourceApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "source failed to register")
     public void testUpdateSourceFailure() throws Exception {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.uploadFileToBookkeeper(
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.uploadFileToBookkeeper(
                     anyString(),
                     any(File.class),
                     any(Namespace.class));
@@ -992,9 +994,9 @@ public class SourceApiV3ResourceTest {
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registration interrupted")
     public void testUpdateSourceInterrupted() throws Exception {
         try {
-            mockStatic(Utils.class);
-            doNothing().when(Utils.class);
-            Utils.uploadFileToBookkeeper(
+            mockStatic(WorkerUtils.class);
+            doNothing().when(WorkerUtils.class);
+            WorkerUtils.uploadFileToBookkeeper(
                     anyString(),
                     any(File.class),
                     any(Namespace.class));
@@ -1321,9 +1323,9 @@ public class SourceApiV3ResourceTest {
                 FunctionDetails.newBuilder().setName("test-3").build()).build();
         functionMetaDataList.add(f3);
         when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
-        doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
-        doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
-        doReturn(org.apache.pulsar.functions.utils.Utils.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
+        doReturn(ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
+        doReturn(ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
+        doReturn(ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
 
         List<String> sourceList = listDefaultSources();
         assertEquals(functions, sourceList);