You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/08/31 12:24:51 UTC

[pulsar] branch master updated: [PIP-193] [feature][connectors] Add support for a transform Function in Sinks (#16740)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new dab0d1f389e [PIP-193] [feature][connectors] Add support for a transform Function in Sinks (#16740)
dab0d1f389e is described below

commit dab0d1f389ee66277c8350072af304895619eb9a
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Wed Aug 31 14:24:42 2022 +0200

    [PIP-193] [feature][connectors] Add support for a transform Function in Sinks (#16740)
---
 .../pulsar/broker/admin/impl/FunctionsBase.java    |   7 +-
 .../org/apache/pulsar/client/admin/Functions.java  |  36 ++
 .../org/apache/pulsar/common/io/SinkConfig.java    |   3 +
 .../client/admin/internal/FunctionsImpl.java       |  16 +
 .../apache/pulsar/admin/cli/CmdFunctionsTest.java  |  38 +++
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  |   7 +-
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java |  19 ++
 .../org/apache/pulsar/admin/cli/TestCmdSinks.java  |  62 ++++
 .../pulsar/functions/instance/InstanceConfig.java  |   1 +
 .../functions/instance/JavaInstanceRunnable.java   | 198 ++++++++++-
 .../functions/instance/OutputRecordSinkRecord.java |  11 +
 .../functions/instance/SinkSchemaInfoProvider.java |  79 +++++
 .../instance/JavaInstanceRunnableTest.java         |   2 +-
 .../functions/api/examples/RecordFunction.java     |   1 +
 .../org/apache/pulsar/functions/LocalRunner.java   |  14 +-
 .../proto/src/main/proto/Function.proto            |   1 +
 .../functions/runtime/JavaInstanceStarter.java     |  12 +
 .../pulsar/functions/runtime/RuntimeFactory.java   |   1 +
 .../pulsar/functions/runtime/RuntimeSpawner.java   |   3 +
 .../pulsar/functions/runtime/RuntimeUtils.java     |  20 +-
 .../runtime/kubernetes/KubernetesRuntime.java      |  87 ++---
 .../kubernetes/KubernetesRuntimeFactory.java       |   3 +
 .../functions/runtime/process/ProcessRuntime.java  |   2 +
 .../runtime/process/ProcessRuntimeFactory.java     |   3 +
 .../functions/runtime/thread/ThreadRuntime.java    |  35 +-
 .../runtime/thread/ThreadRuntimeFactory.java       |   3 +
 .../pulsar/functions/runtime/RuntimeUtilsTest.java |   1 +
 .../runtime/kubernetes/KubernetesRuntimeTest.java  |  44 +--
 .../runtime/process/ProcessRuntimeTest.java        |  23 +-
 .../pulsar/functions/utils/FunctionCommon.java     |  83 ++---
 .../functions/utils/FunctionConfigUtils.java       |   3 +-
 .../pulsar/functions/utils/SinkConfigUtils.java    |  85 ++++-
 .../functions/utils/SinkConfigUtilsTest.java       |  83 ++++-
 .../pulsar/functions/worker/FunctionActioner.java  |  93 ++++--
 .../functions/worker/FunctionRuntimeManager.java   |   7 +-
 .../functions/worker/rest/api/ComponentImpl.java   | 178 ++++++++--
 .../functions/worker/rest/api/FunctionsImpl.java   | 110 +-----
 .../functions/worker/rest/api/SinksImpl.java       | 149 ++++-----
 .../functions/worker/rest/api/SourcesImpl.java     |  89 +----
 .../worker/rest/api/v3/FunctionsApiV3Resource.java |   7 +-
 .../functions/worker/service/api/Component.java    |  15 +-
 .../functions/worker/FunctionActionerTest.java     |  75 +++--
 .../worker/FunctionRuntimeManagerTest.java         |   9 +-
 .../worker/rest/api/FunctionsImplTest.java         |   4 +-
 .../rest/api/v3/FunctionApiV3ResourceTest.java     |  60 ++++
 .../worker/rest/api/v3/SinkApiV3ResourceTest.java  | 245 +++++++++++---
 .../tests/integration/io/TestLoggingSink.java      |  74 +++++
 .../io/SinkWithTransformFunctionTest.java          | 370 +++++++++++++++++++++
 48 files changed, 1890 insertions(+), 581 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index d5380407491..be9dc8c7b71 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -711,9 +711,12 @@ public class FunctionsBase extends AdminResource {
             @ApiParam(value = "The namespace of a Pulsar Function")
             final @PathParam("namespace") String namespace,
             @ApiParam(value = "The name of a Pulsar Function")
-            final @PathParam("functionName") String functionName) {
+            final @PathParam("functionName") String functionName,
+            @ApiParam(value = "Whether to download the transform-function")
+            final @QueryParam("transform-function") boolean transformFunction) {
 
-        return functions().downloadFunction(tenant, namespace, functionName, clientAppId(), clientAuthData());
+        return functions()
+                .downloadFunction(tenant, namespace, functionName, clientAppId(), clientAuthData(), transformFunction);
     }
 
     @GET
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 5bf289f9b65..55e0855ca55 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -757,6 +757,42 @@ public interface Functions {
     CompletableFuture<Void> downloadFunctionAsync(
             String destinationFile, String tenant, String namespace, String function);
 
+    /**
+     * Download Function Code.
+     *
+     * @param destinationFile
+     *           file where data should be downloaded to
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     * @param transformFunction
+     *            Whether to download the transform function (for sources and sinks)
+     * @throws PulsarAdminException
+     */
+    void downloadFunction(String destinationFile, String tenant, String namespace, String function,
+                          boolean transformFunction) throws PulsarAdminException;
+
+    /**
+     * Download Function Code asynchronously.
+     *
+     * @param destinationFile
+     *           file where data should be downloaded to
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     * @param transformFunction
+     *            Whether to download the transform function (for sources and sinks)
+     */
+    CompletableFuture<Void> downloadFunctionAsync(
+            String destinationFile, String tenant, String namespace, String function, boolean transformFunction);
+
+
     /**
      * Deprecated in favor of getting sources and sinks for their own APIs.
      * <p/>
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
index f1844e6090c..316cdd69e07 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
@@ -91,4 +91,7 @@ public class SinkConfig {
     // to change behavior at runtime. Currently, this primarily used by the KubernetesManifestCustomizer
     // interface
     private String customRuntimeOptions;
+    private String transformFunction;
+    private String transformFunctionClassName;
+    private String transformFunctionConfig;
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index ff2c22daae5..482846ad8ac 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -606,6 +606,22 @@ public class FunctionsImpl extends ComponentResource implements Functions {
                 functions.path(tenant).path(namespace).path(functionName).path("download"));
     }
 
+    @Override
+    public void downloadFunction(String destinationPath, String tenant, String namespace, String functionName,
+                                 boolean transformFunction) throws PulsarAdminException {
+        downloadFile(destinationPath, functions.path(tenant).path(namespace).path(functionName).path("download")
+                .queryParam("transform-function", transformFunction));
+    }
+
+    @Override
+    public CompletableFuture<Void> downloadFunctionAsync(
+            String destinationPath, String tenant, String namespace, String functionName, boolean transformFunction) {
+        return downloadFileAsync(destinationPath,
+                functions.path(tenant).path(namespace).path(functionName).path("download")
+                        .queryParam("transform-function", transformFunction));
+    }
+
+
     @Override
     public void downloadFunction(String destinationPath, String path) throws PulsarAdminException {
         downloadFile(destinationPath, functions.path("download").queryParam("path", path));
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index e98894e9c65..4fc723e5f4c 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -859,6 +859,44 @@ public class CmdFunctionsTest {
         verify(functions, times(1)).updateFunctionWithUrl(any(FunctionConfig.class), anyString(), eq(updateOptions));
     }
 
+    @Test
+    public void testDownloadFunction() throws Exception {
+        cmd.run(new String[] {
+                "download",
+                "--destination-file", JAR_NAME,
+                "--name", FN_NAME,
+                "--tenant", TENANT,
+                "--namespace", NAMESPACE
+        });
+        verify(functions, times(1))
+                .downloadFunction(JAR_NAME, TENANT, NAMESPACE, FN_NAME, false);
+    }
+
+    @Test
+    public void testDownloadFunctionByPath() throws Exception {
+        cmd.run(new String[] {
+                "download",
+                "--destination-file", JAR_NAME,
+                "--path", PACKAGE_URL
+        });
+        verify(functions, times(1))
+                .downloadFunction(JAR_NAME, PACKAGE_URL);
+    }
+
+    @Test
+    public void testDownloadTransformFunction() throws Exception {
+        cmd.run(new String[] {
+                "download",
+                "--destination-file", JAR_NAME,
+                "--name", FN_NAME,
+                "--tenant", TENANT,
+                "--namespace", NAMESPACE,
+                "--transform-function"
+        });
+        verify(functions, times(1))
+                .downloadFunction(JAR_NAME, TENANT, NAMESPACE, FN_NAME, true);
+    }
+
 
     public static class ConsoleOutputCapturer {
         private ByteArrayOutputStream stdout;
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index a37197ccc28..43d13bc97dd 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -1171,6 +1171,10 @@ public class CmdFunctions extends CmdBase {
                 description = "Path or functionPkgUrl to store the content",
                 listConverter = StringConverter.class, required = false, hidden = true)
         protected String path;
+        @Parameter(
+                names = "--transform-function",
+                description = "Download the transform Function of the connector")
+        protected Boolean transformFunction = false;
 
         private void mergeArgs() {
             if (isBlank(destinationFile) && !isBlank(deprecatedDestinationFile)) {
@@ -1195,7 +1199,8 @@ public class CmdFunctions extends CmdBase {
             if (path != null) {
                 getAdmin().functions().downloadFunction(destinationFile, path);
             } else {
-                getAdmin().functions().downloadFunction(destinationFile, tenant, namespace, functionName);
+                getAdmin().functions()
+                        .downloadFunction(destinationFile, tenant, namespace, functionName, transformFunction);
             }
             print("Downloaded successfully");
         }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index f89dc5b8cc0..7aae6ece799 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -398,6 +398,13 @@ public class CmdSinks extends CmdBase {
         @Parameter(names = "--secrets", description = "The map of secretName to an object that encapsulates "
                 + "how the secret is fetched by the underlying secrets provider")
         protected String secretsString;
+        @Parameter(names = "--transform-function", description = "Transform function applied before the Sink")
+        protected String transformFunction;
+        @Parameter(names = "--transform-function-classname", description = "The transform function class name")
+        protected String transformFunctionClassName;
+        @Parameter(names = "--transform-function-config", description = "Configuration of the transform function "
+                + "applied before the Sink")
+        protected String transformFunctionConfig;
 
         protected SinkConfig sinkConfig;
 
@@ -578,6 +585,18 @@ public class CmdSinks extends CmdBase {
                 sinkConfig.setSecrets(secretsMap);
             }
 
+            if (transformFunction != null) {
+                sinkConfig.setTransformFunction(transformFunction);
+            }
+
+            if (transformFunctionClassName != null) {
+                sinkConfig.setTransformFunctionClassName(transformFunctionClassName);
+            }
+
+            if (transformFunctionConfig != null) {
+                sinkConfig.setTransformFunctionConfig(transformFunctionConfig);
+            }
+
             // check if configs are valid
             validateSinkConfigs(sinkConfig);
         }
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index 7a00f0429fc..1cb86102802 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -83,6 +83,9 @@ public class TestCmdSinks {
     private static final Long RAM = 1024L * 1024L;
     private static final Long DISK = 1024L * 1024L * 1024L;
     private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon Jul 02 00:33:15 +0000 2018\",\"int\":1000,\"int_string\":\"1000\",\"float\":1000.0,\"float_string\":\"1000.0\"}";
+    private static final String TRANSFORM_FUNCTION = "transform";
+    private static final String TRANSFORM_FUNCTION_CLASSNAME = "TransformFunction";
+    private static final String TRANSFORM_FUNCTION_CONFIG = "{\"test_function_config\": \"\"}";
 
     private PulsarAdmin pulsarAdmin;
     private Sinks sink;
@@ -146,6 +149,11 @@ public class TestCmdSinks {
         sinkConfig.setArchive(JAR_FILE_PATH);
         sinkConfig.setResources(new Resources(CPU, RAM, DISK));
         sinkConfig.setConfigs(createSink.parseConfigs(SINK_CONFIG_STRING));
+
+        sinkConfig.setTransformFunction(TRANSFORM_FUNCTION);
+        sinkConfig.setTransformFunctionClassName(TRANSFORM_FUNCTION_CLASSNAME);
+        sinkConfig.setTransformFunctionConfig(TRANSFORM_FUNCTION_CONFIG);
+
         return sinkConfig;
     }
 
@@ -166,6 +174,9 @@ public class TestCmdSinks {
                 RAM,
                 DISK,
                 SINK_CONFIG_STRING,
+                TRANSFORM_FUNCTION,
+                TRANSFORM_FUNCTION_CLASSNAME,
+                TRANSFORM_FUNCTION_CONFIG,
                 sinkConfig
         );
     }
@@ -188,6 +199,9 @@ public class TestCmdSinks {
                 RAM,
                 DISK,
                 SINK_CONFIG_STRING,
+                TRANSFORM_FUNCTION,
+                TRANSFORM_FUNCTION_CLASSNAME,
+                TRANSFORM_FUNCTION_CONFIG,
                 sinkConfig
         );
     }
@@ -211,6 +225,9 @@ public class TestCmdSinks {
                 RAM,
                 DISK,
                 SINK_CONFIG_STRING,
+                TRANSFORM_FUNCTION,
+                TRANSFORM_FUNCTION_CLASSNAME,
+                TRANSFORM_FUNCTION_CONFIG,
                 sinkConfig
         );
     }
@@ -233,6 +250,9 @@ public class TestCmdSinks {
                 RAM,
                 DISK,
                 SINK_CONFIG_STRING,
+                TRANSFORM_FUNCTION,
+                TRANSFORM_FUNCTION_CLASSNAME,
+                TRANSFORM_FUNCTION_CONFIG,
                 sinkConfig
         );
     }
@@ -255,6 +275,9 @@ public class TestCmdSinks {
                 RAM,
                 DISK,
                 SINK_CONFIG_STRING,
+                TRANSFORM_FUNCTION,
+                TRANSFORM_FUNCTION_CLASSNAME,
+                TRANSFORM_FUNCTION_CONFIG,
                 sinkConfig
         );
     }
@@ -277,6 +300,9 @@ public class TestCmdSinks {
                 RAM,
                 DISK,
                 SINK_CONFIG_STRING,
+                TRANSFORM_FUNCTION,
+                TRANSFORM_FUNCTION_CLASSNAME,
+                TRANSFORM_FUNCTION_CONFIG,
                 sinkConfig
         );
     }
@@ -301,6 +327,9 @@ public class TestCmdSinks {
                 RAM,
                 DISK,
                 SINK_CONFIG_STRING,
+                TRANSFORM_FUNCTION,
+                TRANSFORM_FUNCTION_CLASSNAME,
+                TRANSFORM_FUNCTION_CONFIG,
                 sinkConfig
         );
     }
@@ -323,6 +352,9 @@ public class TestCmdSinks {
                 RAM,
                 DISK,
                 null,
+                TRANSFORM_FUNCTION,
+                TRANSFORM_FUNCTION_CLASSNAME,
+                TRANSFORM_FUNCTION_CONFIG,
                 sinkConfig
         );
     }
@@ -341,6 +373,9 @@ public class TestCmdSinks {
             Long ram,
             Long disk,
             String sinkConfigString,
+            String transformFunction,
+            String transformFunctionClassName,
+            String transformFunctionConfig,
             SinkConfig sinkConfig) throws Exception {
 
         // test create sink
@@ -357,6 +392,9 @@ public class TestCmdSinks {
         createSink.ram = ram;
         createSink.disk = disk;
         createSink.sinkConfigString = sinkConfigString;
+        createSink.transformFunction = transformFunction;
+        createSink.transformFunctionClassName = transformFunctionClassName;
+        createSink.transformFunctionConfig = transformFunctionConfig;
 
         createSink.processArguments();
 
@@ -376,6 +414,9 @@ public class TestCmdSinks {
         updateSink.ram = ram;
         updateSink.disk = disk;
         updateSink.sinkConfigString = sinkConfigString;
+        updateSink.transformFunction = transformFunction;
+        updateSink.transformFunctionClassName = transformFunctionClassName;
+        updateSink.transformFunctionConfig = transformFunctionConfig;
 
         updateSink.processArguments();
 
@@ -395,6 +436,9 @@ public class TestCmdSinks {
         localSinkRunner.ram = ram;
         localSinkRunner.disk = disk;
         localSinkRunner.sinkConfigString = sinkConfigString;
+        localSinkRunner.transformFunction = transformFunction;
+        localSinkRunner.transformFunctionClassName = transformFunctionClassName;
+        localSinkRunner.transformFunctionConfig = transformFunctionConfig;
 
         localSinkRunner.processArguments();
 
@@ -539,6 +583,9 @@ public class TestCmdSinks {
         testSinkConfig.setResources(new Resources(CPU + 1, RAM + 1, DISK + 1));
         testSinkConfig.setConfigs(createSink.parseConfigs("{\"created_at-prime\":\"Mon Jul 02 00:33:15 +0000 2018\", \"otherConfigProperties\":{\"property1.value\":\"value1\",\"property2.value\":\"value2\"}}"));
 
+        testSinkConfig.setTransformFunction(TRANSFORM_FUNCTION + "-prime");
+        testSinkConfig.setTransformFunction(TRANSFORM_FUNCTION_CLASSNAME + "-prime");
+        testSinkConfig.setTransformFunction("{\"test_function_config\": \"prime\"}");
 
         SinkConfig expectedSinkConfig = getSinkConfig();
 
@@ -563,6 +610,9 @@ public class TestCmdSinks {
                 RAM,
                 DISK,
                 SINK_CONFIG_STRING,
+                TRANSFORM_FUNCTION,
+                TRANSFORM_FUNCTION_CLASSNAME,
+                TRANSFORM_FUNCTION_CONFIG,
                 file.getAbsolutePath(),
                 expectedSinkConfig
         );
@@ -583,6 +633,9 @@ public class TestCmdSinks {
             Long ram,
             Long disk,
             String sinkConfigString,
+            String transformFunction,
+            String transformFunctionClassName,
+            String transformFunctionConfig,
             String sinkConfigFile,
             SinkConfig sinkConfig
     ) throws Exception {
@@ -602,6 +655,9 @@ public class TestCmdSinks {
         createSink.ram = ram;
         createSink.disk = disk;
         createSink.sinkConfigString = sinkConfigString;
+        createSink.transformFunction = transformFunction;
+        createSink.transformFunctionClassName = transformFunctionClassName;
+        createSink.transformFunctionConfig = transformFunctionConfig;
         createSink.sinkConfigFile = sinkConfigFile;
 
         createSink.processArguments();
@@ -622,6 +678,9 @@ public class TestCmdSinks {
         updateSink.ram = ram;
         updateSink.disk = disk;
         updateSink.sinkConfigString = sinkConfigString;
+        updateSink.transformFunction = transformFunction;
+        updateSink.transformFunctionClassName = transformFunctionClassName;
+        updateSink.transformFunctionConfig = transformFunctionConfig;
         updateSink.sinkConfigFile = sinkConfigFile;
 
         updateSink.processArguments();
@@ -642,6 +701,9 @@ public class TestCmdSinks {
         localSinkRunner.ram = ram;
         localSinkRunner.disk = disk;
         localSinkRunner.sinkConfigString = sinkConfigString;
+        localSinkRunner.transformFunction = transformFunction;
+        localSinkRunner.transformFunctionClassName = transformFunctionClassName;
+        localSinkRunner.transformFunctionConfig = transformFunctionConfig;
         localSinkRunner.sinkConfigFile = sinkConfigFile;
 
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
index 71f0c154af6..3aefa6b2dca 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 public class InstanceConfig {
     private int instanceId;
     private String functionId;
+    private String transformFunctionId;
     private String functionVersion;
     private FunctionDetails functionDetails;
     private int maxBufferedTuples;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 11e443e72e0..bd779e0fccc 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -22,11 +22,14 @@ import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromFuncti
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.annotations.VisibleForTesting;
 import com.scurrilous.circe.checksum.Crc32cIntChecksum;
+import io.netty.buffer.ByteBuf;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
@@ -42,16 +45,31 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
+import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema;
+import org.apache.pulsar.client.impl.schema.ProtobufSchema;
 import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.ProducerConfig;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.api.StateStore;
 import org.apache.pulsar.functions.api.StateStoreContext;
+import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl;
 import org.apache.pulsar.functions.instance.state.InstanceStateManager;
 import org.apache.pulsar.functions.instance.state.StateManager;
@@ -126,6 +144,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
     private final Map<String, String> properties;
 
     private final ClassLoader instanceClassLoader;
+    private final ClassLoader componentClassLoader;
     private final ClassLoader functionClassLoader;
 
     // a flog to determine if member variables have been initialized as part of setup().
@@ -135,6 +154,10 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
     // a read write lock for stats operations
     private final ReadWriteLock statsLock = new ReentrantReadWriteLock();
 
+    private Class<?> sinkTypeArg;
+    private final AtomicReference<Schema<?>> sinkSchema = new AtomicReference<>();
+    private SinkSchemaInfoProvider sinkSchemaInfoProvider = null;
+
     public JavaInstanceRunnable(InstanceConfig instanceConfig,
                                 ClientBuilder clientBuilder,
                                 PulsarClient pulsarClient,
@@ -143,7 +166,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
                                 String stateStorageServiceUrl,
                                 SecretsProvider secretsProvider,
                                 FunctionCollectorRegistry collectorRegistry,
-                                ClassLoader functionClassLoader) throws PulsarClientException {
+                                ClassLoader componentClassLoader,
+                                ClassLoader transformFunctionClassLoader) throws PulsarClientException {
         this.instanceConfig = instanceConfig;
         this.clientBuilder = clientBuilder;
         this.client = (PulsarClientImpl) pulsarClient;
@@ -151,7 +175,10 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         this.stateStorageImplClass = stateStorageImplClass;
         this.stateStorageServiceUrl = stateStorageServiceUrl;
         this.secretsProvider = secretsProvider;
-        this.functionClassLoader = functionClassLoader;
+        this.componentClassLoader = componentClassLoader;
+        this.functionClassLoader = transformFunctionClassLoader != null
+            ? transformFunctionClassLoader
+            : componentClassLoader;
         this.metricsLabels = new String[]{
                 instanceConfig.getFunctionDetails().getTenant(),
                 String.format("%s/%s", instanceConfig.getFunctionDetails().getTenant(),
@@ -227,6 +254,10 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         // start any log topic handler
         setupLogHandler();
 
+        if (!(object instanceof IdentityFunction) && !(sink instanceof PulsarSink)) {
+            sinkSchemaInfoProvider = new SinkSchemaInfoProvider();
+        }
+
         javaInstance = new JavaInstance(contextImpl, object, instanceConfig);
         try {
             Thread.currentThread().setContextClassLoader(functionClassLoader);
@@ -260,8 +291,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
 
             Thread currentThread = Thread.currentThread();
             Consumer<Throwable> asyncErrorHandler = throwable -> currentThread.interrupt();
-            AsyncResultConsumer asyncResultConsumer =
-                    (record, javaExecutionResult) -> handleResult(record, javaExecutionResult);
+            AsyncResultConsumer asyncResultConsumer = this::handleResult;
 
             while (true) {
                 currentRecord = readInput();
@@ -383,11 +413,18 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
 
     private void sendOutputMessage(Record srcRecord, Object output) throws Exception {
         if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) {
-            Thread.currentThread().setContextClassLoader(functionClassLoader);
+            Thread.currentThread().setContextClassLoader(componentClassLoader);
         }
         AbstractSinkRecord<?> sinkRecord;
         if (output instanceof Record) {
-            sinkRecord = new OutputRecordSinkRecord<>(srcRecord, (Record) output);
+            Record record = (Record) output;
+            if (sinkSchemaInfoProvider != null) {
+                // Function and Sink coupled together so we need to encode with the Function Schema
+                // and decode with the Sink schema
+                sinkRecord = encodeWithRecordSchemaAndDecodeWithSinkSchema(srcRecord, record);
+            } else {
+                sinkRecord = new OutputRecordSinkRecord<>(srcRecord, record);
+            }
         } else {
             sinkRecord = new SinkRecord<>(srcRecord, output);
         }
@@ -404,10 +441,50 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         }
     }
 
+    private OutputRecordSinkRecord encodeWithRecordSchemaAndDecodeWithSinkSchema(Record srcRecord, Record record) {
+        AbstractSinkRecord<?> sinkRecord;
+        Schema encodingSchema = record.getSchema();
+        boolean isKeyValueSeparated = false;
+        if (encodingSchema instanceof KeyValueSchema) {
+            KeyValueSchema<?, ?> kvSchema = (KeyValueSchema<?, ?>) encodingSchema;
+            // If the encoding is SEPARATED, it's easier to encode/decode with INLINE
+            // and rebuild the SEPARATED KeyValueSchema after decoding
+            if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
+                encodingSchema = KeyValueSchemaImpl.of(kvSchema.getKeySchema(), kvSchema.getValueSchema());
+                isKeyValueSeparated = true;
+            }
+        }
+        byte[] encoded = encodingSchema.encode(record.getValue());
+
+        if (sinkSchema.get() == null) {
+            Schema<?> schema = getSinkSchema(record, sinkTypeArg);
+            schema.setSchemaInfoProvider(sinkSchemaInfoProvider);
+            sinkSchema.compareAndSet(null, schema);
+        }
+        Schema<?> schema = sinkSchema.get();
+        SchemaVersion schemaVersion = sinkSchemaInfoProvider.addSchemaIfNeeded(encodingSchema);
+        final byte[] schemaVersionBytes = schemaVersion.bytes();
+        Object decoded = schema.decode(encoded, schemaVersionBytes);
+
+        if (schema instanceof AutoConsumeSchema) {
+            schema = ((AutoConsumeSchema) schema).getInternalSchema(schemaVersionBytes);
+        }
+
+        final Schema<?> finalSchema;
+        if (isKeyValueSeparated && schema instanceof KeyValueSchema) {
+            KeyValueSchema<?, ?> kvSchema = (KeyValueSchema<?, ?>) schema;
+            finalSchema = KeyValueSchemaImpl.of(kvSchema.getKeySchema(), kvSchema.getValueSchema(),
+                KeyValueEncodingType.SEPARATED);
+        } else {
+            finalSchema = schema;
+        }
+        return new OutputRecordSinkRecord(srcRecord, record, decoded, finalSchema);
+    }
+
     private Record readInput() throws Exception {
         Record record;
         if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
-            Thread.currentThread().setContextClassLoader(functionClassLoader);
+            Thread.currentThread().setContextClassLoader(componentClassLoader);
         }
         try {
             record = this.source.read();
@@ -446,7 +523,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
 
         if (source != null) {
             if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
-                Thread.currentThread().setContextClassLoader(functionClassLoader);
+                Thread.currentThread().setContextClassLoader(componentClassLoader);
             }
             try {
                 source.close();
@@ -461,7 +538,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
 
         if (sink != null) {
             if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) {
-                Thread.currentThread().setContextClassLoader(functionClassLoader);
+                Thread.currentThread().setContextClassLoader(componentClassLoader);
             }
             try {
                 sink.close();
@@ -769,7 +846,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
             } else {
                 object = Reflections.createInstance(
                   sourceSpec.getClassName(),
-                  this.functionClassLoader);
+                  this.componentClassLoader);
             }
         }
 
@@ -783,7 +860,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         this.source = (Source<?>) object;
 
         if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
-            Thread.currentThread().setContextClassLoader(this.functionClassLoader);
+            Thread.currentThread().setContextClassLoader(this.componentClassLoader);
         }
         try {
             if (sourceSpec.getConfigs().isEmpty()) {
@@ -847,17 +924,18 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         } else {
             object = Reflections.createInstance(
                     sinkSpec.getClassName(),
-                    this.functionClassLoader);
+                    this.componentClassLoader);
         }
 
         if (object instanceof Sink) {
             this.sink = (Sink) object;
+            this.sinkTypeArg = TypeResolver.resolveRawArguments(Sink.class, object.getClass())[0];
         } else {
             throw new RuntimeException("Sink does not implement correct interface");
         }
 
         if (componentType == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SINK) {
-            Thread.currentThread().setContextClassLoader(this.functionClassLoader);
+            Thread.currentThread().setContextClassLoader(this.componentClassLoader);
         }
         try {
             if (sinkSpec.getConfigs().isEmpty()) {
@@ -881,4 +959,98 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
             Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
         }
     }
+
+    private static <T> Schema<T> getSinkSchema(Record<?> record, Class<T> clazz) {
+        SchemaType type = getSchemaTypeOrDefault(record, clazz);
+        switch (type) {
+            case NONE:
+                if (ByteBuffer.class.isAssignableFrom(clazz)) {
+                    return (Schema<T>) Schema.BYTEBUFFER;
+                } else {
+                    return (Schema<T>) Schema.BYTES;
+                }
+
+            case AUTO_CONSUME:
+            case AUTO:
+                return (Schema<T>) Schema.AUTO_CONSUME();
+
+            case STRING:
+                return (Schema<T>) Schema.STRING;
+
+            case AVRO:
+                return AvroSchema.of(SchemaDefinition.<T>builder()
+                    .withPojo(clazz).build());
+
+            case JSON:
+                return JSONSchema.of(SchemaDefinition.<T>builder().withPojo(clazz).build());
+
+            case KEY_VALUE:
+                return (Schema<T>) Schema.KV_BYTES();
+
+            case PROTOBUF:
+                return ProtobufSchema.ofGenericClass(clazz, new HashMap<>());
+
+            case PROTOBUF_NATIVE:
+                return ProtobufNativeSchema.ofGenericClass(clazz, new HashMap<>());
+
+            case AUTO_PUBLISH:
+                return (Schema<T>) Schema.AUTO_PRODUCE_BYTES();
+
+            default:
+                throw new RuntimeException("Unsupported schema type" + type);
+        }
+    }
+
+    private static SchemaType getSchemaTypeOrDefault(Record<?> record, Class<?> clazz) {
+        if (GenericObject.class.isAssignableFrom(clazz)) {
+            return SchemaType.AUTO_CONSUME;
+        } else if (byte[].class.equals(clazz)
+            || ByteBuf.class.equals(clazz)
+            || ByteBuffer.class.equals(clazz)) {
+            // if sink uses bytes, we should ignore
+            return SchemaType.NONE;
+        } else {
+            Schema<?> schema = record.getSchema();
+            if (schema != null) {
+                if (schema.getSchemaInfo().getType() == SchemaType.NONE) {
+                    return getDefaultSchemaType(clazz);
+                } else {
+                    return schema.getSchemaInfo().getType();
+                }
+            } else {
+                return getDefaultSchemaType(clazz);
+            }
+        }
+    }
+
+    private static SchemaType getDefaultSchemaType(Class<?> clazz) {
+        if (byte[].class.equals(clazz)
+            || ByteBuf.class.equals(clazz)
+            || ByteBuffer.class.equals(clazz)) {
+            return SchemaType.NONE;
+        } else if (GenericObject.class.isAssignableFrom(clazz)) {
+            // the sink is taking generic record/object, so we do auto schema detection
+            return SchemaType.AUTO_CONSUME;
+        } else if (String.class.equals(clazz)) {
+            // If type is String, then we use schema type string, otherwise we fallback on default schema
+            return SchemaType.STRING;
+        } else if (isProtobufClass(clazz)) {
+            return SchemaType.PROTOBUF;
+        } else if (KeyValue.class.equals(clazz)) {
+            return SchemaType.KEY_VALUE;
+        } else {
+            return SchemaType.JSON;
+        }
+    }
+
+    private static boolean isProtobufClass(Class<?> pojoClazz) {
+        try {
+            Class<?> protobufBaseClass = Class.forName("com.google.protobuf.GeneratedMessageV3");
+            return protobufBaseClass.isAssignableFrom(pojoClazz);
+        } catch (ClassNotFoundException | NoClassDefFoundError e) {
+            // If sink does not have protobuf in classpath then it cannot be protobuf
+            return false;
+        }
+    }
+
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/OutputRecordSinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/OutputRecordSinkRecord.java
index 5b63c6e8896..233100cd2ee 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/OutputRecordSinkRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/OutputRecordSinkRecord.java
@@ -31,10 +31,21 @@ import org.apache.pulsar.functions.api.Record;
 public class OutputRecordSinkRecord<T> extends AbstractSinkRecord<T> {
 
     private final Record<T> sinkRecord;
+    private final T value;
+    private final Schema<T> schema;
 
     OutputRecordSinkRecord(Record<T> sourceRecord, Record<T> sinkRecord) {
         super(sourceRecord);
         this.sinkRecord = sinkRecord;
+        this.value = sinkRecord.getValue();
+        this.schema = getRecordSchema(sinkRecord);
+    }
+
+    OutputRecordSinkRecord(Record<T> sourceRecord, Record<T> sinkRecord, T value, Schema<T> schema) {
+        super(sourceRecord);
+        this.sinkRecord = sinkRecord;
+        this.value = value;
+        this.schema = schema;
     }
 
     @Override
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkSchemaInfoProvider.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkSchemaInfoProvider.java
new file mode 100644
index 00000000000..1bd41f226d1
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkSchemaInfoProvider.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
+import org.apache.pulsar.common.protocol.schema.SchemaHash;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+/**
+ * SchemaInfo provider that creates a new schema version for each new schema hash.
+ */
+class SinkSchemaInfoProvider implements SchemaInfoProvider {
+
+  AtomicLong latestVersion = new AtomicLong(0);
+  ConcurrentHashMap<SchemaVersion, SchemaInfo> schemaInfos = new ConcurrentHashMap<>();
+  ConcurrentHashMap<SchemaHash, SchemaVersion> schemaVersions = new ConcurrentHashMap<>();
+
+  /**
+   * Creates a new schema version with the info of the provided schema if the hash of the schema is a new one.
+   *
+   * @param schema schema for which we create a version
+   * @return the version of the schema
+   */
+  public SchemaVersion addSchemaIfNeeded(Schema<?> schema) {
+    SchemaHash schemaHash = SchemaHash.of(schema);
+    return schemaVersions.computeIfAbsent(schemaHash, s -> createNewSchemaInfo(schema.getSchemaInfo()));
+  }
+
+  private SchemaVersion createNewSchemaInfo(SchemaInfo schemaInfo) {
+    long l = latestVersion.incrementAndGet();
+    ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+    buffer.putLong(l);
+    BytesSchemaVersion schemaVersion = BytesSchemaVersion.of(buffer.array());
+    schemaInfos.put(schemaVersion, schemaInfo);
+    return schemaVersion;
+  }
+
+  @Override
+  public CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion) {
+    return CompletableFuture.completedFuture(schemaInfos.get(BytesSchemaVersion.of(schemaVersion)));
+  }
+
+  @Override
+  public CompletableFuture<SchemaInfo> getLatestSchema() {
+    long l = latestVersion.get();
+    ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+    buffer.putLong(l);
+    SchemaVersion schemaVersion = BytesSchemaVersion.of(buffer.array());
+    return CompletableFuture.completedFuture(schemaInfos.get(schemaVersion));
+  }
+
+  @Override
+  public String getTopicName() {
+    return "__INTERNAL__";
+  }
+}
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index 5029a064311..0f8385fc6fa 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -79,7 +79,7 @@ public class JavaInstanceRunnableTest {
         when(clientBuilder.build()).thenReturn(null);
         InstanceConfig config = createInstanceConfig(functionDetails);
         JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
-                config, clientBuilder, null, null, null, null, null, null, null);
+                config, clientBuilder, null, null, null, null, null, null, null, null);
         return javaInstanceRunnable;
     }
 
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java
index 55adf848da5..512c583e301 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java
@@ -38,6 +38,7 @@ public class RecordFunction implements Function<String, Record<String>> {
         return context.newOutputRecordBuilder(Schema.STRING)
                 .destinationTopic(publishTopic)
                 .value(output)
+                .schema(Schema.STRING)
                 .properties(properties)
                 .build();
     }
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 2b9b25ed743..6c7551f8ac3 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -441,7 +441,7 @@ public class LocalRunner implements AutoCloseable {
                 if (builtInSinkClassLoader != null) {
                     functionDetails = SinkConfigUtils.convert(
                             sinkConfig, SinkConfigUtils.validateAndExtractDetails(
-                                    sinkConfig, builtInSinkClassLoader, true));
+                                    sinkConfig, builtInSinkClassLoader, null, true));
                     userCodeClassLoader = builtInSinkClassLoader;
                 } else if (Utils.isFunctionPackageUrlSupported(userCodeFile)) {
                     File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
@@ -449,7 +449,8 @@ public class LocalRunner implements AutoCloseable {
                             Function.FunctionDetails.ComponentType.SINK,
                             sinkConfig.getClassName(), file, narExtractionDirectory);
                     functionDetails = SinkConfigUtils.convert(
-                            sinkConfig, SinkConfigUtils.validateAndExtractDetails(sinkConfig, sinkClassLoader, true));
+                            sinkConfig,
+                            SinkConfigUtils.validateAndExtractDetails(sinkConfig, sinkClassLoader, null, true));
                     userCodeClassLoader = sinkClassLoader;
                     userCodeClassLoaderCreated = true;
                 } else if (userCodeFile != null) {
@@ -461,7 +462,8 @@ public class LocalRunner implements AutoCloseable {
                             Function.FunctionDetails.ComponentType.SINK,
                             sinkConfig.getClassName(), file, narExtractionDirectory);
                     functionDetails = SinkConfigUtils.convert(
-                            sinkConfig, SinkConfigUtils.validateAndExtractDetails(sinkConfig, sinkClassLoader, true));
+                            sinkConfig,
+                            SinkConfigUtils.validateAndExtractDetails(sinkConfig, sinkClassLoader, null, true));
                     userCodeClassLoader = sinkClassLoader;
                     userCodeClassLoaderCreated = true;
                 } else {
@@ -470,7 +472,7 @@ public class LocalRunner implements AutoCloseable {
                     }
                     functionDetails = SinkConfigUtils.convert(
                             sinkConfig, SinkConfigUtils.validateAndExtractDetails(
-                                    sinkConfig, Thread.currentThread().getContextClassLoader(), true));
+                                    sinkConfig, Thread.currentThread().getContextClassLoader(), null, true));
                 }
             } else {
                 throw new IllegalArgumentException("Must specify Function, Source or Sink config");
@@ -576,6 +578,8 @@ public class LocalRunner implements AutoCloseable {
                     instanceConfig,
                     userCodeFile,
                     null,
+                    null,
+                    null,
                     runtimeFactory,
                     instanceLivenessCheck);
             spawners.add(runtimeSpawner);
@@ -680,6 +684,8 @@ public class LocalRunner implements AutoCloseable {
                     instanceConfig,
                     userCodeFile,
                     null,
+                    null,
+                    null,
                     runtimeFactory,
                     instanceLivenessCheck);
             spawners.add(runtimeSpawner);
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index 09ff9cfc8eb..e67899abd22 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -211,6 +211,7 @@ message FunctionMetaData {
     uint64 createTime = 4;
     map<int32, FunctionState> instanceStates = 5;
     FunctionAuthenticationSpec functionAuthSpec = 6;
+    PackageLocationMetaData transformFunctionPackageLocation = 7;
 }
 
 message FunctionAuthenticationSpec {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index e65a8e254e2..3978e0f34fd 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -61,6 +61,12 @@ public class JavaInstanceStarter implements AutoCloseable {
             listConverter = StringConverter.class)
     public String jarFile;
 
+    @Parameter(
+            names = "--transform_function_jar",
+            description = "Path to Transform Function Jar\n",
+            listConverter = StringConverter.class)
+    public String transformFunctionJarFile;
+
     @Parameter(names = "--instance_id", description = "Instance Id\n", required = true)
     public int instanceId;
 
@@ -73,6 +79,9 @@ public class JavaInstanceStarter implements AutoCloseable {
     @Parameter(names = "--pulsar_serviceurl", description = "Pulsar Service Url\n", required = true)
     public String pulsarServiceUrl;
 
+    @Parameter(names = "--transform_function_id", description = "Transform Function Id\n")
+    public String transformFunctionId;
+
     @Parameter(names = "--client_auth_plugin", description = "Client auth plugin name\n")
     public String clientAuthenticationPlugin;
 
@@ -157,6 +166,7 @@ public class JavaInstanceStarter implements AutoCloseable {
 
         InstanceConfig instanceConfig = new InstanceConfig();
         instanceConfig.setFunctionId(functionId);
+        instanceConfig.setTransformFunctionId(transformFunctionId);
         instanceConfig.setFunctionVersion(functionVersion);
         instanceConfig.setInstanceId(instanceId);
         instanceConfig.setMaxBufferedTuples(maxBufferedTuples);
@@ -220,6 +230,8 @@ public class JavaInstanceStarter implements AutoCloseable {
                 instanceConfig,
                 jarFile,
                 null, // we really dont use this in thread container
+                transformFunctionJarFile,
+                null, // we really dont use this in thread container
                 containerFactory,
                 expectedHealthCheckInterval * 1000);
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
index 208420957cc..0ee2c430a72 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
@@ -52,6 +52,7 @@ public interface RuntimeFactory extends AutoCloseable {
      */
     Runtime createContainer(
             InstanceConfig instanceConfig, String codeFile, String originalCodeFileName,
+            String transformFunctionFile, String originalTransformFunctionFileName,
             Long expectedHealthCheckInterval) throws Exception;
 
     default boolean externallyManaged() {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index a2b7bf4c254..4f9e9b672cb 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -57,6 +57,8 @@ public class RuntimeSpawner implements AutoCloseable {
     public RuntimeSpawner(InstanceConfig instanceConfig,
                           String codeFile,
                           String originalCodeFileName,
+                          String transformFunctionFile,
+                          String originalTransformFunctionFileName,
                           RuntimeFactory containerFactory, long instanceLivenessCheckFreqMs) {
         this.instanceConfig = instanceConfig;
         this.runtimeFactory = containerFactory;
@@ -65,6 +67,7 @@ public class RuntimeSpawner implements AutoCloseable {
         this.instanceLivenessCheckFreqMs = instanceLivenessCheckFreqMs;
         try {
             this.runtime = runtimeFactory.createContainer(this.instanceConfig, codeFile, originalCodeFileName,
+                    transformFunctionFile, originalTransformFunctionFileName,
                     instanceLivenessCheckFreqMs / 1000);
         } catch (Exception e) {
             throw new RuntimeException(e);
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index a1101e80c95..e5baa7ae967 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.runtime;
 
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.commons.lang3.StringUtils.isNotEmpty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.protobuf.util.JsonFormat;
 import io.prometheus.client.hotspot.BufferPoolsExports;
@@ -36,6 +37,7 @@ import java.io.InputStreamReader;
 import java.net.HttpURLConnection;
 import java.net.InetAddress;
 import java.net.URL;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -66,6 +68,7 @@ public class RuntimeUtils {
                                           String extraDependenciesDir, /* extra dependencies for running instances */
                                           String logDirectory,
                                           String originalCodeFileName,
+                                          String originalTransformFunctionFileName,
                                           String pulsarServiceUrl,
                                           String stateStorageServiceUrl,
                                           AuthenticationConfig authConfig,
@@ -85,7 +88,7 @@ public class RuntimeUtils {
         final List<String> cmd = getArgsBeforeCmd(instanceConfig, extraDependenciesDir);
 
         cmd.addAll(getCmd(instanceConfig, instanceFile, extraDependenciesDir, logDirectory,
-                originalCodeFileName, pulsarServiceUrl, stateStorageServiceUrl,
+                originalCodeFileName, originalTransformFunctionFileName, pulsarServiceUrl, stateStorageServiceUrl,
                 authConfig, shardId, grpcPort, expectedHealthCheckInterval,
                 logConfigFile, secretsProviderClassName, secretsProviderConfig,
                 installUserCodeDependencies, pythonDependencyRepository,
@@ -262,6 +265,7 @@ public class RuntimeUtils {
                                       String extraDependenciesDir, /* extra dependencies for running instances */
                                       String logDirectory,
                                       String originalCodeFileName,
+                                      String originalTransformFunctionFileName,
                                       String pulsarServiceUrl,
                                       String stateStorageServiceUrl,
                                       AuthenticationConfig authConfig,
@@ -332,9 +336,7 @@ public class RuntimeUtils {
             }
 
             if (!isEmpty(instanceConfig.getFunctionDetails().getRuntimeFlags())) {
-                for (String runtimeFlagArg : splitRuntimeArgs(instanceConfig.getFunctionDetails().getRuntimeFlags())) {
-                    args.add(runtimeFlagArg);
-                }
+                Collections.addAll(args, splitRuntimeArgs(instanceConfig.getFunctionDetails().getRuntimeFlags()));
             }
             if (instanceConfig.getFunctionDetails().getResources() != null) {
                 Function.Resources resources = instanceConfig.getFunctionDetails().getResources();
@@ -346,12 +348,16 @@ public class RuntimeUtils {
 
             args.add("--jar");
             args.add(originalCodeFileName);
+            if (isNotEmpty(originalTransformFunctionFileName)) {
+                args.add("--transform_function_jar");
+                args.add(originalTransformFunctionFileName);
+                args.add("--transform_function_id");
+                args.add(instanceConfig.getTransformFunctionId());
+            }
         } else if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.PYTHON) {
             args.add("python3");
             if (!isEmpty(instanceConfig.getFunctionDetails().getRuntimeFlags())) {
-                for (String runtimeFlagArg : splitRuntimeArgs(instanceConfig.getFunctionDetails().getRuntimeFlags())) {
-                    args.add(runtimeFlagArg);
-                }
+                Collections.addAll(args, splitRuntimeArgs(instanceConfig.getFunctionDetails().getRuntimeFlags()));
             }
             args.add(instanceFile);
             args.add("--py");
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index 50dc975075f..f7791e716d9 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -22,6 +22,7 @@ package org.apache.pulsar.functions.runtime.kubernetes;
 import static java.net.HttpURLConnection.HTTP_CONFLICT;
 import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.commons.lang3.StringUtils.isNotEmpty;
 import static org.apache.commons.lang3.StringUtils.left;
 import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
 import static org.apache.pulsar.functions.utils.FunctionCommon.roundDecimal;
@@ -74,7 +75,6 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import okhttp3.Response;
 import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
@@ -139,6 +139,7 @@ public class KubernetesRuntime implements Runtime {
     private final String configAdminCLI;
     private final String userCodePkgUrl;
     private final String originalCodeFileName;
+    private final String originalTransformFunctionFileName;
     private final String pulsarAdminUrl;
     private final SecretsProviderConfigurator secretsProviderConfigurator;
     private int percentMemoryPadding;
@@ -173,6 +174,7 @@ public class KubernetesRuntime implements Runtime {
                       String configAdminCLI,
                       String userCodePkgUrl,
                       String originalCodeFileName,
+                      String originalTransformFunctionFileName,
                       String pulsarServiceUrl,
                       String pulsarAdminUrl,
                       String stateStorageServiceUrl,
@@ -203,8 +205,11 @@ public class KubernetesRuntime implements Runtime {
         this.configAdminCLI = configAdminCLI;
         this.userCodePkgUrl = userCodePkgUrl;
         this.downloadDirectory =
-                StringUtils.isNotEmpty(downloadDirectory) ? downloadDirectory : this.pulsarRootDir; // for backward comp
+                isNotEmpty(downloadDirectory) ? downloadDirectory : this.pulsarRootDir; // for backward comp
         this.originalCodeFileName = this.downloadDirectory + "/" + originalCodeFileName;
+        this.originalTransformFunctionFileName = isNotEmpty(originalTransformFunctionFileName)
+                ? this.downloadDirectory + "/" + originalTransformFunctionFileName
+                : originalTransformFunctionFileName;
         this.pulsarAdminUrl = pulsarAdminUrl;
         this.secretsProviderConfigurator = secretsProviderConfigurator;
         this.percentMemoryPadding = percentMemoryPadding;
@@ -263,6 +268,7 @@ public class KubernetesRuntime implements Runtime {
                         extraDependenciesDir,
                         logDirectory,
                         this.originalCodeFileName,
+                        this.originalTransformFunctionFileName,
                         pulsarServiceUrl,
                         stateStorageServiceUrl,
                         authConfig,
@@ -844,49 +850,29 @@ public class KubernetesRuntime implements Runtime {
     }
 
     protected List<String> getExecutorCommand() {
-        return Arrays.asList(
-                "sh",
-                "-c",
-                String.join(" ", getDownloadCommand(instanceConfig.getFunctionDetails(), originalCodeFileName))
-                        + " && " + setShardIdEnvironmentVariableCommand()
-                        + " && " + String.join(" ", processArgs)
-        );
+        List<String> cmds =
+                new ArrayList<>(getDownloadCommand(instanceConfig.getFunctionDetails(), originalCodeFileName, false));
+        if (isNotEmpty(originalTransformFunctionFileName)) {
+            cmds.add("&&");
+            cmds.addAll(getDownloadCommand(instanceConfig.getFunctionDetails(),
+                originalTransformFunctionFileName, true));
+        }
+        cmds.add("&&");
+        cmds.add(setShardIdEnvironmentVariableCommand());
+        cmds.add("&&");
+        cmds.addAll(processArgs);
+        return Arrays.asList("sh", "-c", String.join(" ", cmds));
     }
 
-    private List<String> getDownloadCommand(Function.FunctionDetails functionDetails, String userCodeFilePath) {
+    private List<String> getDownloadCommand(Function.FunctionDetails functionDetails, String userCodeFilePath,
+                                            boolean transformFunction) {
         return getDownloadCommand(functionDetails.getTenant(), functionDetails.getNamespace(),
-                functionDetails.getName(), userCodeFilePath);
+                functionDetails.getName(), userCodeFilePath, transformFunction);
     }
 
-    private List<String> getDownloadCommand(String tenant, String namespace, String name, String userCodeFilePath) {
-
-        // add auth plugin and parameters if necessary
-        if (authenticationEnabled && authConfig != null) {
-            if (isNotBlank(authConfig.getClientAuthenticationPlugin())
-                    && isNotBlank(authConfig.getClientAuthenticationParameters())
-                    && instanceConfig.getFunctionAuthenticationSpec() != null) {
-                return Arrays.asList(
-                        pulsarRootDir + configAdminCLI,
-                        "--auth-plugin",
-                        authConfig.getClientAuthenticationPlugin(),
-                        "--auth-params",
-                        authConfig.getClientAuthenticationParameters(),
-                        "--admin-url",
-                        pulsarAdminUrl,
-                        "functions",
-                        "download",
-                        "--tenant",
-                        tenant,
-                        "--namespace",
-                        namespace,
-                        "--name",
-                        name,
-                        "--destination-file",
-                        userCodeFilePath);
-            }
-        }
-
-        return Arrays.asList(
+    private List<String> getDownloadCommand(String tenant, String namespace, String name, String userCodeFilePath,
+                                            boolean transformFunction) {
+        ArrayList<String> cmd = new ArrayList<>(Arrays.asList(
                 pulsarRootDir + configAdminCLI,
                 "--admin-url",
                 pulsarAdminUrl,
@@ -899,7 +885,26 @@ public class KubernetesRuntime implements Runtime {
                 "--name",
                 name,
                 "--destination-file",
-                userCodeFilePath);
+                userCodeFilePath));
+
+        // add auth plugin and parameters if necessary
+        if (authenticationEnabled && authConfig != null) {
+            if (isNotBlank(authConfig.getClientAuthenticationPlugin())
+                    && isNotBlank(authConfig.getClientAuthenticationParameters())
+                    && instanceConfig.getFunctionAuthenticationSpec() != null) {
+                cmd.addAll(Arrays.asList(
+                        "--auth-plugin",
+                        authConfig.getClientAuthenticationPlugin(),
+                        "--auth-params",
+                        authConfig.getClientAuthenticationParameters()));
+            }
+        }
+
+        if (transformFunction) {
+            cmd.add("--transform-function");
+        }
+
+        return cmd;
     }
 
     private static String setShardIdEnvironmentVariableCommand() {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
index aa000e789c2..740d6cc0fdc 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
@@ -269,6 +269,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
     @Override
     public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String codePkgUrl,
                                              String originalCodeFileName,
+                                             String transformFunctionPkgUrl,
+                                             String originalTransformFunctionFileName,
                                              Long expectedHealthCheckInterval) throws Exception {
         String instanceFile = null;
         switch (instanceConfig.getFunctionDetails().getRuntime()) {
@@ -327,6 +329,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
             configAdminCLI,
             codePkgUrl,
             originalCodeFileName,
+            originalTransformFunctionFileName,
             pulsarServiceUrl,
             pulsarAdminUrl,
             stateStorageServiceUri,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java
index 3c17633ea61..678296eeec0 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java
@@ -82,6 +82,7 @@ class ProcessRuntime implements Runtime {
                    String narExtractionDirectory,
                    String logDirectory,
                    String codeFile,
+                   String transformFunctionFile,
                    String pulsarServiceUrl,
                    String stateStorageServiceUrl,
                    AuthenticationConfig authConfig,
@@ -133,6 +134,7 @@ class ProcessRuntime implements Runtime {
                     ? extraDependenciesDir : null,
             logDirectory,
             codeFile,
+            transformFunctionFile,
             pulsarServiceUrl,
             stateStorageServiceUrl,
             authConfig,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
index 36e09bafd34..2324d0ee441 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
@@ -193,6 +193,8 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
     @Override
     public ProcessRuntime createContainer(InstanceConfig instanceConfig, String codeFile,
                                           String originalCodeFileName,
+                                          String transformFunctionFile,
+                                          String originalTransformFunctionFileName,
                                           Long expectedHealthCheckInterval) throws Exception {
         String instanceFile = null;
         switch (instanceConfig.getFunctionDetails().getRuntime()) {
@@ -223,6 +225,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
             narExtractionDirectory,
             logDirectory,
             codeFile,
+            transformFunctionFile,
             pulsarServiceUrl,
             stateStorageServiceUrl,
             authConfig,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
index f2aa1ca6f98..d1b53673209 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
@@ -64,6 +64,7 @@ public class ThreadRuntime implements Runtime {
     private final ThreadGroup threadGroup;
     private final FunctionCacheManager fnCache;
     private final String jarFile;
+    private final String transformFunctionFile;
     private final ClientBuilder clientBuilder;
     private final PulsarClient pulsarClient;
     private final PulsarAdmin pulsarAdmin;
@@ -79,6 +80,7 @@ public class ThreadRuntime implements Runtime {
                   FunctionCacheManager fnCache,
                   ThreadGroup threadGroup,
                   String jarFile,
+                  String transformFunctionFile,
                   PulsarClient client,
                   ClientBuilder clientBuilder,
                   PulsarAdmin pulsarAdmin,
@@ -97,6 +99,7 @@ public class ThreadRuntime implements Runtime {
         this.threadGroup = threadGroup;
         this.fnCache = fnCache;
         this.jarFile = jarFile;
+        this.transformFunctionFile = transformFunctionFile;
         this.clientBuilder = clientBuilder;
         this.pulsarClient = client;
         this.pulsarAdmin = pulsarAdmin;
@@ -110,14 +113,15 @@ public class ThreadRuntime implements Runtime {
     }
 
     private static ClassLoader getFunctionClassLoader(InstanceConfig instanceConfig,
+                                                      String functionId,
                                                       String jarFile,
                                                       String narExtractionDirectory,
                                                       FunctionCacheManager fnCache,
                                                       Optional<ConnectorsManager> connectorsManager,
-                                                      Optional<FunctionsManager> functionsManager) throws Exception {
-        if (FunctionCommon.isFunctionCodeBuiltin(instanceConfig.getFunctionDetails())) {
-            Function.FunctionDetails.ComponentType componentType =
-                    InstanceUtils.calculateSubjectType(instanceConfig.getFunctionDetails());
+                                                      Optional<FunctionsManager> functionsManager,
+                                                      Function.FunctionDetails.ComponentType componentType)
+            throws Exception {
+        if (FunctionCommon.isFunctionCodeBuiltin(instanceConfig.getFunctionDetails(), componentType)) {
             if (componentType == Function.FunctionDetails.ComponentType.FUNCTION && functionsManager.isPresent()) {
                 return functionsManager.get()
                         .getFunction(instanceConfig.getFunctionDetails().getBuiltin())
@@ -134,11 +138,12 @@ public class ThreadRuntime implements Runtime {
                         .getClassLoader();
             }
         }
-        return loadJars(jarFile, instanceConfig, narExtractionDirectory, fnCache);
+        return loadJars(jarFile, instanceConfig, functionId, narExtractionDirectory, fnCache);
     }
 
     private static ClassLoader loadJars(String jarFile,
                                  InstanceConfig instanceConfig,
+                                 String functionId,
                                  String narExtractionDirectory,
                                  FunctionCacheManager fnCache) throws Exception {
         if (jarFile == null) {
@@ -151,7 +156,7 @@ public class ThreadRuntime implements Runtime {
                 log.info("Trying Loading file as NAR file: {}", jarFile);
                 // Let's first try to treat it as a nar archive
                 fnCache.registerFunctionInstanceWithArchive(
-                        instanceConfig.getFunctionId(),
+                        functionId,
                         instanceConfig.getInstanceName(),
                         jarFile, narExtractionDirectory);
                 loadedAsNar = true;
@@ -165,16 +170,16 @@ public class ThreadRuntime implements Runtime {
             log.info("Load file as simple JAR file: {}", jarFile);
             // create the function class loader
             fnCache.registerFunctionInstance(
-                    instanceConfig.getFunctionId(),
+                    functionId,
                     instanceConfig.getInstanceName(),
                     Arrays.asList(jarFile),
                     Collections.emptyList());
         }
 
         log.info("Initialize function class loader for function {} at function cache manager, functionClassLoader: {}",
-                instanceConfig.getFunctionDetails().getName(), fnCache.getClassLoader(instanceConfig.getFunctionId()));
+                instanceConfig.getFunctionDetails().getName(), fnCache.getClassLoader(functionId));
 
-        fnClassLoader = fnCache.getClassLoader(instanceConfig.getFunctionId());
+        fnClassLoader = fnCache.getClassLoader(functionId);
         if (null == fnClassLoader) {
             throw new Exception("No function class loader available.");
         }
@@ -190,8 +195,13 @@ public class ThreadRuntime implements Runtime {
 
         // extract class loader for function
         ClassLoader functionClassLoader =
-                getFunctionClassLoader(instanceConfig, jarFile, narExtractionDirectory, fnCache, connectorsManager,
-                        functionsManager);
+                getFunctionClassLoader(instanceConfig, instanceConfig.getFunctionId(), jarFile, narExtractionDirectory,
+                        fnCache, connectorsManager, functionsManager,
+                        InstanceUtils.calculateSubjectType(instanceConfig.getFunctionDetails()));
+
+        ClassLoader transformFunctionClassLoader = transformFunctionFile == null ? null : getFunctionClassLoader(
+                instanceConfig, instanceConfig.getTransformFunctionId(), transformFunctionFile, narExtractionDirectory,
+                fnCache, connectorsManager, functionsManager, Function.FunctionDetails.ComponentType.FUNCTION);
 
         // re-initialize JavaInstanceRunnable so that variables in constructor can be re-initialized
         this.javaInstanceRunnable = new JavaInstanceRunnable(
@@ -203,7 +213,8 @@ public class ThreadRuntime implements Runtime {
                 stateStorageServiceUrl,
                 secretsProvider,
                 collectorRegistry,
-                functionClassLoader);
+                functionClassLoader,
+                transformFunctionClassLoader);
 
         log.info("ThreadContainer starting function with instanceId {} functionId {} namespace {}",
                 instanceConfig.getInstanceId(),
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index da8a9bb14dd..64ea2d0f172 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -177,6 +177,8 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
     @Override
     public ThreadRuntime createContainer(InstanceConfig instanceConfig, String jarFile,
                                          String originalCodeFileName,
+                                         String transformFunctionFile,
+                                         String originalTransformFunctionFileName,
                                          Long expectedHealthCheckInterval) {
         SecretsProvider secretsProvider = defaultSecretsProvider;
         if (secretsProvider == null) {
@@ -196,6 +198,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
             fnCache,
             threadGroup,
             jarFile,
+            transformFunctionFile,
             pulsarClient,
             clientBuilder,
             pulsarAdmin,
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
index 1651a7ba186..f7cdf707d32 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
@@ -191,6 +191,7 @@ public class RuntimeUtilsTest {
                 "extraDependenciesDir", /* extra dependencies for running instances */
                 "logDirectory",
                 "originalCodeFileName",
+                "originalExtraFileName",
                 "pulsarServiceUrl",
                 "stateStorageServiceUrl",
                 AuthenticationConfig.builder().build(),
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index df5c369303c..33451a316e4 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -284,6 +284,7 @@ public class KubernetesRuntimeTest {
 
         config.setFunctionDetails(createFunctionDetails(runtime, addSecrets));
         config.setFunctionId(java.util.UUID.randomUUID().toString());
+        config.setTransformFunctionId(java.util.UUID.randomUUID().toString());
         config.setFunctionVersion("1.0");
         config.setInstanceId(0);
         config.setMaxBufferedTuples(1024);
@@ -304,7 +305,7 @@ public class KubernetesRuntimeTest {
         factory = createKubernetesRuntimeFactory(null, percentMemoryPadding, 1.0, 1.0);
         InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, true);
 
-        KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
+        KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, null, null, 30l);
 
         Function.Resources resources = Function.Resources.newBuilder().setRam(ram).build();
 
@@ -364,7 +365,7 @@ public class KubernetesRuntimeTest {
 
         InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
         factory = createKubernetesRuntimeFactory(null, 10, cpuOverCommitRatio, memoryOverCommitRatio);
-        KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
+        KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, null, null, 30l);
         List<String> args = container.getProcessArgs();
 
         // check padding and xmx
@@ -387,7 +388,7 @@ public class KubernetesRuntimeTest {
         List<String> args;
         try (MockedStatic<SystemUtils> systemUtils = Mockito.mockStatic(SystemUtils.class, Mockito.CALLS_REAL_METHODS)) {
             systemUtils.when(() -> SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)).thenReturn(true);
-            container = factory.createContainer(config, userJarFile, userJarFile, 30L);
+            container = factory.createContainer(config, userJarFile, userJarFile, userJarFile, userJarFile,30L);
             args = container.getProcessArgs();
         }
 
@@ -400,14 +401,14 @@ public class KubernetesRuntimeTest {
         if (null != depsDir) {
             extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir;
             classpath = classpath + ":" + depsDir + "/*";
-            totalArgs = 42;
-            portArg = 29;
-            metricsPortArg = 31;
+            totalArgs = 46;
+            portArg = 33;
+            metricsPortArg = 35;
         } else {
             extraDepsEnv = "";
-            portArg = 28;
-            metricsPortArg = 30;
-            totalArgs = 41;
+            portArg = 32;
+            metricsPortArg = 34;
+            totalArgs = 45;
         }
         if (secretsAttached) {
             totalArgs += 4;
@@ -441,8 +442,11 @@ public class KubernetesRuntimeTest {
                 + " --add-opens java.base/sun.net=ALL-UNNAMED"
                 + " -Xmx" + RESOURCES.getRam()
                 + " org.apache.pulsar.functions.instance.JavaInstanceMain"
-                + " --jar " + jarLocation + " --instance_id "
-                + "$SHARD_ID" + " --function_id " + config.getFunctionId()
+                + " --jar " + jarLocation
+                + " --transform_function_jar " + jarLocation
+                + " --transform_function_id " +  config.getTransformFunctionId()
+                + " --instance_id " + "$SHARD_ID"
+                + " --function_id " + config.getFunctionId()
                 + " --function_version " + config.getFunctionVersion()
                 + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
@@ -491,7 +495,7 @@ public class KubernetesRuntimeTest {
     }
 
     private void verifyPythonInstance(InstanceConfig config, String extraDepsDir, boolean secretsAttached) throws Exception {
-        KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
+        KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, null, null, 30l);
         List<String> args = container.getProcessArgs();
 
         int totalArgs;
@@ -582,7 +586,7 @@ public class KubernetesRuntimeTest {
 
         InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
         config.setFunctionDetails(functionDetails);
-        KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
+        KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, null, null, 30l);
         V1StatefulSet spec = container.createStatefulSet();
 
         assertEquals(spec.getMetadata().getLabels().get("tenant"), "tenant");
@@ -741,7 +745,7 @@ public class KubernetesRuntimeTest {
         factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0);
 
         verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false);
-        KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
+        KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, null, null, 30l);
 
         V1Service serviceSpec = container.createService();
         assertEquals(serviceSpec.getMetadata().getNamespace(), "default");
@@ -760,7 +764,7 @@ public class KubernetesRuntimeTest {
         factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0, Optional.of(new BasicKubernetesManifestCustomizer()));
 
         verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false);
-        KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
+        KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, null, null, 30l);
         V1StatefulSet spec = container.createStatefulSet();
         assertEquals(spec.getMetadata().getAnnotations().get("annotation"), "test");
         assertEquals(spec.getMetadata().getLabels().get("label"), "test");
@@ -801,7 +805,7 @@ public class KubernetesRuntimeTest {
         factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0, Optional.of(new TestKubernetesCustomManifestCustomizer()));
 
         verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false);
-        KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
+        KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, null, null, 30l);
         V1StatefulSet spec = container.createStatefulSet();
         assertEquals(spec.getSpec().getTemplate().getSpec().getServiceAccountName(), "my-service-account");
     }
@@ -830,7 +834,7 @@ public class KubernetesRuntimeTest {
     }
 
     private void verifyGolangInstance(InstanceConfig config) throws Exception {
-        KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
+        KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, null, null, 30l);
         List<String> args = container.getProcessArgs();
 
         int totalArgs = 8;
@@ -1007,7 +1011,7 @@ public class KubernetesRuntimeTest {
                 "org.apache.pulsar.functions.runtime.kubernetes.BasicKubernetesManifestCustomizer", configs);
 
         verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false);
-        KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
+        KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, null, null, 30l);
         V1StatefulSet spec = container.createStatefulSet();
         assertEquals(spec.getMetadata().getAnnotations().get("annotation"), "test");
         assertEquals(spec.getMetadata().getLabels().get("label"), "test");
@@ -1055,7 +1059,7 @@ public class KubernetesRuntimeTest {
                 "org.apache.pulsar.functions.runtime.kubernetes.BasicKubernetesManifestCustomizer", configs);
 
         verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false);
-        KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
+        KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, null, null, 30l);
         V1StatefulSet spec = container.createStatefulSet();
         assertEquals(spec.getMetadata().getAnnotations().get("annotation"), "test");
         assertEquals(spec.getMetadata().getLabels().get("label"), "test");
@@ -1141,7 +1145,7 @@ public class KubernetesRuntimeTest {
                 new DefaultSecretsProviderConfigurator(), Mockito.mock(ConnectorsManager.class),
                 Mockito.mock(FunctionsManager.class), Optional.empty(), Optional.empty());
         InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, true);
-        KubernetesRuntime container = kubernetesRuntimeFactory.createContainer(config, userJarFile, userJarFile, 30l);
+        KubernetesRuntime container = kubernetesRuntimeFactory.createContainer(config, userJarFile, userJarFile, null, null, 30l);
         V1PodTemplateSpec template = container.createStatefulSet().getSpec().getTemplate();
         Map<String, String> annotations =
                 template.getMetadata().getAnnotations();
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
index 6d80ec871b6..ea1d6bbc629 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
@@ -289,16 +289,16 @@ public class ProcessRuntimeTest {
         List<String> args;
         try (MockedStatic<SystemUtils> systemUtils = Mockito.mockStatic(SystemUtils.class, Mockito.CALLS_REAL_METHODS)) {
             systemUtils.when(() -> SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)).thenReturn(true);
-            ProcessRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30L);
+            ProcessRuntime container = factory.createContainer(config, userJarFile, userJarFile,
+                    userJarFile, userJarFile,30L);
             args = container.getProcessArgs();
         }
 
-
         String classpath = javaInstanceJarFile;
         String extraDepsEnv;
         int portArg;
         int metricsPortArg;
-        int totalArgCount = 44;
+        int totalArgCount = 48;
         if (webServiceUrl != null && config.isExposePulsarAdminClientEnabled()) {
             totalArgCount += 3;
         }
@@ -306,13 +306,13 @@ public class ProcessRuntimeTest {
             assertEquals(args.size(), totalArgCount);
             extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir;
             classpath = classpath + ":" + depsDir + "/*";
-            portArg = 27;
-            metricsPortArg = 29;
+            portArg = 31;
+            metricsPortArg = 33;
         } else {
             assertEquals(args.size(), totalArgCount-1);
             extraDepsEnv = "";
-            portArg = 26;
-            metricsPortArg = 28;
+            portArg = 30;
+            metricsPortArg = 32;
         }
         if (webServiceUrl != null && config.isExposePulsarAdminClientEnabled()) {
             portArg += 3;
@@ -331,8 +331,11 @@ public class ProcessRuntimeTest {
                 + " -Dio.netty.tryReflectionSetAccessible=true"
                 + " --add-opens java.base/sun.net=ALL-UNNAMED"
                 + " org.apache.pulsar.functions.instance.JavaInstanceMain"
-                + " --jar " + userJarFile + " --instance_id "
-                + config.getInstanceId() + " --function_id " + config.getFunctionId()
+                + " --jar " + userJarFile
+                + " --transform_function_jar " + userJarFile
+                + " --transform_function_id " +  config.getTransformFunctionId()
+                + " --instance_id " + config.getInstanceId()
+                + " --function_id " + config.getFunctionId()
                 + " --function_version " + config.getFunctionVersion()
                 + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
@@ -368,7 +371,7 @@ public class ProcessRuntimeTest {
     }
 
     private void verifyPythonInstance(InstanceConfig config, String extraDepsDir) throws Exception {
-        ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l);
+        ProcessRuntime container = factory.createContainer(config, userJarFile, null, null, null,30l);
         List<String> args = container.getProcessArgs();
 
         int totalArgs = 36;
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index 49b71afcad7..54b23d5200b 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -56,6 +56,7 @@ import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.api.WindowFunction;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails.Runtime;
 import org.apache.pulsar.functions.utils.functions.FunctionUtils;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
@@ -103,42 +104,50 @@ public class FunctionCommon {
         return getFunctionTypes(functionClass, isWindowConfigPresent);
     }
 
-    public static Class<?>[] getFunctionTypes(Class userClass, boolean isWindowConfigPresent) {
-        Class<?>[] typeArgs;
+    public static Class<?>[] getFunctionTypes(Class<?> userClass, boolean isWindowConfigPresent) {
+        Class<?> classParent = getFunctionClassParent(userClass, isWindowConfigPresent);
+        Class<?>[] typeArgs = TypeResolver.resolveRawArguments(classParent, userClass);
         // if window function
         if (isWindowConfigPresent) {
-            if (WindowFunction.class.isAssignableFrom(userClass)) {
-                typeArgs = getFunctionTypesUnwrappingRecordIfNeeded(WindowFunction.class, userClass);
-            } else {
-                typeArgs = getFunctionTypesUnwrappingRecordIfNeeded(java.util.function.Function.class, userClass);
+            if (classParent.equals(java.util.function.Function.class)) {
                 if (!typeArgs[0].equals(Collection.class)) {
                     throw new IllegalArgumentException("Window function must take a collection as input");
                 }
-                Type type = TypeResolver.resolveGenericType(java.util.function.Function.class, userClass);
-                Type collectionType = ((ParameterizedType) type).getActualTypeArguments()[0];
-                Type actualInputType = ((ParameterizedType) collectionType).getActualTypeArguments()[0];
-                typeArgs[0] = (Class<?>) actualInputType;
+                typeArgs[0] = (Class<?>) unwrapType(classParent, userClass, 0);
+            }
+        }
+        if (typeArgs[1].equals(Record.class)) {
+            typeArgs[1] = (Class<?>) unwrapType(classParent, userClass, 1);
+        }
+
+        return typeArgs;
+    }
+
+    public static Class<?>[] getRawFunctionTypes(Class<?> userClass, boolean isWindowConfigPresent) {
+        Class<?> classParent = getFunctionClassParent(userClass, isWindowConfigPresent);
+        return TypeResolver.resolveRawArguments(classParent, userClass);
+    }
+
+    public static Class<?> getFunctionClassParent(Class<?> userClass, boolean isWindowConfigPresent) {
+        if (isWindowConfigPresent) {
+            if (WindowFunction.class.isAssignableFrom(userClass)) {
+                return WindowFunction.class;
+            } else {
+                return java.util.function.Function.class;
             }
         } else {
             if (Function.class.isAssignableFrom(userClass)) {
-                typeArgs = getFunctionTypesUnwrappingRecordIfNeeded(Function.class, userClass);
+                return Function.class;
             } else {
-                typeArgs = getFunctionTypesUnwrappingRecordIfNeeded(java.util.function.Function.class, userClass);
+                return java.util.function.Function.class;
             }
         }
-
-        return typeArgs;
     }
 
-    private static Class<?>[] getFunctionTypesUnwrappingRecordIfNeeded(Class<?> type, Class<?> subType) {
-        Class<?>[] typeArgs = TypeResolver.resolveRawArguments(type, subType);
-        if (typeArgs[1].equals(Record.class)) {
-            Type genericType = TypeResolver.resolveGenericType(type, subType);
-            Type recordType = ((ParameterizedType) genericType).getActualTypeArguments()[1];
-            Type actualInputType = ((ParameterizedType) recordType).getActualTypeArguments()[0];
-            typeArgs[1] = (Class<?>) actualInputType;
-        }
-        return typeArgs;
+    private static Type unwrapType(Class<?> type, Class<?> subType, int position) {
+        Type genericType = TypeResolver.resolveGenericType(type, subType);
+        Type argType = ((ParameterizedType) genericType).getActualTypeArguments()[position];
+        return ((ParameterizedType) argType).getActualTypeArguments()[0];
     }
 
     public static Object createInstance(String userClassName, ClassLoader classLoader) {
@@ -401,7 +410,7 @@ public class FunctionCommon {
     }
 
     public static ClassLoader getClassLoaderFromPackage(
-            org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType componentType,
+            ComponentType componentType,
             String className,
             File packageFile,
             String narExtractionDirectory) {
@@ -437,11 +446,9 @@ public class FunctionCommon {
                             narClassLoaderException);
                 }
                 try {
-                    if (componentType
-                            == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.FUNCTION) {
-                        connectorClassName = FunctionUtils.getFunctionClass((NarClassLoader) narClassLoader);
-                    } else if (componentType
-                            == org.apache.pulsar.functions.proto.Function.FunctionDetails.ComponentType.SOURCE) {
+                    if (componentType == ComponentType.FUNCTION) {
+                        connectorClassName = FunctionUtils.getFunctionClass(narClassLoader);
+                    } else if (componentType == ComponentType.SOURCE) {
                         connectorClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) narClassLoader);
                     } else {
                         connectorClassName = ConnectorUtils.getIOSinkClass((NarClassLoader) narClassLoader);
@@ -532,26 +539,28 @@ public class FunctionCommon {
     }
 
     public static boolean isFunctionCodeBuiltin(
-            org.apache.pulsar.functions.proto.Function.FunctionDetailsOrBuilder functionDetails) {
-        if (functionDetails.hasSource()) {
+            org.apache.pulsar.functions.proto.Function.FunctionDetailsOrBuilder functionDetail) {
+        return isFunctionCodeBuiltin(functionDetail, functionDetail.getComponentType());
+    }
+
+    public static boolean isFunctionCodeBuiltin(
+            org.apache.pulsar.functions.proto.Function.FunctionDetailsOrBuilder functionDetails,
+            ComponentType componentType) {
+        if (componentType == ComponentType.SOURCE && functionDetails.hasSource()) {
             org.apache.pulsar.functions.proto.Function.SourceSpec sourceSpec = functionDetails.getSource();
             if (!isEmpty(sourceSpec.getBuiltin())) {
                 return true;
             }
         }
 
-        if (functionDetails.hasSink()) {
+        if (componentType == ComponentType.SINK && functionDetails.hasSink()) {
             org.apache.pulsar.functions.proto.Function.SinkSpec sinkSpec = functionDetails.getSink();
             if (!isEmpty(sinkSpec.getBuiltin())) {
                 return true;
             }
         }
 
-        if (!isEmpty(functionDetails.getBuiltin())) {
-            return true;
-        }
-
-        return false;
+        return componentType == ComponentType.FUNCTION && !isEmpty(functionDetails.getBuiltin());
     }
 
     public static SubscriptionInitialPosition convertFromFunctionDetailsSubscriptionPosition(
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index d24542185e5..bcd56b1d25f 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -562,7 +562,6 @@ public class FunctionConfigUtils {
         } else if (functionConfig.getGo() != null) {
             functionConfig.setRuntime(FunctionConfig.Runtime.GO);
         }
-
         WindowConfig windowConfig = functionConfig.getWindowConfig();
         if (windowConfig != null) {
             WindowConfigUtils.inferMissingArguments(windowConfig);
@@ -570,7 +569,7 @@ public class FunctionConfigUtils {
         }
     }
 
-    private static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConfig, ClassLoader clsLoader) {
+    public static ExtractedFunctionDetails doJavaChecks(FunctionConfig functionConfig, ClassLoader clsLoader) {
 
         String functionClassName = functionConfig.getClassName();
         Class functionClass;
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 31b147a1caf..2a06339d00a 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -22,6 +22,8 @@ import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.functions.utils.FunctionCommon.convertFromFunctionDetailsSubscriptionPosition;
 import static org.apache.pulsar.functions.utils.FunctionCommon.convertProcessingGuarantee;
+import static org.apache.pulsar.functions.utils.FunctionCommon.getFunctionTypes;
+import static org.apache.pulsar.functions.utils.FunctionCommon.getRawFunctionTypes;
 import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.gson.Gson;
@@ -50,9 +52,11 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.config.validation.ConfigValidation;
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.utils.functions.FunctionUtils;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 
 @Slf4j
@@ -64,6 +68,7 @@ public class SinkConfigUtils {
     public static class ExtractedSinkDetails {
         private String sinkClassName;
         private String typeArg;
+        private String functionClassName;
     }
 
     public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetails sinkDetails) throws IOException {
@@ -88,7 +93,14 @@ public class SinkConfigUtils {
         } else {
             functionDetailsBuilder.setParallelism(1);
         }
-        functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
+        if (sinkDetails.getFunctionClassName() != null) {
+            functionDetailsBuilder.setClassName(sinkDetails.getFunctionClassName());
+        } else {
+            functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
+        }
+        if (sinkConfig.getTransformFunctionConfig() != null) {
+            functionDetailsBuilder.setUserConfig(sinkConfig.getTransformFunctionConfig());
+        }
         if (sinkConfig.getProcessingGuarantees() != null) {
             functionDetailsBuilder.setProcessingGuarantees(
                     convertProcessingGuarantee(sinkConfig.getProcessingGuarantees()));
@@ -224,6 +236,11 @@ public class SinkConfigUtils {
             sinkSpecBuilder.setBuiltin(builtin);
         }
 
+        if (!isEmpty(sinkConfig.getTransformFunction())
+                && sinkConfig.getTransformFunction().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
+            functionDetailsBuilder.setBuiltin(sinkConfig.getTransformFunction().replaceFirst("^builtin://", ""));
+        }
+
         if (sinkConfig.getConfigs() != null) {
             sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfig.getConfigs()));
         }
@@ -369,11 +386,23 @@ public class SinkConfigUtils {
             }
         }
 
+        if (!isEmpty(functionDetails.getBuiltin())) {
+            sinkConfig.setTransformFunction("builtin://" + functionDetails.getBuiltin());
+        }
+        if (!functionDetails.getClassName().equals(IdentityFunction.class.getName())) {
+            sinkConfig.setTransformFunctionClassName(functionDetails.getClassName());
+        }
+        if (!isEmpty(functionDetails.getUserConfig())) {
+            sinkConfig.setTransformFunctionConfig(functionDetails.getUserConfig());
+        }
+
+
         return sinkConfig;
     }
 
     public static ExtractedSinkDetails validateAndExtractDetails(SinkConfig sinkConfig,
                                                                  ClassLoader sinkClassLoader,
+                                                                 ClassLoader functionClassLoader,
                                                                  boolean validateConnectorConfig) {
         if (isEmpty(sinkConfig.getTenant())) {
             throw new IllegalArgumentException("Sink tenant cannot be null");
@@ -429,18 +458,47 @@ public class SinkConfigUtils {
                     String.format("Sink class %s not found in class loader", sinkClassName), e);
         }
 
-        // extract type from sink class
-        Class<?> typeArg = getSinkType(sinkClass);
+        String functionClassName = sinkConfig.getTransformFunctionClassName();
+        Class<?> typeArg;
+        ClassLoader inputClassLoader;
+        if (functionClassLoader != null) {
+            // if function class name in sink config is not set, this should be a built-in function
+            // thus we should try to find it class name in the NAR service definition
+            if (functionClassName == null) {
+                try {
+                    functionClassName = FunctionUtils.getFunctionClass(functionClassLoader);
+                } catch (IOException e) {
+                    throw new IllegalArgumentException("Failed to extract function class from archive", e);
+                }
+            }
+            Class functionClass;
+            try {
+                functionClass = functionClassLoader.loadClass(functionClassName);
+            } catch (ClassNotFoundException e) {
+                throw new IllegalArgumentException(
+                        String.format("Function class %s not found in class loader", functionClassName), e);
+            }
+            // extract type from transform function class
+            if (!getRawFunctionTypes(functionClass, false)[1].equals(Record.class)) {
+                throw new IllegalArgumentException("Sink transform function output must be of type Record");
+            }
+            typeArg = getFunctionTypes(functionClass, false)[0];
+            inputClassLoader = functionClassLoader;
+        } else {
+            // extract type from sink class
+            typeArg = getSinkType(sinkClass);
+            inputClassLoader = sinkClassLoader;
+        }
 
         if (sinkConfig.getTopicToSerdeClassName() != null) {
            for (String serdeClassName : sinkConfig.getTopicToSerdeClassName().values()) {
-               ValidatorUtils.validateSerde(serdeClassName, typeArg, sinkClassLoader, true);
+               ValidatorUtils.validateSerde(serdeClassName, typeArg, inputClassLoader, true);
            }
         }
 
         if (sinkConfig.getTopicToSchemaType() != null) {
             for (String schemaType : sinkConfig.getTopicToSchemaType().values()) {
-                ValidatorUtils.validateSchema(schemaType, typeArg, sinkClassLoader, true);
+                ValidatorUtils.validateSchema(schemaType, typeArg, inputClassLoader, true);
             }
         }
 
@@ -453,13 +511,13 @@ public class SinkConfigUtils {
                     throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
                 }
                 if (!isEmpty(consumerSpec.getSerdeClassName())) {
-                    ValidatorUtils.validateSerde(consumerSpec.getSerdeClassName(), typeArg, sinkClassLoader, true);
+                    ValidatorUtils.validateSerde(consumerSpec.getSerdeClassName(), typeArg, inputClassLoader, true);
                 }
                 if (!isEmpty(consumerSpec.getSchemaType())) {
-                    ValidatorUtils.validateSchema(consumerSpec.getSchemaType(), typeArg, sinkClassLoader, true);
+                    ValidatorUtils.validateSchema(consumerSpec.getSchemaType(), typeArg, inputClassLoader, true);
                 }
                 if (consumerSpec.getCryptoConfig() != null) {
-                    ValidatorUtils.validateCryptoKeyReader(consumerSpec.getCryptoConfig(), sinkClassLoader, false);
+                    ValidatorUtils.validateCryptoKeyReader(consumerSpec.getCryptoConfig(), inputClassLoader, false);
                 }
             }
         }
@@ -469,7 +527,7 @@ public class SinkConfigUtils {
             validateSinkConfig(sinkConfig, (NarClassLoader) sinkClassLoader);
         }
 
-        return new ExtractedSinkDetails(sinkClassName, typeArg.getName());
+        return new ExtractedSinkDetails(sinkClassName, typeArg.getName(), functionClassName);
     }
 
     private static Collection<String> collectAllInputTopics(SinkConfig sinkConfig) {
@@ -615,6 +673,15 @@ public class SinkConfigUtils {
         if (newConfig.getCleanupSubscription() != null) {
             mergedConfig.setCleanupSubscription(newConfig.getCleanupSubscription());
         }
+        if (newConfig.getTransformFunction() != null) {
+            mergedConfig.setTransformFunction(newConfig.getTransformFunction());
+        }
+        if (newConfig.getTransformFunctionClassName() != null) {
+            mergedConfig.setTransformFunctionClassName(newConfig.getTransformFunctionClassName());
+        }
+        if (newConfig.getTransformFunctionConfig() != null) {
+            mergedConfig.setTransformFunctionConfig(newConfig.getTransformFunctionConfig());
+        }
 
         return mergedConfig;
     }
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index 60f7d30052e..4f316cac7c7 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -70,7 +70,7 @@ public class SinkConfigUtilsTest {
 
         assertThrows(IllegalArgumentException.class, () -> {
             SinkConfigUtils.convert(sinkConfig,
-                    new SinkConfigUtils.ExtractedSinkDetails(null, null));
+                    new SinkConfigUtils.ExtractedSinkDetails(null, null, null));
         });
     }
 
@@ -111,7 +111,10 @@ public class SinkConfigUtilsTest {
 
         sinkConfig.setResources(Resources.getDefaultResources());
 
-        Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null));
+        sinkConfig.setTransformFunction("builtin://transform");
+        sinkConfig.setTransformFunctionConfig("{\"key\": \"value\"}");
+
+        Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null, null));
         assertEquals(Function.SubscriptionType.SHARED, functionDetails.getSource().getSubscriptionType());
         SinkConfig convertedConfig = SinkConfigUtils.convertFromDetails(functionDetails);
         assertEquals(
@@ -122,7 +125,7 @@ public class SinkConfigUtilsTest {
         sinkConfig.setRetainOrdering(true);
         sinkConfig.setRetainKeyOrdering(false);
 
-        functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null));
+        functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null, null));
         assertEquals(Function.SubscriptionType.FAILOVER, functionDetails.getSource().getSubscriptionType());
         convertedConfig = SinkConfigUtils.convertFromDetails(functionDetails);
         assertEquals(
@@ -132,7 +135,7 @@ public class SinkConfigUtilsTest {
         sinkConfig.setRetainOrdering(false);
         sinkConfig.setRetainKeyOrdering(true);
 
-        functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null));
+        functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null, null));
         assertEquals(Function.SubscriptionType.KEY_SHARED, functionDetails.getSource().getSubscriptionType());
         convertedConfig = SinkConfigUtils.convertFromDetails(functionDetails);
         assertEquals(
@@ -146,7 +149,7 @@ public class SinkConfigUtilsTest {
         for (Boolean testcase : testcases) {
             SinkConfig sinkConfig = createSinkConfig();
             sinkConfig.setRetainOrdering(testcase);
-            Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null));
+            Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null, null));
             SinkConfig result = SinkConfigUtils.convertFromDetails(functionDetails);
             assertEquals(result.getRetainOrdering(), testcase != null ? testcase : Boolean.valueOf(false));
         }
@@ -158,7 +161,7 @@ public class SinkConfigUtilsTest {
         for (Boolean testcase : testcases) {
             SinkConfig sinkConfig = createSinkConfig();
             sinkConfig.setRetainKeyOrdering(testcase);
-            Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null));
+            Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null, null));
             SinkConfig result = SinkConfigUtils.convertFromDetails(functionDetails);
             assertEquals(result.getRetainKeyOrdering(), testcase != null ? testcase : Boolean.valueOf(false));
         }
@@ -176,7 +179,7 @@ public class SinkConfigUtilsTest {
         for (FunctionConfig.ProcessingGuarantees testcase : testcases) {
             SinkConfig sinkConfig = createSinkConfig();
             sinkConfig.setProcessingGuarantees(testcase);
-            Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null));
+            Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null, null));
             SinkConfig result = SinkConfigUtils.convertFromDetails(functionDetails);
             assertEquals(result.getProcessingGuarantees(), testcase == null ? ATLEAST_ONCE : testcase);
         }
@@ -189,7 +192,7 @@ public class SinkConfigUtilsTest {
         for (Boolean testcase : testcases) {
             SinkConfig sinkConfig = createSinkConfig();
             sinkConfig.setCleanupSubscription(testcase);
-            Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null));
+            Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null, null));
             SinkConfig result = SinkConfigUtils.convertFromDetails(functionDetails);
             assertEquals(result.getCleanupSubscription(), testcase == null ? Boolean.valueOf(true) : testcase);
         }
@@ -425,6 +428,57 @@ public class SinkConfigUtilsTest {
         );
     }
 
+    @Test
+    public void testMergeDifferentTransformFunction() {
+        SinkConfig sinkConfig = createSinkConfig();
+        String newFunction = "builtin://new";
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("transformFunction", newFunction);
+        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
+        assertEquals(
+                mergedConfig.getTransformFunction(),
+                newFunction
+        );
+        mergedConfig.setTransformFunction(sinkConfig.getTransformFunction());
+        assertEquals(
+                new Gson().toJson(sinkConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test
+    public void testMergeDifferentTransformFunctionClassName() {
+        SinkConfig sinkConfig = createSinkConfig();
+        String newFunctionClassName = "NewTransformFunction";
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("transformFunctionClassName", newFunctionClassName);
+        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
+        assertEquals(
+                mergedConfig.getTransformFunctionClassName(),
+                newFunctionClassName
+        );
+        mergedConfig.setTransformFunctionClassName(sinkConfig.getTransformFunctionClassName());
+        assertEquals(
+                new Gson().toJson(sinkConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
+    @Test
+    public void testMergeDifferentTransformFunctionConfig() {
+        SinkConfig sinkConfig = createSinkConfig();
+        String newFunctionConfig = "{\"new-key\": \"new-value\"}";
+        SinkConfig newSinkConfig = createUpdatedSinkConfig("transformFunctionConfig", newFunctionConfig);
+        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
+        assertEquals(
+                mergedConfig.getTransformFunctionConfig(),
+                newFunctionConfig
+        );
+        mergedConfig.setTransformFunctionConfig(sinkConfig.getTransformFunctionConfig());
+        assertEquals(
+                new Gson().toJson(sinkConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
     @Test
     public void testValidateConfig() {
         SinkConfig sinkConfig = createSinkConfig();
@@ -459,6 +513,9 @@ public class SinkConfigUtilsTest {
         sinkConfig.setCleanupSubscription(true);
         sinkConfig.setArchive("DummyArchive.nar");
         sinkConfig.setCleanupSubscription(true);
+        sinkConfig.setTransformFunction("builtin://transform");
+        sinkConfig.setTransformFunctionClassName("Transform");
+        sinkConfig.setTransformFunctionConfig("{\"key\": \"value\"}");
         return sinkConfig;
     }
 
@@ -478,7 +535,7 @@ public class SinkConfigUtilsTest {
     @Test
     public void testPoolMessages() throws IOException {
         SinkConfig sinkConfig = createSinkConfig();
-        Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null));
+        Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null, null));
         assertFalse(functionDetails.getSource().getInputSpecsMap().get("test-input").getPoolMessages());
         SinkConfig convertedConfig = SinkConfigUtils.convertFromDetails(functionDetails);
         assertFalse(convertedConfig.getInputSpecs().get("test-input").isPoolMessages());
@@ -488,7 +545,7 @@ public class SinkConfigUtilsTest {
                 .poolMessages(true).build());
         sinkConfig.setInputSpecs(inputSpecs);
 
-        functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null));
+        functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null, null));
         assertTrue(functionDetails.getSource().getInputSpecsMap().get("test-input").getPoolMessages());
         convertedConfig = SinkConfigUtils.convertFromDetails(functionDetails);
         assertTrue(convertedConfig.getInputSpecs().get("test-input").isPoolMessages());
@@ -499,13 +556,13 @@ public class SinkConfigUtilsTest {
         SinkConfig sinkConfig = createSinkConfig();
         sinkConfig.setInputSpecs(null);
         sinkConfig.setTopicsPattern("my-topic-*");
-        SinkConfigUtils.validateAndExtractDetails(sinkConfig, this.getClass().getClassLoader(),
+        SinkConfigUtils.validateAndExtractDetails(sinkConfig, this.getClass().getClassLoader(), null,
                 true);
         sinkConfig.setTimeoutMs(null);
-        SinkConfigUtils.validateAndExtractDetails(sinkConfig, this.getClass().getClassLoader(),
+        SinkConfigUtils.validateAndExtractDetails(sinkConfig, this.getClass().getClassLoader(), null,
                 true);
         sinkConfig.setTimeoutMs(0L);
-        SinkConfigUtils.validateAndExtractDetails(sinkConfig, this.getClass().getClassLoader(),
+        SinkConfigUtils.validateAndExtractDetails(sinkConfig, this.getClass().getClassLoader(), null,
                 true);
     }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 01b240d7f97..a4f8532dbc6 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions.worker;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.pulsar.common.functions.Utils.FILE;
 import static org.apache.pulsar.common.functions.Utils.HTTP;
 import static org.apache.pulsar.common.functions.Utils.hasPackageTypePrefix;
@@ -30,9 +31,9 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.io.MoreFiles;
 import com.google.common.io.RecursiveDeleteOption;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.net.URISyntaxException;
 import java.net.URL;
 import java.nio.file.FileAlreadyExistsException;
 import java.nio.file.Files;
@@ -106,37 +107,30 @@ public class FunctionActioner {
                     functionDetails.getName(), instanceId);
 
             String packageFile;
+            String transformFunctionPackageFile = null;
 
-            String pkgLocation = functionMetaData.getPackageLocation().getPackagePath();
-            boolean isPkgUrlProvided = isFunctionPackageUrlSupported(pkgLocation);
+            Function.PackageLocationMetaData pkgLocation = functionMetaData.getPackageLocation();
+            Function.PackageLocationMetaData transformFunctionPkgLocation =
+                    functionMetaData.getTransformFunctionPackageLocation();
 
             if (runtimeFactory.externallyManaged()) {
-                packageFile = pkgLocation;
+                packageFile = pkgLocation.getPackagePath();
+                transformFunctionPackageFile = transformFunctionPkgLocation.getPackagePath();
             } else {
-                if (isPkgUrlProvided && pkgLocation.startsWith(FILE)) {
-                    URL url = new URL(pkgLocation);
-                    File pkgFile = new File(url.toURI());
-                    packageFile = pkgFile.getAbsolutePath();
-                } else if (FunctionCommon.isFunctionCodeBuiltin(functionDetails)) {
-                    File pkgFile = getBuiltinArchive(FunctionDetails.newBuilder(functionMetaData.getFunctionDetails()));
-                    packageFile = pkgFile.getAbsolutePath();
-                } else {
-                    File pkgDir = new File(workerConfig.getDownloadDirectory(),
-                            getDownloadPackagePath(functionMetaData, instanceId));
-                    pkgDir.mkdirs();
-                    File pkgFile = new File(
-                            pkgDir,
-                            new File(getDownloadFileName(functionMetaData.getFunctionDetails(),
-                                    functionMetaData.getPackageLocation())).getName());
-                    downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId);
-                    packageFile = pkgFile.getAbsolutePath();
+                packageFile = getPackageFile(functionMetaData, functionDetails, instanceId, pkgLocation,
+                        InstanceUtils.calculateSubjectType(functionDetails));
+                if (!isEmpty(transformFunctionPkgLocation.getPackagePath())) {
+                    transformFunctionPackageFile =
+                            getPackageFile(functionMetaData, functionDetails, instanceId, transformFunctionPkgLocation,
+                                    FunctionDetails.ComponentType.FUNCTION);
                 }
             }
 
             // Setup for batch sources if necessary
             setupBatchSource(functionDetails);
 
-            RuntimeSpawner runtimeSpawner = getRuntimeSpawner(functionRuntimeInfo.getFunctionInstance(), packageFile);
+            RuntimeSpawner runtimeSpawner = getRuntimeSpawner(functionRuntimeInfo.getFunctionInstance(),
+                    packageFile, transformFunctionPackageFile);
             functionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
 
             runtimeSpawner.start();
@@ -149,7 +143,38 @@ public class FunctionActioner {
         }
     }
 
-    RuntimeSpawner getRuntimeSpawner(Function.Instance instance, String packageFile) {
+    private String getPackageFile(FunctionMetaData functionMetaData, FunctionDetails functionDetails, int instanceId,
+                                  Function.PackageLocationMetaData pkgLocation,
+                                  FunctionDetails.ComponentType componentType)
+            throws URISyntaxException, IOException, ClassNotFoundException, PulsarAdminException {
+        String packagePath = pkgLocation.getPackagePath();
+        boolean isPkgUrlProvided = isFunctionPackageUrlSupported(packagePath);
+        String packageFile;
+        if (isPkgUrlProvided && packagePath.startsWith(FILE)) {
+            URL url = new URL(packagePath);
+            File pkgFile = new File(url.toURI());
+            packageFile = pkgFile.getAbsolutePath();
+        } else if (FunctionCommon.isFunctionCodeBuiltin(functionDetails, componentType)) {
+            File pkgFile = getBuiltinArchive(
+                    componentType,
+                    FunctionDetails.newBuilder(functionMetaData.getFunctionDetails()));
+            packageFile = pkgFile.getAbsolutePath();
+        } else {
+            File pkgDir = new File(workerConfig.getDownloadDirectory(),
+                    getDownloadPackagePath(functionMetaData, instanceId));
+            pkgDir.mkdirs();
+            File pkgFile = new File(
+                    pkgDir,
+                    new File(getDownloadFileName(functionMetaData.getFunctionDetails(),
+                            pkgLocation)).getName());
+            downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId, pkgLocation);
+            packageFile = pkgFile.getAbsolutePath();
+        }
+        return packageFile;
+    }
+
+    RuntimeSpawner getRuntimeSpawner(Function.Instance instance, String packageFile,
+                                     String transformFunctionPackageFile) {
         FunctionMetaData functionMetaData = instance.getFunctionMetaData();
         int instanceId = instance.getInstanceId();
 
@@ -170,6 +195,8 @@ public class FunctionActioner {
 
         RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, packageFile,
                 functionMetaData.getPackageLocation().getOriginalFileName(),
+                transformFunctionPackageFile,
+                functionMetaData.getTransformFunctionPackageLocation().getOriginalFileName(),
                 runtimeFactory, workerConfig.getInstanceLivenessCheckFreqMs());
 
         return runtimeSpawner;
@@ -181,6 +208,7 @@ public class FunctionActioner {
         instanceConfig.setFunctionDetails(functionDetails);
         // TODO: set correct function id and version when features implemented
         instanceConfig.setFunctionId(UUID.randomUUID().toString());
+        instanceConfig.setTransformFunctionId(UUID.randomUUID().toString());
         instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
         instanceConfig.setInstanceId(instanceId);
         instanceConfig.setMaxBufferedTuples(1024);
@@ -197,7 +225,8 @@ public class FunctionActioner {
     }
 
     private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaData functionMetaData,
-                              int instanceId) throws FileNotFoundException, IOException, PulsarAdminException {
+                              int instanceId, Function.PackageLocationMetaData pkgLocation)
+            throws IOException, PulsarAdminException {
 
         FunctionDetails details = functionMetaData.getFunctionDetails();
         File pkgDir = pkgFile.getParentFile();
@@ -213,12 +242,12 @@ public class FunctionActioner {
                     pkgDir,
                     pkgFile.getName() + "." + instanceId + "." + UUID.randomUUID());
         } while (tempPkgFile.exists() || !tempPkgFile.createNewFile());
-        String pkgLocationPath = functionMetaData.getPackageLocation().getPackagePath();
+        String pkgLocationPath = pkgLocation.getPackagePath();
         boolean downloadFromHttp = isPkgUrlProvided && pkgLocationPath.startsWith(HTTP);
         boolean downloadFromPackageManagementService = isPkgUrlProvided && hasPackageTypePrefix(pkgLocationPath);
         log.info("{}/{}/{} Function package file {} will be downloaded from {}", tempPkgFile, details.getTenant(),
                 details.getNamespace(), details.getName(),
-                downloadFromHttp ? pkgLocationPath : functionMetaData.getPackageLocation());
+                downloadFromHttp ? pkgLocationPath : pkgLocation);
 
         if (downloadFromHttp) {
             FunctionCommon.downloadFromHttpUrl(pkgLocationPath, tempPkgFile);
@@ -247,7 +276,7 @@ public class FunctionActioner {
             } catch (FileAlreadyExistsException faee) {
                 // file already exists
                 log.warn("Function package has been downloaded from {} and saved at {}",
-                        functionMetaData.getPackageLocation(), pkgFile);
+                        pkgLocation, pkgFile);
             }
         } finally {
             tempPkgFile.delete();
@@ -485,8 +514,9 @@ public class FunctionActioner {
                 File.separatorChar);
     }
 
-    private File getBuiltinArchive(FunctionDetails.Builder functionDetails) throws IOException, ClassNotFoundException {
-        if (functionDetails.hasSource()) {
+    private File getBuiltinArchive(FunctionDetails.ComponentType componentType, FunctionDetails.Builder functionDetails)
+            throws IOException, ClassNotFoundException {
+        if (componentType == FunctionDetails.ComponentType.SOURCE && functionDetails.hasSource()) {
             SourceSpec sourceSpec = functionDetails.getSource();
             if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
                 Connector connector = connectorsManager.getConnector(sourceSpec.getBuiltin());
@@ -501,7 +531,7 @@ public class FunctionActioner {
             }
         }
 
-        if (functionDetails.hasSink()) {
+        if (componentType == FunctionDetails.ComponentType.SINK && functionDetails.hasSink()) {
             SinkSpec sinkSpec = functionDetails.getSink();
             if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) {
                 Connector connector = connectorsManager.getConnector(sinkSpec.getBuiltin());
@@ -517,7 +547,8 @@ public class FunctionActioner {
             }
         }
 
-        if (!StringUtils.isEmpty(functionDetails.getBuiltin())) {
+        if (componentType == FunctionDetails.ComponentType.FUNCTION
+                && !StringUtils.isEmpty(functionDetails.getBuiltin())) {
             return functionsManager.getFunctionArchive(functionDetails.getBuiltin()).toFile();
         }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 006437cee3b..c05ffe9ca8c 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -758,7 +758,12 @@ public class FunctionRuntimeManager implements AutoCloseable {
                         newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance());
                         RuntimeSpawner runtimeSpawner = functionActioner.getRuntimeSpawner(
                                 assignment.getInstance(),
-                                assignment.getInstance().getFunctionMetaData().getPackageLocation().getPackagePath());
+                                assignment.getInstance().getFunctionMetaData().getPackageLocation().getPackagePath(),
+                                assignment
+                                        .getInstance()
+                                        .getFunctionMetaData()
+                                        .getTransformFunctionPackageLocation()
+                                        .getPackagePath());
                         // re-initialize if necessary
                         runtimeSpawner.getRuntime().reinitialize();
                         newFunctionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index f3ebef89b92..132641c8f01 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -26,9 +26,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.functions.utils.FunctionCommon.createPkgTempFile;
 import static org.apache.pulsar.functions.utils.FunctionCommon.getStateNamespace;
 import static org.apache.pulsar.functions.utils.FunctionCommon.getUniquePackageName;
-import static org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin;
 import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
-import static org.apache.pulsar.functions.worker.rest.api.FunctionsImpl.downloadPackageFile;
 import com.google.common.base.Utf8;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
@@ -42,6 +40,7 @@ import java.net.URI;
 import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.Base64;
 import java.util.Collection;
 import java.util.LinkedList;
@@ -67,6 +66,7 @@ import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
 import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.internal.FunctionsImpl;
@@ -292,16 +292,29 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
     PackageLocationMetaData.Builder getFunctionPackageLocation(final FunctionMetaData functionMetaData,
                                                                final String functionPkgUrl,
                                                                final FormDataContentDisposition fileDetail,
-                                                               final File uploadedInputStreamAsFile) throws Exception {
+                                                               final File uploadedInputStreamAsFile)
+            throws Exception {
+        return getFunctionPackageLocation(functionMetaData, functionPkgUrl, fileDetail, uploadedInputStreamAsFile,
+                functionMetaData.getFunctionDetails().getName(), componentType,
+                getFunctionCodeBuiltin(functionMetaData.getFunctionDetails(), componentType));
+    }
+
+    PackageLocationMetaData.Builder getFunctionPackageLocation(final FunctionMetaData functionMetaData,
+                                                               final String functionPkgUrl,
+                                                               final FormDataContentDisposition fileDetail,
+                                                               final File uploadedInputStreamAsFile,
+                                                               final String componentName,
+                                                               final FunctionDetails.ComponentType componentType,
+                                                               final String builtin)
+            throws Exception {
         FunctionDetails functionDetails = functionMetaData.getFunctionDetails();
         String tenant = functionDetails.getTenant();
         String namespace = functionDetails.getNamespace();
-        String componentName = functionDetails.getName();
         PackageLocationMetaData.Builder packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder();
-        boolean isBuiltin = isFunctionCodeBuiltin(functionDetails);
         boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
         boolean isPackageManagementEnabled = worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement();
-        PackageName packageName = PackageName.get(functionDetails.getComponentType().name(),
+        PackageName packageName = PackageName.get(
+                componentType.name(),
                 tenant,
                 namespace,
                 componentName,
@@ -311,7 +324,7 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
         if (worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) {
             // For externally managed schedulers, the pkgUrl/builtin stuff can be copied to bk
             // if the function worker image does not include connectors
-            if (isBuiltin) {
+            if (!isEmpty(builtin)) {
                 if (worker().getWorkerConfig().getUploadBuiltinSinksSources()) {
                     File component;
                     String archiveName;
@@ -328,7 +341,7 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
                             archiveName = functionDetails.getBuiltin();
                             component = worker().getFunctionsManager().getFunctionArchive(archiveName).toFile();
                             break;
-                        }
+                    }
                     packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName,
                             component.getName()));
                     packageLocationMetaDataBuilder.setOriginalFileName(component.getName());
@@ -337,6 +350,8 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
                         worker().getBrokerAdmin().packages().upload(metadata,
                                 packageName.toString(), component.getAbsolutePath());
                     } else {
+                        packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace,
+                                componentName, component.getName()));
                         WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(),
                                 component, worker().getDlogNamespace());
                     }
@@ -344,8 +359,7 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
                             packageLocationMetaDataBuilder.getPackagePath());
                 } else {
                     log.info("Skipping upload for the built-in package {}", ComponentTypeUtils.toString(componentType));
-                    packageLocationMetaDataBuilder
-                            .setPackagePath("builtin://" + getFunctionCodeBuiltin(functionDetails));
+                    packageLocationMetaDataBuilder.setPackagePath("builtin://" + builtin);
                 }
             } else if (isPkgUrlProvided) {
                 packageLocationMetaDataBuilder.setOriginalFileName(uploadedInputStreamAsFile.getName());
@@ -398,8 +412,8 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
             }
         } else {
             // For pulsar managed schedulers, the pkgUrl/builtin stuff should be copied to bk
-            if (isBuiltin) {
-                packageLocationMetaDataBuilder.setPackagePath("builtin://" + getFunctionCodeBuiltin(functionDetails));
+            if (!isEmpty(builtin)) {
+                packageLocationMetaDataBuilder.setPackagePath("builtin://" + builtin);
             } else if (isPkgUrlProvided) {
                 packageLocationMetaDataBuilder.setPackagePath(functionPkgUrl);
             } else if (functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.HTTP)
@@ -500,8 +514,19 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
                 String.format("Error deleting %s @ /%s/%s/%s",
                         ComponentTypeUtils.toString(componentType), tenant, namespace, componentName));
 
-        // clean up component files stored in BK
-        String functionPackagePath = functionMetaData.getPackageLocation().getPackagePath();
+        deleteComponentFromStorage(tenant, namespace, componentName,
+                functionMetaData.getPackageLocation().getPackagePath());
+
+        if (!isEmpty(functionMetaData.getTransformFunctionPackageLocation().getPackagePath())) {
+            deleteComponentFromStorage(tenant, namespace, componentName,
+                    functionMetaData.getTransformFunctionPackageLocation().getPackagePath());
+        }
+
+        deleteStatestoreTableAsync(getStateNamespace(tenant, namespace), componentName);
+    }
+
+    private void deleteComponentFromStorage(String tenant, String namespace, String componentName,
+                                            String functionPackagePath) {
         if (!functionPackagePath.startsWith(Utils.HTTP)
                 && !functionPackagePath.startsWith(Utils.FILE)
                 && !functionPackagePath.startsWith(Utils.BUILTIN)) {
@@ -509,22 +534,19 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
                 try {
                     worker().getBrokerAdmin().packages().delete(functionPackagePath);
                 } catch (PulsarAdminException e) {
-                    log.error("{}/{}/{} Failed to cleanup package in package managemanet with url {}", tenant,
-                            namespace, componentName, functionMetaData.getPackageLocation().getPackagePath(), e);
+                    log.error("{}/{}/{} Failed to cleanup package in package management with url {}", tenant,
+                            namespace, componentName, functionPackagePath, e);
                 }
             } else {
                 try {
-                    WorkerUtils.deleteFromBookkeeper(worker().getDlogNamespace(),
-                            functionMetaData.getPackageLocation().getPackagePath());
+                    WorkerUtils.deleteFromBookkeeper(worker().getDlogNamespace(), functionPackagePath);
                 } catch (IOException e) {
                     log.error("{}/{}/{} Failed to cleanup package in BK with path {}", tenant, namespace, componentName,
-                            functionMetaData.getPackageLocation().getPackagePath(), e);
+                            functionPackagePath, e);
                 }
             }
 
         }
-
-        deleteStatestoreTableAsync(getStateNamespace(tenant, namespace), componentName);
     }
 
     @Override
@@ -573,8 +595,7 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
             throw new RestException(Status.NOT_FOUND,
                     String.format(ComponentTypeUtils.toString(componentType) + " %s doesn't exist", componentName));
         }
-        FunctionConfig config = FunctionConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
-        return config;
+        return FunctionConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
     }
 
     @Override
@@ -1149,7 +1170,7 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
         Reader<byte[]> reader = null;
         Producer<byte[]> producer = null;
         try {
-            if (outputTopic != null && !outputTopic.isEmpty()) {
+            if (!isEmpty(outputTopic)) {
                 reader = worker().getClient().newReader()
                         .topic(outputTopic)
                         .startMessageId(MessageId.latest)
@@ -1420,7 +1441,8 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
 
     @Override
     public StreamingOutput downloadFunction(String tenant, String namespace, String componentName,
-                                            String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
+                                            String clientRole, AuthenticationDataSource clientAuthenticationDataHttps,
+                                            boolean transformFunction) {
         if (!isWorkerServiceAvailable()) {
             throwUnavailableException();
         }
@@ -1445,14 +1467,17 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
                     String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
         }
 
-        String pkgPath = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName)
-                .getPackageLocation().getPackagePath();
+        FunctionMetaData functionMetaData =
+                functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+        String pkgPath = transformFunction
+                ? functionMetaData.getTransformFunctionPackageLocation().getPackagePath()
+                : functionMetaData.getPackageLocation().getPackagePath();
 
         return getStreamingOutput(pkgPath);
     }
 
     private StreamingOutput getStreamingOutput(String pkgPath) {
-        final StreamingOutput streamingOutput = output -> {
+        return output -> {
             if (pkgPath.startsWith(Utils.HTTP)) {
                 URL url = URI.create(pkgPath).toURL();
                 try (InputStream inputStream = url.openStream()) {
@@ -1493,7 +1518,6 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
                 WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), output, pkgPath);
             }
         };
-        return streamingOutput;
     }
 
     @Override
@@ -1605,23 +1629,26 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
         }
     }
 
-    private String getFunctionCodeBuiltin(FunctionDetails functionDetails) {
-        if (functionDetails.hasSource()) {
+    private String getFunctionCodeBuiltin(FunctionDetails functionDetails,
+                                          FunctionDetails.ComponentType componentType) {
+        if (componentType == FunctionDetails.ComponentType.SOURCE && functionDetails.hasSource()) {
             SourceSpec sourceSpec = functionDetails.getSource();
             if (!isEmpty(sourceSpec.getBuiltin())) {
                 return sourceSpec.getBuiltin();
             }
         }
 
-        if (functionDetails.hasSink()) {
+        if (componentType == FunctionDetails.ComponentType.SINK && functionDetails.hasSink()) {
             SinkSpec sinkSpec = functionDetails.getSink();
             if (!isEmpty(sinkSpec.getBuiltin())) {
                 return sinkSpec.getBuiltin();
             }
         }
 
-        if (!isEmpty(functionDetails.getBuiltin())) {
-            return functionDetails.getBuiltin();
+        if (componentType == FunctionDetails.ComponentType.FUNCTION) {
+            if (!isEmpty(functionDetails.getBuiltin())) {
+                return functionDetails.getBuiltin();
+            }
         }
 
         return null;
@@ -1837,4 +1864,87 @@ public abstract class ComponentImpl implements Component<PulsarWorkerService> {
                                                   String narExtractionDirectory) {
         return FunctionCommon.getClassLoaderFromPackage(componentType, className, packageFile, narExtractionDirectory);
     }
+
+    static File downloadPackageFile(PulsarWorkerService worker, String packageName)
+            throws IOException, PulsarAdminException {
+        Path tempDirectory;
+        if (worker.getWorkerConfig().getDownloadDirectory() != null) {
+            tempDirectory = Paths.get(worker.getWorkerConfig().getDownloadDirectory());
+        } else {
+            // use the Nar extraction directory as a temporary directory for downloaded files
+            tempDirectory = Paths.get(worker.getWorkerConfig().getNarExtractionDirectory());
+        }
+        Files.createDirectories(tempDirectory);
+        File file = Files.createTempFile(tempDirectory, "function", ".tmp").toFile();
+        worker.getBrokerAdmin().packages().download(packageName, file.toString());
+        return file;
+    }
+
+    protected File getPackageFile(String functionPkgUrl, String existingPackagePath, InputStream uploadedInputStream)
+            throws IOException, PulsarAdminException {
+        File componentPackageFile = null;
+        if (isNotBlank(functionPkgUrl)) {
+            componentPackageFile = getPackageFile(functionPkgUrl);
+        } else if (existingPackagePath.startsWith(Utils.FILE) || existingPackagePath.startsWith(Utils.HTTP)) {
+            try {
+                componentPackageFile = FunctionCommon.extractFileFromPkgURL(existingPackagePath);
+            } catch (Exception e) {
+                throw new IllegalArgumentException(String.format("Encountered error \"%s\" "
+                                + "when getting %s package from %s", e.getMessage(),
+                        ComponentTypeUtils.toString(componentType), functionPkgUrl));
+            }
+        } else if (uploadedInputStream != null) {
+            componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream);
+        } else if (!existingPackagePath.startsWith(Utils.BUILTIN)) {
+            componentPackageFile = FunctionCommon.createPkgTempFile();
+            componentPackageFile.deleteOnExit();
+            if (worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement()) {
+                worker().getBrokerAdmin().packages().download(
+                        existingPackagePath,
+                        componentPackageFile.getAbsolutePath());
+            } else {
+                WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(),
+                        componentPackageFile, existingPackagePath);
+            }
+        }
+        return componentPackageFile;
+    }
+
+    protected File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
+        return downloadPackageFile(worker(), packageName);
+    }
+
+    protected File getPackageFile(String functionPkgUrl) throws IOException, PulsarAdminException {
+        if (Utils.hasPackageTypePrefix(functionPkgUrl)) {
+            return downloadPackageFile(functionPkgUrl);
+        } else {
+            if (!Utils.isFunctionPackageUrlSupported(functionPkgUrl)) {
+                throw new IllegalArgumentException("Function Package url is not valid."
+                        + "supported url (http/https/file)");
+            }
+            try {
+                return FunctionCommon.extractFileFromPkgURL(functionPkgUrl);
+            } catch (Exception e) {
+                throw new IllegalArgumentException(String.format("Encountered error \"%s\" "
+                                + "when getting %s package from %s", e.getMessage(),
+                        ComponentTypeUtils.toString(componentType), functionPkgUrl), e);
+            }
+        }
+    }
+
+    protected ClassLoader getBuiltinFunctionClassLoader(String archive) {
+        if (!StringUtils.isEmpty(archive)) {
+            if (archive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
+                archive = archive.replaceFirst("^builtin://", "");
+
+                FunctionArchive function = worker().getFunctionsManager().getFunction(archive);
+                // check if builtin connector exists
+                if (function == null) {
+                    throw new IllegalArgumentException("Built-in " + componentType + " is not available");
+                }
+                return function.getClassLoader();
+            }
+        }
+        return null;
+    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 3e7f0ab7339..799de49d3cc 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -28,9 +28,6 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
@@ -57,7 +54,6 @@ import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.utils.ComponentTypeUtils;
-import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.utils.functions.FunctionArchive;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
@@ -149,29 +145,13 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
         }
 
         Function.FunctionDetails functionDetails;
-        boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
         File componentPackageFile = null;
         try {
 
             // validate parameters
             try {
-                if (isPkgUrlProvided) {
-                    if (Utils.hasPackageTypePrefix(functionPkgUrl)) {
-                        componentPackageFile = downloadPackageFile(functionPkgUrl);
-                    } else {
-                        if (!Utils.isFunctionPackageUrlSupported(functionPkgUrl)) {
-                            throw new IllegalArgumentException("Function Package url is not valid."
-                                    + "supported url (http/https/file)");
-                        }
-                        try {
-
-                            componentPackageFile = FunctionCommon.extractFileFromPkgURL(functionPkgUrl);
-                        } catch (Exception e) {
-                            throw new IllegalArgumentException(String.format("Encountered error \"%s\" "
-                                            + "when getting %s package from %s", e.getMessage(),
-                                    ComponentTypeUtils.toString(componentType), functionPkgUrl), e);
-                        }
-                    }
+                if (isNotBlank(functionPkgUrl)) {
+                    componentPackageFile = getPackageFile(functionPkgUrl);
                     functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
                             functionConfig, componentPackageFile);
                 } else {
@@ -339,60 +319,17 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
 
             // validate parameters
             try {
-                if (isNotBlank(functionPkgUrl)) {
-                    if (Utils.hasPackageTypePrefix(functionPkgUrl)) {
-                        componentPackageFile = downloadPackageFile(functionPkgUrl);
-                    } else {
-                        try {
-                            componentPackageFile = FunctionCommon.extractFileFromPkgURL(functionPkgUrl);
-                        } catch (Exception e) {
-                            throw new IllegalArgumentException(String.format("Encountered error \"%s\" "
-                                            + "when getting %s package from %s", e.getMessage(),
-                                    ComponentTypeUtils.toString(componentType), functionPkgUrl));
-                        }
-                    }
-                    functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
-                            mergedConfig, componentPackageFile);
-
-                } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.FILE)
-                        || existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.HTTP)) {
-                    try {
-                        componentPackageFile = FunctionCommon.extractFileFromPkgURL(
-                                existingComponent.getPackageLocation().getPackagePath());
-                    } catch (Exception e) {
-                        throw new IllegalArgumentException(String.format("Encountered error \"%s\" "
-                                        + "when getting %s package from %s", e.getMessage(),
-                                ComponentTypeUtils.toString(componentType), functionPkgUrl));
-                    }
-                    functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
-                            mergedConfig, componentPackageFile);
-                } else if (uploadedInputStream != null) {
-
-                    componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream);
-                    functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
-                            mergedConfig, componentPackageFile);
-
-                } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN)) {
-                    functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
-                            mergedConfig, componentPackageFile);
-                    if (!isFunctionCodeBuiltin(functionDetails)
-                            && (componentPackageFile == null || fileDetail == null)) {
-                        throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType)
-                                + " Package is not provided");
-                    }
-                } else {
-                    componentPackageFile = FunctionCommon.createPkgTempFile();
-                    componentPackageFile.deleteOnExit();
-                    if (worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement()) {
-                        worker().getBrokerAdmin().packages().download(
-                                existingComponent.getPackageLocation().getPackagePath(),
-                                componentPackageFile.getAbsolutePath());
-                    } else {
-                        WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(),
-                                componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
-                    }
-                    functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
-                            mergedConfig, componentPackageFile);
+                componentPackageFile = getPackageFile(
+                        functionPkgUrl,
+                        existingComponent.getPackageLocation().getPackagePath(),
+                        uploadedInputStream);
+                functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
+                        mergedConfig, componentPackageFile);
+                if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN)
+                        && !isFunctionCodeBuiltin(functionDetails)
+                        && (componentPackageFile == null || fileDetail == null)) {
+                    throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType)
+                            + " Package is not provided");
                 }
             } catch (Exception e) {
                 log.error("Invalid update {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType),
@@ -821,7 +758,7 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
             if (archive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
                 archive = archive.replaceFirst("^builtin://", "");
 
-                FunctionsManager functionsManager = this.worker().getFunctionsManager();
+                FunctionsManager functionsManager = worker().getFunctionsManager();
                 FunctionArchive function = functionsManager.getFunction(archive);
 
                 // check if builtin function exists
@@ -860,23 +797,4 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
             }
         }
     }
-
-    private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
-        return downloadPackageFile(worker(), packageName);
-    }
-
-    static File downloadPackageFile(PulsarWorkerService worker, String packageName)
-            throws IOException, PulsarAdminException {
-        Path tempDirectory;
-        if (worker.getWorkerConfig().getDownloadDirectory() != null) {
-            tempDirectory = Paths.get(worker.getWorkerConfig().getDownloadDirectory());
-        } else {
-            // use the Nar extraction directory as a temporary directory for downloaded files
-            tempDirectory = Paths.get(worker.getWorkerConfig().getNarExtractionDirectory());
-        }
-        Files.createDirectories(tempDirectory);
-        File file = Files.createTempFile(tempDirectory, "function", ".tmp").toFile();
-        worker.getBrokerAdmin().packages().download(packageName, file.toString());
-        return file;
-    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index 7b2c5a68ef2..dede6307e76 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -54,7 +54,6 @@ import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.utils.ComponentTypeUtils;
-import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.SinkConfigUtils;
 import org.apache.pulsar.functions.utils.io.Connector;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
@@ -144,29 +143,14 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic
                     String.format("%s %s already exists", ComponentTypeUtils.toString(componentType), sinkName));
         }
 
-        Function.FunctionDetails functionDetails = null;
-        boolean isPkgUrlProvided = isNotBlank(sinkPkgUrl);
+        Function.FunctionDetails functionDetails;
         File componentPackageFile = null;
         try {
 
             // validate parameters
             try {
-                if (isPkgUrlProvided) {
-                    if (Utils.hasPackageTypePrefix(sinkPkgUrl)) {
-                        componentPackageFile = downloadPackageFile(sinkPkgUrl);
-                    } else {
-                        if (!Utils.isFunctionPackageUrlSupported(sinkPkgUrl)) {
-                            throw new IllegalArgumentException(
-                                    "Function Package url is not valid. supported url (http/https/file)");
-                        }
-                        try {
-                            componentPackageFile = FunctionCommon.extractFileFromPkgURL(sinkPkgUrl);
-                        } catch (Exception e) {
-                            throw new IllegalArgumentException(
-                                    String.format("Encountered error \"%s\" when getting %s package from %s",
-                                            e.getMessage(), ComponentTypeUtils.toString(componentType), sinkPkgUrl));
-                        }
-                    }
+                if (isNotBlank(sinkPkgUrl)) {
+                    componentPackageFile = getPackageFile(sinkPkgUrl);
                     functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
                             sinkConfig, componentPackageFile);
                 } else {
@@ -243,6 +227,12 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic
             }
 
             functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+
+            String transformFunction = sinkConfig.getTransformFunction();
+            if (isNotBlank(transformFunction)) {
+                setTransformFunctionPackageLocation(functionMetaDataBuilder, functionDetails, transformFunction);
+            }
+
             updateRequest(null, functionMetaDataBuilder.build());
         } finally {
             if (componentPackageFile != null && componentPackageFile.exists()) {
@@ -329,67 +319,24 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic
             throw new RestException(Response.Status.BAD_REQUEST, "Update contains no change");
         }
 
-        Function.FunctionDetails functionDetails = null;
+        Function.FunctionDetails functionDetails;
         File componentPackageFile = null;
         try {
 
             // validate parameters
             try {
-                if (isNotBlank(sinkPkgUrl)) {
-                    if (Utils.hasPackageTypePrefix(sinkPkgUrl)) {
-                        componentPackageFile = downloadPackageFile(sinkPkgUrl);
-                    } else {
-                        try {
-                            componentPackageFile = FunctionCommon.extractFileFromPkgURL(sinkPkgUrl);
-                        } catch (Exception e) {
-                            throw new IllegalArgumentException(
-                                    String.format("Encountered error \"%s\" when getting %s package from %s",
-                                            e.getMessage(), ComponentTypeUtils.toString(componentType), sinkPkgUrl));
-                        }
-                    }
-                    functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
-                            mergedConfig, componentPackageFile);
-
-                } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.FILE)
-                        || existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.HTTP)) {
-                    try {
-                        componentPackageFile = FunctionCommon
-                                .extractFileFromPkgURL(existingComponent.getPackageLocation().getPackagePath());
-                    } catch (Exception e) {
-                        throw new IllegalArgumentException(
-                                String.format("Encountered error \"%s\" when getting %s package from %s",
-                                        e.getMessage(), ComponentTypeUtils.toString(componentType), sinkPkgUrl));
-                    }
-                    functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
-                            mergedConfig, componentPackageFile);
-                } else if (uploadedInputStream != null) {
-
-                    componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream);
-                    functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
-                            mergedConfig, componentPackageFile);
-
-                } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN)) {
-                    functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
-                            mergedConfig, componentPackageFile);
-                    if (!isFunctionCodeBuiltin(functionDetails)
-                            && (componentPackageFile == null || fileDetail == null)) {
+                componentPackageFile = getPackageFile(
+                        sinkPkgUrl,
+                        existingComponent.getPackageLocation().getPackagePath(),
+                        uploadedInputStream);
+                functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
+                        mergedConfig, componentPackageFile);
+                if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN)
+                        && !isFunctionCodeBuiltin(functionDetails)
+                        && (componentPackageFile == null || fileDetail == null)) {
                         throw new IllegalArgumentException(
                                 ComponentTypeUtils.toString(componentType) + " Package is not provided");
                     }
