You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/29 03:38:53 UTC
[pulsar] 02/02: [improve][functions][admin] Improve the package download process (#16365)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6e8b04149ac3c5e86dc7c1d56f65a31db6c69dc1
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Thu Aug 11 11:05:44 2022 +0800
[improve][functions][admin] Improve the package download process (#16365)
* Improve the package download process
---
*Motivation*
Improve the package download process to handle the download
body more efficient.
(cherry picked from commit 1fe8c06ce37092e37de3562651c56d3d6bb1b91a)
---
.../broker/admin/v3/PackagesApiNotEnabledTest.java | 17 +++-
.../pulsar/broker/admin/v3/PackagesApiTest.java | 18 +++-
.../pulsar/client/admin/internal/PackagesImpl.java | 98 ++++++++++++++++------
3 files changed, 105 insertions(+), 28 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiNotEnabledTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiNotEnabledTest.java
index 4eb1e528d0d..becddb173b2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiNotEnabledTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiNotEnabledTest.java
@@ -20,6 +20,10 @@ package org.apache.pulsar.broker.admin.v3;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Comparator;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.packages.management.core.common.PackageMetadata;
@@ -45,14 +49,23 @@ public class PackagesApiNotEnabledTest extends MockedPulsarServiceBaseTest {
}
@Test(timeOut = 60000)
- public void testPackagesOperationsWithoutPackagesServiceEnabled() {
+ public void testPackagesOperationsWithoutPackagesServiceEnabled() throws Exception {
// download package api should return 503 Service Unavailable exception
String unknownPackageName = "function://public/default/unknown@v1";
+ Path tmp = Files.createTempDirectory("package-test-tmp");
try {
- admin.packages().download(unknownPackageName, "/test/unknown");
+ admin.packages().download(unknownPackageName, tmp.toAbsolutePath().toString() + "/unknown");
fail("should throw 503 error");
} catch (PulsarAdminException e) {
assertEquals(503, e.getStatusCode());
+ } finally {
+ Files.walk(tmp).sorted(Comparator.reverseOrder()).forEach(p -> {
+ try {
+ Files.delete(p);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
}
// get metadata api should return 503 Service Unavailable exception
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiTest.java
index c38594478aa..79efb42f63c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiTest.java
@@ -32,7 +32,11 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
@Test(groups = "broker")
@@ -101,14 +105,24 @@ public class PackagesApiTest extends MockedPulsarServiceBaseTest {
}
@Test(timeOut = 60000)
- public void testPackagesOperationsFailed() {
+ public void testPackagesOperationsFailed() throws IOException {
// download a non-existent package should return not found exception
String unknownPackageName = "function://public/default/unknown@v1";
+
+ Path tmp = Files.createTempDirectory("package-test-tmp");
try {
- admin.packages().download(unknownPackageName, "/test/unknown");
+ admin.packages().download(unknownPackageName, tmp.toAbsolutePath() + "/unknown");
fail("should throw 404 error");
} catch (PulsarAdminException e) {
assertEquals(404, e.getStatusCode());
+ } finally {
+ Files.walk(tmp).sorted(Comparator.reverseOrder()).forEach(p -> {
+ try {
+ Files.delete(p);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
}
// get the metadata of a non-existent package should return not found exception
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
index 77749e6421b..d7afb959720 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
@@ -18,21 +18,22 @@
*/
package org.apache.pulsar.client.admin.internal;
+import static org.asynchttpclient.Dsl.get;
import com.google.gson.Gson;
+import io.netty.handler.codec.http.HttpHeaders;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.InputStream;
+import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.nio.file.StandardCopyOption;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@@ -42,8 +43,11 @@ import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.packages.management.core.common.PackageMetadata;
import org.apache.pulsar.packages.management.core.common.PackageName;
+import org.asynchttpclient.AsyncHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.Dsl;
+import org.asynchttpclient.HttpResponseBodyPart;
+import org.asynchttpclient.HttpResponseStatus;
import org.asynchttpclient.RequestBuilder;
import org.asynchttpclient.request.body.multipart.FilePart;
import org.asynchttpclient.request.body.multipart.StringPart;
@@ -174,30 +178,76 @@ public class PackagesImpl extends ComponentResource implements Packages {
public CompletableFuture<Void> downloadAsync(String packageName, String path) {
WebTarget webTarget = packages.path(PackageName.get(packageName).toRestPath());
final CompletableFuture<Void> future = new CompletableFuture<>();
- asyncGetRequest(webTarget, new InvocationCallback<Response>(){
- @Override
- public void completed(Response response) {
- if (response.getStatus() == Response.Status.OK.getStatusCode()) {
- try (InputStream inputStream = response.readEntity(InputStream.class)) {
- Path destinyPath = Paths.get(path);
- if (destinyPath.getParent() != null) {
- Files.createDirectories(destinyPath.getParent());
+ try {
+ Path destinyPath = Paths.get(path);
+ if (destinyPath.getParent() != null) {
+ Files.createDirectories(destinyPath.getParent());
+ }
+
+ FileChannel os = new FileOutputStream(destinyPath.toFile()).getChannel();
+ RequestBuilder builder = get(webTarget.getUri().toASCIIString());
+
+ CompletableFuture<HttpResponseStatus> statusFuture =
+ httpClient.executeRequest(addAuthHeaders(webTarget, builder).build(),
+ new AsyncHandler<HttpResponseStatus>() {
+ private HttpResponseStatus status;
+
+ @Override
+ public State onStatusReceived(HttpResponseStatus httpResponseStatus) throws Exception {
+ status = httpResponseStatus;
+ if (status.getStatusCode() != Response.Status.OK.getStatusCode()) {
+ return State.ABORT;
+ }
+ return State.CONTINUE;
}
- Files.copy(inputStream, destinyPath, StandardCopyOption.REPLACE_EXISTING);
- future.complete(null);
+
+ @Override
+ public State onHeadersReceived(HttpHeaders httpHeaders) throws Exception {
+ return State.CONTINUE;
+ }
+
+ @Override
+ public State onBodyPartReceived(HttpResponseBodyPart httpResponseBodyPart) throws Exception {
+ os.write(httpResponseBodyPart.getBodyByteBuffer());
+ return State.CONTINUE;
+ }
+
+ @Override
+ public void onThrowable(Throwable throwable) {
+ // we don't need to handle that throwable and use the returned future to handle it.
+ }
+
+ @Override
+ public HttpResponseStatus onCompleted() throws Exception {
+ return status;
+ }
+ }).toCompletableFuture();
+ statusFuture
+ .whenComplete((status, throwable) -> {
+ try {
+ os.close();
} catch (IOException e) {
- future.completeExceptionally(e);
+ future.completeExceptionally(getApiException(throwable));
}
- } else {
- future.completeExceptionally(getApiException(response));
- }
- }
-
- @Override
- public void failed(Throwable throwable) {
- future.completeExceptionally(throwable);
- }
- });
+ })
+ .thenAccept(status -> {
+ if (status.getStatusCode() < 200 || status.getStatusCode() >= 300) {
+ future.completeExceptionally(
+ getApiException(Response
+ .status(status.getStatusCode())
+ .entity(status.getStatusText())
+ .build()));
+ } else {
+ future.complete(null);
+ }
+ })
+ .exceptionally(throwable -> {
+ future.completeExceptionally(getApiException(throwable));
+ return null;
+ });
+ } catch (Exception e) {
+ future.completeExceptionally(getApiException(e));
+ }
return future;
}