You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/09/08 10:45:59 UTC
[pulsar] branch master updated: [PIP-193] Support Transform Function with LocalRunner (#17445)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 327648cbb26 [PIP-193] Support Transform Function with LocalRunner (#17445)
327648cbb26 is described below
commit 327648cbb268ca1fa1770d3254d00a818adc2e19
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Thu Sep 8 12:45:48 2022 +0200
[PIP-193] Support Transform Function with LocalRunner (#17445)
---
.../worker/PulsarFunctionLocalRunTest.java | 15 +++
.../org/apache/pulsar/functions/LocalRunner.java | 113 ++++++++++++++-------
2 files changed, 90 insertions(+), 38 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index 5d6395a9813..d90b971b5fa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -896,6 +896,11 @@ public class PulsarFunctionLocalRunTest {
}
private void testPulsarSinkLocalRun(String jarFilePathUrl, int parallelism, String className) throws Exception {
+ testPulsarSinkLocalRun(jarFilePathUrl, parallelism, className, null, null);
+ }
+
+ private void testPulsarSinkLocalRun(String jarFilePathUrl, int parallelism, String className,
+ String transformFunction, String transformFunctionClassName) throws Exception {
final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
final String sourceTopic = "persistent://" + replNamespace + "/input";
@@ -921,6 +926,9 @@ public class PulsarFunctionLocalRunTest {
sinkConfig.setArchive(jarFilePathUrl);
sinkConfig.setParallelism(parallelism);
+ sinkConfig.setTransformFunction(transformFunction);
+ sinkConfig.setTransformFunctionClassName(transformFunctionClassName);
+
int metricsPort = FunctionCommon.findAvailablePort();
@Cleanup
LocalRunner localRunner = LocalRunner.builder()
@@ -933,6 +941,7 @@ public class PulsarFunctionLocalRunTest {
.tlsHostNameVerificationEnabled(false)
.brokerServiceUrl(pulsar.getBrokerServiceUrlTls())
.connectorsDirectory(workerConfig.getConnectorsDirectory())
+ .functionsDirectory(workerConfig.getFunctionsDirectory())
.metricsPortStart(metricsPort)
.build();
@@ -1083,6 +1092,12 @@ public class PulsarFunctionLocalRunTest {
public void testPulsarSinkStatsByteBufferType() throws Throwable {
runWithNarClassLoader(() -> testPulsarSinkLocalRun(null, 1, StatsNullSink.class.getName()));
}
+
+ //@Test(timeOut = 20000, groups = "builtin")
+ @Test(groups = "builtin")
+ public void testPulsarSinkWithFunction() throws Throwable {
+ testPulsarSinkLocalRun(null, 1, StatsNullSink.class.getName(), "builtin://exclamation", "org.apache.pulsar.functions.api.examples.RecordFunction");
+ }
public static class TestErrorSink implements Sink<byte[]> {
private Map config;
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index a308c98b3da..ae38b8cd652 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions;
+import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.pulsar.common.functions.Utils.inferMissingArguments;
import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.JCommander;
@@ -48,6 +49,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Builder;
+import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Utils;
@@ -92,8 +94,8 @@ public class LocalRunner implements AutoCloseable {
private final String functionsDir;
private final Thread shutdownHook;
private final int instanceLivenessCheck;
- private ClassLoader userCodeClassLoader;
- private boolean userCodeClassLoaderCreated;
+ private UserCodeClassLoader userCodeClassLoader;
+ private UserCodeClassLoader transformFunctionCodeClassLoader;
private RuntimeFactory runtimeFactory;
private HTTPServer metricsServer;
@@ -102,6 +104,12 @@ public class LocalRunner implements AutoCloseable {
PROCESS
}
+ @Value
+ private static class UserCodeClassLoader {
+ ClassLoader classLoader;
+ boolean classLoaderCreated;
+ }
+
public static class FunctionConfigConverter implements IStringConverter<FunctionConfig> {
@Override
public FunctionConfig convert(String value) {
@@ -310,16 +318,21 @@ public class LocalRunner implements AutoCloseable {
runtimeFactory = null;
}
- if (userCodeClassLoaderCreated) {
- if (userCodeClassLoader instanceof Closeable) {
- try {
- ((Closeable) userCodeClassLoader).close();
- } catch (IOException e) {
- log.warn("Error closing classloader", e);
- }
+ closeClassLoaderIfneeded(userCodeClassLoader);
+ userCodeClassLoader = null;
+ closeClassLoaderIfneeded(transformFunctionCodeClassLoader);
+ transformFunctionCodeClassLoader = null;
+ }
+ }
+
+ private static void closeClassLoaderIfneeded(UserCodeClassLoader userCodeClassLoader) {
+ if (userCodeClassLoader != null && userCodeClassLoader.isClassLoaderCreated()) {
+ if (userCodeClassLoader.getClassLoader() instanceof Closeable) {
+ try {
+ ((Closeable) userCodeClassLoader.getClassLoader()).close();
+ } catch (IOException e) {
+ log.warn("Error closing classloader", e);
}
- userCodeClassLoaderCreated = false;
- userCodeClassLoader = null;
}
}
}
@@ -333,16 +346,18 @@ public class LocalRunner implements AutoCloseable {
Runtime.getRuntime().addShutdownHook(shutdownHook);
Function.FunctionDetails functionDetails = null;
String userCodeFile;
+ String transformFunctionFile = null;
int parallelism;
if (functionConfig != null) {
FunctionConfigUtils.inferMissingArguments(functionConfig, true);
parallelism = functionConfig.getParallelism();
if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
userCodeFile = functionConfig.getJar();
- ClassLoader functionClassLoader = extractClassLoader(
+ userCodeClassLoader = extractClassLoader(
userCodeFile, ComponentType.FUNCTION, functionConfig.getClassName());
functionDetails = FunctionConfigUtils.convert(
- functionConfig, FunctionConfigUtils.validateJavaFunction(functionConfig, functionClassLoader));
+ functionConfig,
+ FunctionConfigUtils.validateJavaFunction(functionConfig, getCurrentOrUserCodeClassLoader()));
} else if (functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
userCodeFile = functionConfig.getGo();
} else if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON) {
@@ -352,26 +367,42 @@ public class LocalRunner implements AutoCloseable {
}
if (functionDetails == null) {
- functionDetails = FunctionConfigUtils.convert(functionConfig,
- userCodeClassLoader != null ? userCodeClassLoader :
- Thread.currentThread().getContextClassLoader());
+ functionDetails = FunctionConfigUtils.convert(functionConfig, getCurrentOrUserCodeClassLoader());
}
} else if (sourceConfig != null) {
inferMissingArguments(sourceConfig);
userCodeFile = sourceConfig.getArchive();
parallelism = sourceConfig.getParallelism();
- ClassLoader sourceClassLoader = extractClassLoader(
+ userCodeClassLoader = extractClassLoader(
userCodeFile, ComponentType.SOURCE, sourceConfig.getClassName());
functionDetails = SourceConfigUtils.convert(
- sourceConfig, SourceConfigUtils.validateAndExtractDetails(sourceConfig, sourceClassLoader, true));
+ sourceConfig,
+ SourceConfigUtils.validateAndExtractDetails(sourceConfig, getCurrentOrUserCodeClassLoader(), true));
} else if (sinkConfig != null) {
inferMissingArguments(sinkConfig);
userCodeFile = sinkConfig.getArchive();
+ transformFunctionFile = sinkConfig.getTransformFunction();
parallelism = sinkConfig.getParallelism();
- ClassLoader sinkClassLoader = extractClassLoader(
+ userCodeClassLoader = extractClassLoader(
userCodeFile, ComponentType.SINK, sinkConfig.getClassName());
+ if (isNotEmpty(sinkConfig.getTransformFunction())) {
+ transformFunctionCodeClassLoader = extractClassLoader(
+ sinkConfig.getTransformFunction(),
+ ComponentType.FUNCTION,
+ sinkConfig.getTransformFunctionClassName());
+ }
+
+ ClassLoader functionClassLoader = null;
+ if (transformFunctionCodeClassLoader != null) {
+ functionClassLoader = transformFunctionCodeClassLoader.getClassLoader() == null
+ ? Thread.currentThread().getContextClassLoader()
+ : transformFunctionCodeClassLoader.getClassLoader();
+ }
+
functionDetails = SinkConfigUtils.convert(
- sinkConfig, SinkConfigUtils.validateAndExtractDetails(sinkConfig, sinkClassLoader, null, true));
+ sinkConfig,
+ SinkConfigUtils.validateAndExtractDetails(sinkConfig, getCurrentOrUserCodeClassLoader(),
+ functionClassLoader, true));
} else {
throw new IllegalArgumentException("Must specify Function, Source or Sink config");
}
@@ -401,10 +432,10 @@ public class LocalRunner implements AutoCloseable {
&& (runtimeEnv == null || runtimeEnv == RuntimeEnv.THREAD)) {
// By default run java functions as threads
startThreadedMode(functionDetails, parallelism, instanceIdOffset, serviceUrl,
- stateStorageServiceUrl, authConfig, userCodeFile);
+ stateStorageServiceUrl, authConfig, userCodeFile, transformFunctionFile);
} else {
startProcessMode(functionDetails, parallelism, instanceIdOffset, serviceUrl,
- stateStorageServiceUrl, authConfig, userCodeFile);
+ stateStorageServiceUrl, authConfig, userCodeFile, transformFunctionFile);
}
local.addAll(spawners);
}
@@ -426,15 +457,22 @@ public class LocalRunner implements AutoCloseable {
}
}
- private ClassLoader extractClassLoader(String userCodeFile, ComponentType componentType, String className)
+ private ClassLoader getCurrentOrUserCodeClassLoader() {
+ return userCodeClassLoader == null || userCodeClassLoader.getClassLoader() == null
+ ? Thread.currentThread().getContextClassLoader()
+ : userCodeClassLoader.getClassLoader();
+ }
+
+ private UserCodeClassLoader extractClassLoader(String userCodeFile, ComponentType componentType, String className)
throws IOException, URISyntaxException {
- userCodeClassLoader = userCodeFile != null ? isBuiltIn(userCodeFile, componentType) : null;
- if (userCodeClassLoader == null) {
+ ClassLoader classLoader = userCodeFile != null ? isBuiltIn(userCodeFile, componentType) : null;
+ boolean classLoaderCreated = false;
+ if (classLoader == null) {
if (userCodeFile != null && Utils.isFunctionPackageUrlSupported(userCodeFile)) {
File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
- userCodeClassLoader = FunctionCommon.getClassLoaderFromPackage(
+ classLoader = FunctionCommon.getClassLoaderFromPackage(
componentType, className, file, narExtractionDirectory);
- userCodeClassLoaderCreated = true;
+ classLoaderCreated = true;
} else if (userCodeFile != null) {
File file = new File(userCodeFile);
if (!file.exists()) {
@@ -454,9 +492,9 @@ public class LocalRunner implements AutoCloseable {
}
throw new RuntimeException(errorMsg + " (" + userCodeFile + ") does not exist");
}
- userCodeClassLoader = FunctionCommon.getClassLoaderFromPackage(
+ classLoader = FunctionCommon.getClassLoaderFromPackage(
componentType, className, file, narExtractionDirectory);
- userCodeClassLoaderCreated = true;
+ classLoaderCreated = true;
} else {
if (!(runtimeEnv == null || runtimeEnv == RuntimeEnv.THREAD)) {
String errorMsg;
@@ -477,15 +515,13 @@ public class LocalRunner implements AutoCloseable {
}
}
}
- return userCodeClassLoader == null
- ? Thread.currentThread().getContextClassLoader()
- : userCodeClassLoader;
+ return new UserCodeClassLoader(classLoader, classLoaderCreated);
}
private void startProcessMode(org.apache.pulsar.functions.proto.Function.FunctionDetails functionDetails,
int parallelism, int instanceIdOffset, String serviceUrl,
String stateStorageServiceUrl, AuthenticationConfig authConfig,
- String userCodeFile) throws Exception {
+ String userCodeFile, String transformFunctionFile) throws Exception {
SecretsProviderConfigurator secretsProviderConfigurator = getSecretsProviderConfigurator();
runtimeFactory = new ProcessRuntimeFactory(
serviceUrl,
@@ -532,7 +568,7 @@ public class LocalRunner implements AutoCloseable {
instanceConfig,
userCodeFile,
null,
- null,
+ transformFunctionFile,
null,
runtimeFactory,
instanceLivenessCheck);
@@ -568,7 +604,7 @@ public class LocalRunner implements AutoCloseable {
private void startThreadedMode(org.apache.pulsar.functions.proto.Function.FunctionDetails functionDetails,
int parallelism, int instanceIdOffset, String serviceUrl,
String stateStorageServiceUrl, AuthenticationConfig authConfig,
- String userCodeFile) throws Exception {
+ String userCodeFile, String transformFunctionFile) throws Exception {
if (metricsPortStart != null) {
if (metricsPortStart < 0 || metricsPortStart > 65535) {
@@ -599,8 +635,8 @@ public class LocalRunner implements AutoCloseable {
ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
try {
- if (userCodeClassLoader != null) {
- Thread.currentThread().setContextClassLoader(userCodeClassLoader);
+ if (userCodeClassLoader != null && userCodeClassLoader.getClassLoader() != null) {
+ Thread.currentThread().setContextClassLoader(userCodeClassLoader.getClassLoader());
}
runtimeFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup",
serviceUrl,
@@ -620,6 +656,7 @@ public class LocalRunner implements AutoCloseable {
// TODO: correctly implement function version and id
instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
instanceConfig.setFunctionId(UUID.randomUUID().toString());
+ instanceConfig.setTransformFunctionId(UUID.randomUUID().toString());
instanceConfig.setInstanceId(i + instanceIdOffset);
instanceConfig.setMaxBufferedTuples(1024);
if (metricsPortStart != null) {
@@ -638,7 +675,7 @@ public class LocalRunner implements AutoCloseable {
instanceConfig,
userCodeFile,
null,
- null,
+ transformFunctionFile,
null,
runtimeFactory,
instanceLivenessCheck);