You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/08/26 13:52:16 UTC
[pulsar] branch master updated: Add CLI command to reload built-in functions (#17102)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1d65a886b3a Add CLI command to reload built-in functions (#17102)
1d65a886b3a is described below
commit 1d65a886b3aaf7a96d680499f35abbc013bf401f
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Fri Aug 26 15:52:08 2022 +0200
Add CLI command to reload built-in functions (#17102)
---
.../org/apache/pulsar/broker/admin/impl/FunctionsBase.java | 14 ++++++++++++++
.../java/org/apache/pulsar/client/admin/Functions.java | 13 +++++++++++++
.../apache/pulsar/client/admin/internal/FunctionsImpl.java | 11 +++++++++++
.../java/org/apache/pulsar/admin/cli/CmdFunctions.java | 10 ++++++++++
.../pulsar/functions/worker/rest/api/FunctionsImpl.java | 13 +++++++++++++
.../worker/rest/api/v3/FunctionsApiV3Resource.java | 14 ++++++++++++++
.../pulsar/functions/worker/service/api/Functions.java | 4 ++++
7 files changed, 79 insertions(+)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index f1c4c105de6..f43e8227cfc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -734,6 +734,20 @@ public class FunctionsBase extends AdminResource {
return functions().getListOfConnectors();
}
+ @POST
+ @ApiOperation(
+ value = "Reload the built-in Functions"
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 401, message = "This operation requires super-user access"),
+ @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later."),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
+ @Path("/builtins/reload")
+ public void reloadBuiltinFunctions() throws IOException {
+ functions().reloadBuiltinFunctions(clientAppId(), clientAuthData());
+ }
+
@PUT
@ApiOperation(value = "Updates a Pulsar Function on the worker leader", hidden = true)
@ApiResponses(value = {
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 82c47e5d97a..bbbfcb85268 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -893,4 +893,17 @@ public interface Functions {
*/
CompletableFuture<Void> putFunctionStateAsync(
String tenant, String namespace, String function, FunctionState state);
+
+ /**
+ * Reload the available built-in functions.
+ *
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void reloadBuiltInFunctions() throws PulsarAdminException;
+
+ /**
+ * Reload the available built-in functions.
+ */
+ CompletableFuture<Void> reloadBuiltInFunctionsAsync();
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 0686917a381..0e96da58432 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -841,4 +841,15 @@ public class FunctionsImpl extends ComponentResource implements Functions {
}
return future;
}
+
+ @Override
+ public void reloadBuiltInFunctions() throws PulsarAdminException {
+ sync(this::reloadBuiltInFunctionsAsync);
+ }
+
+ @Override
+ public CompletableFuture<Void> reloadBuiltInFunctionsAsync() {
+ WebTarget path = functions.path("builtins/reload");
+ return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+ }
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index c107f939813..b007106b2a8 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -1200,6 +1200,15 @@ public class CmdFunctions extends CmdBase {
}
}
+ @Parameters(commandDescription = "Reload the available built-in functions")
+ public class ReloadBuiltInFunctions extends CmdFunctions.BaseCommand {
+
+ @Override
+ void runCmd() throws Exception {
+ getAdmin().functions().reloadBuiltInFunctions();
+ }
+ }
+
public CmdFunctions(Supplier<PulsarAdmin> admin) throws PulsarClientException {
super("functions", admin);
localRunner = new LocalRunner();
@@ -1235,6 +1244,7 @@ public class CmdFunctions extends CmdBase {
jcommander.addCommand("trigger", getTriggerer());
jcommander.addCommand("upload", getUploader());
jcommander.addCommand("download", getDownloader());
+ jcommander.addCommand("reload", new ReloadBuiltInFunctions());
}
@VisibleForTesting
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 466071c490e..8847454973e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -774,6 +774,19 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
}
}
+ @Override
+ public void reloadBuiltinFunctions(String clientRole, AuthenticationDataSource authenticationData)
+ throws IOException {
+ if (!isWorkerServiceAvailable()) {
+ throwUnavailableException();
+ }
+
+ if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole, authenticationData)) {
+ throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
+ }
+ worker().getFunctionsManager().reloadFunctions(worker().getWorkerConfig());
+ }
+
private Function.FunctionDetails validateUpdateRequestParams(final String tenant,
final String namespace,
final String componentName,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
index b472b768b80..9eb02e4f165 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
@@ -353,6 +353,20 @@ public class FunctionsApiV3Resource extends FunctionApiResource {
return functions().getListOfConnectors();
}
+ @POST
+ @ApiOperation(
+ value = "Reload the built-in Functions"
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 401, message = "This operation requires super-user access"),
+ @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later."),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
+ @Path("/builtins/reload")
+ public void reloadBuiltinFunctions() throws IOException {
+ functions().reloadBuiltinFunctions(clientAppId(), clientAuthData());
+ }
+
@GET
@Path("/{tenant}/{namespace}/{functionName}/state/{key}")
public FunctionState getFunctionState(final @PathParam("tenant") String tenant,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java
index ac77b76ec2e..90d4c1f7a10 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.worker.service.api;
+import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
@@ -160,4 +161,7 @@ public interface Functions<W extends WorkerService> extends Component<W> {
String clientRole,
AuthenticationDataSource clientAuthenticationDataHttps);
+
+ void reloadBuiltinFunctions(String clientRole, AuthenticationDataSource clientAuthenticationDataHttps)
+ throws IOException;
}