You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/03/20 01:46:04 UTC

[GitHub] merlimat closed pull request #1402: Added trigger Function capability

merlimat closed pull request #1402: Added trigger Function capability
URL: https://github.com/apache/incubator-pulsar/pull/1402
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 7080474ec..e1318ba4a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -139,4 +139,18 @@ public Response getAssignments() {
         return functions.getAssignments();
     }
 
+    @POST
+    @Path("/{tenant}/{namespace}/{functionName}/trigger")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response triggerFunction(final @PathParam("tenant") String tenant,
+                                    final @PathParam("namespace") String namespace,
+                                    final @PathParam("functionName") String functionName,
+                                    final @FormDataParam("data") String triggerValue,
+                                    final @FormDataParam("dataStream") InputStream triggerStream) {
+
+        return functions.triggerFunction(
+                tenant, namespace, functionName, triggerValue, triggerStream);
+
+    }
+
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index b38aed2dd..5dcf070b0 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -77,6 +77,7 @@
     private final GetFunctionStatus statuser;
     private final ListFunctions lister;
     private final StateGetter stateGetter;
+    private final TriggerFunction triggerer;
 
     /**
      * Base command
@@ -617,6 +618,22 @@ void runCmd() throws Exception {
         }
     }
 
+    @Parameters(commandDescription = "Trigger function")
+    class TriggerFunction extends FunctionCommand {
+        @Parameter(names = "--triggerValue", description = "The value the function needs to be triggered with")
+        protected String triggerValue;
+        @Parameter(names = "--triggerFile", description = "The fileName that contains data the function needs to be triggered with")
+        protected String triggerFile;
+        @Override
+        void runCmd() throws Exception {
+            if (triggerFile == null && triggerValue == null) {
+                throw new RuntimeException("One of triggerValue/triggerFile has to be present");
+            }
+            String retval = fnAdmin.functions().triggerFunction(tenant, namespace, functionName, triggerValue, triggerFile);
+            System.out.println(retval);
+        }
+    }
+
     public CmdFunctions(PulsarAdmin admin) {
         super("functions", admin);
         this.fnAdmin = (PulsarAdminWithFunctions) admin;
@@ -628,6 +645,7 @@ public CmdFunctions(PulsarAdmin admin) {
         statuser = new GetFunctionStatus();
         lister = new ListFunctions();
         stateGetter = new StateGetter();
+        triggerer = new TriggerFunction();
         jcommander.addCommand("localrun", getLocalRunner());
         jcommander.addCommand("create", getCreater());
         jcommander.addCommand("delete", getDeleter());
@@ -636,6 +654,7 @@ public CmdFunctions(PulsarAdmin admin) {
         jcommander.addCommand("getstatus", getStatuser());
         jcommander.addCommand("list", getLister());
         jcommander.addCommand("querystate", getStateGetter());
+        jcommander.addCommand("trigger", getTriggerer());
     }
 
     @VisibleForTesting
@@ -675,4 +694,9 @@ ListFunctions getLister() {
     StateGetter getStateGetter() {
         return stateGetter;
     }
+
+    @VisibleForTesting
+    TriggerFunction getTriggerer() {
+        return triggerer;
+    }
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 390d88da0..55ba08922 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -140,4 +140,24 @@
      */
     FunctionStatusList getFunctionStatus(String tenant, String namespace, String function) throws PulsarAdminException;
 
