You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/09/26 14:21:04 UTC

[GitHub] jiazhai closed pull request #2653: Access function state by rest endpoint

jiazhai closed pull request #2653: Access function state by rest endpoint
URL: https://github.com/apache/pulsar/pull/2653
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 62d12ec9f0..80f2df552b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -223,6 +223,27 @@ public Response triggerFunction(final @PathParam("tenant") String tenant,
 
     }
 
+    @GET
+    @ApiOperation(
+        value = "Fetch the current state associated with a Pulsar Function",
+        response = String.class
+    )
+    @ApiResponses(value = {
+        @ApiResponse(code = 400, message = "Invalid request"),
+        @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+        @ApiResponse(code = 404, message = "The key does not exist"),
+        @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}/state/{key}")
+    public Response getFunctionState(final @PathParam("tenant") String tenant,
+                                     final @PathParam("namespace") String namespace,
+                                     final @PathParam("functionName") String functionName,
+                                     final @PathParam("key") String key) {
+        return functions.getFunctionState(
+            tenant, namespace, functionName, key);
+
+    }
+
     @POST
     @ApiOperation(value = "Restart function instance", response = Void.class)
     @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 9a18460f6c..bdb2c568bb 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -185,7 +185,7 @@
 
     /**
      * Gets the current status of a function instance.
-     * 
+     *
      * @param tenant
      *            Tenant name
      * @param namespace
@@ -217,7 +217,7 @@ FunctionStatus getFunctionStatus(String tenant, String namespace, String functio
      *             Unexpected error
      */
     void restartFunction(String tenant, String namespace, String function, int instanceId) throws PulsarAdminException;
-    
+
     /**
      * Restart all function instances
      *
@@ -233,7 +233,7 @@ FunctionStatus getFunctionStatus(String tenant, String namespace, String functio
      */
     void restartFunction(String tenant, String namespace, String function) throws PulsarAdminException;
 
-    
+
     /**
      * Stop function instance
      *
@@ -251,7 +251,7 @@ FunctionStatus getFunctionStatus(String tenant, String namespace, String functio
      *             Unexpected error
      */
     void stopFunction(String tenant, String namespace, String function, int instanceId) throws PulsarAdminException;
-    
+
     /**
      * Stop all function instances
      *
@@ -266,7 +266,7 @@ FunctionStatus getFunctionStatus(String tenant, String namespace, String functio
      *             Unexpected error
      */
     void stopFunction(String tenant, String namespace, String function) throws PulsarAdminException;
-    
+
     /**
      * Triggers the function by writing to the input topic.
      *
@@ -338,4 +338,34 @@ FunctionStatus getFunctionStatus(String tenant, String namespace, String functio
      *
      */
     Set<String> getSinks() throws PulsarAdminException;
+
+    /**
+     * Fetch the current state associated with a Pulsar Function.
+     * <p>
+     * Response Example:
+     *
+     * <pre>
+     * <code>{ "value : 12, version : 2"}</code>
+     * </pre>
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     * @param key
+     *            Key name of State
+     *
+     * @return the function configuration
+     *
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to get the configuration of the cluster
+     * @throws NotFoundException
+     *             Cluster doesn't exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    String getFunctionState(String tenant, String namespace, String function, String key) throws PulsarAdminException;
+
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 1d315d78f0..99d0da2422 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -343,6 +343,21 @@ public void downloadFunction(String destinationPath, String path) throws PulsarA
         }
     }
 
+    public String getFunctionState(String tenant, String namespace, String function, String key)
+        throws PulsarAdminException {
+        try {
+            Response response = request(functions.path(tenant)
+                .path(namespace).path(function).path("state").path(key)).get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            return response.readEntity(String.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+
     public static void mergeJson(String json, Builder builder) throws IOException {
         JsonFormat.parser().merge(json, builder);
     }
@@ -351,4 +366,4 @@ public static String printJson(MessageOrBuilder msg) throws IOException {
         return JsonFormat.printer().print(msg);
     }
 
-}
\ No newline at end of file
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 8bb91946a1..aca3b4a9b7 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.functions.worker.rest.api;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.apache.pulsar.functions.utils.Reflections.createInstance;
 import static org.apache.pulsar.functions.utils.Utils.FILE;
 import static org.apache.pulsar.functions.utils.Utils.HTTP;
@@ -27,6 +29,9 @@
 
 import com.google.gson.Gson;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
 import java.io.*;
 import java.net.MalformedURLException;
 import java.net.URI;
@@ -50,6 +55,11 @@
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.bookkeeper.api.StorageClient;
+import org.apache.bookkeeper.api.kv.Table;
+import org.apache.bookkeeper.api.kv.result.KeyValue;
+import org.apache.bookkeeper.clients.StorageClientBuilder;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.commons.io.IOUtils;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -460,7 +470,7 @@ public Response stopFunctionInstances(final String tenant, final String namespac
             return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build();
         }
     }
-    
+
     public Response getFunctionStatus(final String tenant, final String namespace, final String functionName)
             throws IOException {
 
@@ -666,6 +676,65 @@ public Response triggerFunction(final String tenant, final String namespace, fin
         }
     }
 
+    public Response getFunctionState(final String tenant, final String namespace,
+                                     final String functionName, final String key) {
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
+        // validate parameters
+        try {
+            validateGetFunctionStateParams(tenant, namespace, functionName, key);
+        } catch (IllegalArgumentException e) {
+            log.error("Invalid getFunctionState request @ /{}/{}/{}/{}",
+                tenant, namespace, functionName, key, e);
+            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                .entity(new ErrorData(e.getMessage())).build();
+        }
+
+        String tableNs = String.format(
+            "%s_%s",
+            tenant,
+            namespace).replace('-', '_');
+        String tableName = functionName;
+
+        String stateStorageServiceUrl = worker().getWorkerConfig().getStateStorageServiceUrl();
+
+        try (StorageClient client = StorageClientBuilder.newBuilder()
+            .withSettings(StorageClientSettings.newBuilder()
+                .serviceUri(stateStorageServiceUrl)
+                .clientName("functions-admin")
+                .build())
+            .withNamespace(tableNs)
+            .build()) {
+            try (Table<ByteBuf, ByteBuf> table = result(client.openTable(tableName))) {
+                try (KeyValue<ByteBuf, ByteBuf> kv = result(table.getKv(Unpooled.wrappedBuffer(key.getBytes(UTF_8))))) {
+                    if (null == kv) {
+                        return Response.status(Status.NOT_FOUND)
+                            .entity(new String("key '" + key + "' doesn't exist."))
+                            .build();
+                    } else {
+                        String value;
+                        if (kv.isNumber()) {
+                            value = "value : " + kv.numberValue() + ", version : " + kv.version();
+                        } else {
+                            value = "value : " + new String(ByteBufUtil.getBytes(kv.value()), UTF_8)
+                                + ", version : " + kv.version();
+                        }
+                        return Response.status(Status.OK)
+                            .entity(new String(value))
+                            .build();
+                    }
+                }
+            } catch (Exception e) {
+                log.error("Error while getFunctionState request @ /{}/{}/{}/{}",
+                    tenant, namespace, functionName, key, e);
+                return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(e.getMessage())).build();
+            }
+        }
+    }
+
     public Response uploadFunction(final InputStream uploadedInputStream, final String path) {
         // validate parameters
         try {
@@ -787,6 +856,23 @@ private FunctionDetails validateUpdateRequestParams(String tenant, String namesp
         return functionDetails;
     }
 
+    private void validateGetFunctionStateParams(String tenant, String namespace, String functionName, String key)
+        throws IllegalArgumentException {
+
+        if (tenant == null) {
+            throw new IllegalArgumentException("Tenant is not provided");
+        }
+        if (namespace == null) {
+            throw new IllegalArgumentException("Namespace is not provided");
+        }
+        if (functionName == null) {
+            throw new IllegalArgumentException("Function Name is not provided");
+        }
+        if (key == null) {
+            throw new IllegalArgumentException("Key is not provided");
+        }
+    }
+
     private boolean isFunctionCodeBuiltin(FunctionDetails functionDetails) {
         if (functionDetails.hasSource()) {
             SourceSpec sourceSpec = functionDetails.getSource();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 6c6fb18613..789fbea67c 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -206,4 +206,14 @@ public Response downloadFunction(final @QueryParam("path") String path) {
     public List<ConnectorDefinition> getConnectorsList() throws IOException {
         return functions.getListOfConnectors();
     }
+
+    @GET
+    @Path("/{tenant}/{namespace}/{functionName}/state/{key}")
+    public Response getFunctionState(final @PathParam("tenant") String tenant,
+                                      final @PathParam("namespace") String namespace,
+                                      final @PathParam("functionName") String functionName,
+                                     final @PathParam("key") String key) throws IOException {
+        return functions.getFunctionState(
+            tenant, namespace, functionName, key);
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services