You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/05 21:49:52 UTC

[incubator-pulsar] branch master updated: Support running function worker along with broker (#1329)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b701925  Support running function worker along with broker (#1329)
b701925 is described below

commit b7019252d832c42fb244c07fc92ce4f59801c952
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Mon Mar 5 13:49:50 2018 -0800

    Support running function worker along with broker (#1329)
    
    * Move pulsar functions dependency version to root pom and remove duplicated license headers
    
    This addresses some comments in pulsar functions PR #1314
    
    * shade worker
    
    * Fix broken master
    
    * Upgrade the bookkeeper storage client dependency to the official bookkeeper version
    
    This removes the temp dependency in `pulsar-functions-instance`
    
    * set `protobuf2.version` in pulsar-common
    
    * provide a shaded worker
    
    * include worker dependency at broker
    
    * Embeded function worker at broker
    
    * rename 'function worker' to 'functions worker'
    
    * add "--no-functions-worker" for pulsar-client-cpp tests
---
 bin/pulsar                                         |  59 ++-
 buildtools/pom.xml                                 |  15 +
 conf/broker.conf                                   |   6 +
 conf/functions_worker.yml                          |  46 +++
 conf/log4j2.yaml                                   | 118 +++++-
 managed-ledger/pom.xml                             |  15 -
 pom.xml                                            |  38 +-
 .../apache/pulsar/broker/ServiceConfiguration.java |  13 +
 pulsar-broker/pom.xml                              |  90 +++++
 .../org/apache/pulsar/PulsarBrokerStarter.java     |  55 ++-
 .../org/apache/pulsar/PulsarStandaloneStarter.java |  42 +-
 .../org/apache/pulsar/broker/PulsarService.java    |  12 +
 .../pulsar/broker/admin/impl/FunctionsBase.java    | 142 +++++++
 .../apache/pulsar/broker/admin/v1/Functions.java   |  27 +-
 .../apache/pulsar/broker/admin/v2/Functions.java   |  27 +-
 .../org/apache/pulsar/broker/web/WebService.java   |   2 +
 pulsar-client-cpp/run-unit-tests.sh                |   3 +-
 .../pom.xml                                        |  86 ++--
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |   0
 .../apache/pulsar/admin/cli/utils/IOUtilsTest.java |   0
 .../cli/utils/NameValueParameterSplitterTest.java  |   0
 .../pulsar/client/cli/PulsarClientToolTest.java    |   0
 pulsar-client-tools/pom.xml                        | 133 +++----
 pulsar-common/pom.xml                              |   2 +-
 pulsar-functions/bin/pulsar                        |   6 +-
 pulsar-functions/cli/pom.xml                       |  14 -
 pulsar-functions/instance/pom.xml                  |   1 -
 pulsar-functions/pom.xml                           |   2 +
 pulsar-functions/runtime/pom.xml                   |   5 +
 pulsar-functions/utils/pom.xml                     |  14 -
 pulsar-functions/worker-runner/pom.xml             | 111 ++++++
 pulsar-functions/worker-shaded/pom.xml             | 113 ++++++
 pulsar-functions/worker/pom.xml                    |  14 -
 .../pulsar/functions/worker/MembershipManager.java |   2 +-
 .../org/apache/pulsar/functions/worker/Worker.java |  11 +-
 .../pulsar/functions/worker/WorkerService.java     |  12 +-
 .../functions/worker/rest/FunctionApiResource.java |  14 +-
 ...nctionApiV2Resource.java => FunctionsImpl.java} | 110 +++---
 .../worker/rest/api/v2/FunctionApiV2Resource.java  | 440 +--------------------
 .../rest/api/v2/FunctionApiV2ResourceTest.java     |   7 +-
 pulsar-zookeeper-utils/pom.xml                     |  15 -
 41 files changed, 1040 insertions(+), 782 deletions(-)

diff --git a/bin/pulsar b/bin/pulsar
index ed7e988..cafc359 100755
--- a/bin/pulsar
+++ b/bin/pulsar
@@ -31,6 +31,14 @@ DEFAULT_STANDALONE_CONF=$PULSAR_HOME/conf/standalone.conf
 DEFAULT_WEBSOCKET_CONF=$PULSAR_HOME/conf/websocket.conf
 DEFAULT_LOG_CONF=$PULSAR_HOME/conf/log4j2.yaml
 
+# functions related variables
+FUNCTIONS_HOME=$PULSAR_HOME/pulsar-functions
+DEFAULT_WORKER_CONF=$PULSAR_HOME/conf/functions_worker.yml
+DEFAULT_JAVA_INSTANCE_JAR=$PULSAR_HOME/instances/java-instance.jar
+JAVA_INSTANCE_JAR=${PULSAR_JAVA_INSTANCE_JAR:-"${DEFAULT_JAVA_INSTANCE_JAR}"}
+DEFAULT_PY_INSTANCE_FILE=$PULSAR_HOME/instances/python-instance/python_instance_main.py
+PY_INSTANCE_FILE=${PULSAR_PY_INSTANCE_FILE:-"${DEFAULT_PY_INSTANCE_FILE}"}
+
 if [ -f "$PULSAR_HOME/conf/pulsar_env.sh" ]
 then
     . "$PULSAR_HOME/conf/pulsar_env.sh"
@@ -63,6 +71,34 @@ elif [ -e "$BUILT_JAR" ]; then
     PULSAR_JAR=$BUILT_JAR
 fi
 
+#
+# find the instance locations for pulsar-functions
+#
+
+# find the java instance location
+if [ ! -f "${JAVA_INSTANCE_JAR}" ]; then
+    # didn't find a released jar, then search the built jar
+    BUILT_JAVA_INSTANCE_JAR="${FUNCTIONS_HOME}/runtime/target/java-instance.jar"
+    if [ -z "${BUILT_JAVA_INSTANCE_JAR}" ]; then
+        echo "\nCouldn't find pulsar-functions java instance jar.";
+        echo "Make sure you've run 'mvn package'\n";
+        exit 1;
+    fi
+    JAVA_INSTANCE_JAR=${BUILT_JAVA_INSTANCE_JAR}
+fi
+
+# find the python instance location
+if [ ! -f "${PY_INSTANCE_FILE}" ]; then
+    # didn't find a released python instance, then search the built python instance
+    BUILT_PY_INSTANCE_FILE="${FUNCTIONS_HOME}/runtime/target/python-instance/python_instance_main.py"
+    if [ -z "${BUILT_PY_INSTANCE_FILE}" ]; then
+        echo "\nCouldn't find pulsar-functions python instance.";
+        echo "Make sure you've run 'mvn package'\n";
+        exit 1;
+    fi
+    PY_INSTANCE_FILE=${BUILT_PY_INSTANCE_FILE}
+fi
+
 pulsar_help() {
     cat <<EOF
 Usage: pulsar <command>
@@ -74,6 +110,7 @@ where command is one of:
     discovery           Run a discovery server
     proxy               Run a pulsar proxy
     websocket           Run a web socket proxy server
+    functions-worker    Run a functions worker server
     standalone          Run a broker server with local bookies and local zookeeper
     compact-topic       Run compaction against a topic
 
@@ -93,6 +130,7 @@ Environment variables:
    PULSAR_DISCOVERY_CONF         Configuration file for discovery service (default: $DEFAULT_DISCOVERY_CONF)
    PULSAR_WEBSOCKET_CONF         Configuration file for websocket proxy (default: $DEFAULT_WEBSOCKET_CONF)
    PULSAR_PROXY_CONF             Configuration file for Pulsar proxy (default: $DEFAULT_PROXY_CONF)
+   PULSAR_WORKER_CONF            Configuration file for functions worker (default: $DEFAULT_WORKER_CONF)
    PULSAR_STANDALONE_CONF        Configuration file for standalone (default: $DEFAULT_STANDALONE_CONF)
    PULSAR_EXTRA_OPTS             Extra options to be passed to the jvm
    PULSAR_EXTRA_CLASSPATH        Add extra paths to the pulsar classpath
@@ -115,7 +153,7 @@ add_maven_deps_to_classpath() {
     f="${PULSAR_HOME}/all/target/classpath.txt"
     if [ ! -f "${f}" ]
     then
-	    ${MVN} -f "${PULSAR_HOME}/pom.xml" dependency:build-classpath -Dmdep.outputFile="${f}" &> /dev/null
+	    ${MVN} -f "${PULSAR_HOME}/pom.xml" dependency:build-classpath -DincludeScope=compile -Dmdep.outputFile="${f}" &> /dev/null
     fi
     PULSAR_CLASSPATH=${CLASSPATH}:`cat "${f}"`
 }
@@ -136,6 +174,10 @@ fi
 COMMAND=$1
 shift
 
+if [ -z "$PULSAR_WORKER_CONF" ]; then
+    PULSAR_WORKER_CONF=$DEFAULT_WORKER_CONF
+fi
+
 if [ -z "$PULSAR_BROKER_CONF" ]; then
     PULSAR_BROKER_CONF=$DEFAULT_BROKER_CONF
 fi
@@ -186,12 +228,22 @@ OPTS="-cp $PULSAR_CLASSPATH $OPTS"
 OPTS="$OPTS $PULSAR_EXTRA_OPTS"
 
 # log directory & file
-PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"Console"}
 PULSAR_LOG_DIR=${PULSAR_LOG_DIR:-"$PULSAR_HOME/logs"}
+PULSAR_LOG_APPENDER=${PULSAR_LOG_APPENDER:-"RoutingAppender"}
+PULSAR_LOG_LEVEL=${PULSAR_LOG_LEVEL:-"info"}
+PULSAR_ROUTING_APPENDER_DEFAULT=${PULSAR_ROUTING_APPENDER_DEFAULT:-"Console"}
 
 #Configure log configuration system properties
 OPTS="$OPTS -Dpulsar.log.appender=$PULSAR_LOG_APPENDER"
 OPTS="$OPTS -Dpulsar.log.dir=$PULSAR_LOG_DIR"
+OPTS="$OPTS -Dpulsar.log.level=$PULSAR_LOG_LEVEL"
+OPTS="$OPTS -Dpulsar.routing.appender.default=$PULSAR_ROUTING_APPENDER_DEFAULT"
+
+# Functions related logging
+OPTS="$OPTS -Dpulsar.functions.process.container.log.dir=$PULSAR_LOG_DIR"
+# instance
+OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}"
+OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}"
 
 #Change to PULSAR_HOME to support relative paths
 cd "$PULSAR_HOME"
@@ -218,6 +270,9 @@ elif [ $COMMAND == "proxy" ]; then
 elif [ $COMMAND == "websocket" ]; then
     PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-websocket.log"}
     exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.websocket.service.WebSocketServiceStarter $PULSAR_WEBSOCKET_CONF $@
