You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2018/09/26 14:21:07 UTC
[pulsar] branch master updated: Access function state by rest
endpoint (#2653)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cb52d51 Access function state by rest endpoint (#2653)
cb52d51 is described below
commit cb52d519ce53d863a24c1dbf082b1db3e8b62fe8
Author: Jia Zhai <ji...@users.noreply.github.com>
AuthorDate: Wed Sep 26 22:21:01 2018 +0800
Access function state by rest endpoint (#2653)
add get function state by rest endpoint
---
.../pulsar/broker/admin/impl/FunctionsBase.java | 21 ++++++
.../org/apache/pulsar/client/admin/Functions.java | 40 ++++++++--
.../client/admin/internal/FunctionsImpl.java | 17 ++++-
.../functions/worker/rest/api/FunctionsImpl.java | 88 +++++++++++++++++++++-
.../worker/rest/api/v2/FunctionApiV2Resource.java | 10 +++
5 files changed, 169 insertions(+), 7 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 62d12ec..80f2df5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -223,6 +223,27 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
}
+ @GET
+ @ApiOperation(
+ value = "Fetch the current state associated with a Pulsar Function",
+ response = String.class
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 404, message = "The key does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
+ @Path("/{tenant}/{namespace}/{functionName}/state/{key}")
+ public Response getFunctionState(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String functionName,
+ final @PathParam("key") String key) {
+ return functions.getFunctionState(
+ tenant, namespace, functionName, key);
+
+ }
+
@POST
@ApiOperation(value = "Restart function instance", response = Void.class)
@ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 9a18460..bdb2c56 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -185,7 +185,7 @@ public interface Functions {
/**
* Gets the current status of a function instance.
- *
+ *
* @param tenant
* Tenant name
* @param namespace
@@ -217,7 +217,7 @@ public interface Functions {
* Unexpected error
*/
void restartFunction(String tenant, String namespace, String function, int instanceId) throws PulsarAdminException;
-
+
/**
* Restart all function instances
*
@@ -233,7 +233,7 @@ public interface Functions {
*/
void restartFunction(String tenant, String namespace, String function) throws PulsarAdminException;
-
+
/**
* Stop function instance
*
@@ -251,7 +251,7 @@ public interface Functions {
* Unexpected error
*/
void stopFunction(String tenant, String namespace, String function, int instanceId) throws PulsarAdminException;
-
+
/**
* Stop all function instances
*
@@ -266,7 +266,7 @@ public interface Functions {
* Unexpected error
*/
void stopFunction(String tenant, String namespace, String function) throws PulsarAdminException;
-
+
/**
* Triggers the function by writing to the input topic.
*
@@ -338,4 +338,34 @@ public interface Functions {
*
*/
Set<String> getSinks() throws PulsarAdminException;
+
+ /**
+ * Fetch the current state associated with a Pulsar Function.
+ * <p>
+ * Response Example:
+ *
+ * <pre>
+ * <code>{ "value : 12, version : 2"}</code>
+ * </pre>
+ *
+ * @param tenant
+ * Tenant name
+ * @param namespace
+ * Namespace name
+ * @param function
+ * Function name
+ * @param key
+ * Key name of State
+ *
+ * @return the function configuration
+ *
+ * @throws NotAuthorizedException
+ * You don't have admin permission to get the configuration of the cluster
+ * @throws NotFoundException
+ * Cluster doesn't exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ String getFunctionState(String tenant, String namespace, String function, String key) throws PulsarAdminException;
+
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 1d315d7..99d0da2 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -343,6 +343,21 @@ public class FunctionsImpl extends BaseResource implements Functions {
}
}
+ public String getFunctionState(String tenant, String namespace, String function, String key)
+ throws PulsarAdminException {
+ try {
+ Response response = request(functions.path(tenant)
+ .path(namespace).path(function).path("state").path(key)).get();
+ if (!response.getStatusInfo().equals(Response.Status.OK)) {
+ throw new ClientErrorException(response);
+ }
+ return response.readEntity(String.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+
public static void mergeJson(String json, Builder builder) throws IOException {
JsonFormat.parser().merge(json, builder);
}
@@ -351,4 +366,4 @@ public class FunctionsImpl extends BaseResource implements Functions {
return JsonFormat.printer().print(msg);
}
-}
\ No newline at end of file
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 8bb9194..aca3b4a 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -19,6 +19,8 @@
package org.apache.pulsar.functions.worker.rest.api;
import static com.google.common.base.Preconditions.checkNotNull;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import static org.apache.pulsar.functions.utils.Reflections.createInstance;
import static org.apache.pulsar.functions.utils.Utils.FILE;
import static org.apache.pulsar.functions.utils.Utils.HTTP;
@@ -27,6 +29,9 @@ import static org.apache.pulsar.functions.utils.functioncache.FunctionClassLoade
import com.google.gson.Gson;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
import java.io.*;
import java.net.MalformedURLException;
import java.net.URI;
@@ -50,6 +55,11 @@ import javax.ws.rs.core.StreamingOutput;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.api.StorageClient;
+import org.apache.bookkeeper.api.kv.Table;
+import org.apache.bookkeeper.api.kv.result.KeyValue;
+import org.apache.bookkeeper.clients.StorageClientBuilder;
+import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.commons.io.IOUtils;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -460,7 +470,7 @@ public class FunctionsImpl {
return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build();
}
}
-
+
public Response getFunctionStatus(final String tenant, final String namespace, final String functionName)
throws IOException {
@@ -666,6 +676,65 @@ public class FunctionsImpl {
}
}
+ public Response getFunctionState(final String tenant, final String namespace,
+ final String functionName, final String key) {
+ if (!isWorkerServiceAvailable()) {
+ return getUnavailableResponse();
+ }
+
+ // validate parameters
+ try {
+ validateGetFunctionStateParams(tenant, namespace, functionName, key);
+ } catch (IllegalArgumentException e) {
+ log.error("Invalid getFunctionState request @ /{}/{}/{}/{}",
+ tenant, namespace, functionName, key, e);
+ return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData(e.getMessage())).build();
+ }
+
+ String tableNs = String.format(
+ "%s_%s",
+ tenant,
+ namespace).replace('-', '_');
+ String tableName = functionName;
+
+ String stateStorageServiceUrl = worker().getWorkerConfig().getStateStorageServiceUrl();
+
+ try (StorageClient client = StorageClientBuilder.newBuilder()
+ .withSettings(StorageClientSettings.newBuilder()
+ .serviceUri(stateStorageServiceUrl)
+ .clientName("functions-admin")
+ .build())
+ .withNamespace(tableNs)
+ .build()) {
+ try (Table<ByteBuf, ByteBuf> table = result(client.openTable(tableName))) {
+ try (KeyValue<ByteBuf, ByteBuf> kv = result(table.getKv(Unpooled.wrappedBuffer(key.getBytes(UTF_8))))) {
+ if (null == kv) {
+ return Response.status(Status.NOT_FOUND)
+ .entity(new String("key '" + key + "' doesn't exist."))
+ .build();
+ } else {
+ String value;
+ if (kv.isNumber()) {
+ value = "value : " + kv.numberValue() + ", version : " + kv.version();
+ } else {
+ value = "value : " + new String(ByteBufUtil.getBytes(kv.value()), UTF_8)
+ + ", version : " + kv.version();
+ }
+ return Response.status(Status.OK)
+ .entity(new String(value))
+ .build();
+ }
+ }
+ } catch (Exception e) {
+ log.error("Error while getFunctionState request @ /{}/{}/{}/{}",
+ tenant, namespace, functionName, key, e);
+ return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
+ .entity(new ErrorData(e.getMessage())).build();
+ }
+ }
+ }
+
public Response uploadFunction(final InputStream uploadedInputStream, final String path) {
// validate parameters
try {
@@ -787,6 +856,23 @@ public class FunctionsImpl {
return functionDetails;
}
+ private void validateGetFunctionStateParams(String tenant, String namespace, String functionName, String key)
+ throws IllegalArgumentException {
+
+ if (tenant == null) {
+ throw new IllegalArgumentException("Tenant is not provided");
+ }
+ if (namespace == null) {
+ throw new IllegalArgumentException("Namespace is not provided");
+ }
+ if (functionName == null) {
+ throw new IllegalArgumentException("Function Name is not provided");
+ }
+ if (key == null) {
+ throw new IllegalArgumentException("Key is not provided");
+ }
+ }
+
private boolean isFunctionCodeBuiltin(FunctionDetails functionDetails) {
if (functionDetails.hasSource()) {
SourceSpec sourceSpec = functionDetails.getSource();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 6c6fb18..789fbea 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -206,4 +206,14 @@ public class FunctionApiV2Resource extends FunctionApiResource {
public List<ConnectorDefinition> getConnectorsList() throws IOException {
return functions.getListOfConnectors();
}
+
+ @GET
+ @Path("/{tenant}/{namespace}/{functionName}/state/{key}")
+ public Response getFunctionState(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String functionName,
+ final @PathParam("key") String key) throws IOException {
+ return functions.getFunctionState(
+ tenant, namespace, functionName, key);
+ }
}