You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2018/09/26 14:21:07 UTC

[pulsar] branch master updated: Access function state by rest endpoint (#2653)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cb52d51  Access function state by rest endpoint (#2653)
cb52d51 is described below

commit cb52d519ce53d863a24c1dbf082b1db3e8b62fe8
Author: Jia Zhai <ji...@users.noreply.github.com>
AuthorDate: Wed Sep 26 22:21:01 2018 +0800

    Access function state by rest endpoint (#2653)
    
    add get function state by rest endpoint
---
 .../pulsar/broker/admin/impl/FunctionsBase.java    | 21 ++++++
 .../org/apache/pulsar/client/admin/Functions.java  | 40 ++++++++--
 .../client/admin/internal/FunctionsImpl.java       | 17 ++++-
 .../functions/worker/rest/api/FunctionsImpl.java   | 88 +++++++++++++++++++++-
 .../worker/rest/api/v2/FunctionApiV2Resource.java  | 10 +++
 5 files changed, 169 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 62d12ec..80f2df5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -223,6 +223,27 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
 
     }
 
+    @GET
+    @ApiOperation(
+        value = "Fetch the current state associated with a Pulsar Function",
+        response = String.class
+    )
+    @ApiResponses(value = {
+        @ApiResponse(code = 400, message = "Invalid request"),
+        @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+        @ApiResponse(code = 404, message = "The key does not exist"),
+        @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @Path("/{tenant}/{namespace}/{functionName}/state/{key}")
+    public Response getFunctionState(final @PathParam("tenant") String tenant,
+                                     final @PathParam("namespace") String namespace,
+                                     final @PathParam("functionName") String functionName,
+                                     final @PathParam("key") String key) {
+        return functions.getFunctionState(
+            tenant, namespace, functionName, key);
+
+    }
+
     @POST
     @ApiOperation(value = "Restart function instance", response = Void.class)
     @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 9a18460..bdb2c56 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -185,7 +185,7 @@ public interface Functions {
 
     /**
      * Gets the current status of a function instance.
-     * 
+     *
      * @param tenant
      *            Tenant name
      * @param namespace
@@ -217,7 +217,7 @@ public interface Functions {
      *             Unexpected error
      */
     void restartFunction(String tenant, String namespace, String function, int instanceId) throws PulsarAdminException;
-    
+
     /**
      * Restart all function instances
      *
@@ -233,7 +233,7 @@ public interface Functions {
      */
     void restartFunction(String tenant, String namespace, String function) throws PulsarAdminException;
 
-    
+
     /**
      * Stop function instance
      *
@@ -251,7 +251,7 @@ public interface Functions {
      *             Unexpected error
      */
     void stopFunction(String tenant, String namespace, String function, int instanceId) throws PulsarAdminException;
-    
+
     /**
      * Stop all function instances
      *
@@ -266,7 +266,7 @@ public interface Functions {
      *             Unexpected error
      */
     void stopFunction(String tenant, String namespace, String function) throws PulsarAdminException;
-    
+
     /**
      * Triggers the function by writing to the input topic.
      *
@@ -338,4 +338,34 @@ public interface Functions {
      *
      */
     Set<String> getSinks() throws PulsarAdminException;
+
+    /**
+     * Fetch the current state associated with a Pulsar Function.
+     * <p>
+     * Response Example:
+     *
+     * <pre>
+     * <code>{ "value : 12, version : 2"}</code>
+     * </pre>
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     * @param key
+     *            Key name of State
+     *
+     * @return the function configuration
+     *
+     * @throws NotAuthorizedException
+     *             You don't have admin permission to get the configuration of the cluster
+     * @throws NotFoundException
+     *             Cluster doesn't exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    String getFunctionState(String tenant, String namespace, String function, String key) throws PulsarAdminException;
+
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 1d315d7..99d0da2 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -343,6 +343,21 @@ public class FunctionsImpl extends BaseResource implements Functions {
         }
     }
 
+    public String getFunctionState(String tenant, String namespace, String function, String key)
+        throws PulsarAdminException {
+        try {
+            Response response = request(functions.path(tenant)
+                .path(namespace).path(function).path("state").path(key)).get();
+            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                throw new ClientErrorException(response);
+            }
+            return response.readEntity(String.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+
     public static void mergeJson(String json, Builder builder) throws IOException {
         JsonFormat.parser().merge(json, builder);
     }
@@ -351,4 +366,4 @@ public class FunctionsImpl extends BaseResource implements Functions {
         return JsonFormat.printer().print(msg);
     }
 
-}
\ No newline at end of file
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 8bb9194..aca3b4a 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.functions.worker.rest.api;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.apache.pulsar.functions.utils.Reflections.createInstance;
 import static org.apache.pulsar.functions.utils.Utils.FILE;
 import static org.apache.pulsar.functions.utils.Utils.HTTP;
@@ -27,6 +29,9 @@ import static org.apache.pulsar.functions.utils.functioncache.FunctionClassLoade
 
 import com.google.gson.Gson;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
 import java.io.*;
 import java.net.MalformedURLException;
 import java.net.URI;
@@ -50,6 +55,11 @@ import javax.ws.rs.core.StreamingOutput;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.bookkeeper.api.StorageClient;
+import org.apache.bookkeeper.api.kv.Table;
+import org.apache.bookkeeper.api.kv.result.KeyValue;
+import org.apache.bookkeeper.clients.StorageClientBuilder;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.commons.io.IOUtils;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -460,7 +470,7 @@ public class FunctionsImpl {
             return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build();
         }
     }
-    
+
     public Response getFunctionStatus(final String tenant, final String namespace, final String functionName)
             throws IOException {
 
@@ -666,6 +676,65 @@ public class FunctionsImpl {
         }
     }
 
+    public Response getFunctionState(final String tenant, final String namespace,
+                                     final String functionName, final String key) {
+        if (!isWorkerServiceAvailable()) {
+            return getUnavailableResponse();
+        }
+
+        // validate parameters
+        try {
+            validateGetFunctionStateParams(tenant, namespace, functionName, key);
+        } catch (IllegalArgumentException e) {
+            log.error("Invalid getFunctionState request @ /{}/{}/{}/{}",
+                tenant, namespace, functionName, key, e);
+            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                .entity(new ErrorData(e.getMessage())).build();
+        }
+
+        String tableNs = String.format(
+            "%s_%s",
+            tenant,
+            namespace).replace('-', '_');
+        String tableName = functionName;
+
+        String stateStorageServiceUrl = worker().getWorkerConfig().getStateStorageServiceUrl();
+
+        try (StorageClient client = StorageClientBuilder.newBuilder()
+            .withSettings(StorageClientSettings.newBuilder()
+                .serviceUri(stateStorageServiceUrl)
+                .clientName("functions-admin")
+                .build())
+            .withNamespace(tableNs)
+            .build()) {
+            try (Table<ByteBuf, ByteBuf> table = result(client.openTable(tableName))) {
+                try (KeyValue<ByteBuf, ByteBuf> kv = result(table.getKv(Unpooled.wrappedBuffer(key.getBytes(UTF_8))))) {
+                    if (null == kv) {
+                        return Response.status(Status.NOT_FOUND)
+                            .entity(new String("key '" + key + "' doesn't exist."))
+                            .build();
+                    } else {
+                        String value;
+                        if (kv.isNumber()) {
+                            value = "value : " + kv.numberValue() + ", version : " + kv.version();
+                        } else {
+                            value = "value : " + new String(ByteBufUtil.getBytes(kv.value()), UTF_8)
+                                + ", version : " + kv.version();
+                        }
+                        return Response.status(Status.OK)
+                            .entity(new String(value))
+                            .build();
+                    }
+                }
+            } catch (Exception e) {
+                log.error("Error while getFunctionState request @ /{}/{}/{}/{}",
+                    tenant, namespace, functionName, key, e);
+                return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(e.getMessage())).build();
+            }
+        }
+    }
+
     public Response uploadFunction(final InputStream uploadedInputStream, final String path) {
         // validate parameters
         try {
@@ -787,6 +856,23 @@ public class FunctionsImpl {
         return functionDetails;
     }
 
+    private void validateGetFunctionStateParams(String tenant, String namespace, String functionName, String key)
+        throws IllegalArgumentException {
+
+        if (tenant == null) {
+            throw new IllegalArgumentException("Tenant is not provided");
+        }
+        if (namespace == null) {
+            throw new IllegalArgumentException("Namespace is not provided");
+        }
+        if (functionName == null) {
+            throw new IllegalArgumentException("Function Name is not provided");
+        }
+        if (key == null) {
+            throw new IllegalArgumentException("Key is not provided");
+        }
+    }
+
     private boolean isFunctionCodeBuiltin(FunctionDetails functionDetails) {
         if (functionDetails.hasSource()) {
             SourceSpec sourceSpec = functionDetails.getSource();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 6c6fb18..789fbea 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -206,4 +206,14 @@ public class FunctionApiV2Resource extends FunctionApiResource {
     public List<ConnectorDefinition> getConnectorsList() throws IOException {
         return functions.getListOfConnectors();
     }
+
+    @GET
+    @Path("/{tenant}/{namespace}/{functionName}/state/{key}")
+    public Response getFunctionState(final @PathParam("tenant") String tenant,
+                                      final @PathParam("namespace") String namespace,
+                                      final @PathParam("functionName") String functionName,
+                                     final @PathParam("key") String key) throws IOException {
+        return functions.getFunctionState(
+            tenant, namespace, functionName, key);
+    }
 }