You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/03/13 21:33:53 UTC
[incubator-pulsar] branch master updated: Renamed PulsarFunction to
plain Function (#1377)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 381ccc0 Renamed PulsarFunction to plain Function (#1377)
381ccc0 is described below
commit 381ccc07e03d45a0ae3f5ccecef56c08cf4de089
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Tue Mar 13 14:33:51 2018 -0700
Renamed PulsarFunction to plain Function (#1377)
---
.../apache/pulsar/admin/cli/CmdFunctionsTest.java | 6 ++---
.../org/apache/pulsar/admin/cli/CmdFunctions.java | 21 ++++++++--------
.../api/{PulsarFunction.java => Function.java} | 2 +-
.../functions/api/utils/DefaultSerDeTest.java | 6 ++---
.../pulsar/functions/instance/JavaInstance.java | 17 ++++++-------
.../functions/instance/JavaInstanceRunnable.java | 16 ++++++------
.../instance/JavaInstanceRunnableProcessTest.java | 9 +++----
.../instance/JavaInstanceRunnableTest.java | 29 +++++++++++-----------
.../functions/instance/JavaInstanceTest.java | 4 +--
.../functions/api/examples/CounterFunction.java | 4 +--
.../api/examples/ExclamationFunction.java | 4 +--
.../functions/api/examples/LoggingFunction.java | 4 +--
.../functions/api/examples/PublishFunction.java | 4 +--
.../functions/api/examples/UserConfigFunction.java | 4 +--
.../functions/api/examples/UserMetricFunction.java | 4 +--
.../functions/api/examples/VoidFunction.java | 4 +--
.../rest/api/v2/FunctionApiV2ResourceTest.java | 4 +--
site/docs/latest/functions/api.md | 2 +-
18 files changed, 70 insertions(+), 74 deletions(-)
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index 2cd664a..ca20cfe 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -55,7 +55,7 @@ import org.apache.pulsar.client.admin.Functions;
import org.apache.pulsar.client.admin.PulsarAdminWithFunctions;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.functions.api.Context;
-import org.apache.pulsar.functions.api.PulsarFunction;
+import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.proto.Function.FunctionConfig;
import org.apache.pulsar.functions.utils.Reflections;
@@ -85,7 +85,7 @@ public class CmdFunctionsTest {
private Functions functions;
private CmdFunctions cmd;
- public class DummyFunction implements PulsarFunction<String, String> {
+ public class DummyFunction implements Function<String, String> {
@Override
public String process(String input, Context context) throws Exception {
return null;
@@ -111,7 +111,7 @@ public class CmdFunctionsTest {
mockStatic(Reflections.class);
when(Reflections.classExistsInJar(any(File.class), anyString())).thenReturn(true);
when(Reflections.classExists(anyString())).thenReturn(true);
- when(Reflections.classInJarImplementsIface(any(File.class), anyString(), eq(PulsarFunction.class)))
+ when(Reflections.classInJarImplementsIface(any(File.class), anyString(), eq(Function.class)))
.thenReturn(true);
when(Reflections.classImplementsIface(anyString(), any())).thenReturn(true);
when(Reflections.createInstance(eq(DummyFunction.class.getName()), any(File.class))).thenReturn(new DummyFunction());
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index c8010e7..ae348ae 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -46,7 +46,7 @@ import org.apache.bookkeeper.clients.utils.NetUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminWithFunctions;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.functions.api.PulsarFunction;
+import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.proto.Function.FunctionConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
@@ -62,7 +62,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.function.Function;
import org.apache.pulsar.functions.utils.Utils;
@Slf4j
@@ -250,13 +249,13 @@ public class CmdFunctions extends CmdBase {
private void doJavaSubmitChecks(FunctionConfig.Builder functionConfigBuilder) {
File file = new File(jarFile);
- // check if the function class exists in Jar and it implements PulsarFunction class
+ // check if the function class exists in Jar and it implements Function class
if (!Reflections.classExistsInJar(file, functionConfigBuilder.getClassName())) {
throw new IllegalArgumentException(String.format("Pulsar function class %s does not exist in jar %s",
functionConfigBuilder.getClassName(), jarFile));
- } else if (!Reflections.classInJarImplementsIface(file, functionConfigBuilder.getClassName(), PulsarFunction.class)
- && !Reflections.classInJarImplementsIface(file, functionConfigBuilder.getClassName(), Function.class)) {
- throw new IllegalArgumentException(String.format("Pulsar function class %s in jar %s implements neither PulsarFunction nor java.util.Function",
+ } else if (!Reflections.classInJarImplementsIface(file, functionConfigBuilder.getClassName(), Function.class)
+ && !Reflections.classInJarImplementsIface(file, functionConfigBuilder.getClassName(), java.util.function.Function.class)) {
+ throw new IllegalArgumentException(String.format("Pulsar function class %s in jar %s implements neither Function nor java.util.function.Function",
functionConfigBuilder.getClassName(), jarFile));
}
@@ -269,20 +268,20 @@ public class CmdFunctions extends CmdBase {
Object userClass = Reflections.createInstance(functionConfigBuilder.getClassName(), file);
Class<?>[] typeArgs;
- if (userClass instanceof PulsarFunction) {
- PulsarFunction pulsarFunction = (PulsarFunction) userClass;
+ if (userClass instanceof Function) {
+ Function pulsarFunction = (Function) userClass;
if (pulsarFunction == null) {
throw new IllegalArgumentException(String.format("Pulsar function class %s could not be instantiated from jar %s",
functionConfigBuilder.getClassName(), jarFile));
}
- typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass());
+ typeArgs = TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass());
} else {
- Function function = (Function) userClass;
+ java.util.function.Function function = (java.util.function.Function) userClass;
if (function == null) {
throw new IllegalArgumentException(String.format("Java Util function class %s could not be instantiated from jar %s",
functionConfigBuilder.getClassName(), jarFile));
}
- typeArgs = TypeResolver.resolveRawArguments(Function.class, function.getClass());
+ typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass());
}
// Check if the Input serialization/deserialization class exists in jar or already loaded and that it
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/PulsarFunction.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Function.java
similarity index 97%
rename from pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/PulsarFunction.java
rename to pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Function.java
index 82406a8..ca292eb 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/PulsarFunction.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Function.java
@@ -26,7 +26,7 @@ package org.apache.pulsar.functions.api;
* meet your needs, you can use the byte stream handler defined in RawRequestHandler.
*/
@FunctionalInterface
-public interface PulsarFunction<I, O> {
+public interface Function<I, O> {
/**
* Process the input.
* @return the output
diff --git a/pulsar-functions/api-java/src/test/java/org/apache/pulsar/functions/api/utils/DefaultSerDeTest.java b/pulsar-functions/api-java/src/test/java/org/apache/pulsar/functions/api/utils/DefaultSerDeTest.java
index d878650..3b1b118 100644
--- a/pulsar-functions/api-java/src/test/java/org/apache/pulsar/functions/api/utils/DefaultSerDeTest.java
+++ b/pulsar-functions/api-java/src/test/java/org/apache/pulsar/functions/api/utils/DefaultSerDeTest.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.functions.api.utils;
import net.jodah.typetools.TypeResolver;
import org.apache.pulsar.functions.api.Context;
-import org.apache.pulsar.functions.api.PulsarFunction;
+import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.SerDe;
import org.testng.annotations.Test;
@@ -82,7 +82,7 @@ public class DefaultSerDeTest {
assertEquals(result, input);
}
- private class SimplePulsarFunction implements PulsarFunction<String, String> {
+ private class SimplePulsarFunction implements Function<String, String> {
@Override
public String process(String input, Context context) {
return null;
@@ -92,7 +92,7 @@ public class DefaultSerDeTest {
@Test
public void testPulsarFunction() {
SimplePulsarFunction pulsarFunction = new SimplePulsarFunction();
- Class<?>[] typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass());
+ Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass());
SerDe serDe = new DefaultSerDe(String.class);
Class<?>[] inputSerdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
assertTrue(inputSerdeTypeArgs[0].isAssignableFrom(typeArgs[0]));
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index aa80e71..14f3171 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -24,11 +24,10 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.functions.api.PulsarFunction;
+import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import java.util.Map;
-import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,8 +42,8 @@ public class JavaInstance implements AutoCloseable {
@Getter(AccessLevel.PACKAGE)
private final ContextImpl context;
- private PulsarFunction pulsarFunction;
- private Function javaUtilFunction;
+ private Function function;
+ private java.util.function.Function javaUtilFunction;
public JavaInstance(InstanceConfig config, Object userClassObject,
ClassLoader clsLoader,
@@ -56,10 +55,10 @@ public class JavaInstance implements AutoCloseable {
this.context = new ContextImpl(config, instanceLog, pulsarClient, clsLoader, sourceConsumers);
// create the functions
- if (userClassObject instanceof PulsarFunction) {
- this.pulsarFunction = (PulsarFunction) userClassObject;
+ if (userClassObject instanceof Function) {
+ this.function = (Function) userClassObject;
} else {
- this.javaUtilFunction = (Function) userClassObject;
+ this.javaUtilFunction = (java.util.function.Function) userClassObject;
}
}
@@ -73,8 +72,8 @@ public class JavaInstance implements AutoCloseable {
JavaExecutionResult executionResult = new JavaExecutionResult();
try {
Object output;
- if (pulsarFunction != null) {
- output = pulsarFunction.process(input, context);
+ if (function != null) {
+ output = function.process(input, context);
} else {
output = javaUtilFunction.apply(input);
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 06904cf..fffe8f5 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -32,7 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
+
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
@@ -53,7 +53,7 @@ import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.functions.api.PulsarFunction;
+import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.proto.Function.FunctionConfig;
import org.apache.pulsar.functions.proto.Function.FunctionConfig.ProcessingGuarantees;
@@ -171,16 +171,16 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
Object object = Reflections.createInstance(
instanceConfig.getFunctionConfig().getClassName(),
clsLoader);
- if (!(object instanceof PulsarFunction) && !(object instanceof Function)) {
- throw new RuntimeException("User class must either be PulsarFunction or java.util.Function");
+ if (!(object instanceof Function) && !(object instanceof java.util.function.Function)) {
+ throw new RuntimeException("User class must either be Function or java.util.Function");
}
Class<?>[] typeArgs;
- if (object instanceof PulsarFunction) {
- PulsarFunction pulsarFunction = (PulsarFunction) object;
- typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass());
- } else {
+ if (object instanceof Function) {
Function function = (Function) object;
typeArgs = TypeResolver.resolveRawArguments(Function.class, function.getClass());
+ } else {
+ java.util.function.Function function = (java.util.function.Function) object;
+ typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass());
}
// setup serde
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
index affe68d..e8e761f 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
@@ -74,7 +74,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.functions.api.Context;
-import org.apache.pulsar.functions.api.PulsarFunction;
+import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.proto.Function.FunctionConfig;
import org.apache.pulsar.functions.proto.Function.FunctionConfig.ProcessingGuarantees;
@@ -87,7 +87,6 @@ import org.apache.pulsar.functions.utils.Utils;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.testng.PowerMockObjectFactory;
import org.powermock.reflect.Whitebox;
import org.testng.IObjectFactory;
import org.testng.annotations.BeforeMethod;
@@ -107,14 +106,14 @@ public class JavaInstanceRunnableProcessTest {
return new org.powermock.modules.testng.PowerMockObjectFactory();
}
- private static class TestFunction implements PulsarFunction<String, String> {
+ private static class TestFunction implements Function<String, String> {
@Override
public String process(String input, Context context) throws Exception {
return input + "!";
}
}
- private static class TestFailureFunction implements PulsarFunction<String, String> {
+ private static class TestFailureFunction implements Function<String, String> {
private int processId2Count = 0;
@@ -134,7 +133,7 @@ public class JavaInstanceRunnableProcessTest {
}
}
- private static class TestVoidFunction implements PulsarFunction<String, Void> {
+ private static class TestVoidFunction implements Function<String, Void> {
@Override
public Void process(String input, Context context) throws Exception {
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index b4712ee..1e9781c 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -22,11 +22,10 @@ import lombok.Getter;
import lombok.Setter;
import net.jodah.typetools.TypeResolver;
import org.apache.pulsar.functions.api.Context;
-import org.apache.pulsar.functions.api.PulsarFunction;
+import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.proto.Function.FunctionConfig;
-import org.apache.pulsar.functions.instance.InstanceConfig;
import org.testng.annotations.Test;
import java.lang.reflect.InvocationTargetException;
@@ -84,7 +83,7 @@ public class JavaInstanceRunnableTest {
private Integer age;
}
- private class ComplexTypeHandler implements PulsarFunction<String, ComplexUserDefinedType> {
+ private class ComplexTypeHandler implements Function<String, ComplexUserDefinedType> {
@Override
public ComplexUserDefinedType process(String input, Context context) throws Exception {
return new ComplexUserDefinedType();
@@ -103,14 +102,14 @@ public class JavaInstanceRunnableTest {
}
}
- private class VoidInputHandler implements PulsarFunction<Void, String> {
+ private class VoidInputHandler implements Function<Void, String> {
@Override
public String process(Void input, Context context) throws Exception {
return new String("Interesting");
}
}
- private class VoidOutputHandler implements PulsarFunction<String, Void> {
+ private class VoidOutputHandler implements Function<String, Void> {
@Override
public Void process(String input, Context context) throws Exception {
return null;
@@ -127,7 +126,7 @@ public class JavaInstanceRunnableTest {
Method method = makeAccessible(runnable);
VoidInputHandler pulsarFunction = new VoidInputHandler();
ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
- Class<?>[] typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass());
+ Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass());
method.invoke(runnable, typeArgs, clsLoader);
assertFalse(true);
} catch (InvocationTargetException ex) {
@@ -147,7 +146,7 @@ public class JavaInstanceRunnableTest {
Method method = makeAccessible(runnable);
ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
VoidOutputHandler pulsarFunction = new VoidOutputHandler();
- Class<?>[] typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass());
+ Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass());
method.invoke(runnable, typeArgs, clsLoader);
} catch (Exception ex) {
assertTrue(false);
@@ -163,8 +162,8 @@ public class JavaInstanceRunnableTest {
JavaInstanceRunnable runnable = createRunnable(true, DefaultSerDe.class.getName());
Method method = makeAccessible(runnable);
ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
- PulsarFunction pulsarFunction = (PulsarFunction<String, String>) (input, context) -> input + "-lambda";
- Class<?>[] typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass());
+ Function function = (Function<String, String>) (input, context) -> input + "-lambda";
+ Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, function.getClass());
method.invoke(runnable, typeArgs, clsLoader);
fail("Should fail constructing java instance if function type is inconsistent with serde type");
} catch (InvocationTargetException ex) {
@@ -183,8 +182,8 @@ public class JavaInstanceRunnableTest {
JavaInstanceRunnable runnable = createRunnable(false, null);
Method method = makeAccessible(runnable);
ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
- PulsarFunction pulsarFunction = (PulsarFunction<String, String>) (input, context) -> input + "-lambda";
- Class<?>[] typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass());
+ Function function = (Function<String, String>) (input, context) -> input + "-lambda";
+ Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, function.getClass());
method.invoke(runnable, typeArgs, clsLoader);
} catch (Exception ex) {
ex.printStackTrace();
@@ -202,8 +201,8 @@ public class JavaInstanceRunnableTest {
JavaInstanceRunnable runnable = createRunnable(false, DefaultSerDe.class.getName());
Method method = makeAccessible(runnable);
ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
- PulsarFunction pulsarFunction = (PulsarFunction<String, String>) (input, context) -> input + "-lambda";
- Class<?>[] typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass());
+ Function function = (Function<String, String>) (input, context) -> input + "-lambda";
+ Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, function.getClass());
method.invoke(runnable, typeArgs, clsLoader);
} catch (Exception ex) {
assertTrue(false);
@@ -219,8 +218,8 @@ public class JavaInstanceRunnableTest {
JavaInstanceRunnable runnable = createRunnable(false, IntegerSerDe.class.getName());
Method method = makeAccessible(runnable);
ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
- PulsarFunction pulsarFunction = (PulsarFunction<String, String>) (input, context) -> input + "-lambda";
- Class<?>[] typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, pulsarFunction.getClass());
+ Function function = (Function<String, String>) (input, context) -> input + "-lambda";
+ Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, function.getClass());
method.invoke(runnable, typeArgs, clsLoader);
fail("Should fail constructing java instance if function type is inconsistent with serde type");
} catch (InvocationTargetException ex) {
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
index 1583349..53c93a4 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
@@ -22,7 +22,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.functions.api.PulsarFunction;
+import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.proto.Function.FunctionConfig;
import org.testng.annotations.Test;
@@ -49,7 +49,7 @@ public class JavaInstanceTest {
InstanceConfig config = createInstanceConfig();
JavaInstance instance = new JavaInstance(
config,
- (PulsarFunction<String, String>) (input, context) -> input + "-lambda",
+ (Function<String, String>) (input, context) -> input + "-lambda",
null, null, new HashMap<>());
String testString = "ABC123";
JavaExecutionResult result = instance.handleMessage(MessageId.earliest, "random", testString);
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CounterFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CounterFunction.java
index e41055f..7bf1657 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CounterFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CounterFunction.java
@@ -19,9 +19,9 @@
package org.apache.pulsar.functions.api.examples;
import org.apache.pulsar.functions.api.Context;
-import org.apache.pulsar.functions.api.PulsarFunction;
+import org.apache.pulsar.functions.api.Function;
-public class CounterFunction implements PulsarFunction<String, Void> {
+public class CounterFunction implements Function<String, Void> {
@Override
public Void process(String input, Context context) throws Exception {
String[] parts = input.split("\\.");
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java
index 12e97db..ae8ef76 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java
@@ -19,9 +19,9 @@
package org.apache.pulsar.functions.api.examples;
import org.apache.pulsar.functions.api.Context;
-import org.apache.pulsar.functions.api.PulsarFunction;
+import org.apache.pulsar.functions.api.Function;
-public class ExclamationFunction implements PulsarFunction<String, String> {
+public class ExclamationFunction implements Function<String, String> {
@Override
public String process(String input, Context context) {
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java
index f85dd90..5d2ff5a 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/LoggingFunction.java
@@ -20,12 +20,12 @@ package org.apache.pulsar.functions.api.examples;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.pulsar.functions.api.Context;
-import org.apache.pulsar.functions.api.PulsarFunction;
+import org.apache.pulsar.functions.api.Function;
/**
* A function with logging example.
*/
-public class LoggingFunction implements PulsarFunction<String, String> {
+public class LoggingFunction implements Function<String, String> {
private static final AtomicIntegerFieldUpdater<LoggingFunction> COUNTER_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(LoggingFunction.class, "counter");
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
index 6ef92ae..236d651 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
@@ -19,10 +19,10 @@
package org.apache.pulsar.functions.api.examples;
import org.apache.pulsar.functions.api.Context;
-import org.apache.pulsar.functions.api.PulsarFunction;
+import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
-public class PublishFunction implements PulsarFunction<String, Void> {
+public class PublishFunction implements Function<String, Void> {
@Override
public Void process(String input, Context context) {
context.publish(context.getUserConfigValue("PublishTopic"), input + "!", DefaultSerDe.class.getName());
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java
index 2d2ac2a..65df29a 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java
@@ -19,9 +19,9 @@
package org.apache.pulsar.functions.api.examples;
import org.apache.pulsar.functions.api.Context;
-import org.apache.pulsar.functions.api.PulsarFunction;
+import org.apache.pulsar.functions.api.Function;
-public class UserConfigFunction implements PulsarFunction<String, String> {
+public class UserConfigFunction implements Function<String, String> {
@Override
public String process(String input, Context context) {
context.getLogger().info("My Config is " + context.getUserConfigValue("MyOwnConfig"));
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserMetricFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserMetricFunction.java
index ab30d11..2a9b95c 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserMetricFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserMetricFunction.java
@@ -19,9 +19,9 @@
package org.apache.pulsar.functions.api.examples;
import org.apache.pulsar.functions.api.Context;
-import org.apache.pulsar.functions.api.PulsarFunction;
+import org.apache.pulsar.functions.api.Function;
-public class UserMetricFunction implements PulsarFunction<String, Void> {
+public class UserMetricFunction implements Function<String, Void> {
@Override
public Void process(String input, Context context) {
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/VoidFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/VoidFunction.java
index b0a05a3..abcc663 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/VoidFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/VoidFunction.java
@@ -19,9 +19,9 @@
package org.apache.pulsar.functions.api.examples;
import org.apache.pulsar.functions.api.Context;
-import org.apache.pulsar.functions.api.PulsarFunction;
+import org.apache.pulsar.functions.api.Function;
-public class VoidFunction implements PulsarFunction<String, Void> {
+public class VoidFunction implements Function<String, Void> {
@Override
public Void process(String input, Context context) {
return null;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 721d4b4..7dabeed 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -44,7 +44,7 @@ import org.apache.logging.log4j.core.config.Configurator;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Context;
-import org.apache.pulsar.functions.api.PulsarFunction;
+import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData;
import org.apache.pulsar.functions.proto.Function.FunctionConfig;
@@ -76,7 +76,7 @@ public class FunctionApiV2ResourceTest {
return new org.powermock.modules.testng.PowerMockObjectFactory();
}
- private static final class TestFunction implements PulsarFunction<String, String> {
+ private static final class TestFunction implements Function<String, String> {
public String process(String input, Context context) throws Exception {
return input;
diff --git a/site/docs/latest/functions/api.md b/site/docs/latest/functions/api.md
index 895679e..6e7581d 100644
--- a/site/docs/latest/functions/api.md
+++ b/site/docs/latest/functions/api.md
@@ -39,7 +39,7 @@ Both the [Java](#java-functions-with-context) and [Python](#python-functions-wit
Writing Pulsar Functions in Java involves implementing one of two interfaces:
* The [`java.util.Function`](https://docs.oracle.com/javase/8/docs/api/java/util/function/Function.html) interface
-* The {% javadoc PulsarFunction client org.apache.pulsar.functions.api.PulsarFunction %} interface. This interface works much like the `java.util.Function` ihterface, but with the important difference
+* The {% javadoc PulsarFunction client org.apache.pulsar.functions.api.Function %} interface. This interface works much like the `java.util.Function` ihterface, but with the important difference
### Java functions without context
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.