You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2023/05/30 12:16:48 UTC
[pulsar] branch branch-3.0 updated: [fix][fn] Fix JavaInstanceStarter inferring type class name error (#19896)
This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 8b86f48699a [fix][fn] Fix JavaInstanceStarter inferring type class name error (#19896)
8b86f48699a is described below
commit 8b86f48699a05d57f27c70016cb2a81ca98c6a55
Author: jiangpengcheng <sc...@gmail.com>
AuthorDate: Wed May 24 16:41:52 2023 +0800
[fix][fn] Fix JavaInstanceStarter inferring type class name error (#19896)
(cherry picked from commit 05e57dd3a443c5b99c21054c56a1b497455fa867)
---
.../functions/runtime/JavaInstanceStarter.java | 19 +++++++++++-----
.../functions/runtime/thread/ThreadRuntime.java | 19 +++++++++-------
.../runtime/thread/ThreadRuntimeFactory.java | 26 ++++++++++++++++++----
3 files changed, 47 insertions(+), 17 deletions(-)
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index deff690815d..b78f07076f6 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -49,10 +49,13 @@ import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntime;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
+import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl;
@Slf4j
@@ -185,7 +188,10 @@ public class JavaInstanceStarter implements AutoCloseable {
functionDetailsJsonString = functionDetailsJsonString.substring(0, functionDetailsJsonString.length() - 1);
}
JsonFormat.parser().merge(functionDetailsJsonString, functionDetailsBuilder);
- inferringMissingTypeClassName(functionDetailsBuilder, functionInstanceClassLoader);
+ FunctionCacheManager fnCache = new FunctionCacheManagerImpl(rootClassLoader);
+ ClassLoader functionClassLoader = ThreadRuntime.loadJars(jarFile, instanceConfig, functionId,
+ functionDetailsBuilder.getName(), narExtractionDirectory, fnCache);
+ inferringMissingTypeClassName(functionDetailsBuilder, functionClassLoader);
Function.FunctionDetails functionDetails = functionDetailsBuilder.build();
instanceConfig.setFunctionDetails(functionDetails);
instanceConfig.setPort(port);
@@ -230,7 +236,7 @@ public class JavaInstanceStarter implements AutoCloseable {
.tlsHostnameVerificationEnable(isTrue(tlsHostNameVerificationEnabled))
.tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader,
- exposePulsarAdminClientEnabled, webServiceUrl);
+ exposePulsarAdminClientEnabled, webServiceUrl, fnCache);
runtimeSpawner = new RuntimeSpawner(
instanceConfig,
jarFile,
@@ -322,7 +328,8 @@ public class JavaInstanceStarter implements AutoCloseable {
Map<String, Object> userConfigs = new Gson().fromJson(functionDetailsBuilder.getUserConfig(),
new TypeToken<Map<String, Object>>() {
}.getType());
- boolean isWindowConfigPresent = userConfigs.containsKey(WindowConfig.WINDOW_CONFIG_KEY);
+ boolean isWindowConfigPresent =
+ userConfigs != null && userConfigs.containsKey(WindowConfig.WINDOW_CONFIG_KEY);
String className = functionDetailsBuilder.getClassName();
if (isWindowConfigPresent) {
WindowConfig windowConfig = new Gson().fromJson(
@@ -353,7 +360,8 @@ public class JavaInstanceStarter implements AutoCloseable {
case SINK:
if ((functionDetailsBuilder.hasSink()
&& functionDetailsBuilder.getSink().getTypeClassName().isEmpty())) {
- String typeArg = getSinkType(functionDetailsBuilder.getClassName(), classLoader).getName();
+ String typeArg =
+ getSinkType(functionDetailsBuilder.getSink().getClassName(), classLoader).getName();
Function.SinkSpec.Builder sinkBuilder =
Function.SinkSpec.newBuilder(functionDetailsBuilder.getSink());
@@ -371,7 +379,8 @@ public class JavaInstanceStarter implements AutoCloseable {
case SOURCE:
if ((functionDetailsBuilder.hasSource()
&& functionDetailsBuilder.getSource().getTypeClassName().isEmpty())) {
- String typeArg = getSourceType(functionDetailsBuilder.getClassName(), classLoader).getName();
+ String typeArg =
+ getSourceType(functionDetailsBuilder.getSource().getClassName(), classLoader).getName();
Function.SourceSpec.Builder sourceBuilder =
Function.SourceSpec.newBuilder(functionDetailsBuilder.getSource());
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
index 0aa0cd95aef..ed128568bcf 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
@@ -137,14 +137,16 @@ public class ThreadRuntime implements Runtime {
.getClassLoader();
}
}
- return loadJars(jarFile, instanceConfig, functionId, narExtractionDirectory, fnCache);
+ return loadJars(jarFile, instanceConfig, functionId, instanceConfig.getFunctionDetails().getName(),
+ narExtractionDirectory, fnCache);
}
- private static ClassLoader loadJars(String jarFile,
- InstanceConfig instanceConfig,
- String functionId,
- String narExtractionDirectory,
- FunctionCacheManager fnCache) throws Exception {
+ public static ClassLoader loadJars(String jarFile,
+ InstanceConfig instanceConfig,
+ String functionId,
+ String functionName,
+ String narExtractionDirectory,
+ FunctionCacheManager fnCache) throws Exception {
if (jarFile == null) {
return Thread.currentThread().getContextClassLoader();
}
@@ -175,8 +177,9 @@ public class ThreadRuntime implements Runtime {
Collections.emptyList());
}
- log.info("Initialize function class loader for function {} at function cache manager, functionClassLoader: {}",
- instanceConfig.getFunctionDetails().getName(), fnCache.getClassLoader(functionId));
+ log.info(
+ "Initialize function class loader for function {} at function cache manager, functionClassLoader: {}",
+ functionName, fnCache.getClassLoader(functionId));
fnClassLoader = fnCache.getClassLoader(functionId);
if (null == fnClassLoader) {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index 7bc055b25d6..cb9ad27a2df 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -86,7 +86,21 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
stateStorageImplClass, storageServiceUrl, null, secretsProvider, collectorRegistry,
narExtractionDirectory,
rootClassLoader, exposePulsarAdminClientEnabled, pulsarWebServiceUrl, Optional.empty(),
- Optional.empty());
+ Optional.empty(), null);
+ }
+
+ public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl,
+ String stateStorageImplClass,
+ String storageServiceUrl,
+ AuthenticationConfig authConfig, SecretsProvider secretsProvider,
+ FunctionCollectorRegistry collectorRegistry, String narExtractionDirectory,
+ ClassLoader rootClassLoader, boolean exposePulsarAdminClientEnabled,
+ String pulsarWebServiceUrl, FunctionCacheManager fnCache) throws Exception {
+ initialize(threadGroupName, Optional.empty(), pulsarServiceUrl, authConfig,
+ stateStorageImplClass, storageServiceUrl, null, secretsProvider, collectorRegistry,
+ narExtractionDirectory,
+ rootClassLoader, exposePulsarAdminClientEnabled, pulsarWebServiceUrl, Optional.empty(),
+ Optional.empty(), fnCache);
}
private void initialize(String threadGroupName, Optional<ThreadRuntimeFactoryConfig.MemoryLimit> memoryLimit,
@@ -96,7 +110,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
FunctionCollectorRegistry collectorRegistry, String narExtractionDirectory,
ClassLoader rootClassLoader, boolean exposePulsarAdminClientEnabled,
String pulsarWebServiceUrl, Optional<ConnectorsManager> connectorsManager,
- Optional<FunctionsManager> functionsManager)
+ Optional<FunctionsManager> functionsManager, FunctionCacheManager fnCache)
throws PulsarClientException {
if (rootClassLoader == null) {
@@ -106,7 +120,10 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
this.rootClassLoader = rootClassLoader;
this.secretsProviderConfigurator = secretsProviderConfigurator;
this.defaultSecretsProvider = secretsProvider;
- this.fnCache = new FunctionCacheManagerImpl(rootClassLoader);
+ this.fnCache = fnCache;
+ if (fnCache == null) {
+ this.fnCache = new FunctionCacheManagerImpl(rootClassLoader);
+ }
this.threadGroup = new ThreadGroup(threadGroupName);
this.pulsarAdmin =
exposePulsarAdminClientEnabled ? InstanceUtils.createPulsarAdminClient(pulsarWebServiceUrl, authConfig)
@@ -171,7 +188,8 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
workerConfig.getStateStorageServiceUrl(), secretsProviderConfigurator, null,
null, workerConfig.getNarExtractionDirectory(), null,
workerConfig.isExposeAdminClientEnabled(),
- workerConfig.getPulsarWebServiceUrl(), Optional.of(connectorsManager), Optional.of(functionsManager));
+ workerConfig.getPulsarWebServiceUrl(), Optional.of(connectorsManager), Optional.of(functionsManager),
+ null);
}
@Override