-                } else {
-                    componentPackageFile = FunctionCommon.createPkgTempFile();
-                    componentPackageFile.deleteOnExit();
-                    if (worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement()) {
-                        worker().getBrokerAdmin().packages().download(
-                                existingComponent.getPackageLocation().getPackagePath(),
-                                componentPackageFile.getAbsolutePath());
-                    } else {
-                        WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), componentPackageFile,
-                                existingComponent.getPackageLocation().getPackagePath());
-                    }
-                    functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
-                            mergedConfig, componentPackageFile);
-                }
             } catch (Exception e) {
                 log.error("Invalid update {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant,
                         namespace, sinkName, e);
@@ -466,6 +413,12 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic
 
             functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
 
+            String transformFunction = mergedConfig.getTransformFunction();
+            if (isNotBlank(transformFunction)
+                    && !transformFunction.equals(existingSinkConfig.getTransformFunction())) {
+                setTransformFunctionPackageLocation(functionMetaDataBuilder, functionDetails, transformFunction);
+            }
+
             updateRequest(existingComponent, functionMetaDataBuilder.build());
         } finally {
             if (componentPackageFile != null && componentPackageFile.exists()) {
@@ -476,6 +429,34 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic
         }
     }
 
+    private void setTransformFunctionPackageLocation(Function.FunctionMetaData.Builder functionMetaDataBuilder,
+                                                 Function.FunctionDetails functionDetails, String transformFunction) {
+        File functionPackageFile = null;
+        try {
+            String builtin = null;
+            if (!transformFunction.startsWith(Utils.BUILTIN)) {
+                functionPackageFile = getPackageFile(transformFunction);
+            }
+            Function.PackageLocationMetaData.Builder functionPackageLocation =
+                    getFunctionPackageLocation(functionMetaDataBuilder.build(),
+                            transformFunction, null, functionPackageFile,
+                            functionDetails.getName() + "__sink-function",
+                            Function.FunctionDetails.ComponentType.FUNCTION, builtin);
+            functionMetaDataBuilder.setTransformFunctionPackageLocation(functionPackageLocation);
+        } catch (Exception e) {
+            log.error("Failed process {} {}/{}/{} extra function package: ",
+                    ComponentTypeUtils.toString(componentType), functionDetails.getTenant(),
+                    functionDetails.getNamespace(), functionDetails.getName(), e);
+            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+        } finally {
+            if (functionPackageFile != null && functionPackageFile.exists()) {
+                if (!transformFunction.startsWith(Utils.FILE)) {
+                    functionPackageFile.delete();
+                }
+            }
+        }
+    }
+
     private class GetSinkStatus extends GetStatus<SinkStatus, SinkStatus.SinkInstanceStatus.SinkInstanceStatusData> {
 
         @Override
@@ -758,7 +739,8 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic
                                                                  final String namespace,
                                                                  final String sinkName,
                                                                  final SinkConfig sinkConfig,
-                                                                 final File sinkPackageFile) throws IOException {
+                                                                 final File sinkPackageFile)
+            throws IOException, PulsarAdminException {
 
         // The rest end points take precedence over whatever is there in sinkConfig
         sinkConfig.setTenant(tenant);
@@ -796,8 +778,23 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic
                 throw new IllegalArgumentException("Sink package is not provided");
             }
 
