You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yo...@apache.org on 2022/08/11 03:05:50 UTC

[pulsar] branch master updated: [improve][functions][admin] Improve the package download process (#16365)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fe8c06ce37 [improve][functions][admin] Improve the package download process (#16365)
1fe8c06ce37 is described below

commit 1fe8c06ce37092e37de3562651c56d3d6bb1b91a
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Thu Aug 11 11:05:44 2022 +0800

    [improve][functions][admin] Improve the package download process (#16365)
    
    * Improve the package download process
    ---
    
    *Motivation*
    
    Improve the package download process to handle the download
    body more efficient.
---
 .../broker/admin/v3/PackagesApiNotEnabledTest.java | 17 +++-
 .../pulsar/broker/admin/v3/PackagesApiTest.java    | 18 +++-
 .../pulsar/client/admin/internal/PackagesImpl.java | 98 ++++++++++++++++------
 3 files changed, 105 insertions(+), 28 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiNotEnabledTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiNotEnabledTest.java
index 3fd39d30d2d..2c39fbbaf8d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiNotEnabledTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiNotEnabledTest.java
@@ -20,6 +20,10 @@ package org.apache.pulsar.broker.admin.v3;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.fail;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Comparator;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.packages.management.core.common.PackageMetadata;
@@ -45,14 +49,23 @@ public class PackagesApiNotEnabledTest extends MockedPulsarServiceBaseTest {
     }
 
     @Test(timeOut = 60000)
-    public void testPackagesOperationsWithoutPackagesServiceEnabled() {
+    public void testPackagesOperationsWithoutPackagesServiceEnabled() throws Exception {
         // download package api should return 503 Service Unavailable exception
         String unknownPackageName = "function://public/default/unknown@v1";
+        Path tmp = Files.createTempDirectory("package-test-tmp");
         try {
-            admin.packages().download(unknownPackageName, "/test/unknown");
+            admin.packages().download(unknownPackageName, tmp.toAbsolutePath().toString() + "/unknown");
             fail("should throw 503 error");
         } catch (PulsarAdminException e) {
             assertEquals(503, e.getStatusCode());
+        } finally {
+            Files.walk(tmp).sorted(Comparator.reverseOrder()).forEach(p -> {
+                try {
+                    Files.delete(p);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
         }
 
         // get metadata api should return 503 Service Unavailable exception
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiTest.java
index 69331c02c7d..dd082681b23 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiTest.java
@@ -32,7 +32,11 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 
 @Test(groups = "broker-admin")
@@ -101,14 +105,24 @@ public class PackagesApiTest extends MockedPulsarServiceBaseTest {
     }
 
     @Test(timeOut = 60000)
-    public void testPackagesOperationsFailed() {
+    public void testPackagesOperationsFailed() throws IOException {
         // download a non-existent package should return not found exception
         String unknownPackageName = "function://public/default/unknown@v1";
+
+        Path tmp = Files.createTempDirectory("package-test-tmp");
         try {
-            admin.packages().download(unknownPackageName, "/test/unknown");
+            admin.packages().download(unknownPackageName, tmp.toAbsolutePath() + "/unknown");
             fail("should throw 404 error");
         } catch (PulsarAdminException e) {
             assertEquals(404, e.getStatusCode());
+        } finally {
+            Files.walk(tmp).sorted(Comparator.reverseOrder()).forEach(p -> {
+                try {
+                    Files.delete(p);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
         }
 
         // get the metadata of a non-existent package should return not found exception
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
index 1ed3e5da367..885e39c1ce6 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
@@ -18,18 +18,19 @@
  */
 package org.apache.pulsar.client.admin.internal;
 
+import static org.asynchttpclient.Dsl.get;
 import com.google.gson.Gson;
+import io.netty.handler.codec.http.HttpHeaders;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
+import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.nio.file.StandardCopyOption;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
@@ -39,8 +40,11 @@ import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.packages.management.core.common.PackageMetadata;
 import org.apache.pulsar.packages.management.core.common.PackageName;
+import org.asynchttpclient.AsyncHandler;
 import org.asynchttpclient.AsyncHttpClient;
 import org.asynchttpclient.Dsl;
+import org.asynchttpclient.HttpResponseBodyPart;
+import org.asynchttpclient.HttpResponseStatus;
 import org.asynchttpclient.RequestBuilder;
 import org.asynchttpclient.request.body.multipart.FilePart;
 import org.asynchttpclient.request.body.multipart.StringPart;
@@ -125,30 +129,76 @@ public class PackagesImpl extends ComponentResource implements Packages {
     public CompletableFuture<Void> downloadAsync(String packageName, String path) {
         WebTarget webTarget = packages.path(PackageName.get(packageName).toRestPath());
         final CompletableFuture<Void> future = new CompletableFuture<>();
-        asyncGetRequest(webTarget, new InvocationCallback<Response>(){
-            @Override
-            public void completed(Response response) {
-                if (response.getStatus() == Response.Status.OK.getStatusCode()) {
-                    try (InputStream inputStream = response.readEntity(InputStream.class)) {
-                        Path destinyPath = Paths.get(path);
-                        if (destinyPath.getParent() != null) {
-                            Files.createDirectories(destinyPath.getParent());
+        try {
+            Path destinyPath = Paths.get(path);
+            if (destinyPath.getParent() != null) {
+                Files.createDirectories(destinyPath.getParent());
+            }
+
+            FileChannel os = new FileOutputStream(destinyPath.toFile()).getChannel();
+            RequestBuilder builder = get(webTarget.getUri().toASCIIString());
+
+            CompletableFuture<HttpResponseStatus> statusFuture =
+                httpClient.executeRequest(addAuthHeaders(webTarget, builder).build(),
+                    new AsyncHandler<HttpResponseStatus>() {
+                        private HttpResponseStatus status;
+
+                        @Override
+                        public State onStatusReceived(HttpResponseStatus httpResponseStatus) throws Exception {
+                            status = httpResponseStatus;
+                            if (status.getStatusCode() != Response.Status.OK.getStatusCode()) {
+                                return State.ABORT;
+                            }
+                            return State.CONTINUE;
                         }
-                        Files.copy(inputStream, destinyPath, StandardCopyOption.REPLACE_EXISTING);
-                        future.complete(null);
+
+                        @Override
+                        public State onHeadersReceived(HttpHeaders httpHeaders) throws Exception {
+                            return State.CONTINUE;
+                        }
+
+                        @Override
+                        public State onBodyPartReceived(HttpResponseBodyPart httpResponseBodyPart) throws Exception {
+                            os.write(httpResponseBodyPart.getBodyByteBuffer());
+                            return State.CONTINUE;
+                        }
+
+                        @Override
+                        public void onThrowable(Throwable throwable) {
+                            // we don't need to handle that throwable and use the returned future to handle it.
+                        }
+
+                        @Override
+                        public HttpResponseStatus onCompleted() throws Exception {
+                            return status;
+                        }
+                    }).toCompletableFuture();
+            statusFuture
+                .whenComplete((status, throwable) -> {
+                    try {
+                        os.close();
                     } catch (IOException e) {
-                        future.completeExceptionally(e);
+                        future.completeExceptionally(getApiException(throwable));
                     }
-                } else {
-                    future.completeExceptionally(getApiException(response));
-                }
-            }
-
-            @Override
-            public void failed(Throwable throwable) {
-                future.completeExceptionally(throwable);
-            }
-        });
+                })
+                .thenAccept(status -> {
+                    if (status.getStatusCode() < 200 || status.getStatusCode() >= 300) {
+                        future.completeExceptionally(
+                            getApiException(Response
+                                .status(status.getStatusCode())
+                                .entity(status.getStatusText())
+                                .build()));
+                    } else {
+                        future.complete(null);
+                    }
+                })
+                .exceptionally(throwable -> {
+                    future.completeExceptionally(getApiException(throwable));
+                    return null;
+                });
+        } catch (Exception e) {
+            future.completeExceptionally(getApiException(e));
+        }
         return future;
     }