You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/07/19 08:35:12 UTC
[pulsar] branch master updated: [fix][functions] Fix netty.DnsResolverUtil compat issue on JDK9+ for the function Runtimes (#16423)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b9249d7d421 [fix][functions] Fix netty.DnsResolverUtil compat issue on JDK9+ for the function Runtimes (#16423)
b9249d7d421 is described below
commit b9249d7d421c9916104df5d8623875691076372d
Author: Christophe Bornet <cb...@hotmail.com>
AuthorDate: Tue Jul 19 10:35:02 2022 +0200
[fix][functions] Fix netty.DnsResolverUtil compat issue on JDK9+ for the function Runtimes (#16423)
---
.../pulsar/functions/runtime/RuntimeUtils.java | 9 +++++++-
.../runtime/kubernetes/KubernetesRuntimeTest.java | 25 +++++++++++++++-------
.../runtime/process/ProcessRuntimeTest.java | 23 ++++++++++++++------
3 files changed, 41 insertions(+), 16 deletions(-)
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 1fb7650b1e8..a1101e80c95 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -41,7 +41,9 @@ import java.util.List;
import java.util.Map;
import javax.management.MalformedObjectNameException;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.JavaVersion;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
@@ -255,7 +257,6 @@ public class RuntimeUtils {
return args;
}
-
public static List<String> getCmd(InstanceConfig instanceConfig,
String instanceFile,
String extraDependenciesDir, /* extra dependencies for running instances */
@@ -320,6 +321,12 @@ public class RuntimeUtils {
args.add("-Dio.netty.tryReflectionSetAccessible=true");
+ // Needed for netty.DnsResolverUtil on JDK9+
+ if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
+ args.add("--add-opens");
+ args.add("java.base/sun.net=ALL-UNNAMED");
+ }
+
if (instanceConfig.getAdditionalJavaRuntimeArguments() != null) {
args.addAll(instanceConfig.getAdditionalJavaRuntimeArguments());
}
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index 88cccfb7b83..f9a5521b2df 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -33,6 +33,8 @@ import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1StatefulSet;
import io.kubernetes.client.openapi.models.V1Toleration;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.SystemUtils;
+import org.apache.commons.lang3.JavaVersion;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
@@ -48,6 +50,7 @@ import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.FunctionsManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -380,8 +383,13 @@ public class KubernetesRuntimeTest {
}
private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean secretsAttached, String downloadDirectory) throws Exception {
- KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
- List<String> args = container.getProcessArgs();
+ KubernetesRuntime container;
+ List<String> args;
+ try (MockedStatic<SystemUtils> systemUtils = Mockito.mockStatic(SystemUtils.class, Mockito.CALLS_REAL_METHODS)) {
+ systemUtils.when(() -> SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)).thenReturn(true);
+ container = factory.createContainer(config, userJarFile, userJarFile, 30L);
+ args = container.getProcessArgs();
+ }
String classpath = javaInstanceJarFile;
String extraDepsEnv;
@@ -392,14 +400,14 @@ public class KubernetesRuntimeTest {
if (null != depsDir) {
extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir;
classpath = classpath + ":" + depsDir + "/*";
- totalArgs = 40;
- portArg = 26;
- metricsPortArg = 28;
+ totalArgs = 42;
+ portArg = 28;
+ metricsPortArg = 30;
} else {
extraDepsEnv = "";
- portArg = 25;
- metricsPortArg = 27;
- totalArgs = 39;
+ portArg = 27;
+ metricsPortArg = 29;
+ totalArgs = 41;
}
if (secretsAttached) {
totalArgs += 4;
@@ -430,6 +438,7 @@ public class KubernetesRuntimeTest {
+ "-Dpulsar.function.log.dir=" + logDirectory + "/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
+ " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-$SHARD_ID"
+ " -Dio.netty.tryReflectionSetAccessible=true -Xmx" + String.valueOf(RESOURCES.getRam())
+ + " --add-opens java.base/sun.net=ALL-UNNAMED"
+ " org.apache.pulsar.functions.instance.JavaInstanceMain"
+ " --jar " + jarLocation + " --instance_id "
+ "$SHARD_ID" + " --function_id " + config.getFunctionId()
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
index 301e38ee820..6d80ec871b6 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
@@ -38,6 +38,8 @@ import java.util.Map;
import java.util.Optional;
import io.kubernetes.client.openapi.models.V1PodSpec;
+import org.apache.commons.lang3.JavaVersion;
+import org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
@@ -50,6 +52,7 @@ import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.FunctionsManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
@@ -283,14 +286,19 @@ public class ProcessRuntimeTest {
}
private void verifyJavaInstance(InstanceConfig config, Path depsDir, String webServiceUrl) throws Exception {
- ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l);
- List<String> args = container.getProcessArgs();
+ List<String> args;
+ try (MockedStatic<SystemUtils> systemUtils = Mockito.mockStatic(SystemUtils.class, Mockito.CALLS_REAL_METHODS)) {
+ systemUtils.when(() -> SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)).thenReturn(true);
+ ProcessRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30L);
+ args = container.getProcessArgs();
+ }
+
String classpath = javaInstanceJarFile;
String extraDepsEnv;
int portArg;
int metricsPortArg;
- int totalArgCount = 42;
+ int totalArgCount = 44;
if (webServiceUrl != null && config.isExposePulsarAdminClientEnabled()) {
totalArgCount += 3;
}
@@ -298,13 +306,13 @@ public class ProcessRuntimeTest {
assertEquals(args.size(), totalArgCount);
extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir;
classpath = classpath + ":" + depsDir + "/*";
- portArg = 25;
- metricsPortArg = 27;
+ portArg = 27;
+ metricsPortArg = 29;
} else {
assertEquals(args.size(), totalArgCount-1);
extraDepsEnv = "";
- portArg = 24;
- metricsPortArg = 26;
+ portArg = 26;
+ metricsPortArg = 28;
}
if (webServiceUrl != null && config.isExposePulsarAdminClientEnabled()) {
portArg += 3;
@@ -321,6 +329,7 @@ public class ProcessRuntimeTest {
+ "-Dpulsar.function.log.dir=" + logDirectory + "/functions/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
+ " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-" + config.getInstanceId()
+ " -Dio.netty.tryReflectionSetAccessible=true"
+ + " --add-opens java.base/sun.net=ALL-UNNAMED"
+ " org.apache.pulsar.functions.instance.JavaInstanceMain"
+ " --jar " + userJarFile + " --instance_id "
+ config.getInstanceId() + " --function_id " + config.getFunctionId()