You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/01/14 17:54:52 UTC

[pulsar] Diff for: [GitHub] srkukarni merged pull request #3324: Add Windowfunction interface to functions api

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowContext.java
similarity index 99%
rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java
rename to pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowContext.java
index 2f1f2e700c..0abc87ad72 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowContext.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.windowing;
+package org.apache.pulsar.functions.api;
 
 import org.slf4j.Logger;
 
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowFunction.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowFunction.java
new file mode 100644
index 0000000000..6f2c4210a4
--- /dev/null
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowFunction.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api;
+
+import java.util.Collection;
+
+/**
+ * This is the interface of the windowed function api. The process method is called
+ * for every triggered window.
+ */
+@FunctionalInterface
+public interface WindowFunction<I, O> {
+    /**
+     * Process the input.
+     * @return the output
+     */
+    O process(Collection<Record<I>> input, WindowContext context) throws Exception;
+}
\ No newline at end of file
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java
index 41e8ebed76..de00f52fa4 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions.windowing;
 
 import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.WindowContext;
 import org.slf4j.Logger;
 
 import java.nio.ByteBuffer;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
index e288261d53..19459499cd 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
@@ -30,9 +30,7 @@
 
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.pulsar.functions.api.Context;
-import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.api.*;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.common.functions.WindowConfig;
 import org.apache.pulsar.functions.windowing.evictors.CountEvictionPolicy;
@@ -51,22 +49,23 @@
 
     private boolean initialized;
     protected WindowConfig windowConfig;
-    private WindowManager<I> windowManager;
+    private WindowManager<Record<I>> windowManager;
     private TimestampExtractor<I> timestampExtractor;
-    protected transient WaterMarkEventGenerator<I> waterMarkEventGenerator;
+    protected transient WaterMarkEventGenerator<Record<I>> waterMarkEventGenerator;
 
-    protected java.util.function.Function<Collection<I>, O> windowFunction;
+    protected java.util.function.Function<Collection<I>, O> bareWindowFunction;
+    protected WindowFunction<I, O> windowFunction;
 
     public void initialize(Context context) {
         this.windowConfig = this.getWindowConfigs(context);
-        this.windowFunction = intializeUserFunction(this.windowConfig);
+        initializeUserFunction(this.windowConfig);
         log.info("Window Config: {}", this.windowConfig);
         this.windowManager = this.getWindowManager(this.windowConfig, context);
         this.initialized = true;
         this.start();
     }
 
