You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/03/23 05:12:31 UTC

[pulsar] branch master updated: [PulsarAdmin] Functions to async (#6580)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 75d43eb  [PulsarAdmin] Functions to async (#6580)
75d43eb is described below

commit 75d43ebb8368920a3a7b3c51d26f1bb59c66da28
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Mon Mar 23 13:12:16 2020 +0800

    [PulsarAdmin] Functions to async (#6580)
---
 .../org/apache/pulsar/client/admin/Functions.java  | 375 ++++++++++-
 .../client/admin/internal/FunctionsImpl.java       | 737 ++++++++++++++++-----
 2 files changed, 942 insertions(+), 170 deletions(-)

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index d6a7ff3..50b7bb5 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.admin;
 
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
@@ -54,6 +55,20 @@ public interface Functions {
     List<String> getFunctions(String tenant, String namespace) throws PulsarAdminException;
 
     /**
+     * Get the list of functions asynchronously.
+     * <p>
+     * Get the list of all the Pulsar functions.
+     * <p>
+     * Response Example:
+     *
+     * <pre>
+     * <code>["f1", "f2", "f3"]</code>
+     * </pre>
+     *
+     */
+    CompletableFuture<List<String>> getFunctionsAsync(String tenant, String namespace);
+
+    /**
      * Get the configuration for the specified function.
      * <p>
      * Response Example:
@@ -81,6 +96,27 @@ public interface Functions {
     FunctionConfig getFunction(String tenant, String namespace, String function) throws PulsarAdminException;
 
     /**
+     * Get the configuration for the specified function asynchronously.
+     * <p>
+     * Response Example:
+     *
+     * <pre>
+     * <code>{ serviceUrl : "http://my-broker.example.com:8080/" }</code>
+     * </pre>
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     *
+     * @return the function configuration
+     *
+     */
+    CompletableFuture<FunctionConfig> getFunctionAsync(String tenant, String namespace, String function);
+
+    /**
      * Create a new function.
      *
      * @param functionConfig
@@ -92,12 +128,20 @@ public interface Functions {
     void createFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException;
 
     /**
-     * <pre>
+     * Create a new function asynchronously.
+     *
+     * @param functionConfig
+     *            the function configuration object
+     */
+    CompletableFuture<Void> createFunctionAsync(FunctionConfig functionConfig, String fileName);
+
+    /**
+     * Create a new function with package url.
+     * <p>
      * Create a new function by providing url from which fun-pkg can be downloaded. supported url: http/file
      * eg:
      * File: file:/dir/fileName.jar
      * Http: http://www.repo.com/fileName.jar
-     * </pre>
      *
      * @param functionConfig
      *            the function configuration object
@@ -108,6 +152,21 @@ public interface Functions {
     void createFunctionWithUrl(FunctionConfig functionConfig, String pkgUrl) throws PulsarAdminException;
 
     /**
+     * Create a new function with package url asynchronously.
+     * <p>
+     * Create a new function by providing url from which fun-pkg can be downloaded. supported url: http/file
+     * eg:
+     * File: file:/dir/fileName.jar
+     * Http: http://www.repo.com/fileName.jar
+     *
+     * @param functionConfig
+     *            the function configuration object
+     * @param pkgUrl
+     *            url from which pkg can be downloaded
+     */
+    CompletableFuture<Void> createFunctionWithUrlAsync(FunctionConfig functionConfig, String pkgUrl);
+
+    /**
      * Update the configuration for a function.
      * <p>
      *
@@ -124,6 +183,15 @@ public interface Functions {
     void updateFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException;
 
     /**
+     * Update the configuration for a function asynchronously.
+     * <p>
+     *
+     * @param functionConfig
+     *            the function configuration object
+     */
+    CompletableFuture<Void> updateFunctionAsync(FunctionConfig functionConfig, String fileName);
+
+    /**
      * Update the configuration for a function.
      * <p>
      *
@@ -141,13 +209,23 @@ public interface Functions {
     void updateFunction(FunctionConfig functionConfig, String fileName, UpdateOptions updateOptions) throws PulsarAdminException;
 
     /**
+     * Update the configuration for a function asynchronously.
+     * <p>
+     *
+     * @param functionConfig
+     *            the function configuration object
+     * @param updateOptions
+     *            options for the update operations
+     */
+    CompletableFuture<Void> updateFunctionAsync(FunctionConfig functionConfig, String fileName, UpdateOptions updateOptions);
+
+    /**
      * Update the configuration for a function.
-     * <pre>
+     * <p>
      * Update a function by providing url from which fun-pkg can be downloaded. supported url: http/file
      * eg:
      * File: file:/dir/fileName.jar
      * Http: http://www.repo.com/fileName.jar
-     * </pre>
      *
      * @param functionConfig
      *            the function configuration object
@@ -163,13 +241,27 @@ public interface Functions {
     void updateFunctionWithUrl(FunctionConfig functionConfig, String pkgUrl) throws PulsarAdminException;
 
     /**
+     * Update the configuration for a function asynchronously.
+     * <p>
+     * Update a function by providing url from which fun-pkg can be downloaded. supported url: http/file
+     * eg:
+     * File: file:/dir/fileName.jar
+     * Http: http://www.repo.com/fileName.jar
+     *
+     * @param functionConfig
+     *            the function configuration object
+     * @param pkgUrl
+     *            url from which pkg can be downloaded
+     */
+    CompletableFuture<Void> updateFunctionWithUrlAsync(FunctionConfig functionConfig, String pkgUrl);
+
+    /**
      * Update the configuration for a function.
-     * <pre>
+     * <p>
      * Update a function by providing url from which fun-pkg can be downloaded. supported url: http/file
      * eg:
      * File: file:/dir/fileName.jar
      * Http: http://www.repo.com/fileName.jar
-     * </pre>
      *
      * @param functionConfig
      *            the function configuration object
@@ -186,9 +278,25 @@ public interface Functions {
      */
     void updateFunctionWithUrl(FunctionConfig functionConfig, String pkgUrl, UpdateOptions updateOptions) throws PulsarAdminException;
 
+    /**
+     * Update the configuration for a function asynchronously.
+     * <p>
+     * Update a function by providing url from which fun-pkg can be downloaded. supported url: http/file
+     * eg:
+     * File: file:/dir/fileName.jar
+     * Http: http://www.repo.com/fileName.jar
+     *
+     * @param functionConfig
+     *            the function configuration object
+     * @param pkgUrl
+     *            url from which pkg can be downloaded
+     * @param updateOptions
+     *            options for the update operations
+     */
+    CompletableFuture<Void> updateFunctionWithUrlAsync(FunctionConfig functionConfig, String pkgUrl, UpdateOptions updateOptions);
 
     /**
-     * Delete an existing function
+     * Delete an existing function.
      * <p>
      * Delete a function
      *
@@ -211,6 +319,20 @@ public interface Functions {
     void deleteFunction(String tenant, String namespace, String function) throws PulsarAdminException;
 
     /**
+     * Delete an existing function asynchronously.
+     * <p>
+     * Delete a function
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     */
+    CompletableFuture<Void> deleteFunctionAsync(String tenant, String namespace, String function);
+
+    /**
      * Gets the current status of a function.
      *
      * @param tenant
@@ -226,6 +348,18 @@ public interface Functions {
     FunctionStatus getFunctionStatus(String tenant, String namespace, String function) throws PulsarAdminException;
 
     /**
+     * Gets the current status of a function asynchronously.
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     */
+    CompletableFuture<FunctionStatus> getFunctionStatusAsync(String tenant, String namespace, String function);
+
+    /**
      * Gets the current status of a function instance.
      *
      * @param tenant
@@ -243,6 +377,21 @@ public interface Functions {
             throws PulsarAdminException;
 
     /**
+     * Gets the current status of a function instance asynchronously.
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     * @param id
+     *            Function instance-id
+     * @return
+     */
+    CompletableFuture<FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData> getFunctionStatusAsync(String tenant, String namespace, String function, int id);
+
+    /**
      * Gets the current stats of a function instance.
      *
      * @param tenant
@@ -260,6 +409,21 @@ public interface Functions {
             throws PulsarAdminException;
 
     /**
+     * Gets the current stats of a function instance asynchronously.
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     * @param id
+     *            Function instance-id
+     * @return
+     */
+    CompletableFuture<FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData> getFunctionStatsAsync(String tenant, String namespace, String function, int id);
+
+    /**
      * Gets the current stats of a function.
      *
      * @param tenant
@@ -276,7 +440,21 @@ public interface Functions {
             throws PulsarAdminException;
 
     /**
-     * Restart function instance
+     * Gets the current stats of a function asynchronously.
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     * @return
+     */
+
+    CompletableFuture<FunctionStats> getFunctionStatsAsync(String tenant, String namespace, String function);
+
+    /**
+     * Restart function instance.
      *
      * @param tenant
      *            Tenant name
@@ -294,7 +472,22 @@ public interface Functions {
     void restartFunction(String tenant, String namespace, String function, int instanceId) throws PulsarAdminException;
 
     /**
-     * Restart all function instances
+     * Restart function instance asynchronously.
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     *
+     * @param instanceId
+     *            Function instanceId
+     */
+    CompletableFuture<Void> restartFunctionAsync(String tenant, String namespace, String function, int instanceId);
+
+    /**
+     * Restart all function instances.
      *
      * @param tenant
      *            Tenant name
@@ -308,9 +501,20 @@ public interface Functions {
      */
     void restartFunction(String tenant, String namespace, String function) throws PulsarAdminException;
 
+    /**
+     * Restart all function instances asynchronously.
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     */
+    CompletableFuture<Void> restartFunctionAsync(String tenant, String namespace, String function);
 
     /**
-     * Stop function instance
+     * Stop function instance.
      *
      * @param tenant
      *            Tenant name
@@ -328,7 +532,22 @@ public interface Functions {
     void stopFunction(String tenant, String namespace, String function, int instanceId) throws PulsarAdminException;
 
     /**
-     * Start all function instances
+     * Stop function instance asynchronously.
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     *
+     * @param instanceId
+     *            Function instanceId
+     */
+    CompletableFuture<Void> stopFunctionAsync(String tenant, String namespace, String function, int instanceId);
+
+    /**
+     * Start all function instances.
      *
      * @param tenant
      *            Tenant name
@@ -343,7 +562,19 @@ public interface Functions {
     void startFunction(String tenant, String namespace, String function) throws PulsarAdminException;
 
     /**
-     * Start function instance
+     * Start all function instances asynchronously.
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     */
+    CompletableFuture<Void> startFunctionAsync(String tenant, String namespace, String function);
+
+    /**
+     * Start function instance.
      *
      * @param tenant
      *            Tenant name
@@ -361,7 +592,22 @@ public interface Functions {
     void startFunction(String tenant, String namespace, String function, int instanceId) throws PulsarAdminException;
 
     /**
-     * Stop all function instances
+     * Start function instance asynchronously.
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     *
+     * @param instanceId
+     *            Function instanceId
+     */
+    CompletableFuture<Void> startFunctionAsync(String tenant, String namespace, String function, int instanceId);
+
+    /**
+     * Stop all function instances.
      *
      * @param tenant
      *            Tenant name
@@ -375,6 +621,17 @@ public interface Functions {
      */
     void stopFunction(String tenant, String namespace, String function) throws PulsarAdminException;
 
+    /**
+     * Stop all function instances asynchronously.
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     */
+    CompletableFuture<Void> stopFunctionAsync(String tenant, String namespace, String function);
 
     /**
      * Triggers the function by writing to the input topic.
@@ -396,6 +653,22 @@ public interface Functions {
     String triggerFunction(String tenant, String namespace, String function, String topic, String triggerValue, String triggerFile) throws PulsarAdminException;
 
     /**
+     * Triggers the function by writing to the input topic asynchronously.
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     * @param triggerValue
+     *            The input that will be written to input topic
+     * @param triggerFile
+     *            The file which contains the input that will be written to input topic
+     */
+    CompletableFuture<String> triggerFunctionAsync(String tenant, String namespace, String function, String topic, String triggerValue, String triggerFile);
+
+    /**
      * Upload Data.
      *
      * @param sourceFile
@@ -409,6 +682,16 @@ public interface Functions {
     void uploadFunction(String sourceFile, String path) throws PulsarAdminException;
 
     /**
+     * Upload Data asynchronously.
+     *
+     * @param sourceFile
+     *            dataFile that needs to be uploaded
+     * @param path
+     *            Path where data should be stored
+     */
+    CompletableFuture<Void> uploadFunctionAsync(String sourceFile, String path);
+
+    /**
      * Download Function Code.
      *
      * @param destinationFile
@@ -425,6 +708,16 @@ public interface Functions {
      * Download Function Code.
      *
      * @param destinationFile
+     *            file where data should be downloaded to
+     * @param path
+     *            Path where data is located
+     */
+    CompletableFuture<Void> downloadFunctionAsync(String destinationFile, String path);
+
+    /**
+     * Download Function Code.
+     *
+     * @param destinationFile
      *           file where data should be downloaded to
      * @param tenant
      *            Tenant name
@@ -437,6 +730,20 @@ public interface Functions {
     void downloadFunction(String destinationFile, String tenant, String namespace, String function) throws PulsarAdminException;
 
     /**
+     * Download Function Code asynchronously.
+     *
+     * @param destinationFile
+     *           file where data should be downloaded to
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     */
+    CompletableFuture<Void> downloadFunctionAsync(String destinationFile, String tenant, String namespace, String function);
+
+    /**
      * Deprecated in favor of getting sources and sinks for their own APIs
      *
      * Fetches a list of supported Pulsar IO connectors currently running in cluster mode
@@ -502,6 +809,28 @@ public interface Functions {
     FunctionState getFunctionState(String tenant, String namespace, String function, String key) throws PulsarAdminException;
 
     /**
+     * Fetch the current state associated with a Pulsar Function asynchronously.
+     * <p>
+     * Response Example:
+     *
+     * <pre>
+     * <code>{ "value : 12, version : 2"}</code>
+     * </pre>
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     * @param key
+     *            Key name of State
+     *
+     * @return the function configuration
+     */
+    CompletableFuture<FunctionState> getFunctionStateAsync(String tenant, String namespace, String function, String key);
+
+    /**
      * Puts the given state associated with a Pulsar Function.
      * <p>
      * Response Example:
@@ -527,4 +856,24 @@ public interface Functions {
      *             Unexpected error
      */
     void putFunctionState(String tenant, String namespace, String function, FunctionState state) throws PulsarAdminException;
+
+    /**
+     * Puts the given state associated with a Pulsar Function asynchronously.
+     * <p>
+     * Response Example:
+     *
+     * <pre>
+     * <code>{ "value : 12, version : 2"}</code>
+     * </pre>
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     * @param state
+     *            FunctionState
+     */
+    CompletableFuture<Void> putFunctionStateAsync(String tenant, String namespace, String function, FunctionState state);
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index bc2982d..a5b323c 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.client.admin.internal;
 
 import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
 import io.netty.handler.codec.http.HttpHeaders;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -47,6 +46,7 @@ import org.glassfish.jersey.media.multipart.FormDataMultiPart;
 import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;
 
 import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.MediaType;
@@ -56,7 +56,10 @@ import java.io.FileOutputStream;
 import java.nio.channels.FileChannel;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
 import static org.asynchttpclient.Dsl.get;
@@ -78,188 +81,408 @@ public class FunctionsImpl extends ComponentResource implements Functions {
     @Override
     public List<String> getFunctions(String tenant, String namespace) throws PulsarAdminException {
         try {
-            Response response = request(functions.path(tenant).path(namespace)).get();
-            if (!response.getStatusInfo().equals(Response.Status.OK)) {
-                throw getApiException(response);
-            }
-            return response.readEntity(new GenericType<List<String>>() {
-            });
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getFunctionsAsync(tenant, namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<List<String>> getFunctionsAsync(String tenant, String namespace) {
+        WebTarget path = functions.path(tenant).path(namespace);
+        final CompletableFuture<List<String>> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Response>() {
+                    @Override
+                    public void completed(Response response) {
+                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                            future.completeExceptionally(getApiException(response));
+                        } else {
+                            List<String> functions
+                                    = response.readEntity(new GenericType<List<String>>() {});
+                            future.complete(functions);
+                        }
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public FunctionConfig getFunction(String tenant, String namespace, String function) throws PulsarAdminException {
         try {
-             Response response = request(functions.path(tenant).path(namespace).path(function)).get();
-            if (!response.getStatusInfo().equals(Response.Status.OK)) {
-                throw getApiException(response);
-            }
-            return response.readEntity(FunctionConfig.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getFunctionAsync(tenant, namespace, function).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<FunctionConfig> getFunctionAsync(String tenant, String namespace, String function) {
+        WebTarget path = functions.path(tenant).path(namespace).path(function);
+        final CompletableFuture<FunctionConfig> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Response>() {
+                    @Override
+                    public void completed(Response response) {
+                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                            future.completeExceptionally(getApiException(response));
+                        } else {
+                            future.complete(response.readEntity(FunctionConfig.class));
+                        }
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public FunctionStatus getFunctionStatus(
             String tenant, String namespace, String function) throws PulsarAdminException {
         try {
-            Response response = request(functions.path(tenant).path(namespace).path(function).path("status")).get();
-            if (!response.getStatusInfo().equals(Response.Status.OK)) {
-                throw getApiException(response);
-            }
-            return response.readEntity(FunctionStatus.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getFunctionStatusAsync(tenant, namespace, function).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
+    @Override
+    public CompletableFuture<FunctionStatus> getFunctionStatusAsync(String tenant, String namespace, String function) {
+        WebTarget path = functions.path(tenant).path(namespace).path(function).path("status");
+        final CompletableFuture<FunctionStatus> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Response>() {
+                    @Override
+                    public void completed(Response response) {
+                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                            future.completeExceptionally(getApiException(response));
+                        } else {
+                            future.complete(response.readEntity(FunctionStatus.class));
+                        }
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
     public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData getFunctionStatus(
             String tenant, String namespace, String function, int id) throws PulsarAdminException {
         try {
-            Response response = request(
-                    functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("status"))
-                            .get();
-            if (!response.getStatusInfo().equals(Response.Status.OK)) {
-                throw getApiException(response);
-            }
-            return response.readEntity(FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getFunctionStatusAsync(tenant, namespace, function, id).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData> getFunctionStatusAsync(String tenant, String namespace, String function, int id) {
+        WebTarget path = functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("status");
+        final CompletableFuture<FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Response>() {
+                    @Override
+                    public void completed(Response response) {
+                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                            future.completeExceptionally(getApiException(response));
+                        } else {
+                            future.complete(response.readEntity(
+                                    FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData.class));
+                        }
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionStats(String tenant, String namespace, String function, int id) throws PulsarAdminException {
         try {
-            Response response = request(
-                    functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("stats")).get();
-            if (!response.getStatusInfo().equals(Response.Status.OK)) {
-                throw getApiException(response);
-            }
-            return response.readEntity(FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getFunctionStatsAsync(tenant, namespace, function, id).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData> getFunctionStatsAsync(String tenant, String namespace, String function, int id) {
+        WebTarget path = functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("stats");
+        final CompletableFuture<FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Response>() {
+                    @Override
+                    public void completed(Response response) {
+                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                            future.completeExceptionally(getApiException(response));
+                        } else {
+                            future.complete(response.readEntity(
+                                    FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData.class));
+                        }
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public FunctionStats getFunctionStats(String tenant, String namespace, String function) throws PulsarAdminException {
         try {
-            Response response = request(
-                    functions.path(tenant).path(namespace).path(function).path("stats")).get();
-            if (!response.getStatusInfo().equals(Response.Status.OK)) {
-                throw getApiException(response);
-            }
-            return response.readEntity(FunctionStats.class);
-        } catch (Exception e) {
-            throw getApiException(e);
-        }    }
+            return getFunctionStatsAsync(tenant, namespace, function).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<FunctionStats> getFunctionStatsAsync(String tenant, String namespace, String function) {
+        WebTarget path = functions.path(tenant).path(namespace).path(function).path("stats");
+        final CompletableFuture<FunctionStats> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Response>() {
+                    @Override
+                    public void completed(Response response) {
+                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                            future.completeExceptionally(getApiException(response));
+                        } else {
+                            future.complete(response.readEntity(FunctionStats.class));
+                        }
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
 
     @Override
     public void createFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException {
         try {
+            createFunctionAsync(functionConfig, fileName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> createFunctionAsync(FunctionConfig functionConfig, String fileName) {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        try {
             RequestBuilder builder = post(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName()).getUri().toASCIIString())
                     .addBodyPart(new StringPart("functionConfig", ObjectMapperFactory.getThreadLocal().writeValueAsString(functionConfig), MediaType.APPLICATION_JSON));
 
             if (fileName != null && !fileName.startsWith("builtin://")) {
                 // If the function code is built in, we don't need to submit here
-               builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
-            }
-            org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).get();
-
-            if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
-                throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
+                builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
             }
+            asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).toCompletableFuture().thenAccept(response -> {
+                if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
+                    future.completeExceptionally(getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build()));
+                } else {
+                    future.complete(null);
+                }
+            });
 
         } catch (Exception e) {
-            throw getApiException(e);
+            future.completeExceptionally(e);
         }
+        return future;
     }
 
     @Override
     public void createFunctionWithUrl(FunctionConfig functionConfig, String pkgUrl) throws PulsarAdminException {
         try {
-            final FormDataMultiPart mp = new FormDataMultiPart();
+            createFunctionWithUrlAsync(functionConfig, pkgUrl).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
 
-            mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE));
+    @Override
+    public CompletableFuture<Void> createFunctionWithUrlAsync(FunctionConfig functionConfig, String pkgUrl) {
+        WebTarget path = functions.path(functionConfig.getTenant())
+                .path(functionConfig.getNamespace()).path(functionConfig.getName());
 
-            mp.bodyPart(new FormDataBodyPart("functionConfig",
-                new Gson().toJson(functionConfig),
-                MediaType.APPLICATION_JSON_TYPE));
-            request(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName()))
-                    .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
-        }
+        final FormDataMultiPart mp = new FormDataMultiPart();
+        mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE));
+        mp.bodyPart(new FormDataBodyPart(
+                "functionConfig", new Gson().toJson(functionConfig), MediaType.APPLICATION_JSON_TYPE));
+
+        return asyncPostRequest(path, Entity.entity(mp, MediaType.MULTIPART_FORM_DATA));
     }
 
     @Override
     public void deleteFunction(String cluster, String namespace, String function) throws PulsarAdminException {
         try {
-            request(functions.path(cluster).path(namespace).path(function))
-                    .delete(ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            deleteFunctionAsync(cluster, namespace, function).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> deleteFunctionAsync(String tenant, String namespace, String function) {
+        WebTarget path = functions.path(tenant).path(namespace).path(function);
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public void updateFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException {
         updateFunction(functionConfig, fileName, null);
     }
 
-        @Override
+    @Override
+    public CompletableFuture<Void> updateFunctionAsync(FunctionConfig functionConfig, String fileName) {
+        return updateFunctionAsync(functionConfig, fileName, null);
+    }
+
+    @Override
     public void updateFunction(FunctionConfig functionConfig, String fileName, UpdateOptions updateOptions) throws PulsarAdminException {
         try {
+            updateFunctionAsync(functionConfig, fileName, updateOptions)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> updateFunctionAsync(FunctionConfig functionConfig, String fileName, UpdateOptions updateOptions) {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        try {
             RequestBuilder builder = put(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName()).getUri().toASCIIString())
                     .addBodyPart(new StringPart("functionConfig", ObjectMapperFactory.getThreadLocal().writeValueAsString(functionConfig), MediaType.APPLICATION_JSON));
 
             if (updateOptions != null) {
-                   builder.addBodyPart(new StringPart("updateOptions", ObjectMapperFactory.getThreadLocal().writeValueAsString(updateOptions), MediaType.APPLICATION_JSON));
+                builder.addBodyPart(new StringPart("updateOptions", ObjectMapperFactory.getThreadLocal().writeValueAsString(updateOptions), MediaType.APPLICATION_JSON));
             }
 
             if (fileName != null && !fileName.startsWith("builtin://")) {
                 // If the function code is built in, we don't need to submit here
                 builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
             }
-            org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).get();
 
-            if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
-                throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
-            }
+            asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).toCompletableFuture().thenAccept(response -> {
+                if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
+                    future.completeExceptionally(getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build()));
+                } else {
+                    future.complete(null);
+                }
+            });
         } catch (Exception e) {
-            throw getApiException(e);
+            future.completeExceptionally(getApiException(e));
         }
+        return future;
     }
 
     @Override
     public void updateFunctionWithUrl(FunctionConfig functionConfig, String pkgUrl, UpdateOptions updateOptions) throws PulsarAdminException {
         try {
-            final FormDataMultiPart mp = new FormDataMultiPart();
+            updateFunctionWithUrlAsync(functionConfig, pkgUrl, updateOptions)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
 
+    @Override
+    public CompletableFuture<Void> updateFunctionWithUrlAsync(FunctionConfig functionConfig, String pkgUrl, UpdateOptions updateOptions) {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        try {
+            final FormDataMultiPart mp = new FormDataMultiPart();
             mp.bodyPart(new FormDataBodyPart("url", pkgUrl, MediaType.TEXT_PLAIN_TYPE));
-
             mp.bodyPart(new FormDataBodyPart(
                     "functionConfig",
                     ObjectMapperFactory.getThreadLocal().writeValueAsString(functionConfig),
                     MediaType.APPLICATION_JSON_TYPE));
-
             if (updateOptions != null) {
                 mp.bodyPart(new FormDataBodyPart(
                         "updateOptions",
                         ObjectMapperFactory.getThreadLocal().writeValueAsString(updateOptions),
                         MediaType.APPLICATION_JSON_TYPE));
             }
-
-            request(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace())
-                    .path(functionConfig.getName())).put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA),
-                    ErrorData.class);
+            WebTarget path = functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace())
+                    .path(functionConfig.getName());
+            return asyncPutRequest(path, Entity.entity(mp, MediaType.MULTIPART_FORM_DATA));
         } catch (Exception e) {
-            throw getApiException(e);
+            future.completeExceptionally(getApiException(e));
         }
+        return future;
     }
 
     @Override
@@ -268,104 +491,229 @@ public class FunctionsImpl extends ComponentResource implements Functions {
     }
 
     @Override
+    public CompletableFuture<Void> updateFunctionWithUrlAsync(FunctionConfig functionConfig, String pkgUrl) {
+        return updateFunctionWithUrlAsync(functionConfig, pkgUrl, null);
+    }
+
+    @Override
     public String triggerFunction(String tenant, String namespace, String functionName, String topic, String triggerValue, String triggerFile) throws PulsarAdminException {
         try {
-            final FormDataMultiPart mp = new FormDataMultiPart();
-            if (triggerFile != null) {
-                mp.bodyPart(new FileDataBodyPart("dataStream",
-                        new File(triggerFile),
-                        MediaType.APPLICATION_OCTET_STREAM_TYPE));
-            }
-            if (triggerValue != null) {
-                mp.bodyPart(new FormDataBodyPart("data", triggerValue, MediaType.TEXT_PLAIN_TYPE));
-            }
-            if (topic != null && !topic.isEmpty()) {
-                mp.bodyPart(new FormDataBodyPart("topic", topic, MediaType.TEXT_PLAIN_TYPE));
-            }
-            return request(functions.path(tenant).path(namespace).path(functionName).path("trigger"))
-                    .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), String.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return triggerFunctionAsync(tenant, namespace, functionName, topic, triggerValue, triggerFile)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<String> triggerFunctionAsync(String tenant, String namespace, String function, String topic, String triggerValue, String triggerFile) {
+        final FormDataMultiPart mp = new FormDataMultiPart();
+        if (triggerFile != null) {
+            mp.bodyPart(new FileDataBodyPart("dataStream",
+                    new File(triggerFile),
+                    MediaType.APPLICATION_OCTET_STREAM_TYPE));
+        }
+        if (triggerValue != null) {
+            mp.bodyPart(new FormDataBodyPart("data", triggerValue, MediaType.TEXT_PLAIN_TYPE));
+        }
+        if (topic != null && !topic.isEmpty()) {
+            mp.bodyPart(new FormDataBodyPart("topic", topic, MediaType.TEXT_PLAIN_TYPE));
+        }
+        WebTarget path = functions.path(tenant).path(namespace).path(function).path("trigger");
+
+        final CompletableFuture<String> future = new CompletableFuture<>();
+        try {
+            request(path).async().post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), new InvocationCallback<String>() {
+
+                @Override
+                public void completed(String response) {
+                    future.complete(response);
+                }
+
+                @Override
+                public void failed(Throwable throwable) {
+                    log.warn("[{}] Failed to perform http post request: {}", path.getUri(), throwable.getMessage());
+                    future.completeExceptionally(getApiException(throwable.getCause()));
+                }
+
+            });
+        } catch (PulsarAdminException cae) {
+            future.completeExceptionally(cae);
         }
+        return future;
     }
 
     @Override
     public void restartFunction(String tenant, String namespace, String functionName, int instanceId)
             throws PulsarAdminException {
         try {
-            request(functions.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId))
-                    .path("restart")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            restartFunctionAsync(tenant, namespace, functionName, instanceId)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> restartFunctionAsync(String tenant, String namespace, String function, int instanceId) {
+        WebTarget path = functions.path(tenant).path(namespace).path(function).path(Integer.toString(instanceId))
+                .path("restart");
+        return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void restartFunction(String tenant, String namespace, String functionName) throws PulsarAdminException {
         try {
-            request(functions.path(tenant).path(namespace).path(functionName).path("restart"))
-                    .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            restartFunctionAsync(tenant, namespace, functionName)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> restartFunctionAsync(String tenant, String namespace, String function) {
+        WebTarget path = functions.path(tenant).path(namespace).path(function).path("restart");
+        return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void stopFunction(String tenant, String namespace, String functionName, int instanceId)
             throws PulsarAdminException {
         try {
-            request(functions.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId))
-                    .path("stop")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            stopFunctionAsync(tenant, namespace, functionName, instanceId)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> stopFunctionAsync(String tenant, String namespace, String function, int instanceId) {
+        WebTarget path = functions.path(tenant).path(namespace).path(function).path(Integer.toString(instanceId))
+                .path("stop");
+        return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void stopFunction(String tenant, String namespace, String functionName) throws PulsarAdminException {
         try {
-            request(functions.path(tenant).path(namespace).path(functionName).path("stop"))
-                    .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            stopFunctionAsync(tenant, namespace, functionName)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> stopFunctionAsync(String tenant, String namespace, String function) {
+        WebTarget path = functions.path(tenant).path(namespace).path(function).path("stop");
+        return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void startFunction(String tenant, String namespace, String functionName, int instanceId)
             throws PulsarAdminException {
         try {
-            request(functions.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId))
-                    .path("start")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            startFunctionAsync(tenant, namespace, functionName, instanceId)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> startFunctionAsync(String tenant, String namespace, String function, int instanceId) {
+        WebTarget path = functions.path(tenant).path(namespace).path(function).path(Integer.toString(instanceId))
+                .path("start");
+        return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void startFunction(String tenant, String namespace, String functionName) throws PulsarAdminException {
         try {
-            request(functions.path(tenant).path(namespace).path(functionName).path("start"))
-                    .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            startFunctionAsync(tenant, namespace, functionName)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> startFunctionAsync(String tenant, String namespace, String function) {
+        WebTarget path = functions.path(tenant).path(namespace).path(function).path("start");
+        return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public void uploadFunction(String sourceFile, String path) throws PulsarAdminException {
         try {
+            uploadFunctionAsync(sourceFile, path).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> uploadFunctionAsync(String sourceFile, String path) {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        try {
             RequestBuilder builder = post(functions.path("upload").getUri().toASCIIString())
                     .addBodyPart(new FilePart("data", new File(sourceFile), MediaType.APPLICATION_OCTET_STREAM))
                     .addBodyPart(new StringPart("path", path, MediaType.TEXT_PLAIN));
 
-            org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).get();
-            if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
-                throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
-            }
+            asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).toCompletableFuture()
+                    .thenAccept(response -> {
+                        if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
+                            future.completeExceptionally(getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build()));
+                        } else {
+                            future.complete(null);
+                        }
+                    });
         } catch (Exception e) {
-            throw getApiException(e);
+            future.completeExceptionally(getApiException(e));
         }
+        return future;
     }
 
     @Override
@@ -374,12 +722,35 @@ public class FunctionsImpl extends ComponentResource implements Functions {
     }
 
     @Override
+    public CompletableFuture<Void> downloadFunctionAsync(String destinationPath, String tenant, String namespace, String functionName) {
+        return downloadFileAsync(destinationPath, functions.path(tenant).path(namespace).path(functionName).path("download"));
+    }
+
+    @Override
     public void downloadFunction(String destinationPath, String path) throws PulsarAdminException {
         downloadFile(destinationPath, functions.path("download").queryParam("path", path));
     }
 
+    @Override
+    public CompletableFuture<Void> downloadFunctionAsync(String destinationFile, String path) {
+        return downloadFileAsync(destinationFile, functions.path("download").queryParam("path", path));
+    }
+
     private void downloadFile(String destinationPath, WebTarget target) throws PulsarAdminException {
-        HttpResponseStatus status;
+        try {
+            downloadFileAsync(destinationPath, target).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    private CompletableFuture<Void> downloadFileAsync(String destinationPath, WebTarget target) {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
         try {
             File file = new File(destinationPath);
             if (!file.exists()) {
@@ -389,7 +760,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
 
             RequestBuilder builder = get(target.getUri().toASCIIString());
 
-            Future<HttpResponseStatus> whenStatusCode
+            CompletableFuture<HttpResponseStatus> statusFuture
                     = asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build(), new AsyncHandler<HttpResponseStatus>() {
                 private HttpResponseStatus status;
 
@@ -409,7 +780,6 @@ public class FunctionsImpl extends ComponentResource implements Functions {
 
                 @Override
                 public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
-
                     os.write(bodyPart.getBodyByteBuffer());
                     return State.CONTINUE;
                 }
@@ -422,17 +792,27 @@ public class FunctionsImpl extends ComponentResource implements Functions {
                 @Override
                 public void onThrowable(Throwable t) {
                 }
-            });
-
-            status = whenStatusCode.get();
-            os.close();
+            }).toCompletableFuture();
+
+            statusFuture.thenAccept(status -> {
+                try {
+                    os.close();
+                } catch (Exception e) {
+                    future.completeExceptionally(getApiException(e));
+                    return;
+                }
 
-            if (status.getStatusCode() < 200 || status.getStatusCode() >= 300) {
-                throw getApiException(Response.status(status.getStatusCode()).entity(status.getStatusText()).build());
-            }
+                if (status.getStatusCode() < 200 || status.getStatusCode() >= 300) {
+                    future.completeExceptionally(
+                            getApiException(Response.status(status.getStatusCode()).entity(status.getStatusText()).build()));
+                } else {
+                    future.complete(null);
+                }
+            });
         } catch (Exception e) {
-            throw getApiException(e);
+            future.completeExceptionally(getApiException(e));
         }
+        return future;
     }
 
     @Override
@@ -474,30 +854,73 @@ public class FunctionsImpl extends ComponentResource implements Functions {
     public FunctionState getFunctionState(String tenant, String namespace, String function, String key)
         throws PulsarAdminException {
         try {
-            Response response = request(functions.path(tenant)
-                .path(namespace).path(function).path("state").path(key)).get();
-            if (!response.getStatusInfo().equals(Response.Status.OK)) {
-                throw getApiException(response);
-            }
-            return response.readEntity(FunctionState.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            return getFunctionStateAsync(tenant, namespace, function, key)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<FunctionState> getFunctionStateAsync(String tenant, String namespace, String function, String key) {
+        WebTarget path = functions.path(tenant).path(namespace).path(function).path("state").path(key);
+        final CompletableFuture<FunctionState> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<Response>() {
+                    @Override
+                    public void completed(Response response) {
+                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                            future.completeExceptionally(getApiException(response));
+                        } else {
+                            future.complete(response.readEntity(FunctionState.class));
+                        }
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+    @Override
     public void putFunctionState(String tenant, String namespace, String function, FunctionState state)
             throws PulsarAdminException {
         try {
-             RequestBuilder builder = post(functions.path(tenant).path(namespace).path(function).path("state").path(state.getKey()).getUri().toASCIIString());
-             builder.addBodyPart(new StringPart("state", ObjectMapperFactory.getThreadLocal().writeValueAsString(state), MediaType.APPLICATION_JSON));
-             org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).get();
+            putFunctionStateAsync(tenant, namespace, function, state).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> putFunctionStateAsync(String tenant, String namespace, String function, FunctionState state) {
+        final CompletableFuture<Void> future = new CompletableFuture<>();
+        try {
+            RequestBuilder builder = post(functions.path(tenant).path(namespace).path(function).path("state").path(state.getKey()).getUri().toASCIIString());
+            builder.addBodyPart(new StringPart("state", ObjectMapperFactory.getThreadLocal().writeValueAsString(state), MediaType.APPLICATION_JSON));
+            asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build()).toCompletableFuture().thenAccept(response -> {
+                if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
+                    future.completeExceptionally(getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build()));
+                } else {
+                    future.complete(null);
+                }
+            });
 
-             if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
-                 throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
-             }
         } catch (Exception e) {
-            throw getApiException(e);
+            future.completeExceptionally(e);
         }
+        return future;
     }
 }