You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/04/05 00:33:56 UTC
[pulsar] branch master updated: Fixed avro schema decode error in
functions (#6662)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 52ae182 Fixed avro schema decode error in functions (#6662)
52ae182 is described below
commit 52ae1823dbcbef95637580c5f3568843232cd379
Author: ran <ga...@126.com>
AuthorDate: Sun Apr 5 08:33:42 2020 +0800
Fixed avro schema decode error in functions (#6662)
Fixes #5503
From #6445
# Motivation
In functions, it will encounter ```ClassCastException``` when using the Avro schema for topics.
```
Exception in thread "main" java.lang.ClassCastException: org.apache.pulsar.shade.org.apache.avro.generic.GenericData$Record cannot be cast to io.streamnative.KeyValueSchemaTest$Foo2
at io.streamnative.KeyValueSchemaTest.testConsumerByPythonProduce(KeyValueSchemaTest.java:412)
at io.streamnative.KeyValueSchemaTest.main(KeyValueSchemaTest.java:305)
```
# Modifications
In functions, when using Avro schema specific the ClassLoader for ReflectDatumReader.
Add integration test ```testAvroSchemaFunction``` in class ```PulsarFunctionsTest```.
---
.github/workflows/ci-unit-broker.yml | 32 ++++-
.../apache/pulsar/broker/service/ServerCnx.java | 4 +-
.../pulsar/client/impl/schema/AvroSchema.java | 27 +++--
.../client/impl/schema/reader/AvroReader.java | 33 +++++-
.../functions/instance/JavaInstanceRunnable.java | 4 +-
.../pulsar/functions/source/PulsarSource.java | 3 +-
...onFunction.java => AvroSchemaTestFunction.java} | 20 ++--
.../api/examples/WindowDurationFunction.java | 1 +
.../AvroTestObject.java} | 18 +--
.../latest-version-image/conf/bookie.conf | 2 +-
.../integration/functions/PulsarFunctionsTest.java | 131 ++++++++++++++++++++-
11 files changed, 239 insertions(+), 36 deletions(-)
diff --git a/.github/workflows/ci-unit-broker.yml b/.github/workflows/ci-unit-broker.yml
index a32d0e0..0f51341 100644
--- a/.github/workflows/ci-unit-broker.yml
+++ b/.github/workflows/ci-unit-broker.yml
@@ -57,6 +57,7 @@ jobs:
sudo apt clean
docker rmi $(docker images -q) -f
df -h
+ free -h
- name: Set up Maven
uses: apache/pulsar-test-infra/setup-maven@master
@@ -96,13 +97,42 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=ReplicatorRateLimiterTest' -pl pulsar-broker
+ - name: run unit tests pulsar broker persistent dispatcher failover consumer test
+ if: steps.docs.outputs.changed_only == 'no'
+ run: mvn test -DfailIfNoTests=false '-Dtest=PersistentDispatcherFailoverConsumerTest' -pl pulsar-broker
+
+ - name: run unit tests pulsar broker admin api test
+ if: steps.docs.outputs.changed_only == 'no'
+ run: mvn test -DfailIfNoTests=false '-Dtest=AdminApiTest' -pl pulsar-broker
+
+ - name: run unit tests pulsar broker v1 admin api test
+ if: steps.docs.outputs.changed_only == 'no'
+ run: mvn test -DfailIfNoTests=false '-Dtest=V1_AdminApiTest' -pl pulsar-broker
+
+ - name: run unit tests pulsar broker compaction test
+ if: steps.docs.outputs.changed_only == 'no'
+ run: mvn test -DfailIfNoTests=false '-Dtest=CompactionTest' -pl pulsar-broker
+
+ - name: run unit tests pulsar broker batch message test
+ if: steps.docs.outputs.changed_only == 'no'
+ run: mvn test -DfailIfNoTests=false '-Dtest=BatchMessageTest' -pl pulsar-broker
+
+ - name: run unit tests pulsar broker partitioned topics schema test
+ if: steps.docs.outputs.changed_only == 'no'
+ run: mvn test -DfailIfNoTests=false '-Dtest=PartitionedTopicsSchemaTest' -pl pulsar-broker
+
- name: run unit test pulsar-broker
if: steps.docs.outputs.changed_only == 'no'
- run: mvn test '-Dtest=!PersistentTransactionBufferTest,!PulsarFunctionE2ESecurityTest,!ServerCnxTest,!AdminApiOffloadTest,!AdminApiSchemaValidationEnforced,!V1_AdminApiTest2,!ProxyPublishConsumeTlsTest,!PulsarFunctionE2ETest,!MessageIdSerialization,!AdminApiTest2,!PulsarFunctionLocalRunTest,!PartitionedProducerConsumerTest,!KafkaProducerSimpleConsumerTest,!MessagePublishThrottlingTest,!ReaderTest,!RackAwareTest,!SimpleProducerConsumerTest,!V1_ProducerConsumerTest,!PersistentFailo [...]
+ run: |
+ df -h
+ free -h
+ mvn test -e '-Dtest=!PersistentTransactionBufferTest,!PulsarFunctionE2ESecurityTest,!ServerCnxTest,!AdminApiOffloadTest,!AdminApiSchemaValidationEnforced,!V1_AdminApiTest2,!ProxyPublishConsumeTlsTest,!PulsarFunctionE2ETest,!MessageIdSerialization,!AdminApiTest2,!PulsarFunctionLocalRunTest,!PartitionedProducerConsumerTest,!KafkaProducerSimpleConsumerTest,!MessagePublishThrottlingTest,!ReaderTest,!RackAwareTest,!SimpleProducerConsumerTest,!V1_ProducerConsumerTest,!PersistentFailo [...]
- name: package surefire artifacts
if: failure()
run: |
+ df -h
+ free -h
rm -rf artifacts
mkdir artifacts
find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 7732bd8..cb1d041 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1523,7 +1523,9 @@ public class ServerCnx extends PulsarHandler {
@Override
protected void handleGetSchema(CommandGetSchema commandGetSchema) {
if (log.isDebugEnabled()) {
- log.debug("Received CommandGetSchema call from {}", remoteAddress);
+ log.debug("Received CommandGetSchema call from {}, schemaVersion: {}, topic: {}, requestId: {}",
+ remoteAddress, new String(commandGetSchema.getSchemaVersion().toByteArray()),
+ commandGetSchema.getTopic(), commandGetSchema.getRequestId());
}
long requestId = commandGetSchema.getRequestId();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index fe801dd..4049f14 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -65,9 +65,12 @@ public class AvroSchema<T> extends StructSchema<T> {
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
}
- private AvroSchema(SchemaInfo schemaInfo) {
+ private ClassLoader pojoClassLoader;
+
+ private AvroSchema(SchemaInfo schemaInfo, ClassLoader pojoClassLoader) {
super(schemaInfo);
- setReader(new AvroReader<>(schema));
+ this.pojoClassLoader = pojoClassLoader;
+ setReader(new AvroReader<>(schema, pojoClassLoader));
setWriter(new AvroWriter<>(schema));
}
@@ -78,7 +81,7 @@ public class AvroSchema<T> extends StructSchema<T> {
@Override
public Schema<T> clone() {
- Schema<T> schema = new AvroSchema<>(schemaInfo);
+ Schema<T> schema = new AvroSchema<>(schemaInfo, pojoClassLoader);
if (schemaInfoProvider != null) {
schema.setSchemaInfoProvider(schemaInfoProvider);
}
@@ -86,7 +89,11 @@ public class AvroSchema<T> extends StructSchema<T> {
}
public static <T> AvroSchema<T> of(SchemaDefinition<T> schemaDefinition) {
- return new AvroSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.AVRO));
+ ClassLoader pojoClassLoader = null;
+ if (schemaDefinition.getPojo() != null) {
+ pojoClassLoader = schemaDefinition.getPojo().getClassLoader();
+ }
+ return new AvroSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.AVRO), pojoClassLoader);
}
public static <T> AvroSchema<T> of(Class<T> pojo) {
@@ -94,18 +101,22 @@ public class AvroSchema<T> extends StructSchema<T> {
}
public static <T> AvroSchema<T> of(Class<T> pojo, Map<String, String> properties) {
+ ClassLoader pojoClassLoader = null;
+ if (pojo != null) {
+ pojoClassLoader = pojo.getClassLoader();
+ }
SchemaDefinition<T> schemaDefinition = SchemaDefinition.<T>builder().withPojo(pojo).withProperties(properties).build();
- return new AvroSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.AVRO));
+ return new AvroSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.AVRO), pojoClassLoader);
}
@Override
protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get());
if (schemaInfo != null) {
- log.info("Load schema reader for version({}), schema is : {}",
+ log.info("Load schema reader for version({}), schema is : {}, schemaInfo: {}",
SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
- schemaInfo.getSchemaDefinition());
- return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()), schema);
+ schemaInfo.getSchemaDefinition(), schemaInfo.toString());
+ return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()), schema, pojoClassLoader);
} else {
log.warn("No schema found for version({}), use latest schema : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java
index 01278ba..8481542 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java
@@ -18,9 +18,12 @@
*/
package org.apache.pulsar.client.impl.schema.reader;
+import org.apache.avro.Conversions;
import org.apache.avro.Schema;
+import org.apache.avro.data.TimeConversions;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaReader;
@@ -41,8 +44,34 @@ public class AvroReader<T> implements SchemaReader<T> {
this.reader = new ReflectDatumReader<>(schema);
}
- public AvroReader(Schema writerSchema, Schema readerSchema) {
- this.reader = new ReflectDatumReader<>(writerSchema, readerSchema);
+ public AvroReader(Schema schema, ClassLoader classLoader) {
+ if (classLoader != null) {
+ ReflectData reflectData = new ReflectData(classLoader);
+ reflectData.addLogicalTypeConversion(new Conversions.DecimalConversion());
+ reflectData.addLogicalTypeConversion(new TimeConversions.DateConversion());
+ reflectData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
+ reflectData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
+ reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
+ reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
+ this.reader = new ReflectDatumReader<>(schema, schema, reflectData);
+ } else {
+ this.reader = new ReflectDatumReader<>(schema);
+ }
+ }
+
+ public AvroReader(Schema writerSchema, Schema readerSchema, ClassLoader classLoader) {
+ if (classLoader != null) {
+ ReflectData reflectData = new ReflectData(classLoader);
+ reflectData.addLogicalTypeConversion(new Conversions.DecimalConversion());
+ reflectData.addLogicalTypeConversion(new TimeConversions.DateConversion());
+ reflectData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
+ reflectData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
+ reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
+ reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
+ this.reader = new ReflectDatumReader<>(writerSchema, readerSchema, reflectData);
+ } else {
+ this.reader = new ReflectDatumReader<>(writerSchema, readerSchema);
+ }
}
@Override
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index c705c05..25689a9 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -317,8 +317,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
Collections.emptyList());
}
- log.info("Initialize function class loader for function {} at function cache manager",
- instanceConfig.getFunctionDetails().getName());
+ log.info("Initialize function class loader for function {} at function cache manager, functionClassLoader: {}",
+ instanceConfig.getFunctionDetails().getName(), fnCache.getClassLoader(instanceConfig.getFunctionId()));
fnClassLoader = fnCache.getClassLoader(instanceConfig.getFunctionId());
if (null == fnClassLoader) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index fe4bb3b..0d23ce1 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -68,7 +68,8 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
inputConsumers = configs.entrySet().stream().map(e -> {
String topic = e.getKey();
ConsumerConfig<T> conf = e.getValue();
- log.info("Creating consumers for topic : {}, schema : {}", topic, conf.getSchema());
+ log.info("Creating consumers for topic : {}, schema : {}, schemaInfo: {}",
+ topic, conf.getSchema(), conf.getSchema().getSchemaInfo());
ConsumerBuilder<T> cb = pulsarClient.newConsumer(conf.getSchema())
// consume message even if can't decrypt and deliver it along with encryption-ctx
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AvroSchemaTestFunction.java
similarity index 61%
copy from pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java
copy to pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AvroSchemaTestFunction.java
index 8c0da3b..e81dc5f 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AvroSchemaTestFunction.java
@@ -18,15 +18,19 @@
*/
package org.apache.pulsar.functions.api.examples;
-import java.util.Collection;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.examples.pojo.AvroTestObject;
+
+
+@Slf4j
+public class AvroSchemaTestFunction implements Function<AvroTestObject, AvroTestObject> {
-/**
- * This functions collects the timestamp during the window operation.
- */
-public class WindowDurationFunction implements java.util.function.Function<Collection<String>, String> {
@Override
- public String apply(Collection<String> integers) {
- long time = System.currentTimeMillis();
- return String.format("%s:%s", String.join(",", integers), time);
+ public AvroTestObject process(AvroTestObject input, Context context) throws Exception {
+ log.info("AvroTestObject - baseValue: {}", input.getBaseValue());
+ input.setBaseValue(input.getBaseValue() + 10);
+ return input;
}
}
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java
index 8c0da3b..105815f 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.api.examples;
import java.util.Collection;
+
/**
* This functions collects the timestamp during the window operation.
*/
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/AvroTestObject.java
similarity index 64%
copy from pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java
copy to pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/AvroTestObject.java
index 8c0da3b..43364be 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/AvroTestObject.java
@@ -16,17 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.functions.api.examples;
+package org.apache.pulsar.functions.api.examples.pojo;
+
+import lombok.Data;
-import java.util.Collection;
/**
- * This functions collects the timestamp during the window operation.
+ * Avro test object.
*/
-public class WindowDurationFunction implements java.util.function.Function<Collection<String>, String> {
- @Override
- public String apply(Collection<String> integers) {
- long time = System.currentTimeMillis();
- return String.format("%s:%s", String.join(",", integers), time);
- }
+@Data
+public class AvroTestObject {
+
+ private int baseValue;
+
}
diff --git a/tests/docker-images/latest-version-image/conf/bookie.conf b/tests/docker-images/latest-version-image/conf/bookie.conf
index 9d89d08..a71cb5c 100644
--- a/tests/docker-images/latest-version-image/conf/bookie.conf
+++ b/tests/docker-images/latest-version-image/conf/bookie.conf
@@ -22,5 +22,5 @@ autostart=false
redirect_stderr=true
stdout_logfile=/var/log/pulsar/bookie.log
directory=/pulsar
-environment=PULSAR_MEM="-Xmx128M",PULSAR_GC="-XX:+UseG1GC",dbStorage_writeCacheMaxSizeMb="16",dbStorage_readAheadCacheMaxSizeMb="16"
+environment=PULSAR_MEM="-Xmx128M -XX:MaxDirectMemorySize=512M",PULSAR_GC="-XX:+UseG1GC",dbStorage_writeCacheMaxSizeMb="16",dbStorage_readAheadCacheMaxSizeMb="16"
command=/pulsar/bin/pulsar bookie
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 0c9739e..70e9f45 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.common.naming.TopicName;
@@ -43,7 +44,10 @@ import org.apache.pulsar.common.policies.data.SinkStatus;
import org.apache.pulsar.common.policies.data.SourceStatus;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
+import org.apache.pulsar.functions.api.examples.AvroSchemaTestFunction;
+import org.apache.pulsar.functions.api.examples.pojo.AvroTestObject;
import org.apache.pulsar.functions.api.examples.serde.CustomObject;
import org.apache.pulsar.tests.integration.containers.DebeziumMongoDbContainer;
import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer;
@@ -56,6 +60,7 @@ import org.apache.pulsar.tests.integration.io.*;
import org.apache.pulsar.tests.integration.io.JdbcSinkTester.Foo;
import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.utils.DockerUtils;
import org.assertj.core.api.Assertions;
import org.testcontainers.containers.GenericContainer;
import org.testng.annotations.Test;
@@ -68,7 +73,9 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -1068,16 +1075,17 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
break;
}
String msgStr = new String(msg.getData());
- log.info("i: {} RECV: {}", i, msgStr);
+ log.info("[testWindowFunction] i: {} RECV: {}", i, msgStr);
String result = msgStr.split(":")[0];
assertThat(result).contains(expectedResults[i]);
i++;
}
- // in case last commit is not updated
- assertThat(i).isGreaterThanOrEqualTo(expectedResults.length - 1);
getFunctionStatus(functionName, NUM_OF_MESSAGES, true);
+ // in case last commit is not updated
+ assertThat(i).isGreaterThanOrEqualTo(expectedResults.length - 1);
+
deleteFunction(functionName);
getFunctionInfoNotFound(functionName);
@@ -2137,6 +2145,123 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
}
}
+ @Test(groups = "function")
+ public void testAvroSchemaFunction() throws Exception {
+ log.info("testAvroSchemaFunction start ...");
+ final String inputTopic = "test-avroschema-input-" + randomName(8);
+ final String outputTopic = "test-avroschema-output-" + randomName(8);
+ final String functionName = "test-avroschema-fn-202003241756";
+ final int numMessages = 10;
+
+ if (pulsarCluster == null) {
+ log.info("pulsarClient is null");
+ this.setupCluster();
+ this.setupFunctionWorkers();
+ }
+
+ @Cleanup PulsarClient pulsarClient = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
+ log.info("pulsar client init - input: {}, output: {}", inputTopic, outputTopic);
+
+ @Cleanup Producer<AvroTestObject> producer = pulsarClient
+ .newProducer(Schema.AVRO(AvroTestObject.class))
+ .topic(inputTopic).create();
+ log.info("pulsar producer init - {}", inputTopic);
+
+ @Cleanup Consumer<AvroTestObject> consumer = pulsarClient
+ .newConsumer(Schema.AVRO(AvroTestObject.class))
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscriptionName("test-avro-schema")
+ .topic(outputTopic)
+ .subscribe();
+ log.info("pulsar consumer init - {}", outputTopic);
+
+ CompletableFuture<Optional<SchemaInfo>> inputSchemaFuture =
+ ((PulsarClientImpl) pulsarClient).getSchema(inputTopic);
+ inputSchemaFuture.whenComplete((schemaInfo, throwable) -> {
+ if (schemaInfo.isPresent()) {
+ log.info("inputSchemaInfo: {}", schemaInfo.get().toString());
+ } else {
+ log.error("input schema is not present!");
+ }
+ });
+
+ CompletableFuture<Optional<SchemaInfo>> outputSchemaFuture =
+ ((PulsarClientImpl) pulsarClient).getSchema(outputTopic);
+ outputSchemaFuture.whenComplete((schemaInfo, throwable) -> {
+ if (throwable != null) {
+ log.error("get output schemaInfo error", throwable);
+ throwable.printStackTrace();
+ return;
+ }
+ if (schemaInfo.isPresent()) {
+ log.info("outputSchemaInfo: {}", schemaInfo.get().toString());
+ } else {
+ log.error("output schema is not present!");
+ }
+ });
+
+ submitFunction(
+ Runtime.JAVA,
+ inputTopic,
+ outputTopic,
+ functionName,
+ null,
+ AvroSchemaTestFunction.class.getName(),
+ Schema.AVRO(AvroTestObject.class));
+ log.info("pulsar submitFunction");
+
+ getFunctionInfoSuccess(functionName);
+
+ AvroSchemaTestFunction function = new AvroSchemaTestFunction();
+ Set<Object> expectedSet = new HashSet<>();
+
+ log.info("test-avro-schema producer connected: " + producer.isConnected());
+ for (int i = 0 ; i < numMessages ; i++) {
+ AvroTestObject inputObject = new AvroTestObject();
+ inputObject.setBaseValue(i);
+ MessageId messageId = producer.send(inputObject);
+ log.info("test-avro-schema messageId: {}", messageId.toString());
+ expectedSet.add(function.process(inputObject, null));
+ log.info("test-avro-schema expectedSet size: {}", expectedSet.size());
+ }
+ getFunctionStatus(functionName, numMessages, false);
+ log.info("test-avro-schema producer send message finish");
+
+ CompletableFuture<Optional<SchemaInfo>> outputSchemaFuture2 =
+ ((PulsarClientImpl) pulsarClient).getSchema(outputTopic);
+ outputSchemaFuture2.whenComplete((schemaInfo, throwable) -> {
+ if (throwable != null) {
+ log.error("get output schemaInfo error", throwable);
+ throwable.printStackTrace();
+ return;
+ }
+ if (schemaInfo.isPresent()) {
+ log.info("outputSchemaInfo: {}", schemaInfo.get().toString());
+ } else {
+ log.error("output schema is not present!");
+ }
+ });
+
+ log.info("test-avro-schema consumer connected: " + consumer.isConnected());
+ for (int i = 0 ; i < numMessages ; i++) {
+ log.info("test-avro-schema consumer receive [{}] start", i);
+ Message<AvroTestObject> message = consumer.receive();
+ log.info("test-avro-schema consumer receive [{}] over", i);
+ AvroTestObject outputObject = message.getValue();
+ assertTrue(expectedSet.contains(outputObject));
+ expectedSet.remove(outputObject);
+ consumer.acknowledge(message);
+ }
+ log.info("test-avro-schema consumer receive message finish");
+
+ assertEquals(expectedSet.size(), 0);
+
+ deleteFunction(functionName);
+
+ getFunctionInfoNotFound(functionName);
+ }
+
private void testDebeziumMySqlConnect()
throws Exception {