+            ClassLoader functionClassLoader = null;
+            if (isNotBlank(sinkConfig.getTransformFunction())) {
+                functionClassLoader =
+                        getBuiltinFunctionClassLoader(sinkConfig.getTransformFunction());
+                if (functionClassLoader == null) {
+                    File functionPackageFile = getPackageFile(sinkConfig.getTransformFunction());
+                    functionClassLoader = getClassLoaderFromPackage(sinkConfig.getTransformFunctionClassName(),
+                            functionPackageFile, worker().getWorkerConfig().getNarExtractionDirectory());
+                }
+                if (functionClassLoader == null) {
+                    throw new IllegalArgumentException("Transform Function package not found");
+                }
+            }
+
             SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validateAndExtractDetails(
-                    sinkConfig, classLoader, worker().getWorkerConfig().getValidateConnectorConfig());
+                sinkConfig, classLoader, functionClassLoader, worker().getWorkerConfig().getValidateConnectorConfig());
+
             return SinkConfigUtils.convert(sinkConfig, sinkDetails);
         } finally {
             if (shouldCloseClassLoader) {
@@ -805,8 +802,4 @@ public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerServic
             }
         }
     }
-
-    private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
-        return FunctionsImpl.downloadPackageFile(worker(), packageName);
-    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index 82d2043e9a9..44199dd9917 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -25,7 +25,6 @@ import static org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBui
 import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
 import com.google.protobuf.ByteString;
 import java.io.File;