-    private java.util.function.Function<Collection<I>, O> intializeUserFunction(WindowConfig windowConfig) {
+    private void initializeUserFunction(WindowConfig windowConfig) {
         String actualWindowFunctionClassName = windowConfig.getActualWindowFunctionClassName();
         ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
         Object userClassObject = Reflections.createInstance(
@@ -76,10 +75,12 @@ public void initialize(Context context) {
             Class<?>[] typeArgs = TypeResolver.resolveRawArguments(
                     java.util.function.Function.class, userClassObject.getClass());
             if (typeArgs[0].equals(Collection.class)) {
-                return (java.util.function.Function) userClassObject;
+                bareWindowFunction = (java.util.function.Function) userClassObject;
             } else {
                 throw new IllegalArgumentException("Window function must take a collection as input");
             }
+        } else if (userClassObject instanceof WindowFunction) {
+            windowFunction = (WindowFunction) userClassObject;
         } else {
             throw new IllegalArgumentException("Window function does not implement the correct interface");
         }
@@ -97,10 +98,10 @@ private WindowConfig getWindowConfigs(Context context) {
         return windowConfig;
     }
 
-    private WindowManager<I> getWindowManager(WindowConfig windowConfig, Context context) {
+    private WindowManager<Record<I>> getWindowManager(WindowConfig windowConfig, Context context) {
 
-        WindowLifecycleListener<Event<I>> lifecycleListener = newWindowLifecycleListener(context);
-        WindowManager<I> manager = new WindowManager<>(lifecycleListener, new ConcurrentLinkedQueue<>());
+        WindowLifecycleListener<Event<Record<I>>> lifecycleListener = newWindowLifecycleListener(context);
+        WindowManager<Record<I>> manager = new WindowManager<>(lifecycleListener, new ConcurrentLinkedQueue<>());
 
         if (this.windowConfig.getTimestampExtractorClassName() != null) {
             this.timestampExtractor = getTimeStampExtractor(windowConfig);
@@ -115,8 +116,8 @@ private WindowConfig getWindowConfigs(Context context) {
             }
         }
 
-        EvictionPolicy<I, ?> evictionPolicy = getEvictionPolicy(windowConfig);
-        TriggerPolicy<I, ?> triggerPolicy = getTriggerPolicy(windowConfig, manager,
+        EvictionPolicy<Record<I>, ?> evictionPolicy = getEvictionPolicy(windowConfig);
+        TriggerPolicy<Record<I>, ?> triggerPolicy = getTriggerPolicy(windowConfig, manager,
                 evictionPolicy, context);
         manager.setEvictionPolicy(evictionPolicy);
         manager.setTriggerPolicy(triggerPolicy);
@@ -162,8 +163,8 @@ private WindowConfig getWindowConfigs(Context context) {
         return (TimestampExtractor<I>) result;
     }
 
-    private TriggerPolicy<I, ?> getTriggerPolicy(WindowConfig windowConfig, WindowManager<I> manager,
-                                                 EvictionPolicy<I, ?> evictionPolicy, Context context) {
+    private TriggerPolicy<Record<I>, ?> getTriggerPolicy(WindowConfig windowConfig, WindowManager<Record<I>> manager,
+                                                 EvictionPolicy<Record<I>, ?> evictionPolicy, Context context) {
         if (windowConfig.getSlidingIntervalCount() != null) {
             if (this.isEventTime()) {
                 return new WatermarkCountTriggerPolicy<>(
@@ -181,7 +182,7 @@ private WindowConfig getWindowConfigs(Context context) {
         }
     }
 
-    private EvictionPolicy<I, ?> getEvictionPolicy(WindowConfig windowConfig) {
+    private EvictionPolicy<Record<I>, ?> getEvictionPolicy(WindowConfig windowConfig) {
         if (windowConfig.getWindowLengthCount() != null) {
             if (this.isEventTime()) {
                 return new WatermarkCountEvictionPolicy<>(windowConfig.getWindowLengthCount());
@@ -198,17 +199,17 @@ private WindowConfig getWindowConfigs(Context context) {
         }
     }
 
-    protected WindowLifecycleListener<Event<I>> newWindowLifecycleListener(Context context) {
-        return new WindowLifecycleListener<Event<I>>() {
+    protected WindowLifecycleListener<Event<Record<I>>> newWindowLifecycleListener(Context context) {
+        return new WindowLifecycleListener<Event<Record<I>>>() {
             @Override
-            public void onExpiry(List<Event<I>> events) {
-                for (Event<I> event : events) {
+            public void onExpiry(List<Event<Record<I>>> events) {
+                for (Event<Record<I>> event : events) {
                     event.getRecord().ack();
                 }
             }
 
             @Override
-            public void onActivation(List<Event<I>> tuples, List<Event<I>> newTuples, List<Event<I>>
+            public void onActivation(List<Event<Record<I>>> tuples, List<Event<Record<I>>> newTuples, List<Event<Record<I>>>
                     expiredTuples, Long referenceTime) {
                 processWindow(
                         context,
@@ -220,7 +221,7 @@ public void onActivation(List<Event<I>> tuples, List<Event<I>> newTuples, List<E
         };
     }
 
-    private void processWindow(Context context, List<I> tuples, List<I> newTuples, List<I>
+    private void processWindow(Context context, List<Record<I>> tuples, List<Record<I>> newTuples, List<Record<I>>
             expiredTuples, Long referenceTime) {
 
         O output = null;
@@ -273,12 +274,12 @@ public O process(I input, Context context) throws Exception {
             initialize(context);
         }
 
-        Record<?> record = context.getCurrentRecord();
+        Record<I> record = (Record<I>)context.getCurrentRecord();
 
         if (isEventTime()) {
-            long ts = this.timestampExtractor.extractTimestamp(input);
+            long ts = this.timestampExtractor.extractTimestamp(record.getValue());
             if (this.waterMarkEventGenerator.track(record.getTopicName().get(), ts)) {
-                this.windowManager.add(input, ts, record);
+                this.windowManager.add(record, ts, record);
             } else {
                 if (this.windowConfig.getLateDataTopic() != null) {
                     context.publish(this.windowConfig.getLateDataTopic(), input);
@@ -290,12 +291,17 @@ public O process(I input, Context context) throws Exception {
                 record.ack();
             }
         } else {
-            this.windowManager.add(input, System.currentTimeMillis(), record);
+            this.windowManager.add(record, System.currentTimeMillis(), record);
         }
         return null;
     }
 
-    public O process(Window<I> inputWindow, WindowContext context) throws Exception {
-        return this.windowFunction.apply(inputWindow.get());
+    public O process(Window<Record<I>> inputWindow, WindowContext context) throws Exception {
+        if (this.bareWindowFunction != null) {
+            Collection<I> newCollection = inputWindow.get().stream().map(Record::getValue).collect(Collectors.toList());
+            return this.bareWindowFunction.apply(newCollection);
+        } else {
+            return this.windowFunction.process(inputWindow.get(), context);
+        }
     }
 }
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
index 21a78a2d7e..88ecebe536 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
@@ -24,9 +24,8 @@
 import org.apache.pulsar.functions.api.Record;
 
 import org.apache.pulsar.common.functions.WindowConfig;
-import org.apache.pulsar.functions.utils.WindowConfigUtils;
+import org.apache.pulsar.functions.api.WindowContext;
 import org.mockito.Mockito;
-import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -51,10 +50,10 @@
 
     private static class TestWindowFunctionExecutor extends WindowFunctionExecutor<Long, Long> {
 
-        List<Window<Long>> windows = new ArrayList<>();
+        List<Window<Record<Long>>> windows = new ArrayList<>();
 
         @Override
-        public Long process(Window<Long> inputWindow, WindowContext context) throws Exception {
+        public Long process(Window<Record<Long>> inputWindow, WindowContext context) throws Exception {
             windows.add(inputWindow);
             return null;
         }
@@ -150,22 +149,26 @@ public void testExecuteWithWrongJavaWindowFunctionType() throws Exception {
     public void testExecuteWithTs() throws Exception {
         long[] timestamps = {603, 605, 607, 618, 626, 636};
         for (long ts : timestamps) {
+            Record<?> record = Mockito.mock(Record.class);
+            Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
+            Mockito.doReturn(record).when(context).getCurrentRecord();
+            Mockito.doReturn(ts).when(record).getValue();
             testWindowedPulsarFunction.process(ts, context);
         }
         testWindowedPulsarFunction.waterMarkEventGenerator.run();
         assertEquals(3, testWindowedPulsarFunction.windows.size());
-        Window<Long> first = testWindowedPulsarFunction.windows.get(0);
+        Window<Record<Long>> first = testWindowedPulsarFunction.windows.get(0);
         assertArrayEquals(
                 new long[]{603, 605, 607},
-                new long[]{first.get().get(0), first.get().get(1), first.get().get(2)});
+                new long[]{first.get().get(0).getValue(), first.get().get(1).getValue(), first.get().get(2).getValue()});
 
-        Window<Long> second = testWindowedPulsarFunction.windows.get(1);
+        Window<Record<Long>> second = testWindowedPulsarFunction.windows.get(1);
         assertArrayEquals(
                 new long[]{603, 605, 607, 618},
-                new long[]{second.get().get(0), second.get().get(1), second.get().get(2), second.get().get(3)});
+                new long[]{second.get().get(0).getValue(), second.get().get(1).getValue(), second.get().get(2).getValue(), second.get().get(3).getValue()});
 
-        Window<Long> third = testWindowedPulsarFunction.windows.get(2);
-        assertArrayEquals(new long[]{618, 626}, new long[]{third.get().get(0), third.get().get(1)});
+        Window<Record<Long>> third = testWindowedPulsarFunction.windows.get(2);
+        assertArrayEquals(new long[]{618, 626}, new long[]{third.get().get(0).getValue(), third.get().get(1).getValue()});
     }
 
     @Test
@@ -207,6 +210,10 @@ public void testExecuteWithLateTupleStream() throws Exception {
 
         for (long ts : timestamps) {
             events.add(ts);
+            Record<?> record = Mockito.mock(Record.class);
+            Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
+            Mockito.doReturn(record).when(context).getCurrentRecord();
+            Mockito.doReturn(ts).when(record).getValue();
             testWindowedPulsarFunction.process(ts, context);
 
             //Update the watermark to this timestamp
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AddWindowFunction.java
similarity index 93%
rename from pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java
rename to pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AddWindowFunction.java
index ae01cecad4..d8f1864c42 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AddWindowFunction.java
@@ -27,7 +27,7 @@
  * Example Function that acts on a window of tuples at a time rather than per tuple basis.
  */
 @Slf4j
-public class WindowFunction implements Function <Collection<Integer>, Integer> {
+public class AddWindowFunction implements Function <Collection<Integer>, Integer> {
     @Override
     public Integer apply(Collection<Integer> integers) {
         return integers.stream().reduce(0, (x, y) -> x + y);
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java
new file mode 100644
index 0000000000..fe90d1ed08
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.api.WindowContext;
+import org.apache.pulsar.functions.api.WindowFunction;
+
+import java.util.Collection;
+
+/**
+ * Example Function that acts on a window of tuples at a time rather than per tuple basis.
+ */
+@Slf4j
+public class ContextWindowFunction implements WindowFunction<Integer, Integer> {
+    @Override
+    public Integer process(Collection<Record<Integer>> integers, WindowContext context) {
+        Integer retval = 0;
+        for (Record<Integer> record : integers) {
+            retval += record.getValue();
+        }
+        return retval;
+    }
+}
diff --git a/pulsar-functions/java-examples/src/main/resources/example-window-function-config.yaml b/pulsar-functions/java-examples/src/main/resources/example-window-function-config.yaml
index 3dc3279308..e0faf6f62f 100644
--- a/pulsar-functions/java-examples/src/main/resources/example-window-function-config.yaml
+++ b/pulsar-functions/java-examples/src/main/resources/example-window-function-config.yaml
@@ -20,7 +20,7 @@
 tenant: "test"
 namespace: "test-namespace"
 name: "example"
-className: "org.apache.pulsar.functions.api.examples.WindowFunction"
+className: "org.apache.pulsar.functions.api.examples.AddWindowFunction"
 inputs: ["test_src"]
 userConfig:
   "PublishTopic": "test_result"
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index b50bcd5620..6f2e76b90e 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -36,6 +36,7 @@
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.WindowFunction;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails.Runtime;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.Source;
@@ -91,19 +92,28 @@ public static int findAvailablePort() {
         Class<?>[] typeArgs;
         // if window function
         if (isWindowConfigPresent) {
-            java.util.function.Function function = (java.util.function.Function) userClass;
-            if (function == null) {
-                throw new IllegalArgumentException(
-                        String.format("The Java util function class %s could not be instantiated", userClass));
-            }
-            typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass());
-            if (!typeArgs[0].equals(Collection.class)) {
-                throw new IllegalArgumentException("Window function must take a collection as input");
+            if (userClass instanceof WindowFunction) {
+                WindowFunction function = (WindowFunction) userClass;
+                if (function == null) {
+                    throw new IllegalArgumentException(
+                            String.format("The WindowFunction class %s could not be instantiated", userClass));
+                }
+                typeArgs = TypeResolver.resolveRawArguments(WindowFunction.class, function.getClass());
+            } else {
+                java.util.function.Function function = (java.util.function.Function) userClass;
+                if (function == null) {
+                    throw new IllegalArgumentException(
+                            String.format("The Java util function class %s could not be instantiated", userClass));
+                }
+                typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass());
+                if (!typeArgs[0].equals(Collection.class)) {
+                    throw new IllegalArgumentException("Window function must take a collection as input");
+                }
+                Type type = TypeResolver.resolveGenericType(java.util.function.Function.class, function.getClass());
+                Type collectionType = ((ParameterizedType) type).getActualTypeArguments()[0];
+                Type actualInputType = ((ParameterizedType) collectionType).getActualTypeArguments()[0];
+                typeArgs[0] = (Class<?>) actualInputType;
             }
-            Type type = TypeResolver.resolveGenericType(java.util.function.Function.class, function.getClass());
-            Type collectionType = ((ParameterizedType) type).getActualTypeArguments()[0];
-            Type actualInputType = ((ParameterizedType) collectionType).getActualTypeArguments()[0];
-            typeArgs[0] = (Class<?>) actualInputType;
         } else {
             if (userClass instanceof Function) {
                 Function pulsarFunction = (Function) userClass;


With regards,
Apache Git Services