You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/08/05 06:44:46 UTC

[pulsar] branch master updated: Add examples of window functions for repo (#7727)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 669bdd0  Add examples of window functions for repo (#7727)
669bdd0 is described below

commit 669bdd0f0338f6409d461a429e42c1eb92f0f615
Author: 冉小龙 <rx...@apache.org>
AuthorDate: Wed Aug 5 14:44:36 2020 +0800

    Add examples of window functions for repo (#7727)
    
    * Add examples of window functions for repo
    
    Signed-off-by: xiaolong.ran <rx...@apache.org>
    
    * Add word count example for window function
    
    Signed-off-by: xiaolong.ran <rx...@apache.org>
    
    * add license header for /**
    
    Signed-off-by: xiaolong.ran <rx...@apache.org>
---
 .../{ => window}/ContextWindowFunction.java        |  2 +-
 .../LoggingWindowFunction.java}                    | 26 ++++++++++----------
 .../PublishWindowFunction.java}                    | 28 ++++++++++------------
 .../UserConfigWindowFunction.java}                 | 23 +++++++++---------
 .../UserExceptionWindowFunction.java}              | 23 +++++++-----------
 .../UserMetricWindowFunction.java}                 | 26 ++++++++++----------
 .../WordCountWindowFunction.java}                  | 27 ++++++++++-----------
 7 files changed, 74 insertions(+), 81 deletions(-)

diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/window/ContextWindowFunction.java
similarity index 96%
copy from pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java
copy to pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/window/ContextWindowFunction.java
index fe90d1e..d240c2a 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/window/ContextWindowFunction.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.api.examples;
+package org.apache.pulsar.functions.api.examples.window;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.api.Record;
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/window/LoggingWindowFunction.java
similarity index 60%
copy from pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java
copy to pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/window/LoggingWindowFunction.java
index fe90d1e..93397b8 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/window/LoggingWindowFunction.java
@@ -16,26 +16,28 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.api.examples;
+package org.apache.pulsar.functions.api.examples.window;
 
-import lombok.extern.slf4j.Slf4j;
+import java.util.Collection;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.api.WindowContext;
 import org.apache.pulsar.functions.api.WindowFunction;
-
-import java.util.Collection;
+import org.slf4j.Logger;
 
 /**
- * Example Function that acts on a window of tuples at a time rather than per tuple basis.
+ * A function that demonstrates how to redirect logging to a topic.
+ * In this particular example, for every input string, the function
+ * does some logging. If --logTopic topic is specified, these log statements
+ * end up in that specified pulsar topic.
  */
-@Slf4j
-public class ContextWindowFunction implements WindowFunction<Integer, Integer> {
+public class LoggingWindowFunction implements WindowFunction<String, Void> {
+
     @Override
-    public Integer process(Collection<Record<Integer>> integers, WindowContext context) {
-        Integer retval = 0;
-        for (Record<Integer> record : integers) {
-            retval += record.getValue();
+    public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
+        Logger log = context.getLogger();
+        for (Record<String> record : inputs) {
+            log.info(record + "-window-log");
         }
-        return retval;
+        return null;
     }
 }
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/window/PublishWindowFunction.java
similarity index 54%
copy from pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java
copy to pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/window/PublishWindowFunction.java
index fe90d1e..e623a6b 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/window/PublishWindowFunction.java
@@ -16,26 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.api.examples;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.functions.api.WindowContext;
-import org.apache.pulsar.functions.api.WindowFunction;
+package org.apache.pulsar.functions.api.examples.window;
 
 import java.util.Collection;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.functions.api.*;
 
 /**
- * Example Function that acts on a window of tuples at a time rather than per tuple basis.
+ * Example function that uses the built in publish function in the context
+ * to publish to a desired topic based on config.
  */
-@Slf4j
-public class ContextWindowFunction implements WindowFunction<Integer, Integer> {
+public class PublishWindowFunction implements WindowFunction<String, Void> {
     @Override
-    public Integer process(Collection<Record<Integer>> integers, WindowContext context) {
-        Integer retval = 0;
-        for (Record<Integer> record : integers) {
-            retval += record.getValue();
-        }
-        return retval;
+    public Void process(Collection<Record<String>> input, WindowContext context) throws Exception {
+        String publishTopic = (String) context.getUserConfigValueOrDefault("publish-topic", "publishtopic");
+        String output = String.format("%s!", input);
+        context.publish(publishTopic, output);
+
+        return null;
     }
 }
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/window/UserConfigWindowFunction.java
similarity index 64%
copy from pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java
copy to pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/window/UserConfigWindowFunction.java
index fe90d1e..ffe4cd1 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/window/UserConfigWindowFunction.java
@@ -16,26 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.api.examples;
+package org.apache.pulsar.functions.api.examples.window;
 
-import lombok.extern.slf4j.Slf4j;
+import java.util.Collection;
+import java.util.Optional;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.api.WindowContext;
 import org.apache.pulsar.functions.api.WindowFunction;
 
-import java.util.Collection;
-
 /**
- * Example Function that acts on a window of tuples at a time rather than per tuple basis.
+ * An example demonstrate retrieving user config value from Context.
  */
-@Slf4j
-public class ContextWindowFunction implements WindowFunction<Integer, Integer> {
+public class UserConfigWindowFunction implements WindowFunction<String, String> {
     @Override
-    public Integer process(Collection<Record<Integer>> integers, WindowContext context) {
-        Integer retval = 0;
-        for (Record<Integer> record : integers) {
-            retval += record.getValue();
+    public String process(Collection<Record<String>> input, WindowContext context) throws Exception {
+        Optional<Object> whatToWrite = context.getUserConfigValue("WhatToWrite");
+        if (whatToWrite.get() != null) {
+            return (String)whatToWrite.get();
+        } else {
+            return "Not a nice way";
         }
-        return retval;
     }
 }
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/window/UserExceptionWindowFunction.java
similarity index 56%
copy from pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java
copy to pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/window/UserExceptionWindowFunction.java
index fe90d1e..6f56209 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/window/UserExceptionWindowFunction.java
@@ -16,26 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.api.examples;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.functions.api.WindowContext;
-import org.apache.pulsar.functions.api.WindowFunction;
+package org.apache.pulsar.functions.api.examples.window;
 
 import java.util.Collection;
+import org.apache.pulsar.functions.api.*;
+
 
 /**
- * Example Function that acts on a window of tuples at a time rather than per tuple basis.
+ * This Function simulates a pulsar function encountering runtime errors.
  */
-@Slf4j
-public class ContextWindowFunction implements WindowFunction<Integer, Integer> {
+public class UserExceptionWindowFunction implements WindowFunction<String, String> {
     @Override
-    public Integer process(Collection<Record<Integer>> integers, WindowContext context) {
-        Integer retval = 0;
-        for (Record<Integer> record : integers) {
-            retval += record.getValue();
-        }
-        return retval;
+    public String process(Collection<Record<String>> input, WindowContext context) throws Exception {
+        throw new RuntimeException("This wont work");
     }
 }
+
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/window/UserMetricWindowFunction.java
similarity index 64%
copy from pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java
copy to pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/window/UserMetricWindowFunction.java
index fe90d1e..dca456b 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/window/UserMetricWindowFunction.java
@@ -16,26 +16,28 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.api.examples;
+package org.apache.pulsar.functions.api.examples.window;
 
-import lombok.extern.slf4j.Slf4j;
+import java.util.Collection;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.api.WindowContext;
 import org.apache.pulsar.functions.api.WindowFunction;
 
-import java.util.Collection;
-
 /**
- * Example Function that acts on a window of tuples at a time rather than per tuple basis.
+ * Example function that wants to keep track of
+ * the event time of each message sent.
  */
-@Slf4j
-public class ContextWindowFunction implements WindowFunction<Integer, Integer> {
+public class UserMetricWindowFunction implements WindowFunction<String, Void> {
+
     @Override
-    public Integer process(Collection<Record<Integer>> integers, WindowContext context) {
-        Integer retval = 0;
-        for (Record<Integer> record : integers) {
-            retval += record.getValue();
+    public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
+
+        for (Record<String> record : inputs) {
+            if (record.getEventTime().isPresent()) {
+                context.recordMetric("MessageEventTime", record.getEventTime().get().doubleValue());
+            }
         }
-        return retval;
+
+        return null;
     }
 }
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/window/WordCountWindowFunction.java
similarity index 54%
rename from pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java
rename to pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/window/WordCountWindowFunction.java
index fe90d1e..39f838d 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/window/WordCountWindowFunction.java
@@ -16,26 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.api.examples;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.functions.api.WindowContext;
-import org.apache.pulsar.functions.api.WindowFunction;
+package org.apache.pulsar.functions.api.examples.window;
 
+import java.util.Arrays;
 import java.util.Collection;
+import org.apache.pulsar.functions.api.*;
 
 /**
- * Example Function that acts on a window of tuples at a time rather than per tuple basis.
+ * The classic word count example done using pulsar functions
+ * Each input message is a sentence that split into words and each word counted.
+ * The built in counter state is used to keep track of the word count in a
+ * persistent and consistent manner.
  */
-@Slf4j
-public class ContextWindowFunction implements WindowFunction<Integer, Integer> {
+public class WordCountWindowFunction implements WindowFunction<String, Void> {
     @Override
-    public Integer process(Collection<Record<Integer>> integers, WindowContext context) {
-        Integer retval = 0;
-        for (Record<Integer> record : integers) {
-            retval += record.getValue();
+    public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
+        for (Record<String> input : inputs) {
+            Arrays.asList(input.getValue().split("\\.")).forEach(word -> context.incrCounter(word, 1));
         }
-        return retval;
+        return null;
+
     }
 }