-import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.util.ArrayList;
@@ -54,7 +53,6 @@ import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.utils.ComponentTypeUtils;
-import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.SourceConfigUtils;
 import org.apache.pulsar.functions.utils.io.Connector;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
@@ -152,21 +150,7 @@ public class SourcesImpl extends ComponentImpl implements Sources<PulsarWorkerSe
             // validate parameters
             try {
                 if (isPkgUrlProvided) {
-                    if (Utils.hasPackageTypePrefix(sourcePkgUrl)) {
-                        componentPackageFile = downloadPackageFile(sourcePkgUrl);
-                    } else {
-                        if (!Utils.isFunctionPackageUrlSupported(sourcePkgUrl)) {
-                            throw new IllegalArgumentException(
-                                    "Function Package url is not valid. supported url (http/https/file)");
-                        }
-                        try {
-                            componentPackageFile = FunctionCommon.extractFileFromPkgURL(sourcePkgUrl);
-                        } catch (Exception e) {
-                            throw new IllegalArgumentException(
-                                    String.format("Encountered error \"%s\" when getting %s package from %s",
-                                            e.getMessage(), ComponentTypeUtils.toString(componentType), sourcePkgUrl));
-                        }
-                    }
+                    componentPackageFile = getPackageFile(sourcePkgUrl);
                     functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
                             sourceConfig, componentPackageFile);
                 } else {
@@ -329,66 +313,23 @@ public class SourcesImpl extends ComponentImpl implements Sources<PulsarWorkerSe
             throw new RestException(Response.Status.BAD_REQUEST, "Update contains no change");
         }
 
