You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/29 03:38:52 UTC
[pulsar] 01/02: [package management service] check service status before run commands (#12847)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 7527005672fa0d7e3306edafaf7ac717811edc22
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Fri Nov 19 22:58:31 2021 +0800
[package management service] check service status before run commands (#12847)
### Motivation
`pulsar-admin` runs packages management services commands, but if the broker is not enabled the service, it will throw NPE.
The root cause is https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PackagesBase.java#L45-L47, the `PackagesManagement` might be null if the service is not enabled.
### Modifications
Check `isEnablePackagesManagement` before each internal request.
(cherry picked from commit 747bf2175f7dabe71f90426c2771b505c5983010)
---
.../org/apache/pulsar/broker/PulsarService.java | 12 ++-
.../pulsar/broker/admin/impl/PackagesBase.java | 4 +
.../apache/pulsar/broker/admin/v3/Packages.java | 21 +++--
.../broker/admin/v3/PackagesApiNotEnabledTest.java | 91 ++++++++++++++++++++++
.../pulsar/broker/admin/v3/PackagesApiTest.java | 3 +
5 files changed, 121 insertions(+), 10 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 9211a4efa45..e47bd1bc783 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -247,7 +247,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private AdditionalServlets brokerAdditionalServlets;
// packages management service
- private PackagesManagement packagesManagement;
+ private Optional<PackagesManagement> packagesManagement = Optional.empty();
private PrometheusMetricsServlet metricsServlet;
private List<PrometheusRawMetricsProvider> pendingMetricsProviders;
@@ -1593,16 +1593,22 @@ public class PulsarService implements AutoCloseable, ShutdownService {
}
}
+ public PackagesManagement getPackagesManagement() throws UnsupportedOperationException {
+ return packagesManagement.orElseThrow(() -> new UnsupportedOperationException("Package Management Service "
+ + "is not enabled in the broker."));
+ }
+
private void startPackagesManagementService() throws IOException {
// TODO: using provider to initialize the packages management service.
- this.packagesManagement = new PackagesManagementImpl();
+ PackagesManagement packagesManagementService = new PackagesManagementImpl();
+ this.packagesManagement = Optional.of(packagesManagementService);
PackagesStorageProvider storageProvider = PackagesStorageProvider
.newProvider(config.getPackagesManagementStorageProvider());
DefaultPackagesStorageConfiguration storageConfiguration = new DefaultPackagesStorageConfiguration();
storageConfiguration.setProperty(config.getProperties());
PackagesStorage storage = storageProvider.getStorage(storageConfiguration);
storage.initialize();
- packagesManagement.initialize(storage);
+ packagesManagementService.initialize(storage);
}
public Optional<Integer> getListenPortHTTP() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PackagesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PackagesBase.java
index ccb611d066d..8a8dcd9f03b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PackagesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PackagesBase.java
@@ -65,6 +65,8 @@ public class PackagesBase extends AdminResource {
asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, throwable.getMessage()));
} else if (throwable instanceof WebApplicationException) {
asyncResponse.resume(throwable);
+ } else if (throwable instanceof UnsupportedOperationException) {
+ asyncResponse.resume(new RestException(Response.Status.SERVICE_UNAVAILABLE, throwable.getMessage()));
} else {
log.error("Encountered unexpected error", throwable);
asyncResponse.resume(new RestException(Response.Status.INTERNAL_SERVER_ERROR, throwable.getMessage()));
@@ -116,6 +118,8 @@ public class PackagesBase extends AdminResource {
} else {
throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getCause().getMessage());
}
+ } catch (UnsupportedOperationException e) {
+ throw new RestException(Response.Status.SERVICE_UNAVAILABLE, e.getMessage());
}
};
} catch (IllegalArgumentException illegalArgumentException) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Packages.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Packages.java
index 9d6e5eedd24..03a83bf3145 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Packages.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Packages.java
@@ -58,7 +58,8 @@ public class Packages extends PackagesBase {
@ApiResponse(code = 200, message = "Return the metadata of the specified package."),
@ApiResponse(code = 404, message = "The specified package is not existent."),
@ApiResponse(code = 412, message = "The package name is illegal."),
- @ApiResponse(code = 500, message = "Internal server error.")
+ @ApiResponse(code = 500, message = "Internal server error."),
+ @ApiResponse(code = 503, message = "Package Management Service is not enabled in the broker.")
}
)
public void getMeta(
@@ -82,7 +83,8 @@ public class Packages extends PackagesBase {
@ApiResponse(code = 200, message = "Update the metadata of the specified package successfully."),
@ApiResponse(code = 404, message = "The specified package is not existent."),
@ApiResponse(code = 412, message = "The package name is illegal."),
- @ApiResponse(code = 500, message = "Internal server error.")
+ @ApiResponse(code = 500, message = "Internal server error."),
+ @ApiResponse(code = 503, message = "Package Management Service is not enabled in the broker.")
}
)
@Consumes(MediaType.APPLICATION_JSON)
@@ -113,7 +115,8 @@ public class Packages extends PackagesBase {
value = {
@ApiResponse(code = 200, message = "Upload the specified package successfully."),
@ApiResponse(code = 412, message = "The package name is illegal."),
- @ApiResponse(code = 500, message = "Internal server error.")
+ @ApiResponse(code = 500, message = "Internal server error."),
+ @ApiResponse(code = 503, message = "Package Management Service is not enabled in the broker.")
}
)
@Consumes(MediaType.MULTIPART_FORM_DATA)
@@ -148,7 +151,8 @@ public class Packages extends PackagesBase {
@ApiResponse(code = 200, message = "Download the specified package successfully."),
@ApiResponse(code = 404, message = "The specified package is not existent."),
@ApiResponse(code = 412, message = "The package name is illegal."),
- @ApiResponse(code = 500, message = "Internal server error.")
+ @ApiResponse(code = 500, message = "Internal server error."),
+ @ApiResponse(code = 503, message = "Package Management Service is not enabled in the broker.")
}
)
public StreamingOutput download(
@@ -168,7 +172,8 @@ public class Packages extends PackagesBase {
@ApiResponse(code = 200, message = "Delete the specified package successfully."),
@ApiResponse(code = 404, message = "The specified package is not existent."),
@ApiResponse(code = 412, message = "The package name is illegal."),
- @ApiResponse(code = 500, message = "Internal server error.")
+ @ApiResponse(code = 500, message = "Internal server error."),
+ @ApiResponse(code = 503, message = "Package Management Service is not enabled in the broker.")
}
)
@ApiOperation(value = "Delete a package with the package name.")
@@ -195,7 +200,8 @@ public class Packages extends PackagesBase {
@ApiResponse(code = 200, message = "Return the package versions of the specified package."),
@ApiResponse(code = 404, message = "The specified package is not existent."),
@ApiResponse(code = 412, message = "The package name is illegal."),
- @ApiResponse(code = 500, message = "Internal server error.")
+ @ApiResponse(code = 500, message = "Internal server error."),
+ @ApiResponse(code = 503, message = "Package Management Service is not enabled in the broker.")
}
)
public void listPackageVersion(
@@ -219,7 +225,8 @@ public class Packages extends PackagesBase {
@ApiResponse(code = 200, message =
"Return all the specified type package names in the specified namespace."),
@ApiResponse(code = 412, message = "The package type is illegal."),
- @ApiResponse(code = 500, message = "Internal server error.")
+ @ApiResponse(code = 500, message = "Internal server error."),
+ @ApiResponse(code = 503, message = "Package Management Service is not enabled in the broker.")
}
)
public void listPackages(
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiNotEnabledTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiNotEnabledTest.java
new file mode 100644
index 00000000000..4eb1e528d0d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiNotEnabledTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.v3;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.packages.management.core.common.PackageMetadata;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class PackagesApiNotEnabledTest extends MockedPulsarServiceBaseTest {
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ // not enable Package Management Service
+ conf.setEnablePackagesManagement(false);
+ super.internalSetup();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test(timeOut = 60000)
+ public void testPackagesOperationsWithoutPackagesServiceEnabled() {
+ // download package api should return 503 Service Unavailable exception
+ String unknownPackageName = "function://public/default/unknown@v1";
+ try {
+ admin.packages().download(unknownPackageName, "/test/unknown");
+ fail("should throw 503 error");
+ } catch (PulsarAdminException e) {
+ assertEquals(503, e.getStatusCode());
+ }
+
+ // get metadata api should return 503 Service Unavailable exception
+ try {
+ admin.packages().getMetadata(unknownPackageName);
+ fail("should throw 503 error");
+ } catch (PulsarAdminException e) {
+ assertEquals(503, e.getStatusCode());
+ }
+
+ // update metadata api should return 503 Service Unavailable exception
+ try {
+ admin.packages().updateMetadata(unknownPackageName,
+ PackageMetadata.builder().description("unknown").build());
+ fail("should throw 503 error");
+ } catch (PulsarAdminException e) {
+ assertEquals(503, e.getStatusCode());
+ }
+
+ // list all the packages api should return 503 Service Unavailable exception
+ try {
+ admin.packages().listPackages("function", "unknown/unknown");
+ fail("should throw 503 error");
+ } catch (PulsarAdminException e) {
+ assertEquals(503, e.getStatusCode());
+ }
+
+ // list all the versions api should return 503 Service Unavailable exception
+ try {
+ admin.packages().listPackageVersions(unknownPackageName);
+ fail("should throw 503 error");
+ } catch (PulsarAdminException e) {
+ assertEquals(503, e.getStatusCode());
+ }
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiTest.java
index 05111c50b9b..c38594478aa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/PackagesApiTest.java
@@ -106,6 +106,7 @@ public class PackagesApiTest extends MockedPulsarServiceBaseTest {
String unknownPackageName = "function://public/default/unknown@v1";
try {
admin.packages().download(unknownPackageName, "/test/unknown");
+ fail("should throw 404 error");
} catch (PulsarAdminException e) {
assertEquals(404, e.getStatusCode());
}
@@ -113,6 +114,7 @@ public class PackagesApiTest extends MockedPulsarServiceBaseTest {
// get the metadata of a non-existent package should return not found exception
try {
admin.packages().getMetadata(unknownPackageName);
+ fail("should throw 404 error");
} catch (PulsarAdminException e) {
assertEquals(404, e.getStatusCode());
}
@@ -121,6 +123,7 @@ public class PackagesApiTest extends MockedPulsarServiceBaseTest {
try {
admin.packages().updateMetadata(unknownPackageName,
PackageMetadata.builder().description("unknown").build());
+ fail("should throw 404 error");
} catch (PulsarAdminException e) {
assertEquals(404, e.getStatusCode());
}