You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/01 01:56:28 UTC
[incubator-pulsar] branch master updated: Allow functions to be
triggered without specifying topic name (#1696)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fb7198a Allow functions to be triggered without specifying topic name (#1696)
fb7198a is described below
commit fb7198a669e0647cf3213739b7b6ffb8c650d978
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Mon Apr 30 18:56:25 2018 -0700
Allow functions to be triggered without specifying topic name (#1696)
* Re-added trigger functionality with no need for topic name
* Unified paths
* Fix
---
.../pulsar/broker/admin/impl/FunctionsBase.java | 6 +++---
.../org/apache/pulsar/client/admin/Functions.java | 2 +-
.../pulsar/client/admin/internal/FunctionsImpl.java | 8 +++++---
.../org/apache/pulsar/admin/cli/CmdFunctions.java | 4 +++-
.../functions/worker/rest/api/FunctionsImpl.java | 21 ++++++++++++---------
.../worker/rest/api/v2/FunctionApiV2Resource.java | 6 +++---
6 files changed, 27 insertions(+), 20 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 80e98af..c22b611 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -237,12 +237,12 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
public Response triggerFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
- final @PathParam("topic") String topic,
final @FormDataParam("data") String triggerValue,
- final @FormDataParam("dataStream") InputStream triggerStream) {
+ final @FormDataParam("dataStream") InputStream triggerStream,
+ final @FormDataParam("topic") String topic) {
return functions.triggerFunction(
- tenant, namespace, functionName, topic, triggerValue, triggerStream);
+ tenant, namespace, functionName, triggerValue, triggerStream, topic);
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 8f058c7..9a77495 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -157,7 +157,7 @@ public interface Functions {
* @throws PulsarAdminException
* Unexpected error
*/
- String triggerFunction(String tenant, String namespace, String function, String triggerValue, String triggerFile) throws PulsarAdminException;
+ String triggerFunction(String tenant, String namespace, String function, String topic, String triggerValue, String triggerFile) throws PulsarAdminException;
/**
* Upload Data.
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 30e6bd9..9d5b8ee 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -149,7 +149,7 @@ public class FunctionsImpl extends BaseResource implements Functions {
}
@Override
- public String triggerFunction(String tenant, String namespace, String functionName, String triggerValue, String triggerFile) throws PulsarAdminException {
+ public String triggerFunction(String tenant, String namespace, String functionName, String topic, String triggerValue, String triggerFile) throws PulsarAdminException {
try {
final FormDataMultiPart mp = new FormDataMultiPart();
if (triggerFile != null) {
@@ -160,9 +160,11 @@ public class FunctionsImpl extends BaseResource implements Functions {
if (triggerValue != null) {
mp.bodyPart(new FormDataBodyPart("data", triggerValue, MediaType.TEXT_PLAIN_TYPE));
}
- String response = request(functions.path(tenant).path(namespace).path(functionName).path("trigger"))
+ if (topic != null && !topic.isEmpty()) {
+ mp.bodyPart(new FormDataBodyPart("topic", topic, MediaType.TEXT_PLAIN_TYPE));
+ }
+ return request(functions.path(tenant).path(namespace).path(functionName).path("trigger"))
.post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), String.class);
- return response;
} catch (Exception e) {
throw getApiException(e);
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index e2f0a71..8cc1e23 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -776,12 +776,14 @@ public class CmdFunctions extends CmdBase {
protected String triggerValue;
@Parameter(names = "--triggerFile", description = "The path to the file that contains the data with which you'd like to trigger the function")
protected String triggerFile;
+ @Parameter(names = "--topic", description = "The specific topic name that the function consumes from that you want to inject the data to")
+ protected String topic;
@Override
void runCmd() throws Exception {
if (triggerFile == null && triggerValue == null) {
throw new RuntimeException("Either a trigger value or a trigger filepath needs to be specified");
}
- String retval = admin.functions().triggerFunction(tenant, namespace, functionName, triggerValue, triggerFile);
+ String retval = admin.functions().triggerFunction(tenant, namespace, functionName, topic, triggerValue, triggerFile);
System.out.println(retval);
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 04b6c70..e95f620 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -448,14 +448,14 @@ public class FunctionsImpl {
}
@POST
- @Path("/{tenant}/{namespace}/{functionName}/{topic}/trigger")
+ @Path("/{tenant}/{namespace}/{functionName}/trigger")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public Response triggerFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("name") String functionName,
- final @PathParam("topic") String topic,
final @FormDataParam("data") String input,
- final @FormDataParam("dataStream") InputStream uploadedInputStream) {
+ final @FormDataParam("dataStream") InputStream uploadedInputStream,
+ final @FormDataParam("topic") String topic) {
FunctionDetails functionDetails;
// validate parameters
try {
@@ -480,9 +480,15 @@ public class FunctionsImpl {
FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, functionName);
String inputTopicToWrite;
- // only if the source is PulsarSource
- if (functionMetaData.getFunctionDetails().getSource().getClassName().equals(PulsarSource.class.getName())) {
- inputTopicToWrite = topic;
+ // only if the source is PulsarSource and if the function consumes only one topic
+ if (!functionMetaData.getFunctionDetails().getSource().getClassName().equals(PulsarSource.class.getName())) {
+ return Response.status(Status.BAD_REQUEST).build();
+ }
+ if (topic != null) {
+ inputTopicToWrite = topic;
+ } else if (functionMetaData.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap().size() == 1) {
+ inputTopicToWrite =
+ functionMetaData.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap().keySet().iterator().next();
} else {
return Response.status(Status.BAD_REQUEST).build();
}
@@ -707,9 +713,6 @@ public class FunctionsImpl {
if (functionName == null) {
throw new IllegalArgumentException("Function Name is not provided");
}
- if (topic == null) {
- throw new IllegalArgumentException("Topic Name is not provided");
- }
if (uploadedInputStream == null && input == null) {
throw new IllegalArgumentException("Trigger Data is not provided");
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 12068d2..00c2a28 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -136,10 +136,10 @@ public class FunctionApiV2Resource extends FunctionApiResource {
public Response triggerFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("name") String functionName,
- final @PathParam("topic") String topic,
final @FormDataParam("data") String input,
- final @FormDataParam("dataStream") InputStream uploadedInputStream) {
- return functions.triggerFunction(tenant, namespace, functionName, topic, input, uploadedInputStream);
+ final @FormDataParam("dataStream") InputStream uploadedInputStream,
+ final @FormDataParam("topic") String topic) {
+ return functions.triggerFunction(tenant, namespace, functionName, input, uploadedInputStream, topic);
}
@POST
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.