You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/19 00:49:32 UTC
[pulsar] 17/26: Optimizing performance for Pulsar function archive
download (#4082)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e4c62f5999861190def5ce5e69d743169a2ae823
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sat Apr 20 10:14:24 2019 -0700
Optimizing performance for Pulsar function archive download (#4082)
* Optimizing performance for download
* fixing update and create
* fix upload
* remove commented out code
* remove blank lines
* fix error messages
* fix for sources and sinks
* fix getting exception
* fix auth headers
* cleaning up
* fix print messages
---
.../apache/pulsar/client/admin/PulsarAdmin.java | 23 +++-
.../pulsar/client/admin/PulsarAdminException.java | 20 +++-
.../pulsar/client/admin/internal/BaseResource.java | 4 +-
.../client/admin/internal/ComponentResource.java | 47 ++++++++
.../client/admin/internal/FunctionsImpl.java | 126 +++++++++++++++------
.../pulsar/client/admin/internal/SinkImpl.java | 44 ++++---
.../pulsar/client/admin/internal/SourceImpl.java | 44 ++++---
.../admin/internal/http/AsyncHttpConnector.java | 14 ++-
.../internal/http/AsyncHttpConnectorProvider.java | 5 +
9 files changed, 242 insertions(+), 85 deletions(-)
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
index 4731879..cf4d4ce 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java
@@ -51,6 +51,7 @@ import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.asynchttpclient.AsyncHttpClient;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.jackson.JacksonFeature;
@@ -81,6 +82,7 @@ public class PulsarAdmin implements Closeable {
private final ResourceQuotas resourceQuotas;
private final ClientConfigurationData clientConfigData;
private final Client client;
+ private final AsyncHttpClient httpAsyncClient;
private final String serviceUrl;
private final Lookup lookups;
private final Functions functions;
@@ -146,11 +148,13 @@ public class PulsarAdmin implements Closeable {
auth.start();
}
+ AsyncHttpConnectorProvider asyncConnectorProvider = new AsyncHttpConnectorProvider(clientConfigData);
+
ClientConfig httpConfig = new ClientConfig();
httpConfig.property(ClientProperties.FOLLOW_REDIRECTS, true);
httpConfig.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 8);
httpConfig.register(MultiPartFeature.class);
- httpConfig.connectorProvider(new AsyncHttpConnectorProvider(clientConfigData));
+ httpConfig.connectorProvider(asyncConnectorProvider);
ClientBuilder clientBuilder = ClientBuilder.newBuilder()
.withConfig(httpConfig)
@@ -165,6 +169,10 @@ public class PulsarAdmin implements Closeable {
this.serviceUrl = serviceUrl;
root = client.target(serviceUrl);
+ this.httpAsyncClient = asyncConnectorProvider.getConnector(
+ Math.toIntExact(TimeUnit.SECONDS.toMillis(this.connectTimeout)),
+ Math.toIntExact(TimeUnit.SECONDS.toMillis(this.readTimeout))).getHttpClient();
+
this.clusters = new ClustersImpl(root, auth);
this.brokers = new BrokersImpl(root, auth);
this.brokerStats = new BrokerStatsImpl(root, auth);
@@ -175,9 +183,9 @@ public class PulsarAdmin implements Closeable {
this.nonPersistentTopics = new NonPersistentTopicsImpl(root, auth);
this.resourceQuotas = new ResourceQuotasImpl(root, auth);
this.lookups = new LookupImpl(root, auth, useTls);
- this.functions = new FunctionsImpl(root, auth);
- this.source = new SourceImpl(root, auth);
- this.sink = new SinkImpl(root, auth);
+ this.functions = new FunctionsImpl(root, auth, httpAsyncClient);
+ this.source = new SourceImpl(root, auth, httpAsyncClient);
+ this.sink = new SinkImpl(root, auth, httpAsyncClient);
this.worker = new WorkerImpl(root, auth);
this.schemas = new SchemasImpl(root, auth);
this.bookies = new BookiesImpl(root, auth);
@@ -396,6 +404,11 @@ public class PulsarAdmin implements Closeable {
LOG.error("Failed to close the authentication service", e);
}
client.close();
- }
+ try {
+ httpAsyncClient.close();
+ } catch (IOException e) {
+ LOG.error("Failed to close http async client", e);
+ }
+ }
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminException.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminException.java
index 9961454..68f7c8d 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminException.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdminException.java
@@ -23,10 +23,13 @@ import javax.ws.rs.ServerErrorException;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
@SuppressWarnings("serial")
+@Slf4j
public class PulsarAdminException extends Exception {
private static final int DEFAULT_STATUS_CODE = 500;
@@ -34,15 +37,20 @@ public class PulsarAdminException extends Exception {
private final int statusCode;
private static String getReasonFromServer(WebApplicationException e) {
- if (MediaType.APPLICATION_JSON.equals(e.getResponse().getHeaderString("Content-Type"))) {
+ try {
+ return e.getResponse().readEntity(ErrorData.class).reason;
+ } catch (Exception ex) {
try {
- return e.getResponse().readEntity(ErrorData.class).reason;
- } catch (Exception ex) {
- // could not parse output to ErrorData class
- return e.getMessage();
+ return ObjectMapperFactory.getThreadLocal().readValue(e.getResponse().getEntity().toString(), ErrorData.class).reason;
+ } catch (Exception ex1) {
+ try {
+ return ObjectMapperFactory.getThreadLocal().readValue(e.getMessage(), ErrorData.class).reason;
+ } catch (Exception ex2) {
+ // could not parse output to ErrorData class
+ return e.getMessage();
+ }
}
}
- return e.getMessage();
}
public PulsarAdminException(ClientErrorException e) {
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
index 8753ee1..bde02d2 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
@@ -37,7 +37,6 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
import org.apache.pulsar.client.admin.PulsarAdminException.ConnectException;
import org.apache.pulsar.client.admin.PulsarAdminException.GettingAuthenticationDataException;
-import org.apache.pulsar.client.admin.PulsarAdminException.HttpErrorException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotAllowedException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
@@ -52,7 +51,7 @@ import org.slf4j.LoggerFactory;
public abstract class BaseResource {
private static final Logger log = LoggerFactory.getLogger(BaseResource.class);
- private final Authentication auth;
+ protected final Authentication auth;
protected BaseResource(Authentication auth) {
this.auth = auth;
@@ -200,5 +199,4 @@ public abstract class BaseResource {
throw new WebApplicationException(response);
}
}
-
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java
new file mode 100644
index 0000000..36b0b2d
--- /dev/null
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ComponentResource.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin.internal;
+
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Authentication;
+import org.asynchttpclient.RequestBuilder;
+
+import java.util.Map;
+
+public class ComponentResource extends BaseResource {
+
+ protected ComponentResource(Authentication auth) {
+ super(auth);
+ }
+
+ public RequestBuilder addAuthHeaders(RequestBuilder requestBuilder) throws PulsarAdminException {
+
+ try {
+ if (auth != null && auth.getAuthData().hasDataForHttp()) {
+ for (Map.Entry<String, String> header : auth.getAuthData().getHttpHeaders()) {
+ requestBuilder.addHeader(header.getKey(), header.getValue());
+ }
+ }
+
+ return requestBuilder;
+ } catch (Throwable t) {
+ throw new PulsarAdminException.GettingAuthenticationDataException(t);
+ }
+ }
+}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index ac8d60d..2d5d6e5 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.admin.internal;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
+import io.netty.handler.codec.http.HttpHeaders;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.Functions;
@@ -32,6 +33,14 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.FunctionStatus;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.asynchttpclient.AsyncHandler;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.HttpResponseBodyPart;
+import org.asynchttpclient.HttpResponseStatus;
+import org.asynchttpclient.RequestBuilder;
+import org.asynchttpclient.request.body.multipart.FilePart;
+import org.asynchttpclient.request.body.multipart.StringPart;
import org.glassfish.jersey.media.multipart.FormDataBodyPart;
import org.glassfish.jersey.media.multipart.FormDataMultiPart;
import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;
@@ -42,21 +51,27 @@ import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.StandardCopyOption;
+import java.io.FileOutputStream;
+import java.nio.channels.FileChannel;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.Future;
import java.util.stream.Collectors;
+import static org.asynchttpclient.Dsl.get;
+import static org.asynchttpclient.Dsl.post;
+import static org.asynchttpclient.Dsl.put;
+
@Slf4j
-public class FunctionsImpl extends BaseResource implements Functions {
+public class FunctionsImpl extends ComponentResource implements Functions {
private final WebTarget functions;
+ private final AsyncHttpClient asyncHttpClient;
- public FunctionsImpl(WebTarget web, Authentication auth) {
+ public FunctionsImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient) {
super(auth);
this.functions = web.path("/admin/v3/functions");
+ this.asyncHttpClient = asyncHttpClient;
}
@Override
@@ -145,18 +160,19 @@ public class FunctionsImpl extends BaseResource implements Functions {
@Override
public void createFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException {
try {
- final FormDataMultiPart mp = new FormDataMultiPart();
+ RequestBuilder builder = post(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName()).getUri().toASCIIString())
+ .addBodyPart(new StringPart("functionConfig", ObjectMapperFactory.getThreadLocal().writeValueAsString(functionConfig), MediaType.APPLICATION_JSON));
if (fileName != null && !fileName.startsWith("builtin://")) {
// If the function code is built in, we don't need to submit here
- mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
+ builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
+ }
+ org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(builder).build()).get();
+
+ if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
+ throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
}
- mp.bodyPart(new FormDataBodyPart("functionConfig",
- new Gson().toJson(functionConfig),
- MediaType.APPLICATION_JSON_TYPE));
- request(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName()))
- .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
@@ -192,18 +208,18 @@ public class FunctionsImpl extends BaseResource implements Functions {
@Override
public void updateFunction(FunctionConfig functionConfig, String fileName) throws PulsarAdminException {
try {
- final FormDataMultiPart mp = new FormDataMultiPart();
+ RequestBuilder builder = put(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName()).getUri().toASCIIString())
+ .addBodyPart(new StringPart("functionConfig", ObjectMapperFactory.getThreadLocal().writeValueAsString(functionConfig), MediaType.APPLICATION_JSON));
if (fileName != null && !fileName.startsWith("builtin://")) {
// If the function code is built in, we don't need to submit here
- mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
+ builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
}
+ org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(builder).build()).get();
- mp.bodyPart(new FormDataBodyPart("functionConfig",
- new Gson().toJson(functionConfig),
- MediaType.APPLICATION_JSON_TYPE));
- request(functions.path(functionConfig.getTenant()).path(functionConfig.getNamespace()).path(functionConfig.getName()))
- .put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
+ if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
+ throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
+ }
} catch (Exception e) {
throw getApiException(e);
}
@@ -314,13 +330,14 @@ public class FunctionsImpl extends BaseResource implements Functions {
@Override
public void uploadFunction(String sourceFile, String path) throws PulsarAdminException {
try {
- final FormDataMultiPart mp = new FormDataMultiPart();
-
- mp.bodyPart(new FileDataBodyPart("data", new File(sourceFile), MediaType.APPLICATION_OCTET_STREAM_TYPE));
+ RequestBuilder builder = post(functions.path("upload").getUri().toASCIIString())
+ .addBodyPart(new FilePart("data", new File(sourceFile), MediaType.APPLICATION_OCTET_STREAM))
+ .addBodyPart(new StringPart("path", path, MediaType.TEXT_PLAIN));
- mp.bodyPart(new FormDataBodyPart("path", path, MediaType.TEXT_PLAIN_TYPE));
- request(functions.path("upload"))
- .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
+ org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(builder).build()).get();
+ if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
+ throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
+ }
} catch (Exception e) {
throw getApiException(e);
}
@@ -328,15 +345,58 @@ public class FunctionsImpl extends BaseResource implements Functions {
@Override
public void downloadFunction(String destinationPath, String path) throws PulsarAdminException {
+
+ HttpResponseStatus status;
try {
- InputStream response = request(functions.path("download")
- .queryParam("path", path)).get(InputStream.class);
- if (response != null) {
- File targetFile = new File(destinationPath);
- java.nio.file.Files.copy(
- response,
- targetFile.toPath(),
- StandardCopyOption.REPLACE_EXISTING);
+ File file = new File(destinationPath);
+ if (!file.exists()) {
+ file.createNewFile();
+ }
+ FileChannel os = new FileOutputStream(new File(destinationPath)).getChannel();
+ WebTarget target = functions.path("download").queryParam("path", path);
+
+ RequestBuilder builder = get(target.getUri().toASCIIString());
+
+ Future<HttpResponseStatus> whenStatusCode
+ = asyncHttpClient.executeRequest(addAuthHeaders(builder).build(), new AsyncHandler<HttpResponseStatus>() {
+ private HttpResponseStatus status;
+
+ @Override
+ public State onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
+ status = responseStatus;
+ if (status.getStatusCode() != Response.Status.OK.getStatusCode()) {
+ return State.ABORT;
+ }
+ return State.CONTINUE;
+ }
+
+ @Override
+ public State onHeadersReceived(HttpHeaders headers) throws Exception {
+ return State.CONTINUE;
+ }
+
+ @Override
+ public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
+
+ os.write(bodyPart.getBodyByteBuffer());
+ return State.CONTINUE;
+ }
+
+ @Override
+ public HttpResponseStatus onCompleted() throws Exception {
+ return status;
+ }
+
+ @Override
+ public void onThrowable(Throwable t) {
+ }
+ });
+
+ status = whenStatusCode.get();
+ os.close();
+
+ if (status.getStatusCode() < 200 || status.getStatusCode() >= 300) {
+ throw getApiException(Response.status(status.getStatusCode()).entity(status.getStatusText()).build());
}
} catch (Exception e) {
throw getApiException(e);
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
index 48a75e4..592d879 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
@@ -27,9 +27,13 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.SinkStatus;
import org.apache.pulsar.common.io.SinkConfig;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.RequestBuilder;
+import org.asynchttpclient.request.body.multipart.FilePart;
+import org.asynchttpclient.request.body.multipart.StringPart;
import org.glassfish.jersey.media.multipart.FormDataBodyPart;
import org.glassfish.jersey.media.multipart.FormDataMultiPart;
-import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
@@ -39,14 +43,19 @@ import javax.ws.rs.core.Response;
import java.io.File;
import java.util.List;
+import static org.asynchttpclient.Dsl.post;
+import static org.asynchttpclient.Dsl.put;
+
@Slf4j
-public class SinkImpl extends BaseResource implements Sink {
+public class SinkImpl extends ComponentResource implements Sink {
private final WebTarget sink;
+ private final AsyncHttpClient asyncHttpClient;
- public SinkImpl(WebTarget web, Authentication auth) {
+ public SinkImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient) {
super(auth);
this.sink = web.path("/admin/v3/sink");
+ this.asyncHttpClient = asyncHttpClient;
}
@Override
@@ -109,18 +118,19 @@ public class SinkImpl extends BaseResource implements Sink {
@Override
public void createSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException {
try {
- final FormDataMultiPart mp = new FormDataMultiPart();
+ RequestBuilder builder = post(sink.path(sinkConfig.getTenant()).path(sinkConfig.getNamespace()).path(sinkConfig.getName()).getUri().toASCIIString())
+ .addBodyPart(new StringPart("sinkConfig", ObjectMapperFactory.getThreadLocal().writeValueAsString(sinkConfig), MediaType.APPLICATION_JSON));
if (fileName != null && !fileName.startsWith("builtin://")) {
// If the function code is built in, we don't need to submit here
- mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
+ builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
+ }
+ org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(builder).build()).get();
+
+ if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
+ throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
}
- mp.bodyPart(new FormDataBodyPart("sinkConfig",
- new Gson().toJson(sinkConfig),
- MediaType.APPLICATION_JSON_TYPE));
- request(sink.path(sinkConfig.getTenant()).path(sinkConfig.getNamespace()).path(sinkConfig.getName()))
- .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
@@ -156,18 +166,18 @@ public class SinkImpl extends BaseResource implements Sink {
@Override
public void updateSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminException {
try {
- final FormDataMultiPart mp = new FormDataMultiPart();
+ RequestBuilder builder = put(sink.path(sinkConfig.getTenant()).path(sinkConfig.getNamespace()).path(sinkConfig.getName()).getUri().toASCIIString())
+ .addBodyPart(new StringPart("sinkConfig", ObjectMapperFactory.getThreadLocal().writeValueAsString(sinkConfig), MediaType.APPLICATION_JSON));
if (fileName != null && !fileName.startsWith("builtin://")) {
// If the function code is built in, we don't need to submit here
- mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
+ builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
}
+ org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(builder).build()).get();
- mp.bodyPart(new FormDataBodyPart("sinkConfig",
- new Gson().toJson(sinkConfig),
- MediaType.APPLICATION_JSON_TYPE));
- request(sink.path(sinkConfig.getTenant()).path(sinkConfig.getNamespace()).path(sinkConfig.getName()))
- .put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
+ if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
+ throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
+ }
} catch (Exception e) {
throw getApiException(e);
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
index 1a56dc4..5103375 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
@@ -27,9 +27,13 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.policies.data.SourceStatus;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.RequestBuilder;
+import org.asynchttpclient.request.body.multipart.FilePart;
+import org.asynchttpclient.request.body.multipart.StringPart;
import org.glassfish.jersey.media.multipart.FormDataBodyPart;
import org.glassfish.jersey.media.multipart.FormDataMultiPart;
-import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
@@ -39,14 +43,19 @@ import javax.ws.rs.core.Response;
import java.io.File;
import java.util.List;
+import static org.asynchttpclient.Dsl.post;
+import static org.asynchttpclient.Dsl.put;
+
@Slf4j
-public class SourceImpl extends BaseResource implements Source {
+public class SourceImpl extends ComponentResource implements Source {
private final WebTarget source;
+ private final AsyncHttpClient asyncHttpClient;
- public SourceImpl(WebTarget web, Authentication auth) {
+ public SourceImpl(WebTarget web, Authentication auth, AsyncHttpClient asyncHttpClient) {
super(auth);
this.source = web.path("/admin/v3/source");
+ this.asyncHttpClient = asyncHttpClient;
}
@Override
@@ -109,18 +118,17 @@ public class SourceImpl extends BaseResource implements Source {
@Override
public void createSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException {
try {
- final FormDataMultiPart mp = new FormDataMultiPart();
+ RequestBuilder builder = post(source.path(sourceConfig.getTenant()).path(sourceConfig.getNamespace()).path(sourceConfig.getName()).getUri().toASCIIString())
+ .addBodyPart(new StringPart("sourceConfig", ObjectMapperFactory.getThreadLocal().writeValueAsString(sourceConfig), MediaType.APPLICATION_JSON));
if (fileName != null && !fileName.startsWith("builtin://")) {
// If the function code is built in, we don't need to submit here
- mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
+ builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
+ }
+ org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(builder).build()).get();
+ if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
+ throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
}
-
- mp.bodyPart(new FormDataBodyPart("sourceConfig",
- new Gson().toJson(sourceConfig),
- MediaType.APPLICATION_JSON_TYPE));
- request(source.path(sourceConfig.getTenant()).path(sourceConfig.getNamespace()).path(sourceConfig.getName()))
- .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
@@ -156,18 +164,18 @@ public class SourceImpl extends BaseResource implements Source {
@Override
public void updateSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminException {
try {
- final FormDataMultiPart mp = new FormDataMultiPart();
+ RequestBuilder builder = put(source.path(sourceConfig.getTenant()).path(sourceConfig.getNamespace()).path(sourceConfig.getName()).getUri().toASCIIString())
+ .addBodyPart(new StringPart("sourceConfig", ObjectMapperFactory.getThreadLocal().writeValueAsString(sourceConfig), MediaType.APPLICATION_JSON));
if (fileName != null && !fileName.startsWith("builtin://")) {
// If the function code is built in, we don't need to submit here
- mp.bodyPart(new FileDataBodyPart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM_TYPE));
+ builder.addBodyPart(new FilePart("data", new File(fileName), MediaType.APPLICATION_OCTET_STREAM));
}
+ org.asynchttpclient.Response response = asyncHttpClient.executeRequest(addAuthHeaders(builder).build()).get();
- mp.bodyPart(new FormDataBodyPart("sourceConfig",
- new Gson().toJson(sourceConfig),
- MediaType.APPLICATION_JSON_TYPE));
- request(source.path(sourceConfig.getTenant()).path(sourceConfig.getNamespace()).path(sourceConfig.getName()))
- .put(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), ErrorData.class);
+ if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
+ throw getApiException(Response.status(response.getStatusCode()).entity(response.getResponseBody()).build());
+ }
} catch (Exception e) {
throw getApiException(e);
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index fd89108..dfb2ecc 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -34,6 +34,7 @@ import javax.ws.rs.client.Client;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response.Status;
+import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -59,14 +60,21 @@ import org.glassfish.jersey.client.spi.Connector;
@Slf4j
public class AsyncHttpConnector implements Connector {
+ @Getter
private final AsyncHttpClient httpClient;
- @SneakyThrows
public AsyncHttpConnector(Client client, ClientConfigurationData conf) {
+ this((int) client.getConfiguration().getProperty(ClientProperties.CONNECT_TIMEOUT),
+ (int) client.getConfiguration().getProperty(ClientProperties.READ_TIMEOUT),
+ conf);
+ }
+
+ @SneakyThrows
+ public AsyncHttpConnector(int connectTimeoutMs, int readTimeoutMs, ClientConfigurationData conf) {
DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
confBuilder.setFollowRedirect(true);
- confBuilder.setConnectTimeout((int) client.getConfiguration().getProperty(ClientProperties.CONNECT_TIMEOUT));
- confBuilder.setReadTimeout((int) client.getConfiguration().getProperty(ClientProperties.READ_TIMEOUT));
+ confBuilder.setConnectTimeout(connectTimeoutMs);
+ confBuilder.setReadTimeout(readTimeoutMs);
confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()));
confBuilder.setKeepAliveStrategy(new DefaultKeepAliveStrategy() {
@Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java
index 2f24089..1bddfe1 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java
@@ -37,4 +37,9 @@ public class AsyncHttpConnectorProvider implements ConnectorProvider {
public Connector getConnector(Client client, Configuration runtimeConfig) {
return new AsyncHttpConnector(client, conf);
}
+
+
+ public AsyncHttpConnector getConnector(int connectTimeoutMs, int readTimeoutMs) {
+ return new AsyncHttpConnector(connectTimeoutMs, readTimeoutMs, conf);
+ }
}