You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yu...@apache.org on 2023/05/30 12:26:07 UTC

[pulsar] branch branch-3.0 updated: [improve][fn] Use functions classloader in TopicSchema.newSchemaInstance() to fix ClassNotFoundException when using custom SerDe classes. (targeted for master) (#20115)

This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d4b55f45362 [improve][fn] Use functions classloader in TopicSchema.newSchemaInstance() to fix ClassNotFoundException when using custom SerDe classes. (targeted for master) (#20115)
d4b55f45362 is described below

commit d4b55f45362089005a2259857d23c2f690f23bde
Author: Gabriel Miklós <ga...@getbridge.com>
AuthorDate: Thu May 18 17:23:55 2023 +0200

    [improve][fn] Use functions classloader in TopicSchema.newSchemaInstance() to fix ClassNotFoundException when using custom SerDe classes. (targeted for master) (#20115)
    
    (cherry picked from commit 43a989862f548fa3f67708a5fff62eb764af878c)
---
 .../org/apache/pulsar/functions/instance/ContextImpl.java |  2 +-
 .../java/org/apache/pulsar/functions/sink/PulsarSink.java |  2 +-
 .../org/apache/pulsar/functions/source/PulsarSource.java  |  6 +++---
 .../functions/source/SingleConsumerPulsarSource.java      | 11 ++++-------
 .../org/apache/pulsar/functions/source/TopicSchema.java   | 15 ++++++++++++---
 .../apache/pulsar/functions/source/TopicSchemaTest.java   |  8 +++++---
 .../tests/integration/functions/PulsarFunctionsTest.java  |  8 ++++----
 .../integration/functions/PulsarFunctionsTestBase.java    |  2 +-
 .../functions/java/PulsarFunctionsJavaTest.java           | 11 +++++++----
 .../functions/java/PulsarWorkerRebalanceDrainTest.java    | 15 +++++++++------
 .../integration/functions/utils/CommandGenerator.java     | 10 +++++-----
 11 files changed, 52 insertions(+), 38 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index d64c5f9b52d..5cbbcad24c7 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -148,7 +148,7 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
         this.clientBuilder = clientBuilder;
         this.client = client;
         this.pulsarAdmin = pulsarAdmin;
-        this.topicSchema = new TopicSchema(client);
+        this.topicSchema = new TopicSchema(client, Thread.currentThread().getContextClassLoader());
         this.statsManager = statsManager;
 
         this.producerBuilder = (ProducerBuilderImpl<?>) client.newProducer().blockIfQueueFull(true).enableBatching(true)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 8add0a78c5f..97a0ad0a2ce 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -349,7 +349,7 @@ public class PulsarSink<T> implements Sink<T> {
                       ComponentStatsManager stats, ClassLoader functionClassLoader) {
         this.client = client;
         this.pulsarSinkConfig = pulsarSinkConfig;
-        this.topicSchema = new TopicSchema(client);
+        this.topicSchema = new TopicSchema(client, functionClassLoader);
         this.properties = properties;
         this.stats = stats;
         this.functionClassLoader = functionClassLoader;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index eb67e635fcb..a6e14ab3e09 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -53,7 +53,7 @@ public abstract class PulsarSource<T> implements Source<T> {
                            ClassLoader functionClassLoader) {
         this.pulsarClient = pulsarClient;
         this.pulsarSourceConfig = pulsarSourceConfig;
-        this.topicSchema = new TopicSchema(pulsarClient);
+        this.topicSchema = new TopicSchema(pulsarClient, functionClassLoader);
         this.properties = properties;
         this.functionClassLoader = functionClassLoader;
     }
@@ -168,8 +168,8 @@ public abstract class PulsarSource<T> implements Source<T> {
                                                                             Class<?> typeArg) {
         PulsarSourceConsumerConfig.PulsarSourceConsumerConfigBuilder<T> consumerConfBuilder =
                 PulsarSourceConsumerConfig.<T>builder().isRegexPattern(conf.isRegexPattern())
-                .receiverQueueSize(conf.getReceiverQueueSize())
-                .consumerProperties(conf.getConsumerProperties());
+                        .receiverQueueSize(conf.getReceiverQueueSize())
+                        .consumerProperties(conf.getConsumerProperties());
 
         Schema<T> schema;
         if (conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty()) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java
index 426723804ca..d4d3ea00b93 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java
@@ -44,14 +44,12 @@ public class SingleConsumerPulsarSource<T> extends PulsarSource<T> {
     private Consumer<T> consumer;
     private final List<Consumer<T>> inputConsumers = new LinkedList<>();
 
-    public SingleConsumerPulsarSource(PulsarClient pulsarClient,
-                                      SingleConsumerPulsarSourceConfig pulsarSourceConfig,
-                                      Map<String, String> properties,
-                                      ClassLoader functionClassLoader) {
+    public SingleConsumerPulsarSource(PulsarClient pulsarClient, SingleConsumerPulsarSourceConfig pulsarSourceConfig,
+                                      Map<String, String> properties, ClassLoader functionClassLoader) {
         super(pulsarClient, pulsarSourceConfig, properties, functionClassLoader);
         this.pulsarClient = pulsarClient;
         this.pulsarSourceConfig = pulsarSourceConfig;
-        this.topicSchema = new TopicSchema(pulsarClient);
+        this.topicSchema = new TopicSchema(pulsarClient, functionClassLoader);
         this.properties = properties;
         this.functionClassLoader = functionClassLoader;
     }
@@ -60,8 +58,7 @@ public class SingleConsumerPulsarSource<T> extends PulsarSource<T> {
     public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
         log.info("Opening pulsar source with config: {}", pulsarSourceConfig);
 
-        Class<?> typeArg = Reflections.loadClass(this.pulsarSourceConfig.getTypeClassName(),
-                this.functionClassLoader);
+        Class<?> typeArg = Reflections.loadClass(this.pulsarSourceConfig.getTypeClassName(), this.functionClassLoader);
 
         checkArgument(!Void.class.equals(typeArg), "Input type of Pulsar Function cannot be Void");
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index 57f49fed0ca..a912bc7ecf1 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -19,7 +19,11 @@
 package org.apache.pulsar.functions.source;
 
 import io.netty.buffer.ByteBuf;
+import java.net.URL;
+import java.net.URLClassLoader;
 import java.nio.ByteBuffer;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -49,8 +53,13 @@ public class TopicSchema {
     private final Map<String, Schema<?>> cachedSchemas = new HashMap<>();
     private final PulsarClient client;
 
-    public TopicSchema(PulsarClient client) {
+    private final ClassLoader functionsClassloader;
+
+    public TopicSchema(PulsarClient client, ClassLoader functionsClassloader) {
         this.client = client;
+        this.functionsClassloader = AccessController.doPrivileged(
+                (PrivilegedAction<URLClassLoader>) () -> new URLClassLoader(new URL[0], functionsClassloader)
+        );
     }
 
     /**
@@ -244,11 +253,11 @@ public class TopicSchema {
     @SuppressWarnings("unchecked")
     private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, String schemaTypeOrClassName, boolean input) {
         return newSchemaInstance(topic, clazz, new ConsumerConfig(schemaTypeOrClassName), input,
-                Thread.currentThread().getContextClassLoader());
+                functionsClassloader);
     }
 
     @SuppressWarnings("unchecked")
     private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, ConsumerConfig conf, boolean input) {
-        return newSchemaInstance(topic, clazz, conf, input, Thread.currentThread().getContextClassLoader());
+        return newSchemaInstance(topic, clazz, conf, input, functionsClassloader);
     }
 }
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java
index 506ef67ab07..c4e77cb3ff8 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.functions.source;
 
-import static org.testng.Assert.assertEquals;
-import java.util.Optional;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
@@ -30,12 +28,16 @@ import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.proto.Request;
 import org.testng.annotations.Test;
 
+import java.util.Optional;
+
+import static org.testng.Assert.assertEquals;
+
 @Slf4j
 public class TopicSchemaTest {
 
     @Test
     public void testGetSchema() {
-        TopicSchema topicSchema = new TopicSchema(null);
+        TopicSchema topicSchema = new TopicSchema(null, Thread.currentThread().getContextClassLoader());
 
         String TOPIC = "public/default/test";
         Schema<?> schema = topicSchema.getSchema(TOPIC + "1", DummyClass.class, Optional.of(SchemaType.JSON));
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 6088628aac5..1e54764ad5d 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -24,10 +24,10 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.swagger.util.Json;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -43,8 +43,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-
-import io.swagger.util.Json;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -77,8 +75,8 @@ import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
 import org.apache.pulsar.functions.api.examples.AvroSchemaTestFunction;
-import org.apache.pulsar.functions.api.examples.MergeTopicFunction;
 import org.apache.pulsar.functions.api.examples.InitializableFunction;
+import org.apache.pulsar.functions.api.examples.MergeTopicFunction;
 import org.apache.pulsar.functions.api.examples.RecordFunction;
 import org.apache.pulsar.functions.api.examples.pojo.AvroTestObject;
 import org.apache.pulsar.functions.api.examples.pojo.Users;
@@ -902,6 +900,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
                                       String functionName,
                                       String functionFile,
                                       String functionClass,
+                                      Map<String, String> inputSerdeClassNames,
                                       String outputSerdeClassName,
                                       Map<String, String> userConfigs) throws Exception {
 
@@ -916,6 +915,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         }
         generator.setSinkTopic(outputTopicName);
         generator.setFunctionName(functionName);
+        generator.setCustomSerDeSourceTopics(inputSerdeClassNames);
         generator.setOutputSerDe(outputSerdeClassName);
         if (userConfigs != null) {
             generator.setUserConfig(userConfigs);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 62aa36da1b6..288ced63ae5 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -54,7 +54,7 @@ public abstract class PulsarFunctionsTestBase extends PulsarTestSuite {
     public static final String SERDE_JAVA_CLASS =
             "org.apache.pulsar.functions.api.examples.CustomBaseToBaseFunction";
 
-    public static final String SERDE_OUTPUT_CLASS =
+    public static final String SERDE_CLASS =
             "org.apache.pulsar.functions.api.examples.CustomBaseSerde";
 
     public static final String EXCLAMATION_PYTHON_CLASS =
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
index 097a452937d..939d6e19d1f 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
@@ -19,9 +19,9 @@
 package org.apache.pulsar.tests.integration.functions.java;
 
 import static org.testng.Assert.assertEquals;
-
 import java.util.Collections;
-
+import java.util.Map;
+import org.apache.commons.collections4.map.HashedMap;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.common.policies.data.FunctionStatusUtil;
@@ -70,10 +70,13 @@ public abstract class PulsarFunctionsJavaTest extends PulsarFunctionsTest {
             admin.topics().createNonPartitionedTopic(outputTopicName);
         }
 
+        Map<String, String> inputTopicsSerde = new HashedMap<>();
+        inputTopicsSerde.put(inputTopicName, SERDE_CLASS);
+
         String functionName = "test-serde-fn-" + randomName(8);
         submitFunction(
-                Runtime.JAVA, inputTopicName, outputTopicName, functionName, null, SERDE_JAVA_CLASS,
-                SERDE_OUTPUT_CLASS, Collections.singletonMap("serde-topic", outputTopicName)
+                Runtime.JAVA, inputTopicName, outputTopicName, functionName, null, SERDE_JAVA_CLASS, inputTopicsSerde,
+                SERDE_CLASS, Collections.singletonMap("serde-topic", outputTopicName)
         );
 
         // get function info
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarWorkerRebalanceDrainTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarWorkerRebalanceDrainTest.java
index a6f54349f26..de29cbc94c2 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarWorkerRebalanceDrainTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarWorkerRebalanceDrainTest.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pulsar.tests.integration.functions.java;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import com.fasterxml.jackson.databind.MappingIterator;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -26,10 +29,9 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
-import com.fasterxml.jackson.databind.MappingIterator;
 import lombok.extern.slf4j.Slf4j;
 import lombok.val;
+import org.apache.commons.collections4.map.HashedMap;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
@@ -41,8 +43,6 @@ import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runt
 import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testng.annotations.Test;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
 
 @Slf4j
 public abstract class PulsarWorkerRebalanceDrainTest extends PulsarFunctionsTest {
@@ -251,9 +251,12 @@ public abstract class PulsarWorkerRebalanceDrainTest extends PulsarFunctionsTest
             admin.topics().createNonPartitionedTopic(outputTopicName);
         }
 
+        Map<String, String> inputTopicsSerde = new HashedMap<>();
+        inputTopicsSerde.put(inputTopicName, SERDE_CLASS);
+
         submitFunction(
-                Runtime.JAVA, inputTopicName, outputTopicName, functionName, null, SERDE_JAVA_CLASS,
-                SERDE_OUTPUT_CLASS, Collections.singletonMap(topicPrefix, outputTopicName)
+                Runtime.JAVA, inputTopicName, outputTopicName, functionName, null, SERDE_JAVA_CLASS, inputTopicsSerde,
+                SERDE_CLASS, Collections.singletonMap(topicPrefix, outputTopicName)
         );
     }
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
index 90fac7a0552..adc791fab4d 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
@@ -46,7 +46,7 @@ public class CommandGenerator {
     private String functionClassName;
     private String sourceTopic;
     private String sourceTopicPattern;
-    private Map<String, String> customSereSourceTopics;
+    private Map<String, String> customSerDeSourceTopics;
     private String sinkTopic;
     private String logTopic;
     private String outputSerDe;
@@ -182,8 +182,8 @@ public class CommandGenerator {
         if (batchBuilder != null) {
             commandBuilder.append("--batch-builder" + batchBuilder);
         }
-        if (customSereSourceTopics != null && !customSereSourceTopics.isEmpty()) {
-            commandBuilder.append(" --customSerdeInputs \'" + new Gson().toJson(customSereSourceTopics) + "\'");
+        if (customSerDeSourceTopics != null && !customSerDeSourceTopics.isEmpty()) {
+            commandBuilder.append(" --customSerdeInputs \'" + new Gson().toJson(customSerDeSourceTopics) + "\'");
         }
         if (sinkTopic != null) {
             commandBuilder.append(" --output " + sinkTopic);
@@ -280,8 +280,8 @@ public class CommandGenerator {
         if (StringUtils.isNotEmpty(sourceTopic)) {
             commandBuilder.append(" --inputs " + sourceTopic);
         }
-        if (customSereSourceTopics != null && !customSereSourceTopics.isEmpty()) {
-            commandBuilder.append(" --customSerdeInputs \'" + new Gson().toJson(customSereSourceTopics) + "\'");
+        if (customSerDeSourceTopics != null && !customSerDeSourceTopics.isEmpty()) {
+            commandBuilder.append(" --customSerdeInputs \'" + new Gson().toJson(customSerDeSourceTopics) + "\'");
         }
         if (batchBuilder != null) {
             commandBuilder.append("--batch-builder" + batchBuilder);