-        Function.FunctionDetails functionDetails = null;
+        Function.FunctionDetails functionDetails;
         File componentPackageFile = null;
         try {
 
             // validate parameters
             try {
-                if (isNotBlank(sourcePkgUrl)) {
-                    if (Utils.hasPackageTypePrefix(sourcePkgUrl)) {
-                        componentPackageFile = downloadPackageFile(sourcePkgUrl);
-                    } else {
-                        try {
-                            componentPackageFile = FunctionCommon.extractFileFromPkgURL(sourcePkgUrl);
-                        } catch (Exception e) {
-                            throw new IllegalArgumentException(
-                                    String.format("Encountered error \"%s\" when getting %s package from %s",
-                                            e.getMessage(), ComponentTypeUtils.toString(componentType), sourcePkgUrl));
-                        }
-                    }
-                    functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
-                            mergedConfig, componentPackageFile);
-
-                } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.FILE)
-                        || existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.HTTP)) {
-                    try {
-                        componentPackageFile = FunctionCommon
-                                .extractFileFromPkgURL(existingComponent.getPackageLocation().getPackagePath());
-                    } catch (Exception e) {
-                        throw new IllegalArgumentException(
-                                String.format("Encountered error \"%s\" when getting %s package from %s",
-                                        e.getMessage(), ComponentTypeUtils.toString(componentType), sourcePkgUrl));
-                    }
-                    functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
-                            mergedConfig, componentPackageFile);
-                } else if (uploadedInputStream != null) {
-
-                    componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream);
-                    functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
-                            mergedConfig, componentPackageFile);
-
-                } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN)) {
-                    functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
-                            mergedConfig, componentPackageFile);
-                    if (!isFunctionCodeBuiltin(functionDetails)
-                            && (componentPackageFile == null || fileDetail == null)) {
-                        throw new IllegalArgumentException(
-                                ComponentTypeUtils.toString(componentType) + " Package is not provided");
-                    }
-                } else {
-                    componentPackageFile = FunctionCommon.createPkgTempFile();
-                    componentPackageFile.deleteOnExit();
-                    if (worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement()) {
-                        worker().getBrokerAdmin().packages().download(
-                                existingComponent.getPackageLocation().getPackagePath(),
-                                componentPackageFile.getAbsolutePath());
-                    } else {
-                        WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), componentPackageFile,
-                                existingComponent.getPackageLocation().getPackagePath());
-                    }
-                    functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
-                            mergedConfig, componentPackageFile);
+                componentPackageFile = getPackageFile(
+                        sourcePkgUrl,
+                        existingComponent.getPackageLocation().getPackagePath(),
+                        uploadedInputStream);
+                functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
+                        mergedConfig, componentPackageFile);
+                if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN)
+                        && !isFunctionCodeBuiltin(functionDetails)
+                        && (componentPackageFile == null || fileDetail == null)) {
+                    throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType)
+                            + " Package is not provided");
                 }
             } catch (Exception e) {
                 log.error("Invalid update {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant,
@@ -805,8 +746,4 @@ public class SourcesImpl extends ComponentImpl implements Sources<PulsarWorkerSe
             }
         }
     }
-
-    private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
-        return FunctionsImpl.downloadPackageFile(worker(), packageName);
-    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
index 3efad1da1cc..092844db489 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
@@ -339,9 +339,12 @@ public class FunctionsApiV3Resource extends FunctionApiResource {
             @ApiParam(value = "The namespace of functions")
             final @PathParam("namespace") String namespace,
             @ApiParam(value = "The name of functions")
-            final @PathParam("functionName") String functionName) {
+            final @PathParam("functionName") String functionName,
+            @ApiParam(value = "Whether to download the transform function")
+            final @QueryParam("transform-function") boolean transformFunction) {
 
-        return functions().downloadFunction(tenant, namespace, functionName, clientAppId(), clientAuthData());
+        return functions()
+                .downloadFunction(tenant, namespace, functionName, clientAppId(), clientAuthData(), transformFunction);
     }
 
     @GET
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java
index c305d64b9f3..db315f1181e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Component.java
@@ -172,7 +172,17 @@ public interface Component<W extends WorkerService> {
                                      String namespace,
                                      String componentName,
                                      String clientRole,
-                                     AuthenticationDataSource clientAuthenticationDataHttps);
+                                     AuthenticationDataSource clientAuthenticationDataHttps,
+                                     boolean transformFunction);
+
+    @Deprecated
+    default StreamingOutput downloadFunction(String tenant,
+                                     String namespace,
+                                     String componentName,
+                                     String clientRole,
+                                     AuthenticationDataSource clientAuthenticationDataHttps) {
+        return downloadFunction(tenant, namespace, componentName, clientRole, clientAuthenticationDataHttps, false);
+    }
 
     @Deprecated
     default StreamingOutput downloadFunction(String tenant,
@@ -185,7 +195,8 @@ public interface Component<W extends WorkerService> {
                 namespace,
                 componentName,
                 clientRole,
-                (AuthenticationDataSource) clientAuthenticationDataHttps);
+                (AuthenticationDataSource) clientAuthenticationDataHttps,
+                false);
     }
 
     List<ConnectorDefinition> getListOfConnectors();
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
index 9ccdc09bd7d..2412aebc294 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
@@ -78,7 +78,6 @@ public class FunctionActionerTest {
         @SuppressWarnings("resource")
         FunctionActioner actioner = new FunctionActioner(workerConfig, factory, dlogNamespace,
                 new ConnectorsManager(workerConfig), new FunctionsManager(workerConfig), mock(PulsarAdmin.class));
-        Runtime runtime = mock(Runtime.class);
         Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
                 .setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant")
                         .setNamespace("test-namespace").setName("func-1"))
@@ -115,7 +114,7 @@ public class FunctionActionerTest {
 
         RuntimeFactory factory = mock(RuntimeFactory.class);
         Runtime runtime = mock(Runtime.class);
-        doReturn(runtime).when(factory).createContainer(any(), any(), any(), any());
+        doReturn(runtime).when(factory).createContainer(any(), any(), any(), any(), any(), any());
         doNothing().when(runtime).start();
         Namespace dlogNamespace = mock(Namespace.class);
         final String exceptionMsg = "dl namespace not-found";
@@ -128,39 +127,49 @@ public class FunctionActionerTest {
         // (1) test with file url. functionActioner should be able to consider file-url and it should be able to call
         // RuntimeSpawner
         String pkgPathLocation = FILE + ":/user/my-file.jar";
-        Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
-                .setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant")
-                        .setNamespace("test-namespace").setName("func-1"))
-                .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath(pkgPathLocation).build())
-                .build();
-        Function.Instance instance = Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
-                .build();
-        FunctionRuntimeInfo functionRuntimeInfo = mock(FunctionRuntimeInfo.class);
-        doReturn(instance).when(functionRuntimeInfo).getFunctionInstance();
-
-        actioner.startFunction(functionRuntimeInfo);
+        startFunction(actioner, pkgPathLocation, pkgPathLocation);
         verify(runtime, times(1)).start();
 
         // (2) test with http-url, downloading file from http should fail with UnknownHostException due to invalid url
-        pkgPathLocation = "http://invalid/my-file.jar";
-        function1 = Function.FunctionMetaData.newBuilder()
-                .setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant")
-                        .setNamespace("test-namespace").setName("func-1"))
-                .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath(pkgPathLocation).build())
-                .build();
-        instance = Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0).build();
-        functionRuntimeInfo = mock(FunctionRuntimeInfo.class);
-        doReturn(instance).when(functionRuntimeInfo).getFunctionInstance();
-        doThrow(new IllegalStateException("StartupException")).when(functionRuntimeInfo).setStartupException(any());
+        String invalidPkgPathLocation = "http://invalid/my-file.jar";
 
         try {
-            actioner.startFunction(functionRuntimeInfo);
+            startFunction(actioner, invalidPkgPathLocation, pkgPathLocation);
+            fail();
+        } catch (IllegalStateException ex) {
+            assertEquals(ex.getMessage(), "StartupException");
+        }
+
+        try {
+            startFunction(actioner, pkgPathLocation, invalidPkgPathLocation);
             fail();
         } catch (IllegalStateException ex) {
             assertEquals(ex.getMessage(), "StartupException");
         }
     }
 
+    private void startFunction(FunctionActioner actioner, String pkgPathLocation, String extraPkgPathLocation) {
+        PackageLocationMetaData packageLocation = PackageLocationMetaData.newBuilder()
+                .setPackagePath(pkgPathLocation)
+                .build();
+        PackageLocationMetaData extraPackageLocation = PackageLocationMetaData.newBuilder()
+                .setPackagePath(extraPkgPathLocation)
+                .build();
+        Function.FunctionMetaData function = Function.FunctionMetaData.newBuilder()
+                .setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant")
+                        .setNamespace("test-namespace").setName("func-1"))
+                .setPackageLocation(packageLocation)
+                .setTransformFunctionPackageLocation(extraPackageLocation)
+                .build();
+        Function.Instance instance = Function.Instance.newBuilder().setFunctionMetaData(function).setInstanceId(0)
+                .build();
+        FunctionRuntimeInfo functionRuntimeInfo = mock(FunctionRuntimeInfo.class);
+        doReturn(instance).when(functionRuntimeInfo).getFunctionInstance();
+        doThrow(new IllegalStateException("StartupException")).when(functionRuntimeInfo).setStartupException(any());
+
+        actioner.startFunction(functionRuntimeInfo);
+    }
+
     @Test
     public void testFunctionAuthDisabled() throws Exception {
         WorkerConfig workerConfig = new WorkerConfig();
@@ -177,7 +186,7 @@ public class FunctionActionerTest {
 
         RuntimeFactory factory = mock(RuntimeFactory.class);
         Runtime runtime = mock(Runtime.class);
-        doReturn(runtime).when(factory).createContainer(any(), any(), any(), any());
+        doReturn(runtime).when(factory).createContainer(any(), any(), any(), any(), any(), any());
         doNothing().when(runtime).start();
         Namespace dlogNamespace = mock(Namespace.class);
         final String exceptionMsg = "dl namespace not-found";
@@ -198,7 +207,7 @@ public class FunctionActionerTest {
         Function.Instance instance = Function.Instance.newBuilder()
                 .setFunctionMetaData(functionMeta).build();
 
-        RuntimeSpawner runtimeSpawner = spy(actioner.getRuntimeSpawner(instance, "foo"));
+        RuntimeSpawner runtimeSpawner = spy(actioner.getRuntimeSpawner(instance, "foo", "bar"));
 
         assertNull(runtimeSpawner.getInstanceConfig().getFunctionAuthenticationSpec());
 
@@ -236,7 +245,7 @@ public class FunctionActionerTest {
 
         RuntimeFactory factory = mock(RuntimeFactory.class);
         Runtime runtime = mock(Runtime.class);
-        doReturn(runtime).when(factory).createContainer(any(), any(), any(), any());
+        doReturn(runtime).when(factory).createContainer(any(), any(), any(), any(), any(), any());
         doNothing().when(runtime).start();
         Namespace dlogNamespace = mock(Namespace.class);
         final String exceptionMsg = "dl namespace not-found";
@@ -253,17 +262,7 @@ public class FunctionActionerTest {
         // (1) test with file url. functionActioner should be able to consider file-url and it should be able to call
         // RuntimeSpawner
         String pkgPathLocation = "function://public/default/test-function@latest";
-        Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
-                .setFunctionDetails(Function.FunctionDetails.newBuilder().setTenant("test-tenant")
-                        .setNamespace("test-namespace").setName("func-1"))
-                .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath(pkgPathLocation).build())
-                .build();
-        Function.Instance instance = Function.Instance.newBuilder().setFunctionMetaData(function1).setInstanceId(0)
-                .build();
-        FunctionRuntimeInfo functionRuntimeInfo = mock(FunctionRuntimeInfo.class);
-        doReturn(instance).when(functionRuntimeInfo).getFunctionInstance();
-
-        actioner.startFunction(functionRuntimeInfo);
+        startFunction(actioner, pkgPathLocation, pkgPathLocation);
         verify(runtime, times(1)).start();
     }
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index ceda6f89eb8..61237cdf481 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -716,7 +716,7 @@ public class FunctionRuntimeManagerTest {
         doReturn(true).when(kubernetesRuntimeFactory).externallyManaged();
 
         KubernetesRuntime kubernetesRuntime = mock(KubernetesRuntime.class);
-        doReturn(kubernetesRuntime).when(kubernetesRuntimeFactory).createContainer(any(), any(), any(), any());
+        doReturn(kubernetesRuntime).when(kubernetesRuntimeFactory).createContainer(any(), any(), any(), any(), any(), any());
 
         FunctionActioner functionActioner = spy(new FunctionActioner(
                 workerConfig,
@@ -742,7 +742,9 @@ public class FunctionRuntimeManagerTest {
             functionRuntimeManager.setFunctionActioner(functionActioner);
 
             Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder()
-                    .setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath("path").build())
+                    .setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath("path"))
+                    .setTransformFunctionPackageLocation(Function.PackageLocationMetaData.newBuilder()
+                            .setPackagePath("function-path"))
                     .setFunctionDetails(
                             Function.FunctionDetails.newBuilder()
                                     .setTenant("test-tenant").setNamespace("test-namespace").setName("func-1")).build();
@@ -771,7 +773,8 @@ public class FunctionRuntimeManagerTest {
             FunctionRuntimeInfo functionRuntimeInfo = new FunctionRuntimeInfo()
                     .setFunctionInstance(instance)
                     .setRuntimeSpawner(functionActioner
-                            .getRuntimeSpawner(instance, function1.getPackageLocation().getPackagePath()));
+                            .getRuntimeSpawner(instance, function1.getPackageLocation().getPackagePath(),
+                                    function1.getTransformFunctionPackageLocation().getPackagePath()));
             functionRuntimeManager.functionRuntimeInfos.put(
                     "test-tenant/test-namespace/func-1:0", functionRuntimeInfo);
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
index 4b7e9940230..7ffebcb3216 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -152,7 +152,7 @@ public class FunctionsImplTest {
         instanceConfig.setMaxBufferedTuples(1024);
 
         JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
-                instanceConfig, null, null, null, null, null, null, null, null);
+                instanceConfig, null, null, null, null, null, null, null, null, null);
         CompletableFuture<InstanceCommunication.MetricsData> metricsDataCompletableFuture =
                 new CompletableFuture<InstanceCommunication.MetricsData>();
         metricsDataCompletableFuture.complete(javaInstanceRunnable.getMetrics());
@@ -207,7 +207,7 @@ public class FunctionsImplTest {
         instanceConfig.setMaxBufferedTuples(1024);
 
         JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
-                instanceConfig, null, null, null, null, null, null, null, null);
+                instanceConfig, null, null, null, null, null, null, null, null, null);
         CompletableFuture<InstanceCommunication.MetricsData> completableFuture =
                 new CompletableFuture<InstanceCommunication.MetricsData>();
         completableFuture.complete(javaInstanceRunnable.getMetrics());
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index a868d632506..8d19869b473 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -1688,6 +1688,66 @@ public class FunctionApiV3ResourceTest {
         }
     }
 
+    @Test
+    public void testDownloadFunctionByName() throws Exception {
+        URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
+        File file = Paths.get(fileUrl.toURI()).toFile();
+        String fileLocation = file.getAbsolutePath().replace('\\', '/');
+        String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+        doReturn(true).when(mockedWorkerService).isInitialized();
+        WorkerConfig config = mock(WorkerConfig.class);
+        when(config.isAuthorizationEnabled()).thenReturn(false);
+        when(mockedWorkerService.getWorkerConfig()).thenReturn(config);
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
+
+        FunctionMetaData metaData = FunctionMetaData.newBuilder()
+                .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("file:///" + fileLocation))
+                .setTransformFunctionPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("http://invalid"))
+                .build();
+        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(function))).thenReturn(metaData);
+
+        StreamingOutput streamOutput = resource.downloadFunction(tenant, namespace, function, null, null, false);
+        File pkgFile = new File(testDir, UUID.randomUUID().toString());
+        OutputStream output = new FileOutputStream(pkgFile);
+        streamOutput.write(output);
+        Assert.assertTrue(pkgFile.exists());
+        if (pkgFile.exists()) {
+            pkgFile.delete();
+        }
+    }
+
+    @Test
+    public void testDownloadTransformFunctionByName() throws Exception {
+        URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
+        File file = Paths.get(fileUrl.toURI()).toFile();
+        String fileLocation = file.getAbsolutePath().replace('\\', '/');
+        String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+        doReturn(true).when(mockedWorkerService).isInitialized();
+        WorkerConfig config = mock(WorkerConfig.class);
+        when(config.isAuthorizationEnabled()).thenReturn(false);
+        when(mockedWorkerService.getWorkerConfig()).thenReturn(config);
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
+
+        FunctionMetaData metaData = FunctionMetaData.newBuilder()
+                .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("http://invalid"))
+                .setTransformFunctionPackageLocation(PackageLocationMetaData.newBuilder()
+                        .setPackagePath("file:///" + fileLocation))
+                .build();
+        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(function))).thenReturn(metaData);
+
+        StreamingOutput streamOutput = resource.downloadFunction(tenant, namespace, function, null, null, true);
+        File pkgFile = new File(testDir, UUID.randomUUID().toString());
+        OutputStream output = new FileOutputStream(pkgFile);
+        streamOutput.write(output);
+        Assert.assertTrue(pkgFile.exists());
+        if (pkgFile.exists()) {
+            pkgFile.delete();
+        }
+    }
+
+
     @Test
     public void testRegisterFunctionFileUrlWithValidSinkClass() throws Exception {
         Configurator.setRootLevel(Level.DEBUG);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 791273cf537..7fd0fb1030f 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -20,9 +20,11 @@ package org.apache.pulsar.functions.worker.rest.api.v3;
 
 import static org.apache.pulsar.functions.proto.Function.ProcessingGuarantees.ATLEAST_ONCE;
 import static org.apache.pulsar.functions.source.TopicSchema.DEFAULT_SERDE;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
@@ -63,6 +65,8 @@ import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.apache.pulsar.common.util.RestException;
+import org.apache.pulsar.functions.api.examples.ExclamationFunction;
+import org.apache.pulsar.functions.api.examples.RecordFunction;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.proto.Function;
@@ -71,11 +75,14 @@ import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.SinkConfigUtils;
+import org.apache.pulsar.functions.utils.functions.FunctionArchive;
+import org.apache.pulsar.functions.utils.functions.FunctionUtils;
 import org.apache.pulsar.functions.utils.io.Connector;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 import org.apache.pulsar.functions.worker.ConnectorsManager;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
+import org.apache.pulsar.functions.worker.FunctionsManager;
 import org.apache.pulsar.functions.worker.LeaderService;
 import org.apache.pulsar.functions.worker.PulsarWorkerService;
 import org.apache.pulsar.functions.worker.WorkerConfig;
@@ -635,13 +642,7 @@ public class SinkApiV3ResourceTest {
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
         when(mockedManager.containsFunction(eq(actualTenant), eq(actualNamespace), eq(actualName))).thenReturn(false);
 
-        SinkConfig sinkConfig = new SinkConfig();
-        sinkConfig.setTenant(tenant);
-        sinkConfig.setNamespace(namespace);
-        sinkConfig.setName(sink);
-        sinkConfig.setClassName(CASSANDRA_STRING_SINK);
-        sinkConfig.setParallelism(parallelism);
-        sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
+        SinkConfig sinkConfig = createDefaultSinkConfig();
         try (FileInputStream inputStream = new FileInputStream(getPulsarIOCassandraNar())) {
             resource.registerSink(
                     actualTenant,
@@ -710,6 +711,107 @@ public class SinkApiV3ResourceTest {
         }
     }
 
+    @Test
+    public void testRegisterSinkSuccessWithTransformFunction() throws Exception {
+        mockInstanceUtils();
+        mockWorkerUtils();
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
+
+        NarClassLoader mockedClassLoader = mock(NarClassLoader.class);
+        doReturn(RecordFunction.class).when(mockedClassLoader).loadClass("RecordFunction");
+        mockStatic(FunctionCommon.class, ctx -> {
+            ctx.when(FunctionCommon::createPkgTempFile).thenCallRealMethod();
+            ctx.when(() -> FunctionCommon.getRawFunctionTypes(any(), anyBoolean())).thenCallRealMethod();
+            ctx.when(() -> FunctionCommon.getFunctionTypes(any(), anyBoolean())).thenCallRealMethod();
+            ctx.when(() -> FunctionCommon.getFunctionClassParent(any(), anyBoolean())).thenCallRealMethod();
+            ctx.when(() -> FunctionCommon.getClassLoaderFromPackage(any(), any(), any(), any())).thenCallRealMethod();
+            ctx.when(() -> FunctionCommon.extractNarClassLoader(any(), any())).thenReturn(mockedClassLoader);
+        });
+
+        mockStatic(FunctionUtils.class, ctx -> {
+            ctx.when(() -> FunctionUtils.getFunctionClass(any())).thenReturn("RecordFunction");
+        });
+
+        FunctionsManager mockedFunctionsManager = mock(FunctionsManager.class);
+        FunctionArchive functionArchive = FunctionArchive.builder()
+                .classLoader(mockedClassLoader)
+                .build();
+        when(mockedFunctionsManager.getFunction("transform")).thenReturn(functionArchive);
+
+        when(mockedWorkerService.getFunctionsManager()).thenReturn(mockedFunctionsManager);
+
+        SinkConfig sinkConfig = createDefaultSinkConfig();
+        sinkConfig.setTransformFunction("builtin://transform");
+        sinkConfig.setTransformFunctionConfig("{\"dummy\": \"dummy\"}");
+
+        try (FileInputStream inputStream = new FileInputStream(getPulsarIOCassandraNar())) {
+            resource.registerSink(
+                    tenant,
+                    namespace,
+                    sink,
+                    inputStream,
+                    mockedFormData,
+                    null,
+                    sinkConfig,
+                    null, null);
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class,
+            expectedExceptionsMessageRegExp = "Sink transform function output must be of type Record")
+    public void testRegisterSinkFailureWithInvalidTransformFunction() throws Exception {
+        mockInstanceUtils();
+        mockWorkerUtils();
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
+
+        NarClassLoader mockedClassLoader = mock(NarClassLoader.class);
+        doReturn(ExclamationFunction.class).when(mockedClassLoader).loadClass("ExclamationFunction");
+        mockStatic(FunctionCommon.class, ctx -> {
+            ctx.when(FunctionCommon::createPkgTempFile).thenCallRealMethod();
+            ctx.when(() -> FunctionCommon.getRawFunctionTypes(any(), anyBoolean())).thenCallRealMethod();
+            ctx.when(() -> FunctionCommon.getFunctionTypes(any(), anyBoolean())).thenCallRealMethod();
+            ctx.when(() -> FunctionCommon.getFunctionClassParent(any(), anyBoolean())).thenCallRealMethod();
+            ctx.when(() -> FunctionCommon.getClassLoaderFromPackage(any(), any(), any(), any())).thenCallRealMethod();
+            ctx.when(() -> FunctionCommon.extractNarClassLoader(any(), any())).thenReturn(mockedClassLoader);
+        });
+
+        mockStatic(FunctionUtils.class, ctx -> {
+            ctx.when(() -> FunctionUtils.getFunctionClass(any())).thenReturn("ExclamationFunction");
+        });
+
+        FunctionsManager mockedFunctionsManager = mock(FunctionsManager.class);
+        FunctionArchive functionArchive = FunctionArchive.builder()
+                .classLoader(mockedClassLoader)
+                .build();
+        when(mockedFunctionsManager.getFunction("transform")).thenReturn(functionArchive);
+
+        when(mockedWorkerService.getFunctionsManager()).thenReturn(mockedFunctionsManager);
+
+        SinkConfig sinkConfig = createDefaultSinkConfig();
+        sinkConfig.setTransformFunction("builtin://transform");
+        sinkConfig.setTransformFunctionConfig("{\"dummy\": \"dummy\"}");
+
+        try {
+            try (FileInputStream inputStream = new FileInputStream(getPulsarIOCassandraNar())) {
+                resource.registerSink(
+                        tenant,
+                        namespace,
+                        sink,
+                        inputStream,
+                        mockedFormData,
+                        null,
+                        sinkConfig,
+                        null, null);
+            }
+        } catch (RestException e) {
+            // expected exception
+            assertEquals(e.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw e;
+        }
+    }
+
     //
     // Update Functions
     //
@@ -882,7 +984,7 @@ public class SinkApiV3ResourceTest {
 
         this.mockedFunctionMetaData =
                 FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
-        when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
+        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink))).thenReturn(mockedFunctionMetaData);
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
 
@@ -928,13 +1030,7 @@ public class SinkApiV3ResourceTest {
     }
 
     private void updateDefaultSinkWithPackageUrl(String packageUrl) throws Exception {
-        SinkConfig sinkConfig = new SinkConfig();
-        sinkConfig.setTenant(tenant);
-        sinkConfig.setNamespace(namespace);
-        sinkConfig.setName(sink);
-        sinkConfig.setClassName(CASSANDRA_STRING_SINK);
-        sinkConfig.setParallelism(parallelism);
-        sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
+        SinkConfig sinkConfig = createDefaultSinkConfig();
 
         mockStatic(ConnectorUtils.class, ctx -> {
             ctx.when(() -> ConnectorUtils.getIOSinkClass(any(NarClassLoader.class)))
@@ -1019,13 +1115,7 @@ public class SinkApiV3ResourceTest {
 
         String filePackageUrl = getPulsarIOCassandraNar().toURI().toString();
 
-        SinkConfig sinkConfig = new SinkConfig();
-        sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
-        sinkConfig.setTenant(tenant);
-        sinkConfig.setNamespace(namespace);
-        sinkConfig.setName(sink);
-        sinkConfig.setClassName(CASSANDRA_STRING_SINK);
-        sinkConfig.setParallelism(parallelism);
+        SinkConfig sinkConfig = createDefaultSinkConfig();
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
 
@@ -1115,6 +1205,54 @@ public class SinkApiV3ResourceTest {
         }
     }
 
+    @Test
+    public void testUpdateSinkDifferentTransformFunction() throws Exception {
+        mockWorkerUtils();
+
+        SinkConfig sinkConfig = createDefaultSinkConfig();
+        sinkConfig.setTransformFunction("builtin://transform");
+        sinkConfig.setTransformFunctionClassName("DummyFunction");
+        sinkConfig.setTransformFunctionConfig("{\"dummy\": \"dummy\"}");
+
+        NarClassLoader mockedClassLoader = mock(NarClassLoader.class);
+        doReturn(RecordFunction.class).when(mockedClassLoader).loadClass("DummyFunction");
+        mockStatic(FunctionCommon.class, ctx -> {
+            ctx.when(FunctionCommon::createPkgTempFile).thenCallRealMethod();
+            ctx.when(() -> FunctionCommon.getRawFunctionTypes(any(), anyBoolean())).thenCallRealMethod();
+            ctx.when(() -> FunctionCommon.getFunctionTypes(any(), anyBoolean())).thenCallRealMethod();
+            ctx.when(() -> FunctionCommon.getFunctionClassParent(any(), anyBoolean())).thenCallRealMethod();
+            ctx.when(() -> FunctionCommon.getClassLoaderFromPackage(any(), any(), any(), any())).thenCallRealMethod();
+            ctx.when(() -> FunctionCommon.extractNarClassLoader(any(), any())).thenReturn(mockedClassLoader);
+        });
+
+        this.mockedFunctionMetaData =
+                FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
+        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink))).thenReturn(mockedFunctionMetaData);
+
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
+
+        FunctionsManager mockedFunctionsManager = mock(FunctionsManager.class);
+        FunctionArchive functionArchive = FunctionArchive.builder()
+                .classLoader(mockedClassLoader)
+                .build();
+        when(mockedFunctionsManager.getFunction("transform")).thenReturn(functionArchive);
+
+        when(mockedWorkerService.getFunctionsManager()).thenReturn(mockedFunctionsManager);
+
+
+        try (FileInputStream inputStream = new FileInputStream(getPulsarIOCassandraNar())) {
+            resource.updateSink(
+                    tenant,
+                    namespace,
+                    sink,
+                    inputStream,
+                    mockedFormData,
+                    null,
+                    sinkConfig,
+                    null, null, null);
+        }
+    }
+
     //
     // deregister sink
     //
@@ -1238,7 +1376,7 @@ public class SinkApiV3ResourceTest {
     }
 
     @Test
-    public void testDeregisterSinkBKPackageCleanup() throws IOException {
+    public void testDeregisterSinkBKPackageCleanup() {
         mockInstanceUtils();
         try (final MockedStatic<WorkerUtils> ctx = Mockito.mockStatic(WorkerUtils.class)) {
 
@@ -1246,15 +1384,20 @@ public class SinkApiV3ResourceTest {
 
             String packagePath =
                     "public/default/test/591541f0-c7c5-40c0-983b-610c722f90b0-pulsar-io-batch-data-generator-2.7.0.nar";
+            String transformFunctionPackagePath =
+                    "public/default/test/591541f0-c7c5-40c0-983b-610c722f90b0-test-function.nar";
+            FunctionMetaData functionMetaData = FunctionMetaData.newBuilder()
+                    .setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath))
+                    .setTransformFunctionPackageLocation(Function.PackageLocationMetaData.newBuilder()
+                            .setPackagePath(transformFunctionPackagePath))
+                    .build();
             when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink)))
-                    .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
-                            Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+                    .thenReturn(functionMetaData);
 
             deregisterDefaultSink();
 
-            ctx.verify(() -> {
-                WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
-            }, times(1));
+            ctx.verify(() -> WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath)), times(1));
+            ctx.verify(() -> WorkerUtils.deleteFromBookkeeper(any(), eq(transformFunctionPackagePath)), times(1));
         }
     }
 
