You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/01 01:56:28 UTC

[incubator-pulsar] branch master updated: Allow functions to be triggered without specifying topic name (#1696)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fb7198a  Allow functions to be triggered without specifying topic name (#1696)
fb7198a is described below

commit fb7198a669e0647cf3213739b7b6ffb8c650d978
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Mon Apr 30 18:56:25 2018 -0700

    Allow functions to be triggered without specifying topic name (#1696)
    
    * Re-added trigger functionality with no need for topic name
    
    * Unified paths
    
    * Fix
---
 .../pulsar/broker/admin/impl/FunctionsBase.java     |  6 +++---
 .../org/apache/pulsar/client/admin/Functions.java   |  2 +-
 .../pulsar/client/admin/internal/FunctionsImpl.java |  8 +++++---
 .../org/apache/pulsar/admin/cli/CmdFunctions.java   |  4 +++-
 .../functions/worker/rest/api/FunctionsImpl.java    | 21 ++++++++++++---------
 .../worker/rest/api/v2/FunctionApiV2Resource.java   |  6 +++---
 6 files changed, 27 insertions(+), 20 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 80e98af..c22b611 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -237,12 +237,12 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
     public Response triggerFunction(final @PathParam("tenant") String tenant,
                                     final @PathParam("namespace") String namespace,
                                     final @PathParam("functionName") String functionName,
-                                    final @PathParam("topic") String topic,
                                     final @FormDataParam("data") String triggerValue,
-                                    final @FormDataParam("dataStream") InputStream triggerStream) {
+                                    final @FormDataParam("dataStream") InputStream triggerStream,
+                                    final @FormDataParam("topic") String topic) {
 
         return functions.triggerFunction(
-                tenant, namespace, functionName, topic, triggerValue, triggerStream);
+                tenant, namespace, functionName, triggerValue, triggerStream, topic);
 
     }
 
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 8f058c7..9a77495 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -157,7 +157,7 @@ public interface Functions {
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    String triggerFunction(String tenant, String namespace, String function, String triggerValue, String triggerFile) throws PulsarAdminException;
+    String triggerFunction(String tenant, String namespace, String function, String topic, String triggerValue, String triggerFile) throws PulsarAdminException;
 
     /**
      * Upload Data.
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 30e6bd9..9d5b8ee 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -149,7 +149,7 @@ public class FunctionsImpl extends BaseResource implements Functions {
     }
 
     @Override
-    public String triggerFunction(String tenant, String namespace, String functionName, String triggerValue, String triggerFile) throws PulsarAdminException {
+    public String triggerFunction(String tenant, String namespace, String functionName, String topic, String triggerValue, String triggerFile) throws PulsarAdminException {
         try {
             final FormDataMultiPart mp = new FormDataMultiPart();
             if (triggerFile != null) {
@@ -160,9 +160,11 @@ public class FunctionsImpl extends BaseResource implements Functions {
             if (triggerValue != null) {
                 mp.bodyPart(new FormDataBodyPart("data", triggerValue, MediaType.TEXT_PLAIN_TYPE));
             }
-            String response = request(functions.path(tenant).path(namespace).path(functionName).path("trigger"))
+            if (topic != null && !topic.isEmpty()) {
+                mp.bodyPart(new FormDataBodyPart("topic", topic, MediaType.TEXT_PLAIN_TYPE));
+            }
+            return request(functions.path(tenant).path(namespace).path(functionName).path("trigger"))
                     .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), String.class);
-            return response;
         } catch (Exception e) {
             throw getApiException(e);
         }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index e2f0a71..8cc1e23 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -776,12 +776,14 @@ public class CmdFunctions extends CmdBase {
         protected String triggerValue;
         @Parameter(names = "--triggerFile", description = "The path to the file that contains the data with which you'd like to trigger the function")
         protected String triggerFile;
+        @Parameter(names = "--topic", description = "The specific topic name that the function consumes from that you want to inject the data to")
+        protected String topic;
         @Override
         void runCmd() throws Exception {
             if (triggerFile == null && triggerValue == null) {
                 throw new RuntimeException("Either a trigger value or a trigger filepath needs to be specified");
             }
-            String retval = admin.functions().triggerFunction(tenant, namespace, functionName, triggerValue, triggerFile);
+            String retval = admin.functions().triggerFunction(tenant, namespace, functionName, topic, triggerValue, triggerFile);
             System.out.println(retval);
         }
     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 04b6c70..e95f620 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -448,14 +448,14 @@ public class FunctionsImpl {
     }
 
     @POST
-    @Path("/{tenant}/{namespace}/{functionName}/{topic}/trigger")
+    @Path("/{tenant}/{namespace}/{functionName}/trigger")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
     public Response triggerFunction(final @PathParam("tenant") String tenant,
                                     final @PathParam("namespace") String namespace,
                                     final @PathParam("name") String functionName,
-                                    final @PathParam("topic") String topic,
                                     final @FormDataParam("data") String input,
-                                    final @FormDataParam("dataStream") InputStream uploadedInputStream) {
+                                    final @FormDataParam("dataStream") InputStream uploadedInputStream,
+                                    final @FormDataParam("topic") String topic) {
         FunctionDetails functionDetails;
         // validate parameters
         try {
@@ -480,9 +480,15 @@ public class FunctionsImpl {
         FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, functionName);
 
         String inputTopicToWrite;
-        // only if the source is PulsarSource
-        if (functionMetaData.getFunctionDetails().getSource().getClassName().equals(PulsarSource.class.getName())) {
-            inputTopicToWrite =  topic;
+        // only if the source is PulsarSource and if the function consumes only one topic
+        if (!functionMetaData.getFunctionDetails().getSource().getClassName().equals(PulsarSource.class.getName())) {
+            return Response.status(Status.BAD_REQUEST).build();
+        }
+        if (topic != null) {
+            inputTopicToWrite = topic;
+        } else if (functionMetaData.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap().size() == 1) {
+            inputTopicToWrite =
+                    functionMetaData.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap().keySet().iterator().next();
         } else {
             return Response.status(Status.BAD_REQUEST).build();
         }
@@ -707,9 +713,6 @@ public class FunctionsImpl {
         if (functionName == null) {
             throw new IllegalArgumentException("Function Name is not provided");
         }
-        if (topic == null) {
-            throw new IllegalArgumentException("Topic Name is not provided");
-        }
         if (uploadedInputStream == null && input == null) {
             throw new IllegalArgumentException("Trigger Data is not provided");
         }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 12068d2..00c2a28 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -136,10 +136,10 @@ public class FunctionApiV2Resource extends FunctionApiResource {
     public Response triggerFunction(final @PathParam("tenant") String tenant,
                                     final @PathParam("namespace") String namespace,
                                     final @PathParam("name") String functionName,
-                                    final @PathParam("topic") String topic,
                                     final @FormDataParam("data") String input,
-                                    final @FormDataParam("dataStream") InputStream uploadedInputStream) {
-        return functions.triggerFunction(tenant, namespace, functionName, topic, input, uploadedInputStream);
+                                    final @FormDataParam("dataStream") InputStream uploadedInputStream,
+                                    final @FormDataParam("topic") String topic) {
+        return functions.triggerFunction(tenant, namespace, functionName, input, uploadedInputStream, topic);
     }
 
     @POST

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.