You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2023/05/30 12:16:48 UTC

[pulsar] branch branch-3.0 updated: [fix][fn] Fix JavaInstanceStarter inferring type class name error (#19896)

This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 8b86f48699a [fix][fn] Fix JavaInstanceStarter inferring type class name error (#19896)
8b86f48699a is described below

commit 8b86f48699a05d57f27c70016cb2a81ca98c6a55
Author: jiangpengcheng <sc...@gmail.com>
AuthorDate: Wed May 24 16:41:52 2023 +0800

    [fix][fn] Fix JavaInstanceStarter inferring type class name error (#19896)
    
    (cherry picked from commit 05e57dd3a443c5b99c21054c56a1b497455fa867)
---
 .../functions/runtime/JavaInstanceStarter.java     | 19 +++++++++++-----
 .../functions/runtime/thread/ThreadRuntime.java    | 19 +++++++++-------
 .../runtime/thread/ThreadRuntimeFactory.java       | 26 ++++++++++++++++++----
 3 files changed, 47 insertions(+), 17 deletions(-)

diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index deff690815d..b78f07076f6 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -49,10 +49,13 @@ import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntime;
 import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
+import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl;
 
 
 @Slf4j
@@ -185,7 +188,10 @@ public class JavaInstanceStarter implements AutoCloseable {
             functionDetailsJsonString = functionDetailsJsonString.substring(0, functionDetailsJsonString.length() - 1);
         }
         JsonFormat.parser().merge(functionDetailsJsonString, functionDetailsBuilder);
-        inferringMissingTypeClassName(functionDetailsBuilder, functionInstanceClassLoader);
+        FunctionCacheManager fnCache = new FunctionCacheManagerImpl(rootClassLoader);
+        ClassLoader functionClassLoader = ThreadRuntime.loadJars(jarFile, instanceConfig, functionId,
+                functionDetailsBuilder.getName(), narExtractionDirectory, fnCache);
+        inferringMissingTypeClassName(functionDetailsBuilder, functionClassLoader);
         Function.FunctionDetails functionDetails = functionDetailsBuilder.build();
         instanceConfig.setFunctionDetails(functionDetails);
         instanceConfig.setPort(port);
@@ -230,7 +236,7 @@ public class JavaInstanceStarter implements AutoCloseable {
                         .tlsHostnameVerificationEnable(isTrue(tlsHostNameVerificationEnabled))
                         .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
                 secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader,
-                exposePulsarAdminClientEnabled, webServiceUrl);
+                exposePulsarAdminClientEnabled, webServiceUrl, fnCache);
         runtimeSpawner = new RuntimeSpawner(
                 instanceConfig,
                 jarFile,
@@ -322,7 +328,8 @@ public class JavaInstanceStarter implements AutoCloseable {
                     Map<String, Object> userConfigs = new Gson().fromJson(functionDetailsBuilder.getUserConfig(),
                             new TypeToken<Map<String, Object>>() {
                             }.getType());
-                    boolean isWindowConfigPresent = userConfigs.containsKey(WindowConfig.WINDOW_CONFIG_KEY);
+                    boolean isWindowConfigPresent =
+                            userConfigs != null && userConfigs.containsKey(WindowConfig.WINDOW_CONFIG_KEY);
                     String className = functionDetailsBuilder.getClassName();
                     if (isWindowConfigPresent) {
                         WindowConfig windowConfig = new Gson().fromJson(
@@ -353,7 +360,8 @@ public class JavaInstanceStarter implements AutoCloseable {
             case SINK:
                 if ((functionDetailsBuilder.hasSink()
                         && functionDetailsBuilder.getSink().getTypeClassName().isEmpty())) {
-                    String typeArg = getSinkType(functionDetailsBuilder.getClassName(), classLoader).getName();
+                    String typeArg =
+                            getSinkType(functionDetailsBuilder.getSink().getClassName(), classLoader).getName();
 
                     Function.SinkSpec.Builder sinkBuilder =
                             Function.SinkSpec.newBuilder(functionDetailsBuilder.getSink());
@@ -371,7 +379,8 @@ public class JavaInstanceStarter implements AutoCloseable {
             case SOURCE:
                 if ((functionDetailsBuilder.hasSource()
                         && functionDetailsBuilder.getSource().getTypeClassName().isEmpty())) {
-                    String typeArg = getSourceType(functionDetailsBuilder.getClassName(), classLoader).getName();
+                    String typeArg =
+                            getSourceType(functionDetailsBuilder.getSource().getClassName(), classLoader).getName();
 
                     Function.SourceSpec.Builder sourceBuilder =
                             Function.SourceSpec.newBuilder(functionDetailsBuilder.getSource());
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
index 0aa0cd95aef..ed128568bcf 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
@@ -137,14 +137,16 @@ public class ThreadRuntime implements Runtime {
                         .getClassLoader();
             }
         }
-        return loadJars(jarFile, instanceConfig, functionId, narExtractionDirectory, fnCache);
+        return loadJars(jarFile, instanceConfig, functionId, instanceConfig.getFunctionDetails().getName(),
+                narExtractionDirectory, fnCache);
     }
 
-    private static ClassLoader loadJars(String jarFile,
-                                 InstanceConfig instanceConfig,
-                                 String functionId,
-                                 String narExtractionDirectory,
-                                 FunctionCacheManager fnCache) throws Exception {
+    public static ClassLoader loadJars(String jarFile,
+                                       InstanceConfig instanceConfig,
+                                       String functionId,
+                                       String functionName,
+                                       String narExtractionDirectory,
+                                       FunctionCacheManager fnCache) throws Exception {
         if (jarFile == null) {
             return Thread.currentThread().getContextClassLoader();
         }
@@ -175,8 +177,9 @@ public class ThreadRuntime implements Runtime {
                     Collections.emptyList());
         }
 
-        log.info("Initialize function class loader for function {} at function cache manager, functionClassLoader: {}",
-                instanceConfig.getFunctionDetails().getName(), fnCache.getClassLoader(functionId));
+        log.info(
+                "Initialize function class loader for function {} at function cache manager, functionClassLoader: {}",
+                functionName, fnCache.getClassLoader(functionId));
 
         fnClassLoader = fnCache.getClassLoader(functionId);
         if (null == fnClassLoader) {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index 7bc055b25d6..cb9ad27a2df 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -86,7 +86,21 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
                 stateStorageImplClass, storageServiceUrl, null, secretsProvider, collectorRegistry,
                 narExtractionDirectory,
                 rootClassLoader, exposePulsarAdminClientEnabled, pulsarWebServiceUrl, Optional.empty(),
-                Optional.empty());
+                Optional.empty(), null);
+    }
+
+    public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl,
+                                String stateStorageImplClass,
+                                String storageServiceUrl,
+                                AuthenticationConfig authConfig, SecretsProvider secretsProvider,
+                                FunctionCollectorRegistry collectorRegistry, String narExtractionDirectory,
+                                ClassLoader rootClassLoader, boolean exposePulsarAdminClientEnabled,
+                                String pulsarWebServiceUrl, FunctionCacheManager fnCache) throws Exception {
+        initialize(threadGroupName, Optional.empty(), pulsarServiceUrl, authConfig,
+                stateStorageImplClass, storageServiceUrl, null, secretsProvider, collectorRegistry,
+                narExtractionDirectory,
+                rootClassLoader, exposePulsarAdminClientEnabled, pulsarWebServiceUrl, Optional.empty(),
+                Optional.empty(), fnCache);
     }
 
     private void initialize(String threadGroupName, Optional<ThreadRuntimeFactoryConfig.MemoryLimit> memoryLimit,
@@ -96,7 +110,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
                             FunctionCollectorRegistry collectorRegistry, String narExtractionDirectory,
                             ClassLoader rootClassLoader, boolean exposePulsarAdminClientEnabled,
                             String pulsarWebServiceUrl, Optional<ConnectorsManager> connectorsManager,
-                            Optional<FunctionsManager> functionsManager)
+                            Optional<FunctionsManager> functionsManager, FunctionCacheManager fnCache)
             throws PulsarClientException {
 
         if (rootClassLoader == null) {
@@ -106,7 +120,10 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
         this.rootClassLoader = rootClassLoader;
         this.secretsProviderConfigurator = secretsProviderConfigurator;
         this.defaultSecretsProvider = secretsProvider;
-        this.fnCache = new FunctionCacheManagerImpl(rootClassLoader);
+        this.fnCache = fnCache;
+        if (fnCache == null) {
+            this.fnCache = new FunctionCacheManagerImpl(rootClassLoader);
+        }
         this.threadGroup = new ThreadGroup(threadGroupName);
         this.pulsarAdmin =
                 exposePulsarAdminClientEnabled ? InstanceUtils.createPulsarAdminClient(pulsarWebServiceUrl, authConfig)
@@ -171,7 +188,8 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
                 workerConfig.getStateStorageServiceUrl(), secretsProviderConfigurator, null,
                 null, workerConfig.getNarExtractionDirectory(), null,
                 workerConfig.isExposeAdminClientEnabled(),
-                workerConfig.getPulsarWebServiceUrl(), Optional.of(connectorsManager), Optional.of(functionsManager));
+                workerConfig.getPulsarWebServiceUrl(), Optional.of(connectorsManager), Optional.of(functionsManager),
+                null);
     }
 
     @Override