You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/08/26 13:52:16 UTC

[pulsar] branch master updated: Add CLI command to reload built-in functions (#17102)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d65a886b3a Add CLI command to reload built-in functions (#17102)
1d65a886b3a is described below

commit 1d65a886b3aaf7a96d680499f35abbc013bf401f
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Fri Aug 26 15:52:08 2022 +0200

    Add CLI command to reload built-in functions (#17102)
---
 .../org/apache/pulsar/broker/admin/impl/FunctionsBase.java | 14 ++++++++++++++
 .../java/org/apache/pulsar/client/admin/Functions.java     | 13 +++++++++++++
 .../apache/pulsar/client/admin/internal/FunctionsImpl.java | 11 +++++++++++
 .../java/org/apache/pulsar/admin/cli/CmdFunctions.java     | 10 ++++++++++
 .../pulsar/functions/worker/rest/api/FunctionsImpl.java    | 13 +++++++++++++
 .../worker/rest/api/v3/FunctionsApiV3Resource.java         | 14 ++++++++++++++
 .../pulsar/functions/worker/service/api/Functions.java     |  4 ++++
 7 files changed, 79 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index f1c4c105de6..f43e8227cfc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -734,6 +734,20 @@ public class FunctionsBase extends AdminResource {
         return functions().getListOfConnectors();
     }
 
+    @POST
+    @ApiOperation(
+            value = "Reload the built-in Functions"
+    )
+    @ApiResponses(value = {
+            @ApiResponse(code = 401, message = "This operation requires super-user access"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later."),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @Path("/builtins/reload")
+    public void reloadBuiltinFunctions() throws IOException {
+        functions().reloadBuiltinFunctions(clientAppId(), clientAuthData());
+    }
+
     @PUT
     @ApiOperation(value = "Updates a Pulsar Function on the worker leader", hidden = true)
     @ApiResponses(value = {
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 82c47e5d97a..bbbfcb85268 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -893,4 +893,17 @@ public interface Functions {
      */
     CompletableFuture<Void> putFunctionStateAsync(
             String tenant, String namespace, String function, FunctionState state);
+
+    /**
+     * Reload the available built-in functions.
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void reloadBuiltInFunctions() throws PulsarAdminException;
+
+    /**
+     * Reload the available built-in functions.
+     */
+    CompletableFuture<Void> reloadBuiltInFunctionsAsync();
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 0686917a381..0e96da58432 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -841,4 +841,15 @@ public class FunctionsImpl extends ComponentResource implements Functions {
         }
         return future;
     }
+
+    @Override
+    public void reloadBuiltInFunctions() throws PulsarAdminException {
+        sync(this::reloadBuiltInFunctionsAsync);
+    }
+
+    @Override
+    public CompletableFuture<Void> reloadBuiltInFunctionsAsync() {
+        WebTarget path = functions.path("builtins/reload");
+        return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
+    }
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index c107f939813..b007106b2a8 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -1200,6 +1200,15 @@ public class CmdFunctions extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Reload the available built-in functions")
+    public class ReloadBuiltInFunctions extends CmdFunctions.BaseCommand {
+
+        @Override
+        void runCmd() throws Exception {
+            getAdmin().functions().reloadBuiltInFunctions();
+        }
+    }
+
     public CmdFunctions(Supplier<PulsarAdmin> admin) throws PulsarClientException {
         super("functions", admin);
         localRunner = new LocalRunner();
@@ -1235,6 +1244,7 @@ public class CmdFunctions extends CmdBase {
         jcommander.addCommand("trigger", getTriggerer());
         jcommander.addCommand("upload", getUploader());
         jcommander.addCommand("download", getDownloader());
+        jcommander.addCommand("reload", new ReloadBuiltInFunctions());
     }
 
     @VisibleForTesting
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 466071c490e..8847454973e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -774,6 +774,19 @@ public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWork
         }
     }
 
+    @Override
+    public void reloadBuiltinFunctions(String clientRole, AuthenticationDataSource authenticationData)
+        throws IOException {
+        if (!isWorkerServiceAvailable()) {
+            throwUnavailableException();
+        }
+
+        if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(clientRole, authenticationData)) {
+            throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
+        }
+        worker().getFunctionsManager().reloadFunctions(worker().getWorkerConfig());
+    }
+
     private Function.FunctionDetails validateUpdateRequestParams(final String tenant,
                                                                  final String namespace,
                                                                  final String componentName,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
index b472b768b80..9eb02e4f165 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
@@ -353,6 +353,20 @@ public class FunctionsApiV3Resource extends FunctionApiResource {
         return functions().getListOfConnectors();
     }
 
+    @POST
+    @ApiOperation(
+        value = "Reload the built-in Functions"
+    )
+    @ApiResponses(value = {
+        @ApiResponse(code = 401, message = "This operation requires super-user access"),
+        @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later."),
+        @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @Path("/builtins/reload")
+    public void reloadBuiltinFunctions() throws IOException {
+        functions().reloadBuiltinFunctions(clientAppId(), clientAuthData());
+    }
+
     @GET
     @Path("/{tenant}/{namespace}/{functionName}/state/{key}")
     public FunctionState getFunctionState(final @PathParam("tenant") String tenant,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java
index ac77b76ec2e..90d4c1f7a10 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/service/api/Functions.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.functions.worker.service.api;
 
+import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
@@ -160,4 +161,7 @@ public interface Functions<W extends WorkerService> extends Component<W> {
                                                          String clientRole,
                                                          AuthenticationDataSource clientAuthenticationDataHttps);
 
+
+    void reloadBuiltinFunctions(String clientRole, AuthenticationDataSource clientAuthenticationDataHttps)
+        throws IOException;
 }