You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/08/12 03:29:54 UTC
[pulsar] 18/20: Fix: Cast exception occurs if function/source/sink
type is ByteBuffer (#11611)
This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 33c9fb77a8391e2a68cccb8d821f66c76f071851
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Mon Aug 9 19:01:42 2021 -0700
Fix: Cast exception occurs if function/source/sink type is ByteBuffer (#11611)
Co-authored-by: Jerry Peng <je...@splunk.com>
(cherry picked from commit b7e027b7290eef2f2daf22dde57395ede1c00985)
---
.../worker/PulsarFunctionLocalRunTest.java | 51 +++++++++++++++++++---
.../pulsar/functions/source/TopicSchema.java | 6 ++-
2 files changed, 51 insertions(+), 6 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index 9477af4..232f7dc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -37,6 +37,7 @@ import java.lang.reflect.Method;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
+import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collections;
@@ -74,19 +75,20 @@ import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.LocalRunner;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -851,7 +853,7 @@ public class PulsarFunctionLocalRunTest {
runWithNarClassLoader(() -> testPulsarSourceLocalRun(null, 2));
}
- private void testPulsarSinkLocalRun(String jarFilePathUrl, int parallelism) throws Exception {
+ private void testPulsarSinkLocalRun(String jarFilePathUrl, int parallelism, String className) throws Exception {
final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
final String sourceTopic = "persistent://" + replNamespace + "/input";
@@ -869,7 +871,9 @@ public class PulsarFunctionLocalRunTest {
SinkConfig sinkConfig = createSinkConfig(tenant, namespacePortion, sinkName, sourceTopic, subscriptionName);
sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().receiverQueueSize(1000).build()));
- if (jarFilePathUrl == null || !jarFilePathUrl.endsWith(".nar")) {
+ if (className != null) {
+ sinkConfig.setClassName(className);
+ } else if (jarFilePathUrl == null || !jarFilePathUrl.endsWith(".nar")) {
sinkConfig.setClassName("org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink");
}
@@ -935,6 +939,14 @@ public class PulsarFunctionLocalRunTest {
if (m != null) {
metricsMap.put(m.tags.get("instance_id"), m);
}
+ } else if (line.startsWith("pulsar_sink_sink_exceptions_total")) {
+ Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(line);
+ assertFalse(metrics.isEmpty());
+ PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_sink_exceptions_total");
+ if (m == null) {
+ m = metrics.get("pulsar_sink_sink_exceptions_total_1min");
+ }
+ assertEquals(m.value, 0);
}
});
Assert.assertEquals(metricsMap.size(), parallelism);
@@ -972,7 +984,11 @@ public class PulsarFunctionLocalRunTest {
}
private void testPulsarSinkLocalRun(String jarFilePathUrl) throws Exception {
- testPulsarSourceLocalRun(jarFilePathUrl, 1);
+ testPulsarSinkLocalRun(jarFilePathUrl, 1);
+ }
+
+ private void testPulsarSinkLocalRun(String jarFilePathUrl, int parallelism) throws Exception {
+ testPulsarSinkLocalRun(jarFilePathUrl, parallelism, null);
}
@Test(timeOut = 20000, groups = "builtin")
@@ -1001,6 +1017,31 @@ public class PulsarFunctionLocalRunTest {
runWithNarClassLoader(() -> testPulsarSinkLocalRun(null, 2));
}
+ public static class StatsNullSink implements Sink<ByteBuffer> {
+ volatile long bytesTotal = 0;
+
+ @Override
+ public void open(Map map, final SinkContext sinkContext) throws Exception {
+
+ }
+
+ @Override
+ public void write(Record<ByteBuffer> record) throws Exception {
+ bytesTotal += record.getValue().capacity();
+ record.ack();
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ }
+ }
+
+ @Test
+ public void test() throws Throwable{
+ runWithNarClassLoader(() -> testPulsarSinkLocalRun(null, 1, StatsNullSink.class.getName()));
+ }
+
private void runWithNarClassLoader(Assert.ThrowingRunnable throwingRunnable) throws Throwable {
ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
try (NarClassLoader classLoader = NarClassLoader.getFromArchive(getPulsarIODataGeneratorNar(), Collections.emptySet(), originalClassLoader, NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR)) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index caf7ff8..dcd424e 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -145,7 +145,11 @@ public class TopicSchema {
private static <T> Schema<T> newSchemaInstance(Class<T> clazz, SchemaType type, ConsumerConfig conf) {
switch (type) {
case NONE:
- return (Schema<T>) Schema.BYTES;
+ if (ByteBuffer.class.isAssignableFrom(clazz)) {
+ return (Schema<T>) Schema.BYTEBUFFER;
+ } else {
+ return (Schema<T>) Schema.BYTES;
+ }
case AUTO_CONSUME:
case AUTO: