You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/01/14 17:54:52 UTC
[pulsar] Diff for: [GitHub] srkukarni merged pull request #3324: Add
Windowfunction interface to functions api
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowContext.java
similarity index 99%
rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java
rename to pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowContext.java
index 2f1f2e700c..0abc87ad72 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowContext.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.functions.windowing;
+package org.apache.pulsar.functions.api;
import org.slf4j.Logger;
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowFunction.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowFunction.java
new file mode 100644
index 0000000000..6f2c4210a4
--- /dev/null
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/WindowFunction.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api;
+
+import java.util.Collection;
+
+/**
+ * This is the interface of the windowed function api. The process method is called
+ * for every triggered window.
+ */
+@FunctionalInterface
+public interface WindowFunction<I, O> {
+ /**
+ * Process the input.
+ * @return the output
+ */
+ O process(Collection<Record<I>> input, WindowContext context) throws Exception;
+}
\ No newline at end of file
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java
index 41e8ebed76..de00f52fa4 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.functions.windowing;
import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.WindowContext;
import org.slf4j.Logger;
import java.nio.ByteBuffer;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
index e288261d53..19459499cd 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
@@ -30,9 +30,7 @@
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.functions.api.Context;
-import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.api.*;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.common.functions.WindowConfig;
import org.apache.pulsar.functions.windowing.evictors.CountEvictionPolicy;
@@ -51,22 +49,23 @@
private boolean initialized;
protected WindowConfig windowConfig;
- private WindowManager<I> windowManager;
+ private WindowManager<Record<I>> windowManager;
private TimestampExtractor<I> timestampExtractor;
- protected transient WaterMarkEventGenerator<I> waterMarkEventGenerator;
+ protected transient WaterMarkEventGenerator<Record<I>> waterMarkEventGenerator;
- protected java.util.function.Function<Collection<I>, O> windowFunction;
+ protected java.util.function.Function<Collection<I>, O> bareWindowFunction;
+ protected WindowFunction<I, O> windowFunction;
public void initialize(Context context) {
this.windowConfig = this.getWindowConfigs(context);
- this.windowFunction = intializeUserFunction(this.windowConfig);
+ initializeUserFunction(this.windowConfig);
log.info("Window Config: {}", this.windowConfig);
this.windowManager = this.getWindowManager(this.windowConfig, context);
this.initialized = true;
this.start();
}
- private java.util.function.Function<Collection<I>, O> intializeUserFunction(WindowConfig windowConfig) {
+ private void initializeUserFunction(WindowConfig windowConfig) {
String actualWindowFunctionClassName = windowConfig.getActualWindowFunctionClassName();
ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
Object userClassObject = Reflections.createInstance(
@@ -76,10 +75,12 @@ public void initialize(Context context) {
Class<?>[] typeArgs = TypeResolver.resolveRawArguments(
java.util.function.Function.class, userClassObject.getClass());
if (typeArgs[0].equals(Collection.class)) {
- return (java.util.function.Function) userClassObject;
+ bareWindowFunction = (java.util.function.Function) userClassObject;
} else {
throw new IllegalArgumentException("Window function must take a collection as input");
}
+ } else if (userClassObject instanceof WindowFunction) {
+ windowFunction = (WindowFunction) userClassObject;
} else {
throw new IllegalArgumentException("Window function does not implement the correct interface");
}
@@ -97,10 +98,10 @@ private WindowConfig getWindowConfigs(Context context) {
return windowConfig;
}
- private WindowManager<I> getWindowManager(WindowConfig windowConfig, Context context) {
+ private WindowManager<Record<I>> getWindowManager(WindowConfig windowConfig, Context context) {
- WindowLifecycleListener<Event<I>> lifecycleListener = newWindowLifecycleListener(context);
- WindowManager<I> manager = new WindowManager<>(lifecycleListener, new ConcurrentLinkedQueue<>());
+ WindowLifecycleListener<Event<Record<I>>> lifecycleListener = newWindowLifecycleListener(context);
+ WindowManager<Record<I>> manager = new WindowManager<>(lifecycleListener, new ConcurrentLinkedQueue<>());
if (this.windowConfig.getTimestampExtractorClassName() != null) {
this.timestampExtractor = getTimeStampExtractor(windowConfig);
@@ -115,8 +116,8 @@ private WindowConfig getWindowConfigs(Context context) {
}
}
- EvictionPolicy<I, ?> evictionPolicy = getEvictionPolicy(windowConfig);
- TriggerPolicy<I, ?> triggerPolicy = getTriggerPolicy(windowConfig, manager,
+ EvictionPolicy<Record<I>, ?> evictionPolicy = getEvictionPolicy(windowConfig);
+ TriggerPolicy<Record<I>, ?> triggerPolicy = getTriggerPolicy(windowConfig, manager,
evictionPolicy, context);
manager.setEvictionPolicy(evictionPolicy);
manager.setTriggerPolicy(triggerPolicy);
@@ -162,8 +163,8 @@ private WindowConfig getWindowConfigs(Context context) {
return (TimestampExtractor<I>) result;
}
- private TriggerPolicy<I, ?> getTriggerPolicy(WindowConfig windowConfig, WindowManager<I> manager,
- EvictionPolicy<I, ?> evictionPolicy, Context context) {
+ private TriggerPolicy<Record<I>, ?> getTriggerPolicy(WindowConfig windowConfig, WindowManager<Record<I>> manager,
+ EvictionPolicy<Record<I>, ?> evictionPolicy, Context context) {
if (windowConfig.getSlidingIntervalCount() != null) {
if (this.isEventTime()) {
return new WatermarkCountTriggerPolicy<>(
@@ -181,7 +182,7 @@ private WindowConfig getWindowConfigs(Context context) {
}
}
- private EvictionPolicy<I, ?> getEvictionPolicy(WindowConfig windowConfig) {
+ private EvictionPolicy<Record<I>, ?> getEvictionPolicy(WindowConfig windowConfig) {
if (windowConfig.getWindowLengthCount() != null) {
if (this.isEventTime()) {
return new WatermarkCountEvictionPolicy<>(windowConfig.getWindowLengthCount());
@@ -198,17 +199,17 @@ private WindowConfig getWindowConfigs(Context context) {
}
}
- protected WindowLifecycleListener<Event<I>> newWindowLifecycleListener(Context context) {
- return new WindowLifecycleListener<Event<I>>() {
+ protected WindowLifecycleListener<Event<Record<I>>> newWindowLifecycleListener(Context context) {
+ return new WindowLifecycleListener<Event<Record<I>>>() {
@Override
- public void onExpiry(List<Event<I>> events) {
- for (Event<I> event : events) {
+ public void onExpiry(List<Event<Record<I>>> events) {
+ for (Event<Record<I>> event : events) {
event.getRecord().ack();
}
}
@Override
- public void onActivation(List<Event<I>> tuples, List<Event<I>> newTuples, List<Event<I>>
+ public void onActivation(List<Event<Record<I>>> tuples, List<Event<Record<I>>> newTuples, List<Event<Record<I>>>
expiredTuples, Long referenceTime) {
processWindow(
context,
@@ -220,7 +221,7 @@ public void onActivation(List<Event<I>> tuples, List<Event<I>> newTuples, List<E
};
}
- private void processWindow(Context context, List<I> tuples, List<I> newTuples, List<I>
+ private void processWindow(Context context, List<Record<I>> tuples, List<Record<I>> newTuples, List<Record<I>>
expiredTuples, Long referenceTime) {
O output = null;
@@ -273,12 +274,12 @@ public O process(I input, Context context) throws Exception {
initialize(context);
}
- Record<?> record = context.getCurrentRecord();
+ Record<I> record = (Record<I>)context.getCurrentRecord();
if (isEventTime()) {
- long ts = this.timestampExtractor.extractTimestamp(input);
+ long ts = this.timestampExtractor.extractTimestamp(record.getValue());
if (this.waterMarkEventGenerator.track(record.getTopicName().get(), ts)) {
- this.windowManager.add(input, ts, record);
+ this.windowManager.add(record, ts, record);
} else {
if (this.windowConfig.getLateDataTopic() != null) {
context.publish(this.windowConfig.getLateDataTopic(), input);
@@ -290,12 +291,17 @@ public O process(I input, Context context) throws Exception {
record.ack();
}
} else {
- this.windowManager.add(input, System.currentTimeMillis(), record);
+ this.windowManager.add(record, System.currentTimeMillis(), record);
}
return null;
}
- public O process(Window<I> inputWindow, WindowContext context) throws Exception {
- return this.windowFunction.apply(inputWindow.get());
+ public O process(Window<Record<I>> inputWindow, WindowContext context) throws Exception {
+ if (this.bareWindowFunction != null) {
+ Collection<I> newCollection = inputWindow.get().stream().map(Record::getValue).collect(Collectors.toList());
+ return this.bareWindowFunction.apply(newCollection);
+ } else {
+ return this.windowFunction.process(inputWindow.get(), context);
+ }
}
}
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
index 21a78a2d7e..88ecebe536 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
@@ -24,9 +24,8 @@
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.common.functions.WindowConfig;
-import org.apache.pulsar.functions.utils.WindowConfigUtils;
+import org.apache.pulsar.functions.api.WindowContext;
import org.mockito.Mockito;
-import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -51,10 +50,10 @@
private static class TestWindowFunctionExecutor extends WindowFunctionExecutor<Long, Long> {
- List<Window<Long>> windows = new ArrayList<>();
+ List<Window<Record<Long>>> windows = new ArrayList<>();
@Override
- public Long process(Window<Long> inputWindow, WindowContext context) throws Exception {
+ public Long process(Window<Record<Long>> inputWindow, WindowContext context) throws Exception {
windows.add(inputWindow);
return null;
}
@@ -150,22 +149,26 @@ public void testExecuteWithWrongJavaWindowFunctionType() throws Exception {
public void testExecuteWithTs() throws Exception {
long[] timestamps = {603, 605, 607, 618, 626, 636};
for (long ts : timestamps) {
+ Record<?> record = Mockito.mock(Record.class);
+ Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
+ Mockito.doReturn(record).when(context).getCurrentRecord();
+ Mockito.doReturn(ts).when(record).getValue();
testWindowedPulsarFunction.process(ts, context);
}
testWindowedPulsarFunction.waterMarkEventGenerator.run();
assertEquals(3, testWindowedPulsarFunction.windows.size());
- Window<Long> first = testWindowedPulsarFunction.windows.get(0);
+ Window<Record<Long>> first = testWindowedPulsarFunction.windows.get(0);
assertArrayEquals(
new long[]{603, 605, 607},
- new long[]{first.get().get(0), first.get().get(1), first.get().get(2)});
+ new long[]{first.get().get(0).getValue(), first.get().get(1).getValue(), first.get().get(2).getValue()});
- Window<Long> second = testWindowedPulsarFunction.windows.get(1);
+ Window<Record<Long>> second = testWindowedPulsarFunction.windows.get(1);
assertArrayEquals(
new long[]{603, 605, 607, 618},
- new long[]{second.get().get(0), second.get().get(1), second.get().get(2), second.get().get(3)});
+ new long[]{second.get().get(0).getValue(), second.get().get(1).getValue(), second.get().get(2).getValue(), second.get().get(3).getValue()});
- Window<Long> third = testWindowedPulsarFunction.windows.get(2);
- assertArrayEquals(new long[]{618, 626}, new long[]{third.get().get(0), third.get().get(1)});
+ Window<Record<Long>> third = testWindowedPulsarFunction.windows.get(2);
+ assertArrayEquals(new long[]{618, 626}, new long[]{third.get().get(0).getValue(), third.get().get(1).getValue()});
}
@Test
@@ -207,6 +210,10 @@ public void testExecuteWithLateTupleStream() throws Exception {
for (long ts : timestamps) {
events.add(ts);
+ Record<?> record = Mockito.mock(Record.class);
+ Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
+ Mockito.doReturn(record).when(context).getCurrentRecord();
+ Mockito.doReturn(ts).when(record).getValue();
testWindowedPulsarFunction.process(ts, context);
//Update the watermark to this timestamp
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AddWindowFunction.java
similarity index 93%
rename from pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java
rename to pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AddWindowFunction.java
index ae01cecad4..d8f1864c42 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AddWindowFunction.java
@@ -27,7 +27,7 @@
* Example Function that acts on a window of tuples at a time rather than per tuple basis.
*/
@Slf4j
-public class WindowFunction implements Function <Collection<Integer>, Integer> {
+public class AddWindowFunction implements Function <Collection<Integer>, Integer> {
@Override
public Integer apply(Collection<Integer> integers) {
return integers.stream().reduce(0, (x, y) -> x + y);
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java
new file mode 100644
index 0000000000..fe90d1ed08
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ContextWindowFunction.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.api.WindowContext;
+import org.apache.pulsar.functions.api.WindowFunction;
+
+import java.util.Collection;
+
+/**
+ * Example Function that acts on a window of tuples at a time rather than per tuple basis.
+ */
+@Slf4j
+public class ContextWindowFunction implements WindowFunction<Integer, Integer> {
+ @Override
+ public Integer process(Collection<Record<Integer>> integers, WindowContext context) {
+ Integer retval = 0;
+ for (Record<Integer> record : integers) {
+ retval += record.getValue();
+ }
+ return retval;
+ }
+}
diff --git a/pulsar-functions/java-examples/src/main/resources/example-window-function-config.yaml b/pulsar-functions/java-examples/src/main/resources/example-window-function-config.yaml
index 3dc3279308..e0faf6f62f 100644
--- a/pulsar-functions/java-examples/src/main/resources/example-window-function-config.yaml
+++ b/pulsar-functions/java-examples/src/main/resources/example-window-function-config.yaml
@@ -20,7 +20,7 @@
tenant: "test"
namespace: "test-namespace"
name: "example"
-className: "org.apache.pulsar.functions.api.examples.WindowFunction"
+className: "org.apache.pulsar.functions.api.examples.AddWindowFunction"
inputs: ["test_src"]
userConfig:
"PublishTopic": "test_result"
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index b50bcd5620..6f2e76b90e 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -36,6 +36,7 @@
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.WindowFunction;
import org.apache.pulsar.functions.proto.Function.FunctionDetails.Runtime;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
@@ -91,19 +92,28 @@ public static int findAvailablePort() {
Class<?>[] typeArgs;
// if window function
if (isWindowConfigPresent) {
- java.util.function.Function function = (java.util.function.Function) userClass;
- if (function == null) {
- throw new IllegalArgumentException(
- String.format("The Java util function class %s could not be instantiated", userClass));
- }
- typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass());
- if (!typeArgs[0].equals(Collection.class)) {
- throw new IllegalArgumentException("Window function must take a collection as input");
+ if (userClass instanceof WindowFunction) {
+ WindowFunction function = (WindowFunction) userClass;
+ if (function == null) {
+ throw new IllegalArgumentException(
+ String.format("The WindowFunction class %s could not be instantiated", userClass));
+ }
+ typeArgs = TypeResolver.resolveRawArguments(WindowFunction.class, function.getClass());
+ } else {
+ java.util.function.Function function = (java.util.function.Function) userClass;
+ if (function == null) {
+ throw new IllegalArgumentException(
+ String.format("The Java util function class %s could not be instantiated", userClass));
+ }
+ typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass());
+ if (!typeArgs[0].equals(Collection.class)) {
+ throw new IllegalArgumentException("Window function must take a collection as input");
+ }
+ Type type = TypeResolver.resolveGenericType(java.util.function.Function.class, function.getClass());
+ Type collectionType = ((ParameterizedType) type).getActualTypeArguments()[0];
+ Type actualInputType = ((ParameterizedType) collectionType).getActualTypeArguments()[0];
+ typeArgs[0] = (Class<?>) actualInputType;
}
- Type type = TypeResolver.resolveGenericType(java.util.function.Function.class, function.getClass());
- Type collectionType = ((ParameterizedType) type).getActualTypeArguments()[0];
- Type actualInputType = ((ParameterizedType) collectionType).getActualTypeArguments()[0];
- typeArgs[0] = (Class<?>) actualInputType;
} else {
if (userClass instanceof Function) {
Function pulsarFunction = (Function) userClass;
With regards,
Apache Git Services