+    /**
+     * Triggers the function by writing to the input topic.
+     *
+     * @param tenant
+     *            Tenant name
+     * @param namespace
+     *            Namespace name
+     * @param function
+     *            Function name
+     * @param triggerValue
+     *            The input that will be written to input topic
+     * @param triggerFile
+     *            The file which contains the input that will be written to input topic
+     *
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    String triggerFunction(String tenant, String namespace, String function, String triggerValue, String triggerFile) throws PulsarAdminException;
+
+
 }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index f184ea2b2..73948230a 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -139,4 +139,24 @@ public void updateFunction(FunctionConfig functionConfig, String fileName) throw
             throw getApiException(e);
         }
     }
+
+    @Override
+    public String triggerFunction(String tenant, String namespace, String functionName, String triggerValue, String triggerFile) throws PulsarAdminException {
+        try {
+            final FormDataMultiPart mp = new FormDataMultiPart();
+            if (triggerFile != null) {
+                mp.bodyPart(new FileDataBodyPart("dataStream",
+                        new File(triggerFile),
+                        MediaType.APPLICATION_OCTET_STREAM_TYPE));
+            }
+            if (triggerValue != null) {
+                mp.bodyPart(new FormDataBodyPart("data", triggerValue, MediaType.TEXT_PLAIN_TYPE));
+            }
+            String response = request(functions.path(tenant).path(namespace).path(functionName).path("trigger"))
+                    .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA), String.class);
+            return response;
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
 }
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index 88a1961e9..cc5852613 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -179,33 +179,32 @@ def actual_execution(self):
       self.contextimpl.set_current_message_context(msg.message.message_id(), msg.topic)
       output_object = None
       self.saved_log_handler = None
+      if self.log_topic_handler is not None:
+        self.saved_log_handler = log.remove_all_handlers()
+        log.add_handler(self.log_topic_handler)
+      start_time = time.time()
+      self.current_stats.increment_processed(int(start_time) * 1000)
+      self.total_stats.increment_processed(int(start_time) * 1000)
+      successfully_executed = False
       try:
-        if self.log_topic_handler is not None:
-          self.saved_log_handler = log.remove_all_handlers()
-          log.add_handler(self.log_topic_handler)
-        start_time = time.time()
-        self.current_stats.increment_processed(int(start_time) * 1000)
-        self.total_stats.increment_processed(int(start_time) * 1000)
         if self.function_class is not None:
           output_object = self.function_class.process(input_object, self.contextimpl)
         else:
           output_object = self.function_purefunction.process(input_object)
-        end_time = time.time()
-        latency = (end_time - start_time) * 1000
-        self.total_stats.increment_successfully_processed(latency)
-        self.current_stats.increment_successfully_processed(latency)
-        self.process_result(output_object, msg)
+        successfully_executed = True
       except Exception as e:
-        if self.log_topic_handler is not None:
-          log.remove_all_handlers()
-          log.add_handler(self.saved_log_handler)
         Log.exception("Exception while executing user method")
         self.total_stats.record_user_exception(e)
         self.current_stats.record_user_exception(e)
-      finally:
-        if self.log_topic_handler is not None:
-          log.remove_all_handlers()
-          log.add_handler(self.saved_log_handler)
+      end_time = time.time()
+      latency = (end_time - start_time) * 1000
+      self.total_stats.increment_successfully_processed(latency)
+      self.current_stats.increment_successfully_processed(latency)
+      if self.log_topic_handler is not None:
+        log.remove_all_handlers()
+        log.add_handler(self.saved_log_handler)
+      if successfully_executed:
+        self.process_result(output_object, msg)
 
   def done_producing(self, consumer, orig_message, result, sent_message):
     if result == pulsar.Result.Ok and self.auto_ack and self.atleast_once:
@@ -224,7 +223,7 @@ def process_result(self, output, msg):
         self.current_stats.nserialization_exceptions += 1
         self.total_stats.nserialization_exceptions += 1
       if output_bytes is not None:
-        props = {"__pfn_input_topic__" : str(msg.topic), "__pfn_input_msg_id__" : str(base64.b64encode(msg.message.message_id().serialize()))}
+        props = {"__pfn_input_topic__" : str(msg.topic), "__pfn_input_msg_id__" : base64.b64encode(msg.message.message_id().serialize())}
         try:
           self.producer.send_async(output_bytes, partial(self.done_producing, msg.consumer, msg.message), properties=props)
         except Exception as e:
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 7e49b5d7a..40d2c0b62 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -23,13 +23,11 @@
 import com.google.gson.Gson;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.lang.reflect.Array;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
@@ -43,6 +41,10 @@
 import javax.ws.rs.core.Response.Status;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionConfig;
@@ -442,6 +444,88 @@ public Response getAssignments() {
                 new Gson().toJson(ret)).build();
     }
 
+    @POST
+    @Path("/{tenant}/{namespace}/{functionName}/trigger")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response triggerFunction(final @PathParam("tenant") String tenant,
+                                    final @PathParam("namespace") String namespace,
+                                    final @PathParam("name") String functionName,
+                                    final @FormDataParam("data") String input,
+                                    final @FormDataParam("dataStream") InputStream uploadedInputStream) {
+        FunctionConfig functionConfig;
+        // validate parameters
+        try {
+            validateTriggerRequestParams(tenant, namespace, functionName, input, uploadedInputStream);
+        } catch (IllegalArgumentException e) {
+            log.error("Invalid trigger function request @ /{}/{}/{}",
+                    tenant, namespace, functionName, e);
+            return Response.status(Status.BAD_REQUEST)
+                    .type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(e.getMessage())).build();
+        }
+
+        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
+            log.error("Function in getFunction does not exist @ /{}/{}/{}",
+                    tenant, namespace, functionName);
+            return Response.status(Status.NOT_FOUND)
+                    .type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
+        }
+
+        FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, functionName);
+        String inputTopicToWrite;
+        if (functionMetaData.getFunctionConfig().getInputsList().size() > 0) {
+            inputTopicToWrite = functionMetaData.getFunctionConfig().getInputsList().get(0);
+        } else {
+            inputTopicToWrite = functionMetaData.getFunctionConfig().getCustomSerdeInputs().entrySet().iterator().next().getKey();
+        }
+        String outputTopic = functionMetaData.getFunctionConfig().getOutput();
+        Reader reader = null;
+        Producer producer = null;
+        try {
+            if (outputTopic != null && !outputTopic.isEmpty()) {
+                reader = worker().getClient().newReader().topic(outputTopic).startMessageId(MessageId.latest).create();
+            }
+            producer = worker().getClient().newProducer().topic(inputTopicToWrite).create();
+            byte[] targetArray;
+            if (uploadedInputStream != null) {
+                targetArray = new byte[uploadedInputStream.available()];
+                uploadedInputStream.read(targetArray);
+            } else {
+                targetArray = input.getBytes();
+            }
+            MessageId msgId = producer.send(targetArray);
+            if (reader == null) {
+                return Response.status(Status.OK).build();
+            }
+            long curTime = System.currentTimeMillis();
+            long maxTime = curTime + 1000;
+            while (curTime < maxTime) {
+                Message msg = reader.readNext(10000, TimeUnit.MILLISECONDS);
+                if (msg == null) break;
+                if (msg.getProperties().containsKey("__pfn_input_msg_id__") &&
+                        msg.getProperties().containsKey("__pfn_input_topic__")) {
+                    MessageId newMsgId = MessageId.fromByteArray(Base64.getDecoder().decode((String) msg.getProperties().get("__pfn_input_msg_id__")));
+                    if (msgId.equals(newMsgId) && msg.getProperties().get("__pfn_input_topic__").equals(inputTopicToWrite)) {
+                        return Response.status(Status.OK).entity(msg.getData()).build();
+                    }
+                }
+                curTime = System.currentTimeMillis();
+            }
+            return Response.status(Status.REQUEST_TIMEOUT).build();
+        } catch (Exception e) {
+            return Response.status(Status.INTERNAL_SERVER_ERROR).build();
+        } finally {
+            if (reader != null) {
+                reader.closeAsync();
+            }
+            if (producer != null) {
+                producer.closeAsync();
+            }
+        }
+    }
+
     private void validateListFunctionRequestParams(String tenant, String namespace) throws IllegalArgumentException {
 
         if (tenant == null) {
@@ -550,4 +634,23 @@ private FunctionConfig validateUpdateRequestParams(String tenant,
         }
     }
 
+    private void validateTriggerRequestParams(String tenant,
+                                              String namespace,
+                                              String functionName,
+                                              String input,
+                                              InputStream uploadedInputStream) {
+        if (tenant == null) {
+            throw new IllegalArgumentException("Tenant is not provided");
+        }
+        if (namespace == null) {
+            throw new IllegalArgumentException("Namespace is not provided");
+        }
+        if (functionName == null) {
+            throw new IllegalArgumentException("Function Name is not provided");
+        }
+        if (uploadedInputStream == null && input == null) {
+            throw new IllegalArgumentException("Trigger Data is not provided");
+        }
+    }
+
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 7effcd8d8..1d7a8b67e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -128,4 +128,15 @@ public Response getCluster() {
     public Response getAssignments() {
         return functions.getAssignments();
     }
+
+    @POST
+    @Path("/{tenant}/{namespace}/{functionName}/trigger")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public Response triggerFunction(final @PathParam("tenant") String tenant,
+                                    final @PathParam("namespace") String namespace,
+                                    final @PathParam("name") String functionName,
+                                    final @FormDataParam("data") String input,
+                                    final @FormDataParam("dataStream") InputStream uploadedInputStream) {
+        return functions.triggerFunction(tenant, namespace, functionName, input, uploadedInputStream);
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services