You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2023/05/30 12:26:07 UTC
[pulsar] branch branch-3.0 updated: [improve][fn] Use functions classloader in TopicSchema.newSchemaInstance() to fix ClassNotFoundException when using custom SerDe classes. (targeted for master) (#20115)
This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new d4b55f45362 [improve][fn] Use functions classloader in TopicSchema.newSchemaInstance() to fix ClassNotFoundException when using custom SerDe classes. (targeted for master) (#20115)
d4b55f45362 is described below
commit d4b55f45362089005a2259857d23c2f690f23bde
Author: Gabriel Miklós <ga...@getbridge.com>
AuthorDate: Thu May 18 17:23:55 2023 +0200
[improve][fn] Use functions classloader in TopicSchema.newSchemaInstance() to fix ClassNotFoundException when using custom SerDe classes. (targeted for master) (#20115)
(cherry picked from commit 43a989862f548fa3f67708a5fff62eb764af878c)
---
.../org/apache/pulsar/functions/instance/ContextImpl.java | 2 +-
.../java/org/apache/pulsar/functions/sink/PulsarSink.java | 2 +-
.../org/apache/pulsar/functions/source/PulsarSource.java | 6 +++---
.../functions/source/SingleConsumerPulsarSource.java | 11 ++++-------
.../org/apache/pulsar/functions/source/TopicSchema.java | 15 ++++++++++++---
.../apache/pulsar/functions/source/TopicSchemaTest.java | 8 +++++---
.../tests/integration/functions/PulsarFunctionsTest.java | 8 ++++----
.../integration/functions/PulsarFunctionsTestBase.java | 2 +-
.../functions/java/PulsarFunctionsJavaTest.java | 11 +++++++----
.../functions/java/PulsarWorkerRebalanceDrainTest.java | 15 +++++++++------
.../integration/functions/utils/CommandGenerator.java | 10 +++++-----
11 files changed, 52 insertions(+), 38 deletions(-)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index d64c5f9b52d..5cbbcad24c7 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -148,7 +148,7 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
this.clientBuilder = clientBuilder;
this.client = client;
this.pulsarAdmin = pulsarAdmin;
- this.topicSchema = new TopicSchema(client);
+ this.topicSchema = new TopicSchema(client, Thread.currentThread().getContextClassLoader());
this.statsManager = statsManager;
this.producerBuilder = (ProducerBuilderImpl<?>) client.newProducer().blockIfQueueFull(true).enableBatching(true)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 8add0a78c5f..97a0ad0a2ce 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -349,7 +349,7 @@ public class PulsarSink<T> implements Sink<T> {
ComponentStatsManager stats, ClassLoader functionClassLoader) {
this.client = client;
this.pulsarSinkConfig = pulsarSinkConfig;
- this.topicSchema = new TopicSchema(client);
+ this.topicSchema = new TopicSchema(client, functionClassLoader);
this.properties = properties;
this.stats = stats;
this.functionClassLoader = functionClassLoader;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index eb67e635fcb..a6e14ab3e09 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -53,7 +53,7 @@ public abstract class PulsarSource<T> implements Source<T> {
ClassLoader functionClassLoader) {
this.pulsarClient = pulsarClient;
this.pulsarSourceConfig = pulsarSourceConfig;
- this.topicSchema = new TopicSchema(pulsarClient);
+ this.topicSchema = new TopicSchema(pulsarClient, functionClassLoader);
this.properties = properties;
this.functionClassLoader = functionClassLoader;
}
@@ -168,8 +168,8 @@ public abstract class PulsarSource<T> implements Source<T> {
Class<?> typeArg) {
PulsarSourceConsumerConfig.PulsarSourceConsumerConfigBuilder<T> consumerConfBuilder =
PulsarSourceConsumerConfig.<T>builder().isRegexPattern(conf.isRegexPattern())
- .receiverQueueSize(conf.getReceiverQueueSize())
- .consumerProperties(conf.getConsumerProperties());
+ .receiverQueueSize(conf.getReceiverQueueSize())
+ .consumerProperties(conf.getConsumerProperties());
Schema<T> schema;
if (conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty()) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java
index 426723804ca..d4d3ea00b93 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java
@@ -44,14 +44,12 @@ public class SingleConsumerPulsarSource<T> extends PulsarSource<T> {
private Consumer<T> consumer;
private final List<Consumer<T>> inputConsumers = new LinkedList<>();
- public SingleConsumerPulsarSource(PulsarClient pulsarClient,
- SingleConsumerPulsarSourceConfig pulsarSourceConfig,
- Map<String, String> properties,
- ClassLoader functionClassLoader) {
+ public SingleConsumerPulsarSource(PulsarClient pulsarClient, SingleConsumerPulsarSourceConfig pulsarSourceConfig,
+ Map<String, String> properties, ClassLoader functionClassLoader) {
super(pulsarClient, pulsarSourceConfig, properties, functionClassLoader);
this.pulsarClient = pulsarClient;
this.pulsarSourceConfig = pulsarSourceConfig;
- this.topicSchema = new TopicSchema(pulsarClient);
+ this.topicSchema = new TopicSchema(pulsarClient, functionClassLoader);
this.properties = properties;
this.functionClassLoader = functionClassLoader;
}
@@ -60,8 +58,7 @@ public class SingleConsumerPulsarSource<T> extends PulsarSource<T> {
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
log.info("Opening pulsar source with config: {}", pulsarSourceConfig);
- Class<?> typeArg = Reflections.loadClass(this.pulsarSourceConfig.getTypeClassName(),
- this.functionClassLoader);
+ Class<?> typeArg = Reflections.loadClass(this.pulsarSourceConfig.getTypeClassName(), this.functionClassLoader);
checkArgument(!Void.class.equals(typeArg), "Input type of Pulsar Function cannot be Void");
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index 57f49fed0ca..a912bc7ecf1 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -19,7 +19,11 @@
package org.apache.pulsar.functions.source;
import io.netty.buffer.ByteBuf;
+import java.net.URL;
+import java.net.URLClassLoader;
import java.nio.ByteBuffer;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -49,8 +53,13 @@ public class TopicSchema {
private final Map<String, Schema<?>> cachedSchemas = new HashMap<>();
private final PulsarClient client;
- public TopicSchema(PulsarClient client) {
+ private final ClassLoader functionsClassloader;
+
+ public TopicSchema(PulsarClient client, ClassLoader functionsClassloader) {
this.client = client;
+ this.functionsClassloader = AccessController.doPrivileged(
+ (PrivilegedAction<URLClassLoader>) () -> new URLClassLoader(new URL[0], functionsClassloader)
+ );
}
/**
@@ -244,11 +253,11 @@ public class TopicSchema {
@SuppressWarnings("unchecked")
private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, String schemaTypeOrClassName, boolean input) {
return newSchemaInstance(topic, clazz, new ConsumerConfig(schemaTypeOrClassName), input,
- Thread.currentThread().getContextClassLoader());
+ functionsClassloader);
}
@SuppressWarnings("unchecked")
private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, ConsumerConfig conf, boolean input) {
- return newSchemaInstance(topic, clazz, conf, input, Thread.currentThread().getContextClassLoader());
+ return newSchemaInstance(topic, clazz, conf, input, functionsClassloader);
}
}
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java
index 506ef67ab07..c4e77cb3ff8 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java
@@ -18,8 +18,6 @@
*/
package org.apache.pulsar.functions.source;
-import static org.testng.Assert.assertEquals;
-import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
@@ -30,12 +28,16 @@ import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.proto.Request;
import org.testng.annotations.Test;
+import java.util.Optional;
+
+import static org.testng.Assert.assertEquals;
+
@Slf4j
public class TopicSchemaTest {
@Test
public void testGetSchema() {
- TopicSchema topicSchema = new TopicSchema(null);
+ TopicSchema topicSchema = new TopicSchema(null, Thread.currentThread().getContextClassLoader());
String TOPIC = "public/default/test";
Schema<?> schema = topicSchema.getSchema(TOPIC + "1", DummyClass.class, Optional.of(SchemaType.JSON));
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 6088628aac5..1e54764ad5d 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -24,10 +24,10 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.swagger.util.Json;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@@ -43,8 +43,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-
-import io.swagger.util.Json;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -77,8 +75,8 @@ import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
import org.apache.pulsar.functions.api.examples.AvroSchemaTestFunction;
-import org.apache.pulsar.functions.api.examples.MergeTopicFunction;
import org.apache.pulsar.functions.api.examples.InitializableFunction;
+import org.apache.pulsar.functions.api.examples.MergeTopicFunction;
import org.apache.pulsar.functions.api.examples.RecordFunction;
import org.apache.pulsar.functions.api.examples.pojo.AvroTestObject;
import org.apache.pulsar.functions.api.examples.pojo.Users;
@@ -902,6 +900,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
String functionName,
String functionFile,
String functionClass,
+ Map<String, String> inputSerdeClassNames,
String outputSerdeClassName,
Map<String, String> userConfigs) throws Exception {
@@ -916,6 +915,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
}
generator.setSinkTopic(outputTopicName);
generator.setFunctionName(functionName);
+ generator.setCustomSerDeSourceTopics(inputSerdeClassNames);
generator.setOutputSerDe(outputSerdeClassName);
if (userConfigs != null) {
generator.setUserConfig(userConfigs);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 62aa36da1b6..288ced63ae5 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -54,7 +54,7 @@ public abstract class PulsarFunctionsTestBase extends PulsarTestSuite {
public static final String SERDE_JAVA_CLASS =
"org.apache.pulsar.functions.api.examples.CustomBaseToBaseFunction";
- public static final String SERDE_OUTPUT_CLASS =
+ public static final String SERDE_CLASS =
"org.apache.pulsar.functions.api.examples.CustomBaseSerde";
public static final String EXCLAMATION_PYTHON_CLASS =
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
index 097a452937d..939d6e19d1f 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
@@ -19,9 +19,9 @@
package org.apache.pulsar.tests.integration.functions.java;
import static org.testng.Assert.assertEquals;
-
import java.util.Collections;
-
+import java.util.Map;
+import org.apache.commons.collections4.map.HashedMap;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.common.policies.data.FunctionStatusUtil;
@@ -70,10 +70,13 @@ public abstract class PulsarFunctionsJavaTest extends PulsarFunctionsTest {
admin.topics().createNonPartitionedTopic(outputTopicName);
}
+ Map<String, String> inputTopicsSerde = new HashedMap<>();
+ inputTopicsSerde.put(inputTopicName, SERDE_CLASS);
+
String functionName = "test-serde-fn-" + randomName(8);
submitFunction(
- Runtime.JAVA, inputTopicName, outputTopicName, functionName, null, SERDE_JAVA_CLASS,
- SERDE_OUTPUT_CLASS, Collections.singletonMap("serde-topic", outputTopicName)
+ Runtime.JAVA, inputTopicName, outputTopicName, functionName, null, SERDE_JAVA_CLASS, inputTopicsSerde,
+ SERDE_CLASS, Collections.singletonMap("serde-topic", outputTopicName)
);
// get function info
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarWorkerRebalanceDrainTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarWorkerRebalanceDrainTest.java
index a6f54349f26..de29cbc94c2 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarWorkerRebalanceDrainTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarWorkerRebalanceDrainTest.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.tests.integration.functions.java;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import com.fasterxml.jackson.databind.MappingIterator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -26,10 +29,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
-import com.fasterxml.jackson.databind.MappingIterator;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
+import org.apache.commons.collections4.map.HashedMap;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.FunctionStatus;
@@ -41,8 +43,6 @@ import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runt
import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testng.annotations.Test;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
@Slf4j
public abstract class PulsarWorkerRebalanceDrainTest extends PulsarFunctionsTest {
@@ -251,9 +251,12 @@ public abstract class PulsarWorkerRebalanceDrainTest extends PulsarFunctionsTest
admin.topics().createNonPartitionedTopic(outputTopicName);
}
+ Map<String, String> inputTopicsSerde = new HashedMap<>();
+ inputTopicsSerde.put(inputTopicName, SERDE_CLASS);
+
submitFunction(
- Runtime.JAVA, inputTopicName, outputTopicName, functionName, null, SERDE_JAVA_CLASS,
- SERDE_OUTPUT_CLASS, Collections.singletonMap(topicPrefix, outputTopicName)
+ Runtime.JAVA, inputTopicName, outputTopicName, functionName, null, SERDE_JAVA_CLASS, inputTopicsSerde,
+ SERDE_CLASS, Collections.singletonMap(topicPrefix, outputTopicName)
);
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
index 90fac7a0552..adc791fab4d 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
@@ -46,7 +46,7 @@ public class CommandGenerator {
private String functionClassName;
private String sourceTopic;
private String sourceTopicPattern;
- private Map<String, String> customSereSourceTopics;
+ private Map<String, String> customSerDeSourceTopics;
private String sinkTopic;
private String logTopic;
private String outputSerDe;
@@ -182,8 +182,8 @@ public class CommandGenerator {
if (batchBuilder != null) {
commandBuilder.append("--batch-builder" + batchBuilder);
}
- if (customSereSourceTopics != null && !customSereSourceTopics.isEmpty()) {
- commandBuilder.append(" --customSerdeInputs \'" + new Gson().toJson(customSereSourceTopics) + "\'");
+ if (customSerDeSourceTopics != null && !customSerDeSourceTopics.isEmpty()) {
+ commandBuilder.append(" --customSerdeInputs \'" + new Gson().toJson(customSerDeSourceTopics) + "\'");
}
if (sinkTopic != null) {
commandBuilder.append(" --output " + sinkTopic);
@@ -280,8 +280,8 @@ public class CommandGenerator {
if (StringUtils.isNotEmpty(sourceTopic)) {
commandBuilder.append(" --inputs " + sourceTopic);
}
- if (customSereSourceTopics != null && !customSereSourceTopics.isEmpty()) {
- commandBuilder.append(" --customSerdeInputs \'" + new Gson().toJson(customSereSourceTopics) + "\'");
+ if (customSerDeSourceTopics != null && !customSerDeSourceTopics.isEmpty()) {
+ commandBuilder.append(" --customSerdeInputs \'" + new Gson().toJson(customSerDeSourceTopics) + "\'");
}
if (batchBuilder != null) {
commandBuilder.append("--batch-builder" + batchBuilder);