+elif [ $COMMAND == "functions-worker" ]; then
+    PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-functions-worker.log"}
+    exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.functions.worker.FunctionWorkerStarter -c $PULSAR_WORKER_CONF $@
 elif [ $COMMAND == "standalone" ]; then
     PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-standalone.log"}
     exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.PulsarStandaloneStarter --config $PULSAR_STANDALONE_CONF $@
diff --git a/buildtools/pom.xml b/buildtools/pom.xml
index 388f043..d83bb2c 100644
--- a/buildtools/pom.xml
+++ b/buildtools/pom.xml
@@ -40,6 +40,21 @@
       <artifactId>testng</artifactId>
       <version>6.13.1</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-api</artifactId>
+      <version>2.10.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-core</artifactId>
+      <version>2.10.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-slf4j-impl</artifactId>
+      <version>2.10.0</version>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/conf/broker.conf b/conf/broker.conf
index 93489e5..55e6f19 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -440,3 +440,9 @@ webSocketConnectionsPerBroker=8
 
 # Enable topic level metrics
 exposeTopicLevelMetricsInPrometheus=true
+
+### --- Functions --- ###
+
+# Enable Functions Worker Service in Broker
+functionsWorkerEnabled=false
+
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
new file mode 100644
index 0000000..c2c4091
--- /dev/null
+++ b/conf/functions_worker.yml
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+workerId: standalone
+workerHostname: localhost
+workerPort: 6750
+functionMetadataTopicName: metadata
+functionMetadataSnapshotsTopicPath: snapshots
+clusterCoordinationTopicName: coordinate
+pulsarFunctionsNamespace: sample/standalone/functions
+pulsarServiceUrl: pulsar://localhost:6650
+pulsarWebServiceUrl: http://localhost:8080
+numFunctionPackageReplicas: 1
+downloadDirectory: /tmp/pulsar_functions
+metricsConfig:
+  metricsSinkClassName: org.apache.pulsar.functions.metrics.sink.PrometheusSink
+  metricsCollectionInterval: 30
+  metricsSinkConfig:
+    path: /metrics
+    port: 9099
+#threadContainerFactory:
+#  threadGroupName: "Thread Function Container Group"
+processContainerFactory:
+  logDirectory:
+  
+schedulerClassName: "org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler"
+functionAssignmentTopicName: "assignments"
+failureCheckFreqMs: 30000
+rescheduleTimeoutMs: 60000
+initialBrokerReconnectMaxRetries: 60
diff --git a/conf/log4j2.yaml b/conf/log4j2.yaml
index 13b07ad..cd0ec88 100644
--- a/conf/log4j2.yaml
+++ b/conf/log4j2.yaml
@@ -23,6 +23,21 @@ Configuration:
   monitorInterval: 30
   name: pulsar
 
+  Properties:
+    Property:
+      - name: "pulsar.log.dir"
+        value: "logs"
+      - name: "pulsar.log.file"
+        value: "pulsar.log"
+      - name: "pulsar.log.appender"
+        value: "RoutingAppender"
+      - name: "pulsar.log.level"
+        value: "info"
+      - name: "pulsar.routing.appender.default"
+        value: "Console"
+      - name: "bk.log.level"
+        value: "info"
+
   # Example: logger-filter script
   Scripts:
     ScriptFile:
@@ -32,19 +47,19 @@ Configuration:
       charset: UTF-8
   
   Appenders:
-  
-    # Console 
+
+    # Console
     Console:
       name: Console
       target: SYSTEM_OUT
       PatternLayout:
         Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
-        
-   # Rolling file appender configuration
+
+    # Rolling file appender configuration
     RollingFile:
       name: RollingFile
       fileName: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}"
-      filePattern: "/${sys:pulsar.log.file}-%d{MM-dd-yyyy}-%i.log.gz"
+      filePattern: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}-%d{MM-dd-yyyy}-%i.log.gz"
       immediateFlush: false
       PatternLayout:
         Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
@@ -54,7 +69,7 @@ Configuration:
           modulate: true
         SizeBasedTriggeringPolicy:
           size: 1 GB
-        # Trigger every day at midnight that also scan 
+        # Trigger every day at midnight that also scan
         # roll-over strategy that deletes older file
         CronTriggeringPolicy:
           schedule: "0 0 0 * * ?"
@@ -67,16 +82,101 @@ Configuration:
               glob: "*/${sys:pulsar.log.file}*log.gz"
             IfLastModified:
               age: 30d
-              
+
+    # Rolling file appender configuration for bk
+    RollingRandomAccessFile:
+      name: BkRollingFile
+      fileName: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}.bk"
+      filePattern: "${sys:pulsar.log.dir}/${sys:pulsar.log.file}.bk-%d{MM-dd-yyyy}-%i.log.gz"
+      immediateFlush: true
+      PatternLayout:
+        Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
+      Policies:
+        TimeBasedTriggeringPolicy:
+          interval: 1
+          modulate: true
+        SizeBasedTriggeringPolicy:
+          size: 1 GB
+        # Trigger every day at midnight that also scan
+        # roll-over strategy that deletes older file
+        CronTriggeringPolicy:
+          schedule: "0 0 0 * * ?"
+      # Delete file older than 30days
+      DefaultRolloverStrategy:
+          Delete:
+            basePath: ${sys:pulsar.log.dir}
+            maxDepth: 2
+            IfFileName:
+              glob: "*/${sys:pulsar.log.file}.bk*log.gz"
+            IfLastModified:
+              age: 30d
+
+    # Routing
+    Routing:
+      name: RoutingAppender
+      Routes:
+        pattern: "$${ctx:function}"
+        Route:
+          -
+            Routing:
+              name: InstanceRoutingAppender
+              Routes:
+                pattern: "$${ctx:instance}"
+                Route:
+                  -
+                    RollingFile:
+                      name: "Rolling-${ctx:function}"
+                      fileName : "${sys:pulsar.log.dir}/functions/${ctx:function}/function.log"
+                      filePattern : "${sys:pulsar.log.dir}/functions/${ctx:function}-%d{MM-dd-yyyy}-%i.log.gz"
+                      PatternLayout:
+                        Pattern: "%d{ABSOLUTE} %level{length=5} [%thread] [instance: %X{instance}] %logger{1} - %msg%n"
+                      Policies:
+                        TimeBasedTriggeringPolicy:
+                          interval: 1
+                          modulate: true
+                        SizeBasedTriggeringPolicy:
+                          size: "20MB"
+                        # Trigger every day at midnight that also scan
+                        # roll-over strategy that deletes older file
+                        CronTriggeringPolicy:
+                          schedule: "0 0 0 * * ?"
+                      # Delete file older than 30days
+                      DefaultRolloverStrategy:
+                          Delete:
+                            basePath: ${sys:pulsar.log.dir}
+                            maxDepth: 2
+                            IfFileName:
+                              glob: "*/${sys:pulsar.log.file}*log.gz"
+                            IfLastModified:
+                              age: 30d
+                  - ref: "${sys:pulsar.routing.appender.default}"
+                    key: "${ctx:function}"
+          - ref: "${sys:pulsar.routing.appender.default}"
+            key: "${ctx:function}"
 
   Loggers:
+
+    Logger:
+      name: org.apache.bookkeeper
+      level: "${sys:bk.log.level}"
+      additivity: false
+      AppenderRef:
+        - ref: BkRollingFile
+
+    Logger:
+      name: org.apache.distributedlog
+      level: "${sys:bk.log.level}"
+      additivity: false
+      AppenderRef:
+        - ref: BkRollingFile
   
     # Default root logger configuration
     Root:
       level: info
       additivity: false
       AppenderRef:
-        ref: "${sys:pulsar.log.appender}"
+        - ref: "${sys:pulsar.log.appender}"
+          level: "${sys:pulsar.log.level}"
     
     # Logger to inject filter script
 #    Logger:
@@ -90,4 +190,4 @@ Configuration:
 #          onMisMatch: DENY
 #          ScriptRef:
 #            ref: filter.js
-        
\ No newline at end of file
+        
diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml
index 3cd0726..b4f0017 100644
--- a/managed-ledger/pom.xml
+++ b/managed-ledger/pom.xml
@@ -36,21 +36,6 @@
     <dependency>
       <groupId>org.apache.bookkeeper</groupId>
       <artifactId>bookkeeper-server-shaded</artifactId>
-      <version>${bookkeeper.version}</version>
-      <exclusions>
-        <exclusion>
-          <artifactId>slf4j-log4j12</artifactId>
-          <groupId>org.slf4j</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>log4j</artifactId>
-          <groupId>log4j</groupId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.jboss.netty</groupId>
-          <artifactId>netty</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
 
     <dependency>
diff --git a/pom.xml b/pom.xml
index 8d6dfb2..b6ac3b1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -85,6 +85,7 @@ flexible messaging model and an intuitive client API.</description>
     <module>pulsar-client-admin-shaded</module>
     <module>pulsar-client-tools</module>
     <module>pulsar-client-tools-shaded</module>
+    <module>pulsar-client-tools-test</module>
     <module>pulsar-websocket</module>
     <module>pulsar-proxy</module>
     <module>pulsar-discovery-service</module>
@@ -130,6 +131,7 @@ flexible messaging model and an intuitive client API.</description>
     <puppycrawl.checkstyle.version>6.19</puppycrawl.checkstyle.version>
     <dockerfile-maven.version>1.3.7</dockerfile-maven.version>
     <typetools.version>0.5.0</typetools.version>
+    <protobuf2.version>2.4.1</protobuf2.version>
     <protobuf3.version>3.5.1</protobuf3.version>
     <grpc.version>1.5.0</grpc.version>
     <protoc-gen-grpc-java.version>1.0.0</protoc-gen-grpc-java.version>
@@ -232,6 +234,26 @@ flexible messaging model and an intuitive client API.</description>
 
       <dependency>
         <groupId>org.apache.bookkeeper</groupId>
+        <artifactId>bookkeeper-server-shaded</artifactId>
+        <version>${bookkeeper.version}</version>
+        <exclusions>
+          <exclusion>
+            <artifactId>slf4j-log4j12</artifactId>
+            <groupId>org.slf4j</groupId>
+          </exclusion>
+          <exclusion>
+            <artifactId>log4j</artifactId>
+            <groupId>log4j</groupId>
+          </exclusion>
+          <exclusion>
+            <groupId>org.jboss.netty</groupId>
+            <artifactId>netty</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.bookkeeper</groupId>
         <artifactId>stream-storage-java-client</artifactId>
         <version>${bookkeeper.version}</version>
       </dependency>
@@ -682,22 +704,6 @@ flexible messaging model and an intuitive client API.</description>
     </dependency>
 
     <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-api</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-core</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-slf4j-impl</artifactId>
-      <scope>test</scope>
-    </dependency>
-
-    <dependency>
       <groupId>org.testng</groupId>
       <artifactId>testng</artifactId>
       <scope>test</scope>
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 8aa2f82..bd47970 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -425,6 +425,9 @@ public class ServiceConfiguration implements PulsarConfiguration {
     // If true, export topic level metrics otherwise namespace level
     private boolean exposeTopicLevelMetricsInPrometheus = true;
 
+    /**** --- Functions --- ****/
+    private boolean functionsWorkerEnabled = false;
+
     public String getZookeeperServers() {
         return zookeeperServers;
     }
@@ -1470,4 +1473,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
     public void setTlsCiphers(Set<String> tlsCiphers) {
         this.tlsCiphers = tlsCiphers;
     }
+
+    /**** --- Function ---- ****/
+
+    public void setFunctionsWorkerEnabled(boolean enabled) {
+        this.functionsWorkerEnabled = enabled;
+    }
+
+    public boolean isFunctionsWorkerEnabled() {
+        return functionsWorkerEnabled;
+    }
 }
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 14dcca7..abbc831 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -119,6 +119,96 @@
       <scope>test</scope>
     </dependency>
 
+    <!-- functions related dependencies (begin) -->
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-functions-worker-shaded</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>io.grpc</groupId>
+          <artifactId>grpc-all</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-lite</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-java</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.protobuf.nano</groupId>
+          <artifactId>protobuf-javanano</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-java-util</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.grpc</groupId>
+          <artifactId>grpc-protobuf</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.grpc</groupId>
+          <artifactId>grpc-protobuf-lite</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.grpc</groupId>
+          <artifactId>grpc-protobuf-nano</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.pulsar</groupId>
+          <artifactId>pulsar-functions-utils</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.pulsar</groupId>
+          <artifactId>pulsar-functions-metrics</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.pulsar</groupId>
+          <artifactId>pulsar-functions-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.pulsar</groupId>
+          <artifactId>pulsar-functions-instance</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.pulsar</groupId>
+          <artifactId>pulsar-functions-worker</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>net.jodah</groupId>
+      <artifactId>typetools</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-functions-api</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.glassfish.jersey.media</groupId>
+      <artifactId>jersey-media-multipart</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.distributedlog</groupId>
+      <artifactId>distributedlog-core-shaded</artifactId>
+    </dependency>
+
+    <!-- functions related dependencies (end) -->
+
     <dependency>
       <groupId>org.eclipse.jetty</groupId>
       <artifactId>jetty-server</artifactId>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
index 40f8d59..42d90e2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
@@ -34,6 +34,7 @@ import java.io.FileInputStream;
 import java.net.MalformedURLException;
 import java.nio.file.Paths;
 import java.util.Arrays;
+import java.util.Optional;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.replication.AutoRecoveryMain;
@@ -42,6 +43,9 @@ import org.apache.bookkeeper.util.ReflectionUtils;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.apache.pulsar.functions.worker.WorkerService;
 import org.aspectj.weaver.loadtime.Agent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,6 +76,12 @@ public class PulsarBrokerStarter {
         @Parameter(names = {"-bc", "--bookie-conf"}, description = "Configuration file for Bookie")
         private String bookieConfigFile = Paths.get("").toAbsolutePath().normalize().toString() + "/conf/bookkeeper.conf";
 
+        @Parameter(names = {"-rfw", "--run-functions-worker"}, description = "Run functions worker with Broker")
+        private boolean runFunctionsWorker = false;
+
+        @Parameter(names = {"-fwc", "--functions-worker-conf"}, description = "Configuration file for Functions Worker")
+        private String fnWorkerConfigFile = Paths.get("").toAbsolutePath().normalize().toString() + "/conf/functions_worker.yml";
+
         @Parameter(names = {"-h", "--help"}, description = "Show this help message")
         private boolean help = false;
     }
@@ -103,6 +113,7 @@ public class PulsarBrokerStarter {
         private final AutoRecoveryMain autoRecoveryMain;
         private final StatsProvider bookieStatsProvider;
         private final ServerConfiguration bookieConfig;
+        private final WorkerService functionsWorkerService;
 
         BrokerStarter(String[] args) throws Exception{
             StarterArguments starterArguments = new StarterArguments();
@@ -116,15 +127,40 @@ public class PulsarBrokerStarter {
                 System.exit(-1);
             }
 
-            // init broker config and pulsar service
+            // init broker config
             if (isBlank(starterArguments.brokerConfigFile)) {
                 jcommander.usage();
                 throw new IllegalArgumentException("Need to specify a configuration file for broker");
             } else {
                 brokerConfig = loadConfig(starterArguments.brokerConfigFile);
-                pulsarService = new PulsarService(brokerConfig);
             }
 
+            // init functions worker
+            if (starterArguments.runFunctionsWorker || brokerConfig.isFunctionsWorkerEnabled()) {
+                WorkerConfig workerConfig;
+                if (isBlank(starterArguments.fnWorkerConfigFile)) {
+                    workerConfig = new WorkerConfig();
+                } else {
+                    workerConfig = WorkerConfig.load(starterArguments.fnWorkerConfigFile);
+                }
+                // worker talks to local broker
+                workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + brokerConfig.getBrokerServicePort());
+                workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + brokerConfig.getWebServicePort());
+                String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
+                    brokerConfig.getAdvertisedAddress());
+                workerConfig.setWorkerHostname(hostname);
+                workerConfig.setWorkerId(
+                    "c-" + brokerConfig.getClusterName()
+                        + "-fw-" + hostname
+                        + "-" + workerConfig.getWorkerPort());
+                functionsWorkerService = new WorkerService(workerConfig);
+            } else {
+                functionsWorkerService = null;
+            }
+
+            // init pulsar service
+            pulsarService = new PulsarService(brokerConfig, Optional.ofNullable(functionsWorkerService));
+
             // if no argument to run bookie in cmd line, read from pulsar config
             if (!argsContains(args, "-rb") && !argsContains(args, "--run-bookie")) {
                 checkState(starterArguments.runBookie == false,
@@ -189,6 +225,16 @@ public class PulsarBrokerStarter {
 
             pulsarService.start();
             log.info("PulsarService started.");
+
+            // after broker is started, start the functions worker
+            if (null != functionsWorkerService) {
+                try {
+                    functionsWorkerService.start();
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                    throw ie;
+                }
+            }
         }
 
         public void join() throws InterruptedException {
@@ -203,6 +249,11 @@ public class PulsarBrokerStarter {
         }
 
         public void shutdown() {
+            if (null != functionsWorkerService) {
+                functionsWorkerService.stop();
+                log.info("Shut down functions worker service successfully.");
+            }
+
             pulsarService.getShutdownService().run();
             log.info("Shut down broker service successfully.");
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
index 314eacf..6c8dc14 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
@@ -23,6 +23,8 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
 import java.io.FileInputStream;
 import java.net.URL;
 
+import java.nio.file.Paths;
+import java.util.Optional;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -31,6 +33,8 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.aspectj.weaver.loadtime.Agent;
 import org.slf4j.Logger;
@@ -48,6 +52,7 @@ public class PulsarStandaloneStarter {
     PulsarAdmin admin;
     LocalBookkeeperEnsemble bkEnsemble;
     ServiceConfiguration config;
+    WorkerService fnWorkerService;
 
     @Parameter(names = { "-c", "--config" }, description = "Configuration file path", required = true)
     private String configFile;
@@ -76,6 +81,12 @@ public class PulsarStandaloneStarter {
     @Parameter(names = { "--only-broker" }, description = "Only start Pulsar broker service (no ZK, BK)")
     private boolean onlyBroker = false;
 
+    @Parameter(names = {"-nfw", "--no-functions-worker"}, description = "Run functions worker with Broker")
+    private boolean noFunctionsWorker = false;
+
+    @Parameter(names = {"-fwc", "--functions-worker-conf"}, description = "Configuration file for Functions Worker")
+    private String fnWorkerConfigFile = Paths.get("").toAbsolutePath().normalize().toString() + "/conf/functions_worker.yml";
+
     @Parameter(names = { "-a", "--advertised-address" }, description = "Standalone broker advertised address")
     private String advertisedAddress = null;
 
@@ -127,6 +138,10 @@ public class PulsarStandaloneStarter {
         Runtime.getRuntime().addShutdownHook(new Thread() {
             public void run() {
                 try {
+                    if (fnWorkerService != null) {
+                        fnWorkerService.stop();
+                    }
+
                     if (broker != null) {
                         broker.close();
                     }
@@ -162,8 +177,29 @@ public class PulsarStandaloneStarter {
         // load aspectj-weaver agent for instrumentation
         AgentLoader.loadAgentClass(Agent.class.getName(), null);
 
+        // initialize the functions worker
+        if (!noFunctionsWorker) {
+            WorkerConfig workerConfig;
+            if (isBlank(fnWorkerConfigFile)) {
+                workerConfig = new WorkerConfig();
+            } else {
+                workerConfig = WorkerConfig.load(fnWorkerConfigFile);
+            }
+            // worker talks to local broker
+            workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort());
+            workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + config.getWebServicePort());
+            String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
+                config.getAdvertisedAddress());
+            workerConfig.setWorkerHostname(hostname);
+            workerConfig.setWorkerId(
+                "c-" + config.getClusterName()
+                    + "-fw-" + hostname
+                    + "-" + workerConfig.getWorkerPort());
+            fnWorkerService = new WorkerService(workerConfig);
+        }
+
         // Start Broker
-        broker = new PulsarService(config);
+        broker = new PulsarService(config, Optional.ofNullable(fnWorkerService));
         broker.start();
 
         // Create a sample namespace
@@ -203,6 +239,10 @@ public class PulsarStandaloneStarter {
             log.info(e.getMessage());
         }
 
+        if (null != fnWorkerService) {
+            fnWorkerService.start();
+        }
+
         log.debug("--- setup completed ---");
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 9b23433..6bbc1e6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.net.URL;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -62,6 +63,7 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.utils.PulsarBrokerVersionStringUtils;
 import org.apache.pulsar.websocket.WebSocketConsumerServlet;
 import org.apache.pulsar.websocket.WebSocketProducerServlet;
@@ -121,6 +123,7 @@ public class PulsarService implements AutoCloseable {
     private final String brokerServiceUrl;
     private final String brokerServiceUrlTls;
     private final String brokerVersion;
+    private final Optional<WorkerService> functionWorkerService;
 
     private final MessagingServiceShutdownHook shutdownService;
 
@@ -136,6 +139,10 @@ public class PulsarService implements AutoCloseable {
     private final Condition isClosedCondition = mutex.newCondition();
 
     public PulsarService(ServiceConfiguration config) {
+        this(config, Optional.empty());
+    }
+
+    public PulsarService(ServiceConfiguration config, Optional<WorkerService> functionWorkerService) {
         // Validate correctness of configuration
         PulsarConfigurationLoader.isComplete(config);
 
@@ -151,6 +158,7 @@ public class PulsarService implements AutoCloseable {
         this.shutdownService = new MessagingServiceShutdownHook(this);
         this.loadManagerExecutor = Executors
                 .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-load-manager"));
+        this.functionWorkerService = functionWorkerService;
     }
 
     /**
@@ -543,6 +551,10 @@ public class PulsarService implements AutoCloseable {
         return this.nsservice;
     }
 
+    public WorkerService getWorkerService() {
+        return functionWorkerService.orElse(null);
+    }
+
     /**
      * Get a reference of the current <code>BrokerService</code> instance associated with the current
      * <code>PulsarService</code> instance.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
new file mode 100644
index 0000000..7080474
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.function.Supplier;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.glassfish.jersey.media.multipart.FormDataParam;
+
+public class FunctionsBase extends AdminResource implements Supplier<WorkerService> {
+
+    private final FunctionsImpl functions;
+
+    public FunctionsBase() {
+        this.functions = new FunctionsImpl(this);
+    }
+
+    @Override
+    public WorkerService get() {
+        return pulsar().getWorkerService();
+    }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{functionName}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response registerFunction(final @PathParam("tenant") String tenant,
+                                     final @PathParam("namespace") String namespace,
+                                     final @PathParam("functionName") String functionName,
+                                     final @FormDataParam("data") InputStream uploadedInputStream,
+                                     final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                                     final @FormDataParam("functionConfig") String functionConfigJson) {
+
+        return functions.registerFunction(
+            tenant, namespace, functionName, uploadedInputStream, fileDetail, functionConfigJson);
+
+    }
+
+    @PUT
+    @Path("/{tenant}/{namespace}/{functionName}")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response updateFunction(final @PathParam("tenant") String tenant,
+                                   final @PathParam("namespace") String namespace,
+                                   final @PathParam("functionName") String functionName,
+                                   final @FormDataParam("data") InputStream uploadedInputStream,
+                                   final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                                   final @FormDataParam("functionConfig") String functionConfigJson) {
+
+        return functions.updateFunction(
+            tenant, namespace, functionName, uploadedInputStream, fileDetail, functionConfigJson);
+
+    }
+
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/{functionName}")
+    public Response deregisterFunction(final @PathParam("tenant") String tenant,
+                                       final @PathParam("namespace") String namespace,
+                                       final @PathParam("functionName") String functionName) {
+        return functions.deregisterFunction(
+            tenant, namespace, functionName);
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}/{functionName}")
+    public Response getFunctionInfo(final @PathParam("tenant") String tenant,
+                                    final @PathParam("namespace") String namespace,
+                                    final @PathParam("functionName") String functionName)
+            throws IOException {
+        return functions.getFunctionInfo(
+            tenant, namespace, functionName);
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/status")
+    public Response getFunctionInstanceStatus(final @PathParam("tenant") String tenant,
+                                              final @PathParam("namespace") String namespace,
+                                              final @PathParam("functionName") String functionName,
+                                              final @PathParam("instanceId") String instanceId) throws IOException {
+        return functions.getFunctionInstanceStatus(
+            tenant, namespace, functionName, instanceId);
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}/{functionName}/status")
+    public Response getFunctionStatus(final @PathParam("tenant") String tenant,
+                                      final @PathParam("namespace") String namespace,
+                                      final @PathParam("functionName") String functionName) throws IOException {
+        return functions.getFunctionStatus(
+            tenant, namespace, functionName);
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}")
+    public Response listFunctions(final @PathParam("tenant") String tenant,
+                                  final @PathParam("namespace") String namespace) {
+        return functions.listFunctions(
+            tenant, namespace);
+
+    }
+
+    @GET
+    @Path("/cluster")
+    public Response getCluster() {
+        return functions.getCluster();
+    }
+
+    @GET
+    @Path("/assignments")
+    public Response getAssignments() {
+        return functions.getAssignments();
+    }
+
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Functions.java
similarity index 55%
copy from pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
copy to pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Functions.java
index 7d8b054..1740732 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Functions.java
@@ -16,26 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.worker.rest;
+package org.apache.pulsar.broker.admin.v1;
 
-import javax.servlet.ServletContext;
-import javax.ws.rs.core.Context;
-import org.apache.pulsar.functions.worker.WorkerService;
-
-public class FunctionApiResource {
-
-    public static final String ATTRIBUTE_FUNCTION_WORKER = "function-worker";
-
-    private WorkerService workerService;
-
-    @Context
-    protected ServletContext servletContext;
-
-    public synchronized WorkerService worker() {
-        if (this.workerService == null) {
-            this.workerService = (WorkerService) servletContext.getAttribute(ATTRIBUTE_FUNCTION_WORKER);
-        }
-        return this.workerService;
-    }
+import io.swagger.annotations.Api;
+import javax.ws.rs.Path;
+import org.apache.pulsar.broker.admin.impl.FunctionsBase;
 
+@Path("/functions")
+@Api(value = "/functions", description = "Functions admin apis", tags = "functions")
+public class Functions extends FunctionsBase {
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
similarity index 55%
copy from pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
copy to pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
index 7d8b054..cc68677 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
@@ -16,26 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.worker.rest;
+package org.apache.pulsar.broker.admin.v2;
 
-import javax.servlet.ServletContext;
-import javax.ws.rs.core.Context;
-import org.apache.pulsar.functions.worker.WorkerService;
-
-public class FunctionApiResource {
-
-    public static final String ATTRIBUTE_FUNCTION_WORKER = "function-worker";
-
-    private WorkerService workerService;
-
-    @Context
-    protected ServletContext servletContext;
-
-    public synchronized WorkerService worker() {
-        if (this.workerService == null) {
-            this.workerService = (WorkerService) servletContext.getAttribute(ATTRIBUTE_FUNCTION_WORKER);
-        }
-        return this.workerService;
-    }
+import com.wordnik.swagger.annotations.Api;
+import javax.ws.rs.Path;
+import org.apache.pulsar.broker.admin.impl.FunctionsBase;
 
+@Path("/functions")
+@Api(value = "/functions", description = "Functions admin apis", tags = "functions")
+public class Functions extends FunctionsBase {
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 20ff44b..02c8b1a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -49,6 +49,7 @@ import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.util.resource.Resource;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
 import org.eclipse.jetty.util.thread.ExecutorThreadPool;
+import org.glassfish.jersey.media.multipart.MultiPartFeature;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.glassfish.jersey.servlet.ServletContainer;
 import org.slf4j.Logger;
@@ -120,6 +121,7 @@ public class WebService implements AutoCloseable {
         ResourceConfig config = new ResourceConfig();
         config.packages("jersey.config.server.provider.packages", javaPackages);
         config.register(provider);
+        config.register(MultiPartFeature.class);
         ServletHolder servletHolder = new ServletHolder(new ServletContainer(config));
         servletHolder.setAsyncSupported(true);
         addServlet(basePath, servletHolder, requiresAuthentication, attributeMap);
diff --git a/pulsar-client-cpp/run-unit-tests.sh b/pulsar-client-cpp/run-unit-tests.sh
index f91e6ff..9f8eb5f 100755
--- a/pulsar-client-cpp/run-unit-tests.sh
+++ b/pulsar-client-cpp/run-unit-tests.sh
@@ -25,10 +25,11 @@ rm -rf ./pulsar-dist
 mkdir pulsar-dist
 tar xfz ../all/target/apache-pulsar*bin.tar.gz  -C pulsar-dist --strip-components 1
 
-PULSAR_STANDALONE_CONF=$PWD/test-conf/standalone.conf pulsar-dist/bin/pulsar standalone > broker.log &
+PULSAR_STANDALONE_CONF=$PWD/test-conf/standalone.conf pulsar-dist/bin/pulsar standalone --no-functions-worker > broker.log &
 standalone_pid=$!;
 
 PULSAR_STANDALONE_CONF=$PWD/test-conf/standalone-ssl.conf pulsar-dist/bin/pulsar standalone \
+              --no-functions-worker \
               --zookeeper-port 2191 --bookkeeper-port 3191 \
               --zookeeper-dir data2/standalone/zookeeper --bookkeeper-dir \
               data2/standalone/bookkeeper > broker-tls.log &
diff --git a/pulsar-zookeeper-utils/pom.xml b/pulsar-client-tools-test/pom.xml
similarity index 54%
copy from pulsar-zookeeper-utils/pom.xml
copy to pulsar-client-tools-test/pom.xml
index 0a621f0..0aefc77 100644
--- a/pulsar-zookeeper-utils/pom.xml
+++ b/pulsar-client-tools-test/pom.xml
@@ -21,7 +21,6 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
-
   <parent>
     <groupId>org.apache.pulsar</groupId>
     <artifactId>pulsar</artifactId>
@@ -29,90 +28,57 @@
     <relativePath>..</relativePath>
   </parent>
 
-  <artifactId>pulsar-zookeeper-utils</artifactId>
-  <name>Pulsar ZooKeeper Utils</name>
+  <artifactId>pulsar-client-tools-test</artifactId>
+  <name>Pulsar Client Tools Test</name>
+  <description>Pulsar Client Tools Test</description>
 
   <dependencies>
-
-    <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-lang3</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.bookkeeper</groupId>
-      <artifactId>bookkeeper-server-shaded</artifactId>
-      <version>${bookkeeper.version}</version>
-      <exclusions>
-        <exclusion>
-          <artifactId>slf4j-log4j12</artifactId>
-          <groupId>org.slf4j</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>log4j</artifactId>
-          <groupId>log4j</groupId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.jboss.netty</groupId>
-          <artifactId>netty</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-
     <dependency>
-      <groupId>org.apache.bookkeeper.stats</groupId>
-      <artifactId>prometheus-metrics-provider</artifactId>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-tools</artifactId>
+      <version>${project.version}</version>
     </dependency>
-
     <dependency>
-      <groupId>com.github.ben-manes.caffeine</groupId>
-      <artifactId>caffeine</artifactId>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>managed-ledger</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
     </dependency>
-
     <dependency>
-      <groupId>org.apache.zookeeper</groupId>
-      <artifactId>zookeeper</artifactId>
-      <classifier>tests</classifier>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-broker</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
       <scope>test</scope>
     </dependency>
-
     <dependency>
       <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-common</artifactId>
-      <version>${project.parent.version}</version>
+      <artifactId>pulsar-broker</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
     </dependency>
-
     <dependency>
       <groupId>${project.groupId}</groupId>
-      <artifactId>managed-ledger</artifactId>
-      <version>${project.parent.version}</version>
+      <artifactId>pulsar-zookeeper-utils</artifactId>
+      <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
-
     <dependency>
-      <groupId>${project.groupId}</groupId>
-      <artifactId>managed-ledger</artifactId>
-      <version>${project.parent.version}</version>
-      <type>test-jar</type>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
       <scope>test</scope>
     </dependency>
-
   </dependencies>
-
   <build>
     <plugins>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-jar-plugin</artifactId>
-        <executions>
-          <execution>
-            <goals>
-              <goal>test-jar</goal>
-            </goals>
-          </execution>
-        </executions>
+        <artifactId>maven-deploy-plugin</artifactId>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
       </plugin>
     </plugins>
   </build>
-
 </project>
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
similarity index 100%
rename from pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
rename to pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/utils/IOUtilsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/utils/IOUtilsTest.java
similarity index 100%
rename from pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/utils/IOUtilsTest.java
rename to pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/utils/IOUtilsTest.java
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/utils/NameValueParameterSplitterTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/utils/NameValueParameterSplitterTest.java
similarity index 100%
rename from pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/utils/NameValueParameterSplitterTest.java
rename to pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/utils/NameValueParameterSplitterTest.java
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
similarity index 100%
rename from pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
rename to pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
diff --git a/pulsar-client-tools/pom.xml b/pulsar-client-tools/pom.xml
index 1d2454d..6ec1e80 100644
--- a/pulsar-client-tools/pom.xml
+++ b/pulsar-client-tools/pom.xml
@@ -19,89 +19,58 @@
 
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-	<parent>
-		<groupId>org.apache.pulsar</groupId>
-		<artifactId>pulsar</artifactId>
-		<version>2.0.0-incubating-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar</artifactId>
+    <version>2.0.0-incubating-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
 
-	<artifactId>pulsar-client-tools</artifactId>
-	<name>Pulsar Client Tools</name>
-	<description>Pulsar Client Tools</description>
+  <artifactId>pulsar-client-tools</artifactId>
+  <name>Pulsar Client Tools</name>
+  <description>Pulsar Client Tools</description>
 
-	<dependencies>
-		<dependency>
-			<groupId>com.beust</groupId>
-			<artifactId>jcommander</artifactId>
-			<scope>compile</scope>
-		</dependency>
-		<dependency>
-			<groupId>${project.groupId}</groupId>
-			<artifactId>pulsar-client-admin-original</artifactId>
-			<version>${project.version}</version>
-			<exclusions>
-				<exclusion>
-					<groupId>junit</groupId>
-					<artifactId>junit</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-		<dependency>
-			<groupId>commons-io</groupId>
-			<artifactId>commons-io</artifactId>
-		</dependency>
-		<dependency>
-			<groupId>${project.groupId}</groupId>
-			<artifactId>pulsar-common</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>${project.groupId}</groupId>
-			<artifactId>pulsar-client-original</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.asynchttpclient</groupId>
-			<artifactId>async-http-client</artifactId>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.commons</groupId>
-			<artifactId>commons-lang3</artifactId>
-		</dependency>
-		<dependency>
-			<groupId>${project.groupId}</groupId>
-			<artifactId>managed-ledger</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>${project.groupId}</groupId>
-			<artifactId>pulsar-broker</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>${project.groupId}</groupId>
-			<artifactId>pulsar-broker</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>${project.groupId}</groupId>
-			<artifactId>pulsar-zookeeper-utils</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.mockito</groupId>
-			<artifactId>mockito-core</artifactId>
-			<scope>test</scope>
-		</dependency>
-	</dependencies>
+  <dependencies>
+    <dependency>
+      <groupId>com.beust</groupId>
+      <artifactId>jcommander</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-admin-original</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>junit</groupId>
+          <artifactId>junit</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-original</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.asynchttpclient</groupId>
+      <artifactId>async-http-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
+  </dependencies>
 
 </project>
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index e227d6d..ee246f3 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -48,7 +48,7 @@
     <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
-      <version>2.4.1</version>
+      <version>${protobuf2.version}</version>
     </dependency>
 
     <dependency>
diff --git a/pulsar-functions/bin/pulsar b/pulsar-functions/bin/pulsar
index 4c35fdc..ba219a1 100755
--- a/pulsar-functions/bin/pulsar
+++ b/pulsar-functions/bin/pulsar
@@ -52,7 +52,7 @@ fi
 
 # exclude tests jar
 if [ -z "$PULSAR_JAR" ]; then
-    BUILT_JAR=`ls $PULSAR_HOME/worker/target/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1`
+    BUILT_JAR=`ls $PULSAR_HOME/worker-runner/target/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1`
     if [ -z "${BUILT_JAR}" ]; then
         echo "\nCouldn't find pulsar jar.";
         echo "Make sure you've run 'mvn package'\n";
@@ -118,10 +118,10 @@ add_maven_deps_to_classpath() {
     # Need to generate classpath from maven pom. This is costly so generate it
     # and cache it. Save the file into our target dir so a mvn clean will get
     # clean it up and force us create a new one.
-    f="${PULSAR_HOME}/worker/target/classpath_shaded.txt"
+    f="${PULSAR_HOME}/worker-runner/target/classpath_shaded.txt"
     if [ ! -f "${f}" ]
     then
-	    ${MVN} -f "${PULSAR_HOME}/worker/pom.xml" dependency:build-classpath -DincludeScope=compile -Dmdep.outputFile="${f}" &> /dev/null
+	    ${MVN} -f "${PULSAR_HOME}/worker-runner/pom.xml" dependency:build-classpath -DincludeScope=compile -Dmdep.outputFile="${f}" &> /dev/null
     fi
     PULSAR_CLASSPATH=${CLASSPATH}:`cat "${f}"`
 }
diff --git a/pulsar-functions/cli/pom.xml b/pulsar-functions/cli/pom.xml
index b22a502..2345e25 100644
--- a/pulsar-functions/cli/pom.xml
+++ b/pulsar-functions/cli/pom.xml
@@ -33,20 +33,6 @@
 
   <dependencies>
 
-    <!-- logging -->
-    <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-slf4j-impl</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-api</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-core</artifactId>
-    </dependency>
-
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-functions-runtime</artifactId>
diff --git a/pulsar-functions/instance/pom.xml b/pulsar-functions/instance/pom.xml
index 39dd271..aeb53ae 100644
--- a/pulsar-functions/instance/pom.xml
+++ b/pulsar-functions/instance/pom.xml
@@ -57,7 +57,6 @@
       <version>${project.version}</version>
     </dependency>
 
-    <!-- update this when bookkeeper produces a snapshot version -->
     <dependency>
       <groupId>org.apache.bookkeeper</groupId>
       <artifactId>stream-storage-java-client</artifactId>
diff --git a/pulsar-functions/pom.xml b/pulsar-functions/pom.xml
index 47450a8..2a6daac 100644
--- a/pulsar-functions/pom.xml
+++ b/pulsar-functions/pom.xml
@@ -39,6 +39,8 @@
     <module>instance</module>
     <module>runtime</module>
     <module>worker</module>
+    <module>worker-shaded</module>
+    <module>worker-runner</module>
     <module>cli</module>
     <module>java-examples</module>
     <module>dist</module>
diff --git a/pulsar-functions/runtime/pom.xml b/pulsar-functions/runtime/pom.xml
index dc236b6..88828dc 100644
--- a/pulsar-functions/runtime/pom.xml
+++ b/pulsar-functions/runtime/pom.xml
@@ -45,6 +45,11 @@
       <version>${project.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-all</artifactId>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git a/pulsar-functions/utils/pom.xml b/pulsar-functions/utils/pom.xml
index b71931d..b5cb83e 100644
--- a/pulsar-functions/utils/pom.xml
+++ b/pulsar-functions/utils/pom.xml
@@ -33,20 +33,6 @@
 
   <dependencies>
 
-    <!-- logging -->
-    <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-slf4j-impl</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-api</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-core</artifactId>
-    </dependency>
-
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-client-tools-shaded</artifactId>
diff --git a/pulsar-functions/worker-runner/pom.xml b/pulsar-functions/worker-runner/pom.xml
new file mode 100644
index 0000000..1e621d5
--- /dev/null
+++ b/pulsar-functions/worker-runner/pom.xml
@@ -0,0 +1,111 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-functions</artifactId>
+    <version>2.0.0-incubating-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>pulsar-functions-worker-runner</artifactId>
+  <name>Pulsar Functions :: Worker Runner</name>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-functions-worker-shaded</artifactId>
+      <version>${project.parent.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-lite</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-java</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.protobuf.nano</groupId>
+          <artifactId>protobuf-javanano</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-java-util</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.grpc</groupId>
+          <artifactId>grpc-protobuf</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.grpc</groupId>
+          <artifactId>grpc-protobuf-lite</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.grpc</groupId>
+          <artifactId>grpc-protobuf-nano</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.pulsar</groupId>
+          <artifactId>pulsar-functions-proto</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.pulsar</groupId>
+          <artifactId>pulsar-functions-utils</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.pulsar</groupId>
+          <artifactId>pulsar-functions-metrics</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.pulsar</groupId>
+          <artifactId>pulsar-functions-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.pulsar</groupId>
+          <artifactId>pulsar-functions-instance</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.pulsar</groupId>
+          <artifactId>pulsar-functions-worker</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-original</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-admin-original</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+  </dependencies>
+</project>
diff --git a/pulsar-functions/worker-shaded/pom.xml b/pulsar-functions/worker-shaded/pom.xml
new file mode 100644
index 0000000..670c8b7
--- /dev/null
+++ b/pulsar-functions/worker-shaded/pom.xml
@@ -0,0 +1,113 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-functions</artifactId>
+    <version>2.0.0-incubating-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>pulsar-functions-worker-shaded</artifactId>
+  <name>Pulsar Functions :: Worker Shaded</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-functions-worker</artifactId>
+      <version>${project.parent.version}</version>
+      <exclusions>
+        <!-- exclude the dependencies already exists in bookkeeper-server-shaded -->
+        <exclusion>
+          <groupId>org.apache.bookkeeper</groupId>
+          <artifactId>bookkeeper-common</artifactId>
+        </exclusion>
+        <!-- exclude `pulsar-client-tools-shaded` here, this allows worker-runner and broker to use unshaded clients -->
+        <exclusion>
+          <groupId>org.apache.pulsar</groupId>
+          <artifactId>pulsar-client-tools-shaded</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <!-- bookkeeper-server-shaded include circe-checksum, bookkeeper-common and bookkeeper-stats-api -->
+    <dependency>
+      <groupId>org.apache.bookkeeper</groupId>
+      <artifactId>bookkeeper-server-shaded</artifactId>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <createDependencyReducedPom>true</createDependencyReducedPom>
+              <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+              <minimizeJar>false</minimizeJar>
+
+              <artifactSet>
+                <!-- package the dependencies that use protobuf & grpc and relocate protobuf -->
+                <includes>
+                  <include>com.google.protobuf:protobuf-lite</include>
+                  <include>com.google.protobuf:protobuf-java</include>
+                  <include>com.google.protobuf.nano:protobuf-javanano</include>
+                  <include>com.google.protobuf:protobuf-java-util</include>
+                  <include>com.google.instrumentation:instrumentation-api</include>
+                  <include>com.google.api.grpc:proto-google-common-protos</include>
+                  <!-- dependencies use protobuf -->
+                  <include>org.apache.pulsar:pulsar-functions-proto</include>
+                  <include>org.apache.pulsar:pulsar-functions-utils</include>
+                  <include>org.apache.pulsar:pulsar-functions-metrics</include>
+                  <include>org.apache.pulsar:pulsar-functions-instance</include>
+                  <include>org.apache.pulsar:pulsar-functions-runtime</include>
+                  <include>org.apache.pulsar:pulsar-functions-worker</include>
+                  <!-- protobuf dependencies in grpc -->
+                  <include>io.grpc:*</include>
+                  <!-- bookkeeper key/value service -->
+                  <include>org.apache.bookkeeper:stream-storage-java-client</include>
+                </includes>
+              </artifactSet>
+              <relocations>
+                <!-- bookkeeper shading rule -->
+                <relocation>
+                  <pattern>com.google.protobuf</pattern>
+                  <shadedPattern>org.apache.pulsar.functions.shaded.com.google.protobuf</shadedPattern>
+                </relocation>
+              </relocations>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml
index 24f690f..6f408e7 100644
--- a/pulsar-functions/worker/pom.xml
+++ b/pulsar-functions/worker/pom.xml
@@ -34,20 +34,6 @@
 
   <dependencies>
 
-    <!-- logging -->
-    <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-slf4j-impl</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-api</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.logging.log4j</groupId>
-      <artifactId>log4j-core</artifactId>
-    </dependency>
-
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-functions-runtime</artifactId>
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index 754c8af..4942b34 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -160,7 +160,7 @@ public class MembershipManager implements AutoCloseable, ConsumerEventListener {
         public static WorkerInfo parseFrom(String str) {
             String[] tokens = str.split(":");
             if (tokens.length != 3) {
-                throw new IllegalArgumentException("Invalid string to parse WorkerInfo");
+                throw new IllegalArgumentException("Invalid string to parse WorkerInfo : " + str);
             }
 
             String workerId = tokens[0];
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index 55450e6..09f9fc0 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -20,10 +20,7 @@ package org.apache.pulsar.functions.worker;
 
 import com.google.common.util.concurrent.AbstractService;
 
-import java.io.IOException;
-import java.net.URI;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.functions.worker.rest.WorkerServer;
 
 @Slf4j
@@ -51,13 +48,7 @@ public class Worker extends AbstractService {
     }
 
     protected void doStartImpl() throws InterruptedException {
-        URI dlogUri;
-        try {
-            dlogUri = FunctionMetadataSetup.setupFunctionMetadata(workerConfig);
-        } catch (PulsarAdminException | IOException e) {
-            throw new RuntimeException(e);
-        }
-        workerService.start(dlogUri);
+        workerService.start();
         WorkerServer server = new WorkerServer(workerService);
         this.serverThread = new Thread(server, server.getThreadName());
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 5793e97..732ab07 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -20,12 +20,14 @@ package org.apache.pulsar.functions.worker;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
 import java.net.URI;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
@@ -71,7 +73,15 @@ public class WorkerService {
         return contextHandler;
     }
 
-    public void start(URI dlogUri) throws InterruptedException {
+    public void start() throws InterruptedException {
+        try {
+            start(FunctionMetadataSetup.setupFunctionMetadata(workerConfig));
+        } catch (PulsarAdminException | IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void start(URI dlogUri) throws InterruptedException {
         log.info("Starting worker {}...", workerConfig.getWorkerId());
         try {
             log.info("Worker Configs: {}", new ObjectMapper().writerWithDefaultPrettyPrinter()
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
index 7d8b054..1c5c739 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
@@ -18,24 +18,30 @@
  */
 package org.apache.pulsar.functions.worker.rest;
 
+import java.util.function.Supplier;
 import javax.servlet.ServletContext;
 import javax.ws.rs.core.Context;
 import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
 
-public class FunctionApiResource {
+public class FunctionApiResource implements Supplier<WorkerService> {
 
     public static final String ATTRIBUTE_FUNCTION_WORKER = "function-worker";
 
+    protected final FunctionsImpl functions;
     private WorkerService workerService;
-
     @Context
     protected ServletContext servletContext;
 
-    public synchronized WorkerService worker() {
+    public FunctionApiResource() {
+        this.functions = new FunctionsImpl(this);
+    }
+
+    @Override
+    public synchronized WorkerService get() {
         if (this.workerService == null) {
             this.workerService = (WorkerService) servletContext.getAttribute(ATTRIBUTE_FUNCTION_WORKER);
         }
         return this.workerService;
     }
-
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
similarity index 91%
copy from pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
copy to pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 7227196..4eda822 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -16,13 +16,33 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.worker.rest.api.v2;
+package org.apache.pulsar.functions.worker.rest.api;
 
-import com.google.gson.Gson;
-import javax.ws.rs.core.Response.Status;
+import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.gson.Gson;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.util.JsonFormat;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.policies.data.ErrorData;
@@ -36,33 +56,28 @@ import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
 import org.apache.pulsar.functions.worker.MembershipManager;
 import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.request.RequestResult;
-import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.glassfish.jersey.media.multipart.FormDataParam;
 
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-
 @Slf4j
-@Path("/functions")
-public class FunctionApiV2Resource extends FunctionApiResource {
+public class FunctionsImpl {
+
+    private final Supplier<WorkerService> workerServiceSupplier;
+
+    public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
+        this.workerServiceSupplier = workerServiceSupplier;
+    }
+
+    private WorkerService worker() {
+        try {
+            return checkNotNull(workerServiceSupplier.get());
+        } catch (Throwable t) {
+            log.info("Failed to get worker service", t);
+            throw t;
+        }
+    }
 
     @POST
     @Path("/{tenant}/{namespace}/{functionName}")
@@ -73,7 +88,6 @@ public class FunctionApiV2Resource extends FunctionApiResource {
                                      final @FormDataParam("data") InputStream uploadedInputStream,
                                      final @FormDataParam("data") FormDataContentDisposition fileDetail,
                                      final @FormDataParam("functionConfig") String functionConfigJson) {
-
         FunctionConfig functionConfig;
         // validate parameters
         try {
@@ -82,7 +96,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
         } catch (IllegalArgumentException e) {
             log.error("Invalid register function request @ /{}/{}/{}",
                 tenant, namespace, functionName, e);
-            return Response.status(Response.Status.BAD_REQUEST)
+            return Response.status(Status.BAD_REQUEST)
                     .type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
@@ -91,7 +105,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
 
         if (functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
             log.error("Function {}/{}/{} already exists", tenant, namespace, functionName);
-            return Response.status(Response.Status.BAD_REQUEST)
+            return Response.status(Status.BAD_REQUEST)
                     .type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(String.format("Function %s already exist", functionName))).build();
         }
@@ -132,7 +146,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
         } catch (IllegalArgumentException e) {
             log.error("Invalid update function request @ /{}/{}/{}",
                     tenant, namespace, functionName, e);
-            return Response.status(Response.Status.BAD_REQUEST)
+            return Response.status(Status.BAD_REQUEST)
                     .type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
@@ -140,7 +154,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
 
         if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
-            return Response.status(Response.Status.BAD_REQUEST)
+            return Response.status(Status.BAD_REQUEST)
                     .type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
         }
@@ -176,7 +190,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
         } catch (IllegalArgumentException e) {
             log.error("Invalid deregister function request @ /{}/{}/{}",
                     tenant, namespace, functionName, e);
-            return Response.status(Response.Status.BAD_REQUEST)
+            return Response.status(Status.BAD_REQUEST)
                     .type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
@@ -197,7 +211,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
         try {
             requestResult = completableFuture.get();
             if (!requestResult.isSuccess()) {
-                return Response.status(Response.Status.BAD_REQUEST)
+                return Response.status(Status.BAD_REQUEST)
                     .type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(requestResult.getMessage()))
                     .build();
@@ -217,7 +231,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
                     .build();
         }
 
-        return Response.status(Response.Status.OK).entity(requestResult.toJson()).build();
+        return Response.status(Status.OK).entity(requestResult.toJson()).build();
     }
 
     @GET
@@ -233,7 +247,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
         } catch (IllegalArgumentException e) {
             log.error("Invalid getFunction request @ /{}/{}/{}",
                     tenant, namespace, functionName, e);
-            return Response.status(Response.Status.BAD_REQUEST)
+            return Response.status(Status.BAD_REQUEST)
                     .type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
@@ -249,7 +263,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
 
         FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, functionName);
         String functionConfigJson = JsonFormat.printer().print(functionMetaData.getFunctionConfig());
-        return Response.status(Response.Status.OK).entity(functionConfigJson).build();
+        return Response.status(Status.OK).entity(functionConfigJson).build();
     }
 
     @GET
@@ -265,7 +279,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
         } catch (IllegalArgumentException e) {
             log.error("Invalid getFunctionStatus request @ /{}/{}/{}",
                     tenant, namespace, functionName, e);
-            return Response.status(Response.Status.BAD_REQUEST)
+            return Response.status(Status.BAD_REQUEST)
                     .type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
@@ -280,7 +294,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
         }
 
         FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
-        InstanceCommunication.FunctionStatus functionStatus = null;
+        FunctionStatus functionStatus = null;
         try {
             functionStatus = functionRuntimeManager.getFunctionInstanceStatus(
                     tenant, namespace, functionName, Integer.parseInt(instanceId));
@@ -289,11 +303,11 @@ public class FunctionApiV2Resource extends FunctionApiResource {
             FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder();
             functionStatusBuilder.setRunning(false);
             String functionConfigJson = JsonFormat.printer().print(functionStatusBuilder.build());
-            return Response.status(Response.Status.OK).entity(functionConfigJson).build();
+            return Response.status(Status.OK).entity(functionConfigJson).build();
         }
 
         String jsonResponse = JsonFormat.printer().print(functionStatus);
-        return Response.status(Response.Status.OK).entity(jsonResponse).build();
+        return Response.status(Status.OK).entity(jsonResponse).build();
     }
 
     @GET
@@ -308,7 +322,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
         } catch (IllegalArgumentException e) {
             log.error("Invalid getFunctionStatus request @ /{}/{}/{}",
                     tenant, namespace, functionName, e);
-            return Response.status(Response.Status.BAD_REQUEST)
+            return Response.status(Status.BAD_REQUEST)
                     .type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
@@ -331,11 +345,11 @@ public class FunctionApiV2Resource extends FunctionApiResource {
             FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder();
             functionStatusBuilder.setRunning(false);
             String functionConfigJson = JsonFormat.printer().print(functionStatusBuilder.build());
-            return Response.status(Response.Status.OK).entity(functionConfigJson).build();
+            return Response.status(Status.OK).entity(functionConfigJson).build();
         }
 
         String jsonResponse = JsonFormat.printer().print(functionStatusList);
-        return Response.status(Response.Status.OK).entity(jsonResponse).build();
+        return Response.status(Status.OK).entity(jsonResponse).build();
     }
 
     @GET
@@ -349,7 +363,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
         } catch (IllegalArgumentException e) {
             log.error("Invalid listFunctions request @ /{}/{}",
                     tenant, namespace, e);
-            return Response.status(Response.Status.BAD_REQUEST)
+            return Response.status(Status.BAD_REQUEST)
                     .type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
@@ -358,7 +372,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
 
         Collection<String> functionStateList = functionMetaDataManager.listFunctions(tenant, namespace);
 
-        return Response.status(Response.Status.OK).entity(new Gson().toJson(functionStateList.toArray())).build();
+        return Response.status(Status.OK).entity(new Gson().toJson(functionStateList.toArray())).build();
     }
 
     private Response updateRequest(FunctionMetaData functionMetaData,
@@ -389,7 +403,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
         try {
             requestResult = completableFuture.get();
             if (!requestResult.isSuccess()) {
-                return Response.status(Response.Status.BAD_REQUEST)
+                return Response.status(Status.BAD_REQUEST)
                     .type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(requestResult.getMessage()))
                     .build();
@@ -406,7 +420,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
                 .build();
         }
 
-        return Response.status(Response.Status.OK).build();
+        return Response.status(Status.OK).build();
     }
 
     @GET
@@ -414,7 +428,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
     public Response getCluster() {
         MembershipManager membershipManager = worker().getMembershipManager();
         List<MembershipManager.WorkerInfo> members = membershipManager.getCurrentMembership();
-        return Response.status(Response.Status.OK).entity(new Gson().toJson(members)).build();
+        return Response.status(Status.OK).entity(new Gson().toJson(members)).build();
     }
 
     @GET
@@ -426,7 +440,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
         for (Map.Entry<String, Map<String, Function.Assignment>> entry : assignments.entrySet()) {
             ret.put(entry.getKey(), entry.getValue().keySet());
         }
-        return Response.status(Response.Status.OK).entity(
+        return Response.status(Status.OK).entity(
                 new Gson().toJson(ret)).build();
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 7227196..88cd1a4 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -18,25 +18,8 @@
  */
 package org.apache.pulsar.functions.worker.rest.api.v2;
 
-import com.google.gson.Gson;
-import javax.ws.rs.core.Response.Status;
-
 import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.util.JsonFormat;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.common.policies.data.ErrorData;
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.proto.Function.FunctionConfig;
-import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
-import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData;
-import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
-import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
-import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
-import org.apache.pulsar.functions.worker.MembershipManager;
-import org.apache.pulsar.functions.worker.Utils;
-import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.glassfish.jersey.media.multipart.FormDataParam;
@@ -52,13 +35,6 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 
 @Slf4j
 @Path("/functions")
@@ -74,44 +50,9 @@ public class FunctionApiV2Resource extends FunctionApiResource {
                                      final @FormDataParam("data") FormDataContentDisposition fileDetail,
                                      final @FormDataParam("functionConfig") String functionConfigJson) {
 
-        FunctionConfig functionConfig;
-        // validate parameters
-        try {
-            functionConfig = validateUpdateRequestParams(tenant, namespace, functionName,
-                    uploadedInputStream, fileDetail, functionConfigJson);
-        } catch (IllegalArgumentException e) {
-            log.error("Invalid register function request @ /{}/{}/{}",
-                tenant, namespace, functionName, e);
-            return Response.status(Response.Status.BAD_REQUEST)
-                    .type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getMessage())).build();
-        }
-
-        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-
-        if (functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
-            log.error("Function {}/{}/{} already exists", tenant, namespace, functionName);
-            return Response.status(Response.Status.BAD_REQUEST)
-                    .type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s already exist", functionName))).build();
-        }
-
-        // function state
-        FunctionMetaData.Builder functionMetaDataBuilder = FunctionMetaData.newBuilder()
-                .setFunctionConfig(functionConfig)
-                .setCreateTime(System.currentTimeMillis())
-                .setVersion(0);
+        return functions.registerFunction(
+            tenant, namespace, functionName, uploadedInputStream, fileDetail, functionConfigJson);
 
-        PackageLocationMetaData.Builder packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder()
-                .setPackagePath(String.format(
-            "%s/%s/%s/%s",
-            tenant,
-            namespace,
-            functionName,
-            Utils.getUniquePackageName(fileDetail.getFileName())));
-        functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
-
-        return updateRequest(functionMetaDataBuilder.build(), uploadedInputStream);
     }
 
     @PUT
@@ -124,43 +65,9 @@ public class FunctionApiV2Resource extends FunctionApiResource {
                                    final @FormDataParam("data") FormDataContentDisposition fileDetail,
                                    final @FormDataParam("functionConfig") String functionConfigJson) {
 
-        FunctionConfig functionConfig;
-        // validate parameters
-        try {
-            functionConfig = validateUpdateRequestParams(tenant, namespace, functionName,
-                    uploadedInputStream, fileDetail, functionConfigJson);
-        } catch (IllegalArgumentException e) {
-            log.error("Invalid update function request @ /{}/{}/{}",
-                    tenant, namespace, functionName, e);
-            return Response.status(Response.Status.BAD_REQUEST)
-                    .type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getMessage())).build();
-        }
-
-        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-
-        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
-            return Response.status(Response.Status.BAD_REQUEST)
-                    .type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
-        }
+        return functions.updateFunction(
+            tenant, namespace, functionName, uploadedInputStream, fileDetail, functionConfigJson);
 
-        // function state
-        FunctionMetaData.Builder functionMetaDataBuilder = FunctionMetaData.newBuilder()
-                .setFunctionConfig(functionConfig)
-                .setCreateTime(System.currentTimeMillis())
-                .setVersion(0);
-
-        PackageLocationMetaData.Builder packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder()
-                .setPackagePath(String.format(
-                        "%s/%s/%s/%s",
-                        tenant,
-                        namespace,
-                        functionName,
-                        Utils.getUniquePackageName(fileDetail.getFileName())));
-        functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
-
-        return updateRequest(functionMetaDataBuilder.build(), uploadedInputStream);
     }
 
 
@@ -169,55 +76,8 @@ public class FunctionApiV2Resource extends FunctionApiResource {
     public Response deregisterFunction(final @PathParam("tenant") String tenant,
                                        final @PathParam("namespace") String namespace,
                                        final @PathParam("functionName") String functionName) {
-
-        // validate parameters
-        try {
-            validateDeregisterRequestParams(tenant, namespace, functionName);
-        } catch (IllegalArgumentException e) {
-            log.error("Invalid deregister function request @ /{}/{}/{}",
-                    tenant, namespace, functionName, e);
-            return Response.status(Response.Status.BAD_REQUEST)
-                    .type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getMessage())).build();
-        }
-
-        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
-            log.error("Function to deregister does not exist @ /{}/{}/{}",
-                    tenant, namespace, functionName);
-            return Response.status(Status.NOT_FOUND)
-                    .type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
-        }
-
-        CompletableFuture<RequestResult> completableFuture
-                = functionMetaDataManager.deregisterFunction(tenant, namespace, functionName);
-
-        RequestResult requestResult = null;
-        try {
-            requestResult = completableFuture.get();
-            if (!requestResult.isSuccess()) {
-                return Response.status(Response.Status.BAD_REQUEST)
-                    .type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(requestResult.getMessage()))
-                    .build();
-            }
-        } catch (ExecutionException e) {
-            log.error("Execution Exception while deregistering function @ /{}/{}/{}",
-                    tenant, namespace, functionName, e);
-            return Response.serverError()
-                    .type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getCause().getMessage()))
-                    .build();
-        } catch (InterruptedException e) {
-            log.error("Interrupted Exception while deregistering function @ /{}/{}/{}",
-                    tenant, namespace, functionName, e);
-            return Response.status(Status.REQUEST_TIMEOUT)
-                    .type(MediaType.APPLICATION_JSON)
-                    .build();
-        }
-
-        return Response.status(Response.Status.OK).entity(requestResult.toJson()).build();
+        return functions.deregisterFunction(
+            tenant, namespace, functionName);
     }
 
     @GET
@@ -226,30 +86,8 @@ public class FunctionApiV2Resource extends FunctionApiResource {
                                     final @PathParam("namespace") String namespace,
                                     final @PathParam("functionName") String functionName)
             throws InvalidProtocolBufferException {
-
-        // validate parameters
-        try {
-            validateGetFunctionRequestParams(tenant, namespace, functionName);
-        } catch (IllegalArgumentException e) {
-            log.error("Invalid getFunction request @ /{}/{}/{}",
-                    tenant, namespace, functionName, e);
-            return Response.status(Response.Status.BAD_REQUEST)
-                    .type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getMessage())).build();
-        }
-
-        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
-            log.error("Function in getFunction does not exist @ /{}/{}/{}",
-                    tenant, namespace, functionName);
-            return Response.status(Status.NOT_FOUND)
-                    .type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
-        }
-
-        FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, functionName);
-        String functionConfigJson = JsonFormat.printer().print(functionMetaData.getFunctionConfig());
-        return Response.status(Response.Status.OK).entity(functionConfigJson).build();
+        return functions.getFunctionInfo(
+            tenant, namespace, functionName);
     }
 
     @GET
@@ -258,42 +96,8 @@ public class FunctionApiV2Resource extends FunctionApiResource {
                                               final @PathParam("namespace") String namespace,
                                               final @PathParam("functionName") String functionName,
                                               final @PathParam("instanceId") String instanceId) throws IOException {
-
-        // validate parameters
-        try {
-            validateGetFunctionInstanceRequestParams(tenant, namespace, functionName, instanceId);
-        } catch (IllegalArgumentException e) {
-            log.error("Invalid getFunctionStatus request @ /{}/{}/{}",
-                    tenant, namespace, functionName, e);
-            return Response.status(Response.Status.BAD_REQUEST)
-                    .type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getMessage())).build();
-        }
-
-        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
-            log.error("Function in getFunctionStatus does not exist @ /{}/{}/{}",
-                    tenant, namespace, functionName);
-            return Response.status(Status.NOT_FOUND)
-                    .type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
-        }
-
-        FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
-        InstanceCommunication.FunctionStatus functionStatus = null;
-        try {
-            functionStatus = functionRuntimeManager.getFunctionInstanceStatus(
-                    tenant, namespace, functionName, Integer.parseInt(instanceId));
-        } catch (Exception e) {
-            log.error("Got Exception Getting Status", e);
-            FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder();
-            functionStatusBuilder.setRunning(false);
-            String functionConfigJson = JsonFormat.printer().print(functionStatusBuilder.build());
-            return Response.status(Response.Status.OK).entity(functionConfigJson).build();
-        }
-
-        String jsonResponse = JsonFormat.printer().print(functionStatus);
-        return Response.status(Response.Status.OK).entity(jsonResponse).build();
+        return functions.getFunctionInstanceStatus(
+            tenant, namespace, functionName, instanceId);
     }
 
     @GET
@@ -301,241 +105,29 @@ public class FunctionApiV2Resource extends FunctionApiResource {
     public Response getFunctionStatus(final @PathParam("tenant") String tenant,
                                       final @PathParam("namespace") String namespace,
                                       final @PathParam("functionName") String functionName) throws IOException {
-
-        // validate parameters
-        try {
-            validateGetFunctionRequestParams(tenant, namespace, functionName);
-        } catch (IllegalArgumentException e) {
-            log.error("Invalid getFunctionStatus request @ /{}/{}/{}",
-                    tenant, namespace, functionName, e);
-            return Response.status(Response.Status.BAD_REQUEST)
-                    .type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getMessage())).build();
-        }
-
-        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
-            log.error("Function in getFunctionStatus does not exist @ /{}/{}/{}",
-                    tenant, namespace, functionName);
-            return Response.status(Status.NOT_FOUND)
-                    .type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
-        }
-
-        FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
-        InstanceCommunication.FunctionStatusList functionStatusList = null;
-        try {
-            functionStatusList = functionRuntimeManager.getAllFunctionStatus(tenant, namespace, functionName);
-        } catch (Exception e) {
-            log.error("Got Exception Getting Status", e);
-            FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder();
-            functionStatusBuilder.setRunning(false);
-            String functionConfigJson = JsonFormat.printer().print(functionStatusBuilder.build());
-            return Response.status(Response.Status.OK).entity(functionConfigJson).build();
-        }
-
-        String jsonResponse = JsonFormat.printer().print(functionStatusList);
-        return Response.status(Response.Status.OK).entity(jsonResponse).build();
+        return functions.getFunctionStatus(
+            tenant, namespace, functionName);
     }
 
     @GET
     @Path("/{tenant}/{namespace}")
     public Response listFunctions(final @PathParam("tenant") String tenant,
                                   final @PathParam("namespace") String namespace) {
+        return functions.listFunctions(
+            tenant, namespace);
 
-        // validate parameters
-        try {
-            validateListFunctionRequestParams(tenant, namespace);
-        } catch (IllegalArgumentException e) {
-            log.error("Invalid listFunctions request @ /{}/{}",
-                    tenant, namespace, e);
-            return Response.status(Response.Status.BAD_REQUEST)
-                    .type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getMessage())).build();
-        }
-
-        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-
-        Collection<String> functionStateList = functionMetaDataManager.listFunctions(tenant, namespace);
-
-        return Response.status(Response.Status.OK).entity(new Gson().toJson(functionStateList.toArray())).build();
-    }
-
-    private Response updateRequest(FunctionMetaData functionMetaData,
-                                   InputStream uploadedInputStream) {
-        // Upload to bookkeeper
-        try {
-            log.info("Uploading function package to {}", functionMetaData.getPackageLocation());
-
-            Utils.uploadToBookeeper(
-                worker().getDlogNamespace(),
-                uploadedInputStream,
-                functionMetaData.getPackageLocation().getPackagePath());
-        } catch (IOException e) {
-            log.error("Error uploading file {}", functionMetaData.getPackageLocation(), e);
-            return Response.serverError()
-                    .type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getMessage()))
-                    .build();
-        }
-
-        // Submit to FMT
-        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-
-        CompletableFuture<RequestResult> completableFuture
-                = functionMetaDataManager.updateFunction(functionMetaData);
-
-        RequestResult requestResult = null;
-        try {
-            requestResult = completableFuture.get();
-            if (!requestResult.isSuccess()) {
-                return Response.status(Response.Status.BAD_REQUEST)
-                    .type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(requestResult.getMessage()))
-                    .build();
-            }
-        } catch (ExecutionException e) {
-            return Response.serverError()
-                    .type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getCause().getMessage()))
-                    .build();
-        } catch (InterruptedException e) {
-            return Response.status(Status.REQUEST_TIMEOUT)
-                .type(MediaType.APPLICATION_JSON)
-                .entity(new ErrorData(e.getCause().getMessage()))
-                .build();
-        }
-
-        return Response.status(Response.Status.OK).build();
     }
 
     @GET
     @Path("/cluster")
     public Response getCluster() {
-        MembershipManager membershipManager = worker().getMembershipManager();
-        List<MembershipManager.WorkerInfo> members = membershipManager.getCurrentMembership();
-        return Response.status(Response.Status.OK).entity(new Gson().toJson(members)).build();
+        return functions.getCluster();
     }
 
     @GET
     @Path("/assignments")
     public Response getAssignments() {
-        FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
-        Map<String, Map<String, Function.Assignment>> assignments = functionRuntimeManager.getCurrentAssignments();
-        Map<String, Collection<String>> ret = new HashMap<>();
-        for (Map.Entry<String, Map<String, Function.Assignment>> entry : assignments.entrySet()) {
-            ret.put(entry.getKey(), entry.getValue().keySet());
-        }
-        return Response.status(Response.Status.OK).entity(
-                new Gson().toJson(ret)).build();
-    }
-
-    private void validateListFunctionRequestParams(String tenant, String namespace) throws IllegalArgumentException {
-
-        if (tenant == null) {
-            throw new IllegalArgumentException("Tenant is not provided");
-        }
-        if (namespace == null) {
-            throw new IllegalArgumentException("Namespace is not provided");
-        }
-    }
-
-    private void validateGetFunctionInstanceRequestParams(String tenant,
-                                                          String namespace,
-                                                          String functionName,
-                                                          String instanceId) throws IllegalArgumentException {
-        validateGetFunctionRequestParams(tenant, namespace, functionName);
-        if (instanceId == null) {
-            throw new IllegalArgumentException("Function Instance Id is not provided");
-
-        }
-    }
-
-    private void validateGetFunctionRequestParams(String tenant,
-                                                  String namespace,
-                                                  String functionName) throws IllegalArgumentException {
-
-        if (tenant == null) {
-            throw new IllegalArgumentException("Tenant is not provided");
-        }
-        if (namespace == null) {
-            throw new IllegalArgumentException("Namespace is not provided");
-        }
-        if (functionName == null) {
-            throw new IllegalArgumentException("Function Name is not provided");
-        }
-    }
-
-    private void validateDeregisterRequestParams(String tenant,
-                                                 String namespace,
-                                                 String functionName) throws IllegalArgumentException {
-
-        if (tenant == null) {
-            throw new IllegalArgumentException("Tenant is not provided");
-        }
-        if (namespace == null) {
-            throw new IllegalArgumentException("Namespace is not provided");
-        }
-        if (functionName == null) {
-            throw new IllegalArgumentException("Function Name is not provided");
-        }
-    }
-
-    private FunctionConfig validateUpdateRequestParams(String tenant,
-                                             String namespace,
-                                             String functionName,
-                                             InputStream uploadedInputStream,
-                                             FormDataContentDisposition fileDetail,
-                                             String functionConfigJson) throws IllegalArgumentException {
-        if (tenant == null) {
-            throw new IllegalArgumentException("Tenant is not provided");
-        }
-        if (namespace == null) {
-            throw new IllegalArgumentException("Namespace is not provided");
-        }
-        if (functionName == null) {
-            throw new IllegalArgumentException("Function Name is not provided");
-        }
-        if (uploadedInputStream == null || fileDetail == null) {
-            throw new IllegalArgumentException("Function Package is not provided");
-        }
-        if (functionConfigJson == null) {
-            throw new IllegalArgumentException("FunctionConfig is not provided");
-        }
-        try {
-            FunctionConfig.Builder functionConfigBuilder = FunctionConfig.newBuilder();
-            JsonFormat.parser().merge(functionConfigJson, functionConfigBuilder);
-            FunctionConfig functionConfig = functionConfigBuilder.build();
-
-            List<String> missingFields = new LinkedList<>();
-            if (functionConfig.getTenant() == null || functionConfig.getTenant().isEmpty()) {
-                missingFields.add("Tenant");
-            }
-            if (functionConfig.getNamespace() == null || functionConfig.getNamespace().isEmpty()) {
-                missingFields.add("Namespace");
-            }
-            if (functionConfig.getName() == null || functionConfig.getName().isEmpty()) {
-                missingFields.add("Name");
-            }
-            if (functionConfig.getClassName() == null || functionConfig.getClassName().isEmpty()) {
-                missingFields.add("ClassName");
-            }
-            if (functionConfig.getInputsCount() == 0 && functionConfig.getCustomSerdeInputsCount() == 0) {
-                missingFields.add("Input");
-            }
-            if (!missingFields.isEmpty()) {
-                String errorMessage = StringUtils.join(missingFields, ",");
-                throw new IllegalArgumentException(errorMessage + " is not provided");
-            }
-            if (functionConfig.getParallelism() <= 0) {
-                throw new IllegalArgumentException("Parallelism needs to be set to a positive number");
-            }
-            return functionConfig;
-        } catch (IllegalArgumentException ex) {
-            throw ex;
-        } catch (Exception ex) {
-            throw new IllegalArgumentException("Invalid FunctionConfig");
-        }
+        return functions.getAssignments();
     }
 
 }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index fe75940..8bce7ff 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.functions.worker.rest.api.v2;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
@@ -57,6 +56,7 @@ import org.apache.pulsar.functions.worker.Utils;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.request.RequestResult;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -98,13 +98,12 @@ public class FunctionApiV2ResourceTest {
     private WorkerService mockedWorkerService;
     private FunctionMetaDataManager mockedManager;
     private Namespace mockedNamespace;
-    private FunctionApiV2Resource resource;
+    private FunctionsImpl resource;
     private InputStream mockedInputStream;
     private FormDataContentDisposition mockedFormData;
 
     @BeforeMethod
     public void setup() {
-        this.resource = spy(new FunctionApiV2Resource());
         this.mockedManager = mock(FunctionMetaDataManager.class);
         this.mockedInputStream = mock(InputStream.class);
         this.mockedFormData = mock(FormDataContentDisposition.class);
@@ -124,7 +123,7 @@ public class FunctionApiV2ResourceTest {
             .setPulsarServiceUrl("pulsar://localhost:6650/");
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
-        doReturn(mockedWorkerService).when(resource).worker();
+        this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
     }
 
     //
diff --git a/pulsar-zookeeper-utils/pom.xml b/pulsar-zookeeper-utils/pom.xml
index 0a621f0..b8019a5 100644
--- a/pulsar-zookeeper-utils/pom.xml
+++ b/pulsar-zookeeper-utils/pom.xml
@@ -42,21 +42,6 @@
     <dependency>
       <groupId>org.apache.bookkeeper</groupId>
       <artifactId>bookkeeper-server-shaded</artifactId>
-      <version>${bookkeeper.version}</version>
-      <exclusions>
-        <exclusion>
-          <artifactId>slf4j-log4j12</artifactId>
-          <groupId>org.slf4j</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>log4j</artifactId>
-          <groupId>log4j</groupId>
-        </exclusion>
-        <exclusion>
-          <groupId>org.jboss.netty</groupId>
-          <artifactId>netty</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
 
     <dependency>

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.