You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/19 00:49:32 UTC

[pulsar] 17/26: Optimizing performance for Pulsar function archive download (#4082)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e4c62f5999861190def5ce5e69d743169a2ae823
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sat Apr 20 10:14:24 2019 -0700

    Optimizing performance for Pulsar function archive download (#4082)
    
    * Optimizing performance for download
    
    * fixing update and create
    
    * fix upload
    
    * remove commented out code
    
    * remove blank lines
    
    * fix error messages
    
    * fix for sources and sinks
    
    * fix getting exception
    
    * fix auth headers
    
    * cleaning up
    
    * fix print messages
---
 .../apache/pulsar/client/admin/PulsarAdmin.java    |  23 +++-
 .../pulsar/client/admin/PulsarAdminException.java  |  20 +++-
 .../pulsar/client/admin/internal/BaseResource.java |   4 +-
 .../client/admin/internal/ComponentResource.java   |  47 ++++++++
 .../client/admin/internal/FunctionsImpl.java       | 126 +++++++++++++++------
 .../pulsar/client/admin/internal/SinkImpl.java     |  44 ++++---
 .../pulsar/client/admin/internal/SourceImpl.java   |  44 ++++---
 .../admin/internal/http/AsyncHttpConnector.java    |  14 ++-
 .../internal/http/AsyncHttpConnectorProvider.java  |   5 +
 9 files changed, 242 insertions(+), 85 deletions(-)

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index 4731879..cf4d4ce 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -51,6 +51,7 @@ import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.asynchttpclient.AsyncHttpClient;
 import org.glassfish.jersey.client.ClientConfig;
 import org.glassfish.jersey.client.ClientProperties;
 import org.glassfish.jersey.jackson.JacksonFeature;
@@ -81,6 +82,7 @@ public class PulsarAdmin implements Closeable {
     private final ResourceQuotas resourceQuotas;
     private final ClientConfigurationData clientConfigData;
     private final Client client;
+    private final AsyncHttpClient httpAsyncClient;
     private final String serviceUrl;
     private final Lookup lookups;
     private final Functions functions;
@@ -146,11 +148,13 @@ public class PulsarAdmin implements Closeable {
             auth.start();
         }
 
+        AsyncHttpConnectorProvider asyncConnectorProvider = new AsyncHttpConnectorProvider(clientConfigData);
+
         ClientConfig httpConfig = new ClientConfig();
         httpConfig.property(ClientProperties.FOLLOW_REDIRECTS, true);
         httpConfig.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 8);
         httpConfig.register(MultiPartFeature.class);
-        httpConfig.connectorProvider(new AsyncHttpConnectorProvider(clientConfigData));
+        httpConfig.connectorProvider(asyncConnectorProvider);
 
         ClientBuilder clientBuilder = ClientBuilder.newBuilder()
                 .withConfig(httpConfig)
@@ -165,6 +169,10 @@ public class PulsarAdmin implements Closeable {
         this.serviceUrl = serviceUrl;
         root = client.target(serviceUrl);
 
+        this.httpAsyncClient = asyncConnectorProvider.getConnector(
+                Math.toIntExact(TimeUnit.SECONDS.toMillis(this.connectTimeout)),
+                Math.toIntExact(TimeUnit.SECONDS.toMillis(this.readTimeout))).getHttpClient();
+
         this.clusters = new ClustersImpl(root, auth);
         this.brokers = new BrokersImpl(root, auth);
         this.brokerStats = new BrokerStatsImpl(root, auth);
@@ -175,9 +183,9 @@ public class PulsarAdmin implements Closeable {
         this.nonPersistentTopics = new NonPersistentTopicsImpl(root, auth);
         this.resourceQuotas = new ResourceQuotasImpl(root, auth);
         this.lookups = new LookupImpl(root, auth, useTls);
-        this.functions = new FunctionsImpl(root, auth);
-        this.source = new SourceImpl(root, auth);
-        this.sink = new SinkImpl(root, auth);
+        this.functions = new FunctionsImpl(root, auth, httpAsyncClient);
+        this.source = new SourceImpl(root, auth, httpAsyncClient);
+        this.sink = new SinkImpl(root, auth, httpAsyncClient);
         this.worker = new WorkerImpl(root, auth);
         this.schemas = new SchemasImpl(root, auth);
         this.bookies = new BookiesImpl(root, auth);
@@ -396,6 +404,11 @@ public class PulsarAdmin implements Closeable {
             LOG.error("Failed to close the authentication service", e);
         }
         client.close();
-    }
 
+        try {
+            httpAsyncClient.close();
+        } catch (IOException e) {
+           LOG.error("Failed to close http async client", e);
+        }
+    }
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminException.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminException.java
index 9961454..68f7c8d 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminException.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminException.java
@@ -23,10 +23,13 @@ import javax.ws.rs.ServerErrorException;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.MediaType;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 
 
 @SuppressWarnings("serial")
+@Slf4j
 public class PulsarAdminException extends Exception {
     private static final int DEFAULT_STATUS_CODE = 500;
 
@@ -34,15 +37,20 @@ public class PulsarAdminException extends Exception {
     private final int statusCode;
 
     private static String getReasonFromServer(WebApplicationException e) {
-        if (MediaType.APPLICATION_JSON.equals(e.getResponse().getHeaderString("Content-Type"))) {
+        try {
+            return e.getResponse().readEntity(ErrorData.class).reason;
+        } catch (Exception ex) {
             try {
-                return e.getResponse().readEntity(ErrorData.class).reason;
-            } catch (Exception ex) {
-                // could not parse output to ErrorData class
-                return e.getMessage();
+                return ObjectMapperFactory.getThreadLocal().readValue(e.getResponse().getEntity().toString(), ErrorData.class).reason;
+            } catch (Exception ex1) {
+                try {
+                    return ObjectMapperFactory.getThreadLocal().readValue(e.getMessage(), ErrorData.class).reason;
+                } catch (Exception ex2) {
+                    // could not parse output to ErrorData class
+                    return e.getMessage();
+                }
             }
         }
-        return e.getMessage();
     }
 
     public PulsarAdminException(ClientErrorException e) {
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
index 8753ee1..bde02d2 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
@@ -37,7 +37,6 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
 import org.apache.pulsar.client.admin.PulsarAdminException.ConnectException;
 import org.apache.pulsar.client.admin.PulsarAdminException.GettingAuthenticationDataException;
-import org.apache.pulsar.client.admin.PulsarAdminException.HttpErrorException;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotAllowedException;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
@@ -52,7 +51,7 @@ import org.slf4j.LoggerFactory;
 public abstract class BaseResource {
     private static final Logger log = LoggerFactory.getLogger(BaseResource.class);
 
-    private final Authentication auth;
+    protected final Authentication auth;
 
     protected BaseResource(Authentication auth) {
         this.auth = auth;
@@ -200,5 +199,4 @@ public abstract class BaseResource {
             throw new WebApplicationException(response);
         }
     }
-
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java
new file mode 100644
index 0000000..36b0b2d
--- /dev/null
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal;
+
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Authentication;
+import org.asynchttpclient.RequestBuilder;
+
+import java.util.Map;
+
+public class ComponentResource extends BaseResource {
+
+    protected ComponentResource(Authentication auth) {
+        super(auth);
+    }
+
+    public RequestBuilder addAuthHeaders(RequestBuilder requestBuilder) throws PulsarAdminException {
+
+        try {
+            if (auth != null && auth.getAuthData().hasDataForHttp()) {
+                for (Map.Entry<String, String> header : auth.getAuthData().getHttpHeaders()) {
+                    requestBuilder.addHeader(header.getKey(), header.getValue());
+                }
+            }
+
+            return requestBuilder;
+        } catch (Throwable t) {
+            throw new PulsarAdminException.GettingAuthenticationDataException(t);
+        }
+    }
+}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index ac8d60d..2d5d6e5 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.admin.internal;
 
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
+import io.netty.handler.codec.http.HttpHeaders;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.Functions;
@@ -32,6 +33,14 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.asynchttpclient.AsyncHandler;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.HttpResponseBodyPart;
+import org.asynchttpclient.HttpResponseStatus;
+import org.asynchttpclient.RequestBuilder;
+import org.asynchttpclient.request.body.multipart.FilePart;
+import org.asynchttpclient.request.body.multipart.StringPart;
 import org.glassfish.jersey.media.multipart.FormDataBodyPart;
 import org.glassfish.jersey.media.multipart.FormDataMultiPart;
 import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;
@@ -42,21 +51,27 @@ import javax.ws.rs.core.GenericType;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.StandardCopyOption;
+import java.io.FileOutputStream;
+import java.nio.channels.FileChannel;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
+import static org.asynchttpclient.Dsl.get;
+import static org.asynchttpclient.Dsl.post;
+import static org.asynchttpclient.Dsl.put;
+
 @Slf4j
-public class FunctionsImpl extends BaseResource implements Functions {
+public class FunctionsImpl extends ComponentResource implements Functions {
 
     private final WebTarget functions;
+    private final AsyncHttpClient asyncHttpClient;
 
-    public FunctionsImpl(WebTarget web, Authentication auth) {
+    public FunctionsImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient) {
         super(auth);
         this.functions = web.path("/admin/v3/functions");
+        this.asyncHttpClient = asyncHttpClient;
     }
 
     @Override
@@ -145,18 +160,19 @@ public class FunctionsImpl extends BaseResource implements Functions {
     @Override
     public void createFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException {
         try {
-            final FormDataMultiPart mp = new FormDataMultiPart();
+            RequestBuilder builder = post(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName()).getUri().toASCIIString())
+                    .addBodyPart(new StringPart("functionConfig", ObjectMapperFactory.getThreadLocal().writeValueAsString(functionConfig), MediaType.APPLICATION_JSON));
 
             if (fileName != null && !fileName.startsWith("builtin://")) {
                 // If the function code is built in, we don't need to submit here
-                mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
+               builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
+            }
+            org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(builder).build()).get();
+
+            if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
+                throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
             }
 
-            mp.bodyPart(new FormDataBodyPart("functionConfig",
-                new Gson().toJson(functionConfig),
-                MediaType.APPLICATION_JSON_TYPE));
-            request(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName()))
-                    .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
         } catch (Exception e) {
             throw getApiException(e);
         }
@@ -192,18 +208,18 @@ public class FunctionsImpl extends BaseResource implements Functions {
     @Override
     public void updateFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException {
         try {
-            final FormDataMultiPart mp = new FormDataMultiPart();
+            RequestBuilder builder = put(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName()).getUri().toASCIIString())
+                    .addBodyPart(new StringPart("functionConfig", ObjectMapperFactory.getThreadLocal().writeValueAsString(functionConfig), MediaType.APPLICATION_JSON));
 
             if (fileName != null && !fileName.startsWith("builtin://")) {
                 // If the function code is built in, we don't need to submit here
-                mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
+                builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
             }
+            org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(builder).build()).get();
 
-            mp.bodyPart(new FormDataBodyPart("functionConfig",
-                new Gson().toJson(functionConfig),
-                MediaType.APPLICATION_JSON_TYPE));
-            request(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName()))
-                    .put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
+            if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
+                throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
+            }
         } catch (Exception e) {
             throw getApiException(e);
         }
@@ -314,13 +330,14 @@ public class FunctionsImpl extends BaseResource implements Functions {
     @Override
     public void uploadFunction(String sourceFile, String path) throws PulsarAdminException {
         try {
-            final FormDataMultiPart mp = new FormDataMultiPart();
-
-            mp.bodyPart(new FileDataBodyPart("data", new File(sourceFile), MediaType.APPLICATION_OCTET_STREAM_TYPE));
+            RequestBuilder builder = post(functions.path("upload").getUri().toASCIIString())
+                    .addBodyPart(new FilePart("data", new File(sourceFile), MediaType.APPLICATION_OCTET_STREAM))
+                    .addBodyPart(new StringPart("path", path, MediaType.TEXT_PLAIN));
 
-            mp.bodyPart(new FormDataBodyPart("path", path, MediaType.TEXT_PLAIN_TYPE));
-            request(functions.path("upload"))
-                    .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
+            org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(builder).build()).get();
+            if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
+                throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
+            }
         } catch (Exception e) {
             throw getApiException(e);
         }
@@ -328,15 +345,58 @@ public class FunctionsImpl extends BaseResource implements Functions {
 
     @Override
     public void downloadFunction(String destinationPath, String path) throws PulsarAdminException {
+
+        HttpResponseStatus status;
         try {
-            InputStream response = request(functions.path("download")
-                    .queryParam("path", path)).get(InputStream.class);
-            if (response != null) {
-                File targetFile = new File(destinationPath);
-                java.nio.file.Files.copy(
-                        response,
-                        targetFile.toPath(),
-                        StandardCopyOption.REPLACE_EXISTING);
+            File file = new File(destinationPath);
+            if (!file.exists()) {
+                file.createNewFile();
+            }
+            FileChannel os = new FileOutputStream(new File(destinationPath)).getChannel();
+            WebTarget target = functions.path("download").queryParam("path", path);
+
+            RequestBuilder builder = get(target.getUri().toASCIIString());
+
+            Future<HttpResponseStatus> whenStatusCode
+                    = asyncHttpClient.executeRequest(addAuthHeaders(builder).build(), new AsyncHandler<HttpResponseStatus>() {
+                private HttpResponseStatus status;
+
+                @Override
+                public State onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
+                    status = responseStatus;
+                    if (status.getStatusCode() != Response.Status.OK.getStatusCode()) {
+                        return State.ABORT;
+                    }
+                    return State.CONTINUE;
+                }
+
+                @Override
+                public State onHeadersReceived(HttpHeaders headers) throws Exception {
+                    return State.CONTINUE;
+                }
+
+                @Override
+                public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
+
+                    os.write(bodyPart.getBodyByteBuffer());
+                    return State.CONTINUE;
+                }
+
+                @Override
+                public HttpResponseStatus onCompleted() throws Exception {
+                    return status;
+                }
+
+                @Override
+                public void onThrowable(Throwable t) {
+                }
+            });
+
+            status = whenStatusCode.get();
+            os.close();
+
+            if (status.getStatusCode() < 200 || status.getStatusCode() >= 300) {
+                throw getApiException(Response.status(status.getStatusCode()).entity(status.getStatusText()).build());
             }
         } catch (Exception e) {
             throw getApiException(e);
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
index 48a75e4..592d879 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
@@ -27,9 +27,13 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.SinkStatus;
 import org.apache.pulsar.common.io.SinkConfig;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.RequestBuilder;
+import org.asynchttpclient.request.body.multipart.FilePart;
+import org.asynchttpclient.request.body.multipart.StringPart;
 import org.glassfish.jersey.media.multipart.FormDataBodyPart;
 import org.glassfish.jersey.media.multipart.FormDataMultiPart;
-import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;
 
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.WebTarget;
@@ -39,14 +43,19 @@ import javax.ws.rs.core.Response;
 import java.io.File;
 import java.util.List;
 
+import static org.asynchttpclient.Dsl.post;
+import static org.asynchttpclient.Dsl.put;
+
 @Slf4j
-public class SinkImpl extends BaseResource implements Sink {
+public class SinkImpl extends ComponentResource implements Sink {
 
     private final WebTarget sink;
+    private final AsyncHttpClient asyncHttpClient;
 
-    public SinkImpl(WebTarget web, Authentication auth) {
+    public SinkImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient) {
         super(auth);
         this.sink = web.path("/admin/v3/sink");
+        this.asyncHttpClient = asyncHttpClient;
     }
 
     @Override
@@ -109,18 +118,19 @@ public class SinkImpl extends BaseResource implements Sink {
     @Override
     public void createSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException {
         try {
-            final FormDataMultiPart mp = new FormDataMultiPart();
+            RequestBuilder builder = post(sink.path(sinkConfig.getTenant()).path(sinkConfig.getNamespace()).path(sinkConfig.getName()).getUri().toASCIIString())
+                    .addBodyPart(new StringPart("sinkConfig", ObjectMapperFactory.getThreadLocal().writeValueAsString(sinkConfig), MediaType.APPLICATION_JSON));
 
             if (fileName != null && !fileName.startsWith("builtin://")) {
                 // If the function code is built in, we don't need to submit here
-                mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
+                builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
+            }
+            org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(builder).build()).get();
+
+            if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
+                throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
             }
 
-            mp.bodyPart(new FormDataBodyPart("sinkConfig",
-                new Gson().toJson(sinkConfig),
-                MediaType.APPLICATION_JSON_TYPE));
-            request(sink.path(sinkConfig.getTenant()).path(sinkConfig.getNamespace()).path(sinkConfig.getName()))
-                    .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
         } catch (Exception e) {
             throw getApiException(e);
         }
@@ -156,18 +166,18 @@ public class SinkImpl extends BaseResource implements Sink {
     @Override
     public void updateSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException {
         try {
-            final FormDataMultiPart mp = new FormDataMultiPart();
+            RequestBuilder builder = put(sink.path(sinkConfig.getTenant()).path(sinkConfig.getNamespace()).path(sinkConfig.getName()).getUri().toASCIIString())
+                    .addBodyPart(new StringPart("sinkConfig", ObjectMapperFactory.getThreadLocal().writeValueAsString(sinkConfig), MediaType.APPLICATION_JSON));
 
             if (fileName != null && !fileName.startsWith("builtin://")) {
                 // If the function code is built in, we don't need to submit here
-                mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
+                builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
             }
+            org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(builder).build()).get();
 
-            mp.bodyPart(new FormDataBodyPart("sinkConfig",
-                    new Gson().toJson(sinkConfig),
-                MediaType.APPLICATION_JSON_TYPE));
-            request(sink.path(sinkConfig.getTenant()).path(sinkConfig.getNamespace()).path(sinkConfig.getName()))
-                    .put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
+            if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
+                throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
+            }
         } catch (Exception e) {
             throw getApiException(e);
         }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
index 1a56dc4..5103375 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
@@ -27,9 +27,13 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.policies.data.SourceStatus;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.RequestBuilder;
+import org.asynchttpclient.request.body.multipart.FilePart;
+import org.asynchttpclient.request.body.multipart.StringPart;
 import org.glassfish.jersey.media.multipart.FormDataBodyPart;
 import org.glassfish.jersey.media.multipart.FormDataMultiPart;
-import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;
 
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.WebTarget;
@@ -39,14 +43,19 @@ import javax.ws.rs.core.Response;
 import java.io.File;
 import java.util.List;
 
+import static org.asynchttpclient.Dsl.post;
+import static org.asynchttpclient.Dsl.put;
+
 @Slf4j
-public class SourceImpl extends BaseResource implements Source {
+public class SourceImpl extends ComponentResource implements Source {
 
     private final WebTarget source;
+    private final AsyncHttpClient asyncHttpClient;
 
-    public SourceImpl(WebTarget web, Authentication auth) {
+    public SourceImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient) {
         super(auth);
         this.source = web.path("/admin/v3/source");
+        this.asyncHttpClient = asyncHttpClient;
     }
 
     @Override
@@ -109,18 +118,17 @@ public class SourceImpl extends BaseResource implements Source {
     @Override
     public void createSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException {
         try {
-            final FormDataMultiPart mp = new FormDataMultiPart();
+            RequestBuilder builder = post(source.path(sourceConfig.getTenant()).path(sourceConfig.getNamespace()).path(sourceConfig.getName()).getUri().toASCIIString())
+                    .addBodyPart(new StringPart("sourceConfig", ObjectMapperFactory.getThreadLocal().writeValueAsString(sourceConfig), MediaType.APPLICATION_JSON));
 
             if (fileName != null && !fileName.startsWith("builtin://")) {
                 // If the function code is built in, we don't need to submit here
-                mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
+                builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
+            }
+            org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(builder).build()).get();
+            if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
+                throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
             }
-
-            mp.bodyPart(new FormDataBodyPart("sourceConfig",
-                new Gson().toJson(sourceConfig),
-                MediaType.APPLICATION_JSON_TYPE));
-            request(source.path(sourceConfig.getTenant()).path(sourceConfig.getNamespace()).path(sourceConfig.getName()))
-                    .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
         } catch (Exception e) {
             throw getApiException(e);
         }
@@ -156,18 +164,18 @@ public class SourceImpl extends BaseResource implements Source {
     @Override
     public void updateSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException {
         try {
-            final FormDataMultiPart mp = new FormDataMultiPart();
+            RequestBuilder builder = put(source.path(sourceConfig.getTenant()).path(sourceConfig.getNamespace()).path(sourceConfig.getName()).getUri().toASCIIString())
+                    .addBodyPart(new StringPart("sourceConfig", ObjectMapperFactory.getThreadLocal().writeValueAsString(sourceConfig), MediaType.APPLICATION_JSON));
 
             if (fileName != null && !fileName.startsWith("builtin://")) {
                 // If the function code is built in, we don't need to submit here
-                mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
+                builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
             }
+            org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(builder).build()).get();
 
-            mp.bodyPart(new FormDataBodyPart("sourceConfig",
-                    new Gson().toJson(sourceConfig),
-                MediaType.APPLICATION_JSON_TYPE));
-            request(source.path(sourceConfig.getTenant()).path(sourceConfig.getNamespace()).path(sourceConfig.getName()))
-                    .put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
+            if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
+                throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
+            }
         } catch (Exception e) {
             throw getApiException(e);
         }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index fd89108..dfb2ecc 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -34,6 +34,7 @@ import javax.ws.rs.client.Client;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response.Status;
 
+import lombok.Getter;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 
@@ -59,14 +60,21 @@ import org.glassfish.jersey.client.spi.Connector;
 @Slf4j
 public class AsyncHttpConnector implements Connector {
 
+    @Getter
     private final AsyncHttpClient httpClient;
 
-    @SneakyThrows
     public AsyncHttpConnector(Client client, ClientConfigurationData conf) {
+        this((int) client.getConfiguration().getProperty(ClientProperties.CONNECT_TIMEOUT),
+                (int) client.getConfiguration().getProperty(ClientProperties.READ_TIMEOUT),
+                conf);
+    }
+
+    @SneakyThrows
+    public AsyncHttpConnector(int connectTimeoutMs, int readTimeoutMs, ClientConfigurationData conf) {
         DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
         confBuilder.setFollowRedirect(true);
-        confBuilder.setConnectTimeout((int) client.getConfiguration().getProperty(ClientProperties.CONNECT_TIMEOUT));
-        confBuilder.setReadTimeout((int) client.getConfiguration().getProperty(ClientProperties.READ_TIMEOUT));
+        confBuilder.setConnectTimeout(connectTimeoutMs);
+        confBuilder.setReadTimeout(readTimeoutMs);
         confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()));
         confBuilder.setKeepAliveStrategy(new DefaultKeepAliveStrategy() {
             @Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java
index 2f24089..1bddfe1 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java
@@ -37,4 +37,9 @@ public class AsyncHttpConnectorProvider implements ConnectorProvider {
     public Connector getConnector(Client client, Configuration runtimeConfig) {
         return new AsyncHttpConnector(client, conf);
     }
+
+
+    public AsyncHttpConnector getConnector(int connectTimeoutMs, int readTimeoutMs) {
+        return new AsyncHttpConnector(connectTimeoutMs, readTimeoutMs, conf);
+    }
 }