@@ -1266,16 +1409,19 @@ public class SinkApiV3ResourceTest {
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
 
             String packagePath = String.format("%s://data-generator", Utils.BUILTIN);
+            String transformFunctionPackagePath = String.format("%s://exclamation", Utils.BUILTIN);
+            FunctionMetaData functionMetaData = FunctionMetaData.newBuilder()
+                    .setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath))
+                    .setTransformFunctionPackageLocation(Function.PackageLocationMetaData.newBuilder()
+                            .setPackagePath(transformFunctionPackagePath))
+                    .build();
             when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink)))
-                    .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
-                            Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+                    .thenReturn(functionMetaData);
 
             deregisterDefaultSink();
 
             // if the sink is a builtin sink we shouldn't try to clean it up
-            ctx.verify(() -> {
-                WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
-            }, times(0));
+            ctx.verify(() -> WorkerUtils.deleteFromBookkeeper(any(), anyString()), times(0));
         }
     }
 
@@ -1288,22 +1434,25 @@ public class SinkApiV3ResourceTest {
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
 
             String packagePath = "http://foo.com/connector.jar";
+            String transformFunctionPackagePath = "http://foo.com/function.jar";
+            FunctionMetaData functionMetaData = FunctionMetaData.newBuilder()
+                    .setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath))
+                    .setTransformFunctionPackageLocation(Function.PackageLocationMetaData.newBuilder()
+                            .setPackagePath(transformFunctionPackagePath))
+                    .build();
+
             when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink)))
-                    .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
-                            Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+                    .thenReturn(functionMetaData);
 
             deregisterDefaultSink();
 
             // if the sink is a is download from a http url, we shouldn't try to clean it up
-            ctx.verify(() -> {
-                WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
-            }, times(0));
-
+            ctx.verify(() -> WorkerUtils.deleteFromBookkeeper(any(), anyString()), times(0));
         }
     }
 
     @Test
-    public void testDeregisterFileSinkBKPackageCleanup() throws IOException {
+    public void testDeregisterFileSinkBKPackageCleanup() {
         mockInstanceUtils();
 
         try (final MockedStatic<WorkerUtils> ctx = Mockito.mockStatic(WorkerUtils.class)) {
@@ -1311,16 +1460,20 @@ public class SinkApiV3ResourceTest {
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
 
             String packagePath = "file://foo/connector.jar";
+            String transformFunctionPackagePath = "file://foo/function.jar";
+            FunctionMetaData functionMetaData = FunctionMetaData.newBuilder()
+                    .setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath))
+                    .setTransformFunctionPackageLocation(Function.PackageLocationMetaData.newBuilder()
+                            .setPackagePath(transformFunctionPackagePath))
+                    .build();
+
             when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink)))
-                    .thenReturn(FunctionMetaData.newBuilder().setPackageLocation(
-                            Function.PackageLocationMetaData.newBuilder().setPackagePath(packagePath).build()).build());
+                    .thenReturn(functionMetaData);
 
             deregisterDefaultSink();
 
             // if the sink package has a file url, we shouldn't try to clean it up
-            ctx.verify(() -> {
-                WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath));
-            }, times(0));
+            ctx.verify(() -> WorkerUtils.deleteFromBookkeeper(any(), eq(packagePath)), times(0));
         }
     }
 
@@ -1574,7 +1727,7 @@ public class SinkApiV3ResourceTest {
 
     private FunctionDetails createDefaultFunctionDetails() throws IOException {
         return SinkConfigUtils.convert(createDefaultSinkConfig(),
-                new SinkConfigUtils.ExtractedSinkDetails(null, null));
+                new SinkConfigUtils.ExtractedSinkDetails(null, null, null));
     }
 
     /*
diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestLoggingSink.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestLoggingSink.java
new file mode 100644
index 00000000000..571eb6a9d1a
--- /dev/null
+++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestLoggingSink.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import java.util.Map;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.slf4j.Logger;
+
+public class TestLoggingSink implements Sink<GenericObject> {
+
+    private Logger logger;
+    private Producer<String> producer;
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+        logger = sinkContext.getLogger();
+        String topic = (String) sinkContext.getSinkConfig().getConfigs().get("log-topic");
+        producer = sinkContext.getPulsarClient().newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+    }
+
+    @Override
+    public void write(Record<GenericObject> record) throws Exception {
+        Object nativeObject = record.getValue().getNativeObject();
+        logger.info("Got message: " + nativeObject + " with schema" + record.getSchema());
+        String payload = nativeObject.toString();
+        if (nativeObject instanceof KeyValue) {
+            KeyValue kv = (KeyValue) nativeObject;
+            String key = kv.getKey().toString();
+            String value = kv.getValue().toString();
+
+            if (kv.getKey() instanceof GenericObject) {
+                key = ((GenericObject) kv.getKey()).getNativeObject().toString();
+            }
+            if (kv.getValue() instanceof GenericObject) {
+                value = ((GenericObject) kv.getValue()).getNativeObject().toString();
+            }
+            payload = "(key = " + key + ", value = " + value + ")";
+        }
+        producer.newMessage()
+            .properties(record.getProperties())
+            .value(record.getSchema().getSchemaInfo().getType().name() + " - " + payload)
+            .send();
+        record.ack();
+    }
+
+    @Override
+    public void close() {
+
+    }
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkWithTransformFunctionTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkWithTransformFunctionTest.java
new file mode 100644
index 00000000000..b5faf59d25a
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkWithTransformFunctionTest.java
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.functions.api.examples.pojo.Users;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.containers.StandaloneContainer;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
+import org.testcontainers.containers.Network;
+import org.testng.annotations.Test;
+
+/**
+ * Test behaviour of sinks with a transform function
+ */
+@Slf4j
+public class SinkWithTransformFunctionTest extends PulsarStandaloneTestSuite {
+
+    //Use PIP-117 new defaults so that the package management service is enabled.
+    @Override
+    public void setUpCluster() throws Exception {
+        incrementSetupNumber();
+        network = Network.newNetwork();
+        String clusterName = PulsarClusterTestBase.randomName(8);
+        container = new StandaloneContainer(clusterName, PulsarContainer.DEFAULT_IMAGE_NAME)
+                .withNetwork(network)
+                .withNetworkAliases(StandaloneContainer.NAME + "-" + clusterName)
+                .withEnv("PF_stateStorageServiceUrl", "bk://localhost:4181");
+        container.start();
+        log.info("Pulsar cluster {} is up running:", clusterName);
+        log.info("\tBinary Service Url : {}", container.getPlainTextServiceUrl());
+        log.info("\tHttp Service Url : {}", container.getHttpServiceUrl());
+
+        // add cluster to public tenant
+        ContainerExecResult result = container.execCmd(
+                "/pulsar/bin/pulsar-admin", "namespaces", "policies", "public/default");
+        assertEquals(0, result.getExitCode());
+        log.info("public/default namespace policies are {}", result.getStdout());
+    }
+
+    @Test(groups = {"sink"})
+    public void testSinkWithTransformFunction() throws Exception {
+
+        @Cleanup PulsarClient client = PulsarClient.builder()
+                .serviceUrl(container.getPlainTextServiceUrl())
+                .build();
+
+        final int numRecords = 10;
+
+        String sinkName = "sink-with-function";
+        String topicName = "sink-with-function";
+        String logTopicName = "log-sink-with-function";
+        String packageName = "function://public/default/sink-with-function-function@1.0";
+
+        submitPackage(packageName, "package-function", JAVAJAR);
+
+        submitSinkConnector(
+                sinkName,
+                topicName,
+                "org.apache.pulsar.tests.integration.io.TestLoggingSink",
+                JAVAJAR,
+                "{\"log-topic\": \"" + logTopicName + "\"}",
+                packageName,
+                "org.apache.pulsar.functions.api.examples.RecordFunction");
+
+        getSinkInfoSuccess(sinkName);
+        getSinkStatus(sinkName);
+
+        @Cleanup Producer<String> producer = client.newProducer(Schema.STRING)
+                    .topic(topicName)
+                    .create();
+
+        @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(logTopicName)
+                .subscriptionName("sub")
+                .subscribe();
+
+        for (int i = 0; i < numRecords; i++) {
+            producer.send(i + "-test");
+        }
+
+        try {
+            log.info("waiting for sink {}", sinkName);
+
+            for (int i = 0; i < numRecords; i++) {
+                Message<String> receive = consumer.receive(5, TimeUnit.SECONDS);
+                assertNotNull(receive);
+                assertEquals(receive.getValue(), "STRING - " + i + "-test!");
+            }
+        } finally {
+            dumpFunctionLogs(sinkName);
+        }
+
+        deleteSink(sinkName);
+        getSinkInfoNotFound(sinkName);
+    }
+
+    @Test(groups = {"sink"})
+    public void testGenericObjectSinkWithTransformFunction() throws Exception {
+
+        @Cleanup PulsarClient client = PulsarClient.builder()
+            .serviceUrl(container.getPlainTextServiceUrl())
+            .build();
+
+        final int numRecords = 10;
+
+        String sinkName = "sink-with-genericobject-function";
+        String topicName = "sink-with-genericobject-function";
+        String logTopicName = "log-sink-with-genericobject-function";
+        String packageName = "function://public/default/sink-with-genericobject-function-function@1.0";
+
+        submitPackage(packageName, "package-function", JAVAJAR);
+
+        submitSinkConnector(
+            sinkName,
+            topicName,
+            "org.apache.pulsar.tests.integration.io.TestLoggingSink",
+            JAVAJAR,
+            "{\"log-topic\": \"" + logTopicName + "\"}",
+            packageName,
+            "org.apache.pulsar.tests.integration.functions.RemoveAvroFieldRecordFunction");
+
+        getSinkInfoSuccess(sinkName);
+        getSinkStatus(sinkName);
+
+        try {
+            @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(logTopicName)
+                .subscriptionName("sub")
+                .subscribe();
+
+            @Cleanup Producer<Users.UserV1> producer1 = client.newProducer(Schema.AVRO(Users.UserV1.class))
+                .topic(topicName)
+                .create();
+
+            for (int i = 0; i < numRecords; i++) {
+                producer1.send(new Users.UserV1("foo" + i, i));
+            }
+
+            for (int i = 0; i < numRecords; i++) {
+                Message<String> receive = consumer.receive(5, TimeUnit.SECONDS);
+                assertNotNull(receive);
+                assertEquals(receive.getValue(), "AVRO - {\"name\": \"foo" + i + "\"}");
+            }
+
+            // Test with schema evolution
+            @Cleanup Producer<Users.UserV2> producer2 = client.newProducer(Schema.AVRO(Users.UserV2.class))
+                .topic(topicName)
+                .create();
+
+            for (int i = 0; i < numRecords; i++) {
+                producer2.send(new Users.UserV2("foo" + i, i, "bar" + i));
+            }
+
+            for (int i = 0; i < numRecords; i++) {
+                Message<String> receive = consumer.receive(5, TimeUnit.SECONDS);
+                assertNotNull(receive);
+                assertEquals(receive.getValue(), "AVRO - {\"name\": \"foo" + i + "\", \"phone\": \"bar"+ i + "\"}");
+            }
+
+            for (int i = 0; i < numRecords; i++) {
+                producer1.send(new Users.UserV1("foo" + i, i));
+            }
+
+            for (int i = 0; i < numRecords; i++) {
+                Message<String> receive = consumer.receive(5, TimeUnit.SECONDS);
+                assertNotNull(receive);
+                assertEquals(receive.getValue(), "AVRO - {\"name\": \"foo" + i + "\"}");
+            }
+        } finally {
+            dumpFunctionLogs(sinkName);
+        }
+
+        deleteSink(sinkName);
+        getSinkInfoNotFound(sinkName);
+    }
+
+    @Test(groups = {"sink"})
+    public void testKeyValueSinkWithTransformFunction() throws Exception {
+
+        @Cleanup PulsarClient client = PulsarClient.builder()
+            .serviceUrl(container.getPlainTextServiceUrl())
+            .build();
+
+        final int numRecords = 10;
+
+        String sinkName = "sink-with-kv-function";
+        String topicName = "sink-with-kv-function";
+        String logTopicName = "log-sink-with-kv-function";
+        String packageName = "function://public/default/sink-with-kv-function-function@1.0";
+
+        submitPackage(packageName, "package-function", JAVAJAR);
+
+        submitSinkConnector(
+            sinkName,
+            topicName,
+            "org.apache.pulsar.tests.integration.io.TestLoggingSink",
+            JAVAJAR,
+            "{\"log-topic\": \"" + logTopicName + "\"}",
+            packageName,
+            "org.apache.pulsar.tests.integration.functions.RemoveAvroFieldRecordFunction");
+
+        getSinkInfoSuccess(sinkName);
+        getSinkStatus(sinkName);
+
+        try {
+            @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(logTopicName)
+                .subscriptionName("sub")
+                .subscribe();
+
+            @Cleanup Producer<KeyValue<Users.UserV1, Users.UserV1>> producer = client
+                .newProducer(Schema.KeyValue(
+                    Schema.AVRO(Users.UserV1.class),
+                    Schema.AVRO(Users.UserV1.class), KeyValueEncodingType.SEPARATED))
+                .topic(topicName)
+                .create();
+
+            for (int i = 0; i < numRecords; i++) {
+                producer.send(new KeyValue<>(new Users.UserV1("foo" + i, i),
+                    new Users.UserV1("bar" + i, i + 100)));
+            }
+
+            for (int i = 0; i < numRecords; i++) {
+                Message<String> receive = consumer.receive(5, TimeUnit.SECONDS);
+                assertNotNull(receive);
+                assertEquals(receive.getValue(), "KEY_VALUE - (key = {\"age\": " + i
+                    + ", \"name\": \"foo" + i + "\"}, value = "
+                    + "{\"name\": \"bar" + i + "\"})");
+            }
+        } finally {
+            dumpFunctionLogs(sinkName);
+        }
+
+        deleteSink(sinkName);
+        getSinkInfoNotFound(sinkName);
+    }
+
+
+    private void submitPackage(String packageName, String description, String packagePath) throws Exception {
+        String[] commands = {
+                PulsarCluster.ADMIN_SCRIPT,
+                "packages", "upload",
+                packageName,
+                "--description", description,
+                "--path", packagePath
+
+        };
+        log.info("Run command : {}", StringUtils.join(commands, ' '));
+        ContainerExecResult result = container.execCmd(commands);
+        assertTrue(
+                result.getStdout().contains("successfully"),
+                result.getStdout());
+    }
+
+    private void submitSinkConnector(String sinkName,
+                                     String inputTopicName,
+                                     String className,
+                                     String archive,
+                                     String configs,
+                                     String transformFunction,
+                                     String transformFunctionClassName) throws Exception {
+        String[] commands = {
+                PulsarCluster.ADMIN_SCRIPT,
+                "sinks", "create",
+                "--name", sinkName,
+                "-i", inputTopicName,
+                "--archive", archive,
+                "--classname", className,
+                "--sink-config", configs,
+                "--transform-function", transformFunction,
+                "--transform-function-classname", transformFunctionClassName
+        };
+        log.info("Run command : {}", StringUtils.join(commands, ' '));
+        ContainerExecResult result = container.execCmd(commands);
+        assertTrue(
+                result.getStdout().contains("Created successfully"),
+                result.getStdout());
+    }
+
+    private void getSinkInfoSuccess(String sinkName) throws Exception {
+        ContainerExecResult result = container.execCmd(
+                PulsarCluster.ADMIN_SCRIPT,
+                "sinks",
+                "get",
+                "--tenant", "public",
+                "--namespace", "default",
+                "--name", sinkName
+        );
+        assertTrue(result.getStdout().contains("\"name\": \"" + sinkName + "\""));
+    }
+
+    private void getSinkStatus(String sinkName) throws Exception {
+        ContainerExecResult result = container.execCmd(
+                PulsarCluster.ADMIN_SCRIPT,
+                "sinks",
+                "status",
+                "--tenant", "public",
+                "--namespace", "default",
+                "--name", sinkName
+        );
+        log.info(result.getStdout());
+        log.info(result.getStderr());
+        assertTrue(result.getStdout().contains("\"running\" : true"));
+    }
+
+    private void deleteSink(String sinkName) throws Exception {
+        ContainerExecResult result = container.execCmd(
+                PulsarCluster.ADMIN_SCRIPT,
+                "sinks",
+                "delete",
+                "--tenant", "public",
+                "--namespace", "default",
+                "--name", sinkName
+        );
+        assertTrue(result.getStdout().contains("successfully"));
+        result.assertNoStderr();
+    }
+
+    private void getSinkInfoNotFound(String sinkName) throws Exception {
+        try {
+            container.execCmd(
+                    PulsarCluster.ADMIN_SCRIPT,
+                    "sinks",
+                    "get",
+                    "--tenant", "public",
+                    "--namespace", "default",
+                    "--name", sinkName);
+            fail("Command should have exited with non-zero");
+        } catch (ContainerExecException e) {
+            assertTrue(e.getResult().getStderr().contains(sinkName + " doesn't exist"));
+        }
+    }
+}
+