You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/09/08 10:45:59 UTC

[pulsar] branch master updated: [PIP-193] Support Transform Function with LocalRunner (#17445)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 327648cbb26 [PIP-193] Support Transform Function with LocalRunner (#17445)
327648cbb26 is described below

commit 327648cbb268ca1fa1770d3254d00a818adc2e19
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Thu Sep 8 12:45:48 2022 +0200

    [PIP-193] Support Transform Function with LocalRunner (#17445)
---
 .../worker/PulsarFunctionLocalRunTest.java         |  15 +++
 .../org/apache/pulsar/functions/LocalRunner.java   | 113 ++++++++++++++-------
 2 files changed, 90 insertions(+), 38 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index 5d6395a9813..d90b971b5fa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -896,6 +896,11 @@ public class PulsarFunctionLocalRunTest {
     }
 
     private void testPulsarSinkLocalRun(String jarFilePathUrl, int parallelism, String className) throws Exception {
+        testPulsarSinkLocalRun(jarFilePathUrl, parallelism, className, null, null);
+    }
+
+    private void testPulsarSinkLocalRun(String jarFilePathUrl, int parallelism, String className,
+                                        String transformFunction, String transformFunctionClassName) throws Exception {
         final String namespacePortion = "io";
         final String replNamespace = tenant + "/" + namespacePortion;
         final String sourceTopic = "persistent://" + replNamespace + "/input";
@@ -921,6 +926,9 @@ public class PulsarFunctionLocalRunTest {
 
         sinkConfig.setArchive(jarFilePathUrl);
         sinkConfig.setParallelism(parallelism);
+        sinkConfig.setTransformFunction(transformFunction);
+        sinkConfig.setTransformFunctionClassName(transformFunctionClassName);
+
         int metricsPort = FunctionCommon.findAvailablePort();
         @Cleanup
         LocalRunner localRunner = LocalRunner.builder()
@@ -933,6 +941,7 @@ public class PulsarFunctionLocalRunTest {
                 .tlsHostNameVerificationEnabled(false)
                 .brokerServiceUrl(pulsar.getBrokerServiceUrlTls())
                 .connectorsDirectory(workerConfig.getConnectorsDirectory())
+                .functionsDirectory(workerConfig.getFunctionsDirectory())
                 .metricsPortStart(metricsPort)
                 .build();
 
@@ -1083,6 +1092,12 @@ public class PulsarFunctionLocalRunTest {
     public void testPulsarSinkStatsByteBufferType() throws Throwable {
         runWithNarClassLoader(() -> testPulsarSinkLocalRun(null, 1, StatsNullSink.class.getName()));
     }
+
+    //@Test(timeOut = 20000, groups = "builtin")
+    @Test(groups = "builtin")
+    public void testPulsarSinkWithFunction() throws Throwable {
+        testPulsarSinkLocalRun(null, 1, StatsNullSink.class.getName(), "builtin://exclamation", "org.apache.pulsar.functions.api.examples.RecordFunction");
+    }
     
     public static class TestErrorSink implements Sink<byte[]> {
         private Map config;
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index a308c98b3da..ae38b8cd652 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.functions;
 
+import static org.apache.commons.lang3.StringUtils.isNotEmpty;
 import static org.apache.pulsar.common.functions.Utils.inferMissingArguments;
 import com.beust.jcommander.IStringConverter;
 import com.beust.jcommander.JCommander;
@@ -48,6 +49,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.Builder;
+import lombok.Value;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Utils;
@@ -92,8 +94,8 @@ public class LocalRunner implements AutoCloseable {
     private final String functionsDir;
     private final Thread shutdownHook;
     private final int instanceLivenessCheck;
-    private ClassLoader userCodeClassLoader;
-    private boolean userCodeClassLoaderCreated;
+    private UserCodeClassLoader userCodeClassLoader;
+    private UserCodeClassLoader transformFunctionCodeClassLoader;
     private RuntimeFactory runtimeFactory;
     private HTTPServer metricsServer;
 
@@ -102,6 +104,12 @@ public class LocalRunner implements AutoCloseable {
         PROCESS
     }
 
+    @Value
+    private static class UserCodeClassLoader {
+        ClassLoader classLoader;
+        boolean classLoaderCreated;
+    }
+
     public static class FunctionConfigConverter implements IStringConverter<FunctionConfig> {
         @Override
         public FunctionConfig convert(String value) {
@@ -310,16 +318,21 @@ public class LocalRunner implements AutoCloseable {
                 runtimeFactory = null;
             }
 
-            if (userCodeClassLoaderCreated) {
-                if (userCodeClassLoader instanceof Closeable) {
-                    try {
-                        ((Closeable) userCodeClassLoader).close();
-                    } catch (IOException e) {
-                        log.warn("Error closing classloader", e);
-                    }
+            closeClassLoaderIfneeded(userCodeClassLoader);
+            userCodeClassLoader = null;
+            closeClassLoaderIfneeded(transformFunctionCodeClassLoader);
+            transformFunctionCodeClassLoader = null;
+        }
+    }
+
+    private static void closeClassLoaderIfneeded(UserCodeClassLoader userCodeClassLoader) {
+        if (userCodeClassLoader != null && userCodeClassLoader.isClassLoaderCreated()) {
+            if (userCodeClassLoader.getClassLoader() instanceof Closeable) {
+                try {
+                    ((Closeable) userCodeClassLoader.getClassLoader()).close();
+                } catch (IOException e) {
+                    log.warn("Error closing classloader", e);
                 }
-                userCodeClassLoaderCreated = false;
-                userCodeClassLoader = null;
             }
         }
     }
@@ -333,16 +346,18 @@ public class LocalRunner implements AutoCloseable {
             Runtime.getRuntime().addShutdownHook(shutdownHook);
             Function.FunctionDetails functionDetails = null;
             String userCodeFile;
+            String transformFunctionFile = null;
             int parallelism;
             if (functionConfig != null) {
                 FunctionConfigUtils.inferMissingArguments(functionConfig, true);
                 parallelism = functionConfig.getParallelism();
                 if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
                     userCodeFile = functionConfig.getJar();
-                    ClassLoader functionClassLoader = extractClassLoader(
+                    userCodeClassLoader = extractClassLoader(
                         userCodeFile, ComponentType.FUNCTION, functionConfig.getClassName());
                     functionDetails = FunctionConfigUtils.convert(
-                        functionConfig, FunctionConfigUtils.validateJavaFunction(functionConfig, functionClassLoader));
+                        functionConfig,
+                        FunctionConfigUtils.validateJavaFunction(functionConfig, getCurrentOrUserCodeClassLoader()));
                 } else if (functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
                     userCodeFile = functionConfig.getGo();
                 } else if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON) {
@@ -352,26 +367,42 @@ public class LocalRunner implements AutoCloseable {
                 }
 
                 if (functionDetails == null) {
-                    functionDetails = FunctionConfigUtils.convert(functionConfig,
-                            userCodeClassLoader != null ? userCodeClassLoader :
-                                    Thread.currentThread().getContextClassLoader());
+                    functionDetails = FunctionConfigUtils.convert(functionConfig, getCurrentOrUserCodeClassLoader());
                 }
             } else if (sourceConfig != null) {
                 inferMissingArguments(sourceConfig);
                 userCodeFile = sourceConfig.getArchive();
                 parallelism = sourceConfig.getParallelism();
-                ClassLoader sourceClassLoader = extractClassLoader(
+                userCodeClassLoader = extractClassLoader(
                     userCodeFile, ComponentType.SOURCE, sourceConfig.getClassName());
                 functionDetails = SourceConfigUtils.convert(
-                    sourceConfig, SourceConfigUtils.validateAndExtractDetails(sourceConfig, sourceClassLoader, true));
+                    sourceConfig,
+                    SourceConfigUtils.validateAndExtractDetails(sourceConfig, getCurrentOrUserCodeClassLoader(), true));
             } else if (sinkConfig != null) {
                 inferMissingArguments(sinkConfig);
                 userCodeFile = sinkConfig.getArchive();
+                transformFunctionFile = sinkConfig.getTransformFunction();
                 parallelism = sinkConfig.getParallelism();
-                ClassLoader sinkClassLoader = extractClassLoader(
+                userCodeClassLoader = extractClassLoader(
                     userCodeFile, ComponentType.SINK, sinkConfig.getClassName());
+                if (isNotEmpty(sinkConfig.getTransformFunction())) {
+                    transformFunctionCodeClassLoader = extractClassLoader(
+                        sinkConfig.getTransformFunction(),
+                        ComponentType.FUNCTION,
+                        sinkConfig.getTransformFunctionClassName());
+                }
+
+                ClassLoader functionClassLoader = null;
+                if (transformFunctionCodeClassLoader != null) {
+                    functionClassLoader = transformFunctionCodeClassLoader.getClassLoader() == null
+                        ? Thread.currentThread().getContextClassLoader()
+                        : transformFunctionCodeClassLoader.getClassLoader();
+                }
+
                 functionDetails = SinkConfigUtils.convert(
-                    sinkConfig, SinkConfigUtils.validateAndExtractDetails(sinkConfig, sinkClassLoader, null, true));
+                    sinkConfig,
+                    SinkConfigUtils.validateAndExtractDetails(sinkConfig, getCurrentOrUserCodeClassLoader(),
+                        functionClassLoader, true));
             } else {
                 throw new IllegalArgumentException("Must specify Function, Source or Sink config");
             }
@@ -401,10 +432,10 @@ public class LocalRunner implements AutoCloseable {
                     && (runtimeEnv == null || runtimeEnv == RuntimeEnv.THREAD)) {
                 // By default run java functions as threads
                 startThreadedMode(functionDetails, parallelism, instanceIdOffset, serviceUrl,
-                        stateStorageServiceUrl, authConfig, userCodeFile);
+                        stateStorageServiceUrl, authConfig, userCodeFile, transformFunctionFile);
             } else {
                 startProcessMode(functionDetails, parallelism, instanceIdOffset, serviceUrl,
-                        stateStorageServiceUrl, authConfig, userCodeFile);
+                        stateStorageServiceUrl, authConfig, userCodeFile, transformFunctionFile);
             }
             local.addAll(spawners);
         }
@@ -426,15 +457,22 @@ public class LocalRunner implements AutoCloseable {
         }
     }
 
-    private ClassLoader extractClassLoader(String userCodeFile, ComponentType componentType, String className)
+    private ClassLoader getCurrentOrUserCodeClassLoader() {
+        return userCodeClassLoader == null || userCodeClassLoader.getClassLoader() == null
+            ? Thread.currentThread().getContextClassLoader()
+            : userCodeClassLoader.getClassLoader();
+    }
+
+    private UserCodeClassLoader extractClassLoader(String userCodeFile, ComponentType componentType, String className)
         throws IOException, URISyntaxException {
-        userCodeClassLoader = userCodeFile != null ? isBuiltIn(userCodeFile, componentType) : null;
-        if (userCodeClassLoader == null) {
+        ClassLoader classLoader = userCodeFile != null ? isBuiltIn(userCodeFile, componentType) : null;
+        boolean classLoaderCreated = false;
+        if (classLoader == null) {
             if (userCodeFile != null && Utils.isFunctionPackageUrlSupported(userCodeFile)) {
                 File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
-                userCodeClassLoader = FunctionCommon.getClassLoaderFromPackage(
+                classLoader = FunctionCommon.getClassLoaderFromPackage(
                         componentType, className, file, narExtractionDirectory);
-                userCodeClassLoaderCreated = true;
+                classLoaderCreated = true;
             } else if (userCodeFile != null) {
                 File file = new File(userCodeFile);
                 if (!file.exists()) {
@@ -454,9 +492,9 @@ public class LocalRunner implements AutoCloseable {
                     }
                     throw new RuntimeException(errorMsg + " (" + userCodeFile + ") does not exist");
                 }
-                userCodeClassLoader = FunctionCommon.getClassLoaderFromPackage(
+                classLoader = FunctionCommon.getClassLoaderFromPackage(
                         componentType, className, file, narExtractionDirectory);
-                userCodeClassLoaderCreated = true;
+                classLoaderCreated = true;
             } else {
                 if (!(runtimeEnv == null || runtimeEnv == RuntimeEnv.THREAD)) {
                     String errorMsg;
@@ -477,15 +515,13 @@ public class LocalRunner implements AutoCloseable {
                 }
             }
         }
-        return userCodeClassLoader == null
-            ? Thread.currentThread().getContextClassLoader()
-            : userCodeClassLoader;
+        return new UserCodeClassLoader(classLoader, classLoaderCreated);
     }
 
     private void startProcessMode(org.apache.pulsar.functions.proto.Function.FunctionDetails functionDetails,
                                            int parallelism, int instanceIdOffset, String serviceUrl,
                                            String stateStorageServiceUrl, AuthenticationConfig authConfig,
-                                           String userCodeFile) throws Exception {
+                                           String userCodeFile, String transformFunctionFile) throws Exception {
         SecretsProviderConfigurator secretsProviderConfigurator = getSecretsProviderConfigurator();
         runtimeFactory = new ProcessRuntimeFactory(
                 serviceUrl,
@@ -532,7 +568,7 @@ public class LocalRunner implements AutoCloseable {
                     instanceConfig,
                     userCodeFile,
                     null,
-                    null,
+                    transformFunctionFile,
                     null,
                     runtimeFactory,
                     instanceLivenessCheck);
@@ -568,7 +604,7 @@ public class LocalRunner implements AutoCloseable {
     private void startThreadedMode(org.apache.pulsar.functions.proto.Function.FunctionDetails functionDetails,
                                            int parallelism, int instanceIdOffset, String serviceUrl,
                                            String stateStorageServiceUrl, AuthenticationConfig authConfig,
-                                           String userCodeFile) throws Exception {
+                                           String userCodeFile, String transformFunctionFile) throws Exception {
 
         if (metricsPortStart != null) {
             if (metricsPortStart < 0 || metricsPortStart > 65535) {
@@ -599,8 +635,8 @@ public class LocalRunner implements AutoCloseable {
 
         ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
         try {
-            if (userCodeClassLoader != null) {
-                Thread.currentThread().setContextClassLoader(userCodeClassLoader);
+            if (userCodeClassLoader != null && userCodeClassLoader.getClassLoader() != null) {
+                Thread.currentThread().setContextClassLoader(userCodeClassLoader.getClassLoader());
             }
             runtimeFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup",
                     serviceUrl,
@@ -620,6 +656,7 @@ public class LocalRunner implements AutoCloseable {
             // TODO: correctly implement function version and id
             instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
             instanceConfig.setFunctionId(UUID.randomUUID().toString());
+            instanceConfig.setTransformFunctionId(UUID.randomUUID().toString());
             instanceConfig.setInstanceId(i + instanceIdOffset);
             instanceConfig.setMaxBufferedTuples(1024);
             if (metricsPortStart != null) {
@@ -638,7 +675,7 @@ public class LocalRunner implements AutoCloseable {
                     instanceConfig,
                     userCodeFile,
                     null,
-                    null,
+                    transformFunctionFile,
                     null,
                     runtimeFactory,
                     instanceLivenessCheck);