You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/08/12 03:29:54 UTC

[pulsar] 18/20: Fix: Cast exception occurs if function/source/sink type is ByteBuffer (#11611)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 33c9fb77a8391e2a68cccb8d821f66c76f071851
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Mon Aug 9 19:01:42 2021 -0700

    Fix: Cast exception occurs if function/source/sink type is ByteBuffer (#11611)
    
    Co-authored-by: Jerry Peng <je...@splunk.com>
    (cherry picked from commit b7e027b7290eef2f2daf22dde57395ede1c00985)
---
 .../worker/PulsarFunctionLocalRunTest.java         | 51 +++++++++++++++++++---
 .../pulsar/functions/source/TopicSchema.java       |  6 ++-
 2 files changed, 51 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index 9477af4..232f7dc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -37,6 +37,7 @@ import java.lang.reflect.Method;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.util.Arrays;
 import java.util.Collections;
@@ -74,19 +75,20 @@ import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.policies.data.PublisherStats;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.LocalRunner;
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
 import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -851,7 +853,7 @@ public class PulsarFunctionLocalRunTest {
         runWithNarClassLoader(() -> testPulsarSourceLocalRun(null, 2));
     }
 
-    private void testPulsarSinkLocalRun(String jarFilePathUrl, int parallelism) throws Exception {
+    private void testPulsarSinkLocalRun(String jarFilePathUrl, int parallelism, String className) throws Exception {
         final String namespacePortion = "io";
         final String replNamespace = tenant + "/" + namespacePortion;
         final String sourceTopic = "persistent://" + replNamespace + "/input";
@@ -869,7 +871,9 @@ public class PulsarFunctionLocalRunTest {
         SinkConfig sinkConfig = createSinkConfig(tenant, namespacePortion, sinkName, sourceTopic, subscriptionName);
 
         sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().receiverQueueSize(1000).build()));
-        if (jarFilePathUrl == null || !jarFilePathUrl.endsWith(".nar")) {
+        if (className != null) {
+            sinkConfig.setClassName(className);
+        } else if (jarFilePathUrl == null || !jarFilePathUrl.endsWith(".nar")) {
             sinkConfig.setClassName("org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink");
         }
 
@@ -935,6 +939,14 @@ public class PulsarFunctionLocalRunTest {
                 if (m != null) {
                     metricsMap.put(m.tags.get("instance_id"), m);
                 }
+            } else if (line.startsWith("pulsar_sink_sink_exceptions_total")) {
+                Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(line);
+                assertFalse(metrics.isEmpty());
+                PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_sink_exceptions_total");
+                if (m == null) {
+                    m = metrics.get("pulsar_sink_sink_exceptions_total_1min");
+                }
+                assertEquals(m.value, 0);
             }
         });
         Assert.assertEquals(metricsMap.size(), parallelism);
@@ -972,7 +984,11 @@ public class PulsarFunctionLocalRunTest {
     }
 
     private void testPulsarSinkLocalRun(String jarFilePathUrl) throws Exception {
-        testPulsarSourceLocalRun(jarFilePathUrl, 1);
+        testPulsarSinkLocalRun(jarFilePathUrl, 1);
+    }
+
+    private void testPulsarSinkLocalRun(String jarFilePathUrl, int parallelism) throws Exception {
+        testPulsarSinkLocalRun(jarFilePathUrl, parallelism, null);
     }
 
     @Test(timeOut = 20000, groups = "builtin")
@@ -1001,6 +1017,31 @@ public class PulsarFunctionLocalRunTest {
         runWithNarClassLoader(() -> testPulsarSinkLocalRun(null, 2));
     }
 
+    public static class StatsNullSink implements Sink<ByteBuffer> {
+        volatile long bytesTotal = 0;
+
+        @Override
+        public void open(Map map, final SinkContext sinkContext) throws Exception {
+
+        }
+
+        @Override
+        public void write(Record<ByteBuffer> record) throws Exception {
+            bytesTotal += record.getValue().capacity();
+            record.ack();
+        }
+
+        @Override
+        public void close() throws Exception {
+
+        }
+    }
+
+    @Test
+    public void test() throws Throwable{
+        runWithNarClassLoader(() -> testPulsarSinkLocalRun(null, 1, StatsNullSink.class.getName()));
+    }
+
     private void runWithNarClassLoader(Assert.ThrowingRunnable throwingRunnable) throws Throwable {
         ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
         try (NarClassLoader classLoader = NarClassLoader.getFromArchive(getPulsarIODataGeneratorNar(), Collections.emptySet(), originalClassLoader, NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR)) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index caf7ff8..dcd424e 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -145,7 +145,11 @@ public class TopicSchema {
     private static <T> Schema<T> newSchemaInstance(Class<T> clazz, SchemaType type, ConsumerConfig conf) {
         switch (type) {
         case NONE:
-            return (Schema<T>) Schema.BYTES;
+            if (ByteBuffer.class.isAssignableFrom(clazz)) {
+                return (Schema<T>) Schema.BYTEBUFFER;
+            } else {
+                return (Schema<T>) Schema.BYTES;
+            }
 
         case AUTO_CONSUME:
         case AUTO: