You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/12/15 00:40:50 UTC

[GitHub] aahmed-se closed pull request #2698: Rework functions api with consumer

aahmed-se closed pull request #2698: Rework functions api with consumer
URL: https://github.com/apache/pulsar/pull/2698
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Consumer.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Consumer.java
new file mode 100644
index 0000000000..1fb635997b
--- /dev/null
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Consumer.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api;
+
+/**
+ * This is one of the core interface of the function api. The process is called
+ * for every message of the input topic of the function. The incoming input bytes
+ * are converted to the input type I for simple Java types(String, Integer, Boolean,
+ * Map, and List types) and for org.Json type. If this serialization approach does not
+ * meet your needs, you can use the byte stream handler defined in RawRequestHandler.
+ */
+@FunctionalInterface
+public interface Consumer<I> {
+
+    /**
+     * Process the input.
+     */
+    void accept(I input, Context context) throws Exception;
+}
\ No newline at end of file
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index c66ea6efa7..07828e2093 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -108,7 +108,13 @@
     Logger getLogger();
 
     /**
-     * Increment the builtin distributed counter refered by key
+     * Increment the builtin distributed counter referred by key
+     * @param key The name of the key
+     */
+    void incrCount(String key);
+
+    /**
+     * Increment the builtin distributed counter referred by key by 1
      * @param key The name of the key
      * @param amount The amount to be incremented
      */
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 4d474333b9..ffab601b53 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -226,6 +226,11 @@ public void incrCounter(String key, long amount) {
         }
     }
 
+    @Override
+    public void incrCount(String key) {
+        incrCounter(key, 1);
+    }
+
     @Override
     public long getCounter(String key) {
         ensureStateEnabled();
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 39acdb57c9..97666a9f5d 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -22,6 +22,7 @@
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.pulsar.functions.api.Consumer;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -37,6 +38,7 @@
     @Getter(AccessLevel.PACKAGE)
     private final ContextImpl context;
     private Function function;
+    private Consumer consumer;
     private java.util.function.Function javaUtilFunction;
 
     public JavaInstance(ContextImpl contextImpl, Object userClassObject) {
@@ -46,7 +48,11 @@ public JavaInstance(ContextImpl contextImpl, Object userClassObject) {
         // create the functions
         if (userClassObject instanceof Function) {
             this.function = (Function) userClassObject;
-        } else {
+        }
+        else if (userClassObject instanceof Consumer) {
+            this.consumer = (Consumer) userClassObject;
+        }
+        else {
             this.javaUtilFunction = (java.util.function.Function) userClassObject;
         }
     }
@@ -60,7 +66,11 @@ public JavaExecutionResult handleMessage(Record<?> record, Object input) {
             Object output;
             if (function != null) {
                 output = function.process(input, context);
-            } else {
+            } else if (consumer != null) {
+                consumer.accept(input, context);
+                output = null;
+            }
+            else {
                 output = javaUtilFunction.apply(input);
             }
             executionResult.setResult(output);
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WordCountFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WordCountFunction.java
index d9e6747ab9..6a8de0f485 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WordCountFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WordCountFunction.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pulsar.functions.api.examples;
 
+import org.apache.pulsar.functions.api.Consumer;
 import org.apache.pulsar.functions.api.Context;
-import org.apache.pulsar.functions.api.Function;
 
 import java.util.Arrays;
 
@@ -29,10 +29,10 @@
  * The built in counter state is used to keep track of the word count in a
  * persistent and consistent manner.
  */
-public class WordCountFunction implements Function<String, Void> {
+public class WordCountFunction implements Consumer<String> {
+
     @Override
-    public Void process(String input, Context context) throws Exception {
-        Arrays.asList(input.split("\\.")).forEach(word -> context.incrCounter(word, 1));
-        return null;
+    public void accept(String input, Context context) {
+        Arrays.asList(input.split(" ")).forEach(word -> context.incrCount(word));
     }
 }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index 94c315df3f..e4cde1b470 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -32,6 +32,7 @@
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.TopicMessageIdImpl;
+import org.apache.pulsar.functions.api.Consumer;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails.Runtime;
 import org.apache.pulsar.io.core.Sink;
@@ -130,7 +131,12 @@ public static int findAvailablePort() {
             if (userClass instanceof Function) {
                 Function pulsarFunction = (Function) userClass;
                 typeArgs = TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass());
-            } else {
+            }
+            else if (userClass instanceof Consumer){
+                Consumer pulsarFunction = (Consumer) userClass;
+                typeArgs = TypeResolver.resolveRawArguments(Consumer.class, pulsarFunction.getClass());
+            }
+            else {
                 java.util.function.Function function = (java.util.function.Function) userClass;
                 typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass());
             }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services