You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/04/05 00:33:56 UTC

[pulsar] branch master updated: Fixed avro schema decode error in functions (#6662)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 52ae182  Fixed avro schema decode error in functions (#6662)
52ae182 is described below

commit 52ae1823dbcbef95637580c5f3568843232cd379
Author: ran <ga...@126.com>
AuthorDate: Sun Apr 5 08:33:42 2020 +0800

    Fixed avro schema decode error in functions (#6662)
    
    Fixes #5503
    From #6445
    
    # Motivation
    In functions, it will encounter ```ClassCastException``` when using the Avro schema for topics.
    
    ```
    Exception in thread "main" java.lang.ClassCastException: org.apache.pulsar.shade.org.apache.avro.generic.GenericData$Record cannot be cast to io.streamnative.KeyValueSchemaTest$Foo2
    	at io.streamnative.KeyValueSchemaTest.testConsumerByPythonProduce(KeyValueSchemaTest.java:412)
    	at io.streamnative.KeyValueSchemaTest.main(KeyValueSchemaTest.java:305)
    ```
    
    # Modifications
    In functions, when using Avro schema specific the ClassLoader for ReflectDatumReader.
    
    Add integration test ```testAvroSchemaFunction``` in class ```PulsarFunctionsTest```.
---
 .github/workflows/ci-unit-broker.yml               |  32 ++++-
 .../apache/pulsar/broker/service/ServerCnx.java    |   4 +-
 .../pulsar/client/impl/schema/AvroSchema.java      |  27 +++--
 .../client/impl/schema/reader/AvroReader.java      |  33 +++++-
 .../functions/instance/JavaInstanceRunnable.java   |   4 +-
 .../pulsar/functions/source/PulsarSource.java      |   3 +-
 ...onFunction.java => AvroSchemaTestFunction.java} |  20 ++--
 .../api/examples/WindowDurationFunction.java       |   1 +
 .../AvroTestObject.java}                           |  18 +--
 .../latest-version-image/conf/bookie.conf          |   2 +-
 .../integration/functions/PulsarFunctionsTest.java | 131 ++++++++++++++++++++-
 11 files changed, 239 insertions(+), 36 deletions(-)

diff --git a/.github/workflows/ci-unit-broker.yml b/.github/workflows/ci-unit-broker.yml
index a32d0e0..0f51341 100644
--- a/.github/workflows/ci-unit-broker.yml
+++ b/.github/workflows/ci-unit-broker.yml
@@ -57,6 +57,7 @@ jobs:
           sudo apt clean
           docker rmi $(docker images -q) -f
           df -h
+          free -h
 
       - name: Set up Maven
         uses: apache/pulsar-test-infra/setup-maven@master
@@ -96,13 +97,42 @@ jobs:
         if: steps.docs.outputs.changed_only == 'no'
         run: mvn test -DfailIfNoTests=false '-Dtest=ReplicatorRateLimiterTest' -pl pulsar-broker
 
+      - name: run unit tests pulsar broker persistent dispatcher failover consumer test
+        if: steps.docs.outputs.changed_only == 'no'
+        run: mvn test -DfailIfNoTests=false '-Dtest=PersistentDispatcherFailoverConsumerTest' -pl pulsar-broker
+
+      - name: run unit tests pulsar broker admin api test
+        if: steps.docs.outputs.changed_only == 'no'
+        run: mvn test -DfailIfNoTests=false '-Dtest=AdminApiTest' -pl pulsar-broker
+
+      - name: run unit tests pulsar broker v1 admin api test
+        if: steps.docs.outputs.changed_only == 'no'
+        run: mvn test -DfailIfNoTests=false '-Dtest=V1_AdminApiTest' -pl pulsar-broker
+
+      - name: run unit tests pulsar broker compaction test
+        if: steps.docs.outputs.changed_only == 'no'
+        run: mvn test -DfailIfNoTests=false '-Dtest=CompactionTest' -pl pulsar-broker
+
+      - name: run unit tests pulsar broker batch message test
+        if: steps.docs.outputs.changed_only == 'no'
+        run: mvn test -DfailIfNoTests=false '-Dtest=BatchMessageTest' -pl pulsar-broker
+        
+      - name: run unit tests pulsar broker partitioned topics schema test
+        if: steps.docs.outputs.changed_only == 'no'
+        run: mvn test -DfailIfNoTests=false '-Dtest=PartitionedTopicsSchemaTest' -pl pulsar-broker
+        
       - name: run unit test pulsar-broker
         if: steps.docs.outputs.changed_only == 'no'
-        run: mvn test '-Dtest=!PersistentTransactionBufferTest,!PulsarFunctionE2ESecurityTest,!ServerCnxTest,!AdminApiOffloadTest,!AdminApiSchemaValidationEnforced,!V1_AdminApiTest2,!ProxyPublishConsumeTlsTest,!PulsarFunctionE2ETest,!MessageIdSerialization,!AdminApiTest2,!PulsarFunctionLocalRunTest,!PartitionedProducerConsumerTest,!KafkaProducerSimpleConsumerTest,!MessagePublishThrottlingTest,!ReaderTest,!RackAwareTest,!SimpleProducerConsumerTest,!V1_ProducerConsumerTest,!PersistentFailo [...]
+        run: |
+          df -h
+          free -h
+          mvn test -e '-Dtest=!PersistentTransactionBufferTest,!PulsarFunctionE2ESecurityTest,!ServerCnxTest,!AdminApiOffloadTest,!AdminApiSchemaValidationEnforced,!V1_AdminApiTest2,!ProxyPublishConsumeTlsTest,!PulsarFunctionE2ETest,!MessageIdSerialization,!AdminApiTest2,!PulsarFunctionLocalRunTest,!PartitionedProducerConsumerTest,!KafkaProducerSimpleConsumerTest,!MessagePublishThrottlingTest,!ReaderTest,!RackAwareTest,!SimpleProducerConsumerTest,!V1_ProducerConsumerTest,!PersistentFailo [...]
 
       - name: package surefire artifacts
         if: failure()
         run: |
+          df -h
+          free -h
           rm -rf artifacts
           mkdir artifacts
           find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 7732bd8..cb1d041 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1523,7 +1523,9 @@ public class ServerCnx extends PulsarHandler {
     @Override
     protected void handleGetSchema(CommandGetSchema commandGetSchema) {
         if (log.isDebugEnabled()) {
-            log.debug("Received CommandGetSchema call from {}", remoteAddress);
+            log.debug("Received CommandGetSchema call from {}, schemaVersion: {}, topic: {}, requestId: {}",
+                    remoteAddress, new String(commandGetSchema.getSchemaVersion().toByteArray()),
+                    commandGetSchema.getTopic(), commandGetSchema.getRequestId());
         }
 
         long requestId = commandGetSchema.getRequestId();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index fe801dd..4049f14 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -65,9 +65,12 @@ public class AvroSchema<T> extends StructSchema<T> {
         reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
     }
 
-    private AvroSchema(SchemaInfo schemaInfo) {
+    private ClassLoader pojoClassLoader;
+
+    private AvroSchema(SchemaInfo schemaInfo, ClassLoader pojoClassLoader) {
         super(schemaInfo);
-        setReader(new AvroReader<>(schema));
+        this.pojoClassLoader = pojoClassLoader;
+        setReader(new AvroReader<>(schema, pojoClassLoader));
         setWriter(new AvroWriter<>(schema));
     }
 
@@ -78,7 +81,7 @@ public class AvroSchema<T> extends StructSchema<T> {
 
     @Override
     public Schema<T> clone() {
-        Schema<T> schema = new AvroSchema<>(schemaInfo);
+        Schema<T> schema = new AvroSchema<>(schemaInfo, pojoClassLoader);
         if (schemaInfoProvider != null) {
             schema.setSchemaInfoProvider(schemaInfoProvider);
         }
@@ -86,7 +89,11 @@ public class AvroSchema<T> extends StructSchema<T> {
     }
 
     public static <T> AvroSchema<T> of(SchemaDefinition<T> schemaDefinition) {
-        return new AvroSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.AVRO));
+        ClassLoader pojoClassLoader = null;
+        if (schemaDefinition.getPojo() != null) {
+            pojoClassLoader = schemaDefinition.getPojo().getClassLoader();
+        }
+        return new AvroSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.AVRO), pojoClassLoader);
     }
 
     public static <T> AvroSchema<T> of(Class<T> pojo) {
@@ -94,18 +101,22 @@ public class AvroSchema<T> extends StructSchema<T> {
     }
 
     public static <T> AvroSchema<T> of(Class<T> pojo, Map<String, String> properties) {
+        ClassLoader pojoClassLoader = null;
+        if (pojo != null) {
+            pojoClassLoader = pojo.getClassLoader();
+        }
         SchemaDefinition<T> schemaDefinition = SchemaDefinition.<T>builder().withPojo(pojo).withProperties(properties).build();
-        return new AvroSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.AVRO));
+        return new AvroSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.AVRO), pojoClassLoader);
     }
 
     @Override
     protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
         SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get());
         if (schemaInfo != null) {
-            log.info("Load schema reader for version({}), schema is : {}",
+            log.info("Load schema reader for version({}), schema is : {}, schemaInfo: {}",
                 SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
-                schemaInfo.getSchemaDefinition());
-            return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()), schema);
+                schemaInfo.getSchemaDefinition(), schemaInfo.toString());
+            return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()), schema, pojoClassLoader);
         } else {
             log.warn("No schema found for version({}), use latest schema : {}",
                 SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java
index 01278ba..8481542 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java
@@ -18,9 +18,12 @@
  */
 package org.apache.pulsar.client.impl.schema.reader;
 
+import org.apache.avro.Conversions;
 import org.apache.avro.Schema;
+import org.apache.avro.data.TimeConversions;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.SchemaReader;
@@ -41,8 +44,34 @@ public class AvroReader<T> implements SchemaReader<T> {
         this.reader = new ReflectDatumReader<>(schema);
     }
 
-    public AvroReader(Schema writerSchema, Schema readerSchema) {
-        this.reader = new ReflectDatumReader<>(writerSchema, readerSchema);
+    public AvroReader(Schema schema, ClassLoader classLoader) {
+        if (classLoader != null) {
+            ReflectData reflectData = new ReflectData(classLoader);
+            reflectData.addLogicalTypeConversion(new Conversions.DecimalConversion());
+            reflectData.addLogicalTypeConversion(new TimeConversions.DateConversion());
+            reflectData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
+            reflectData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
+            reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
+            reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
+            this.reader = new ReflectDatumReader<>(schema, schema, reflectData);
+        } else {
+            this.reader = new ReflectDatumReader<>(schema);
+        }
+    }
+
+    public AvroReader(Schema writerSchema, Schema readerSchema, ClassLoader classLoader) {
+        if (classLoader != null) {
+            ReflectData reflectData = new ReflectData(classLoader);
+            reflectData.addLogicalTypeConversion(new Conversions.DecimalConversion());
+            reflectData.addLogicalTypeConversion(new TimeConversions.DateConversion());
+            reflectData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
+            reflectData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
+            reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
+            reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
+            this.reader = new ReflectDatumReader<>(writerSchema, readerSchema, reflectData);
+        } else {
+            this.reader = new ReflectDatumReader<>(writerSchema, readerSchema);
+        }
     }
 
     @Override
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index c705c05..25689a9 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -317,8 +317,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
                     Collections.emptyList());
         }
 
-        log.info("Initialize function class loader for function {} at function cache manager",
-                instanceConfig.getFunctionDetails().getName());
+        log.info("Initialize function class loader for function {} at function cache manager, functionClassLoader: {}",
+                instanceConfig.getFunctionDetails().getName(), fnCache.getClassLoader(instanceConfig.getFunctionId()));
 
         fnClassLoader = fnCache.getClassLoader(instanceConfig.getFunctionId());
         if (null == fnClassLoader) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index fe4bb3b..0d23ce1 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -68,7 +68,8 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
         inputConsumers = configs.entrySet().stream().map(e -> {
             String topic = e.getKey();
             ConsumerConfig<T> conf = e.getValue();
-            log.info("Creating consumers for topic : {}, schema : {}",  topic, conf.getSchema());
+            log.info("Creating consumers for topic : {}, schema : {}, schemaInfo: {}",
+                    topic, conf.getSchema(), conf.getSchema().getSchemaInfo());
 
             ConsumerBuilder<T> cb = pulsarClient.newConsumer(conf.getSchema())
                     // consume message even if can't decrypt and deliver it along with encryption-ctx
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AvroSchemaTestFunction.java
similarity index 61%
copy from pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java
copy to pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AvroSchemaTestFunction.java
index 8c0da3b..e81dc5f 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AvroSchemaTestFunction.java
@@ -18,15 +18,19 @@
  */
 package org.apache.pulsar.functions.api.examples;
 
-import java.util.Collection;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.examples.pojo.AvroTestObject;
+
+
+@Slf4j
+public class AvroSchemaTestFunction implements Function<AvroTestObject, AvroTestObject> {
 
-/**
- * This functions collects the timestamp during the window operation.
- */
-public class WindowDurationFunction implements java.util.function.Function<Collection<String>, String> {
     @Override
-    public String apply(Collection<String> integers) {
-        long time = System.currentTimeMillis();
-        return String.format("%s:%s", String.join(",", integers), time);
+    public AvroTestObject process(AvroTestObject input, Context context) throws Exception {
+        log.info("AvroTestObject - baseValue: {}", input.getBaseValue());
+        input.setBaseValue(input.getBaseValue() + 10);
+        return input;
     }
 }
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java
index 8c0da3b..105815f 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.api.examples;
 
 import java.util.Collection;
 
+
 /**
  * This functions collects the timestamp during the window operation.
  */
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/AvroTestObject.java
similarity index 64%
copy from pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java
copy to pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/AvroTestObject.java
index 8c0da3b..43364be 100644
--- a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/AvroTestObject.java
@@ -16,17 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.api.examples;
+package org.apache.pulsar.functions.api.examples.pojo;
+
+import lombok.Data;
 
-import java.util.Collection;
 
 /**
- * This functions collects the timestamp during the window operation.
+ * Avro test object.
  */
-public class WindowDurationFunction implements java.util.function.Function<Collection<String>, String> {
-    @Override
-    public String apply(Collection<String> integers) {
-        long time = System.currentTimeMillis();
-        return String.format("%s:%s", String.join(",", integers), time);
-    }
+@Data
+public class AvroTestObject {
+
+    private int baseValue;
+
 }
diff --git a/tests/docker-images/latest-version-image/conf/bookie.conf b/tests/docker-images/latest-version-image/conf/bookie.conf
index 9d89d08..a71cb5c 100644
--- a/tests/docker-images/latest-version-image/conf/bookie.conf
+++ b/tests/docker-images/latest-version-image/conf/bookie.conf
@@ -22,5 +22,5 @@ autostart=false
 redirect_stderr=true
 stdout_logfile=/var/log/pulsar/bookie.log
 directory=/pulsar
-environment=PULSAR_MEM="-Xmx128M",PULSAR_GC="-XX:+UseG1GC",dbStorage_writeCacheMaxSizeMb="16",dbStorage_readAheadCacheMaxSizeMb="16"
+environment=PULSAR_MEM="-Xmx128M -XX:MaxDirectMemorySize=512M",PULSAR_GC="-XX:+UseG1GC",dbStorage_writeCacheMaxSizeMb="16",dbStorage_readAheadCacheMaxSizeMb="16"
 command=/pulsar/bin/pulsar bookie
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 0c9739e..70e9f45 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.KeyValueSchema;
 import org.apache.pulsar.common.naming.TopicName;
@@ -43,7 +44,10 @@ import org.apache.pulsar.common.policies.data.SinkStatus;
 import org.apache.pulsar.common.policies.data.SourceStatus;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
+import org.apache.pulsar.functions.api.examples.AvroSchemaTestFunction;
+import org.apache.pulsar.functions.api.examples.pojo.AvroTestObject;
 import org.apache.pulsar.functions.api.examples.serde.CustomObject;
 import org.apache.pulsar.tests.integration.containers.DebeziumMongoDbContainer;
 import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer;
@@ -56,6 +60,7 @@ import org.apache.pulsar.tests.integration.io.*;
 import org.apache.pulsar.tests.integration.io.JdbcSinkTester.Foo;
 import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.apache.pulsar.tests.integration.utils.DockerUtils;
 import org.assertj.core.api.Assertions;
 import org.testcontainers.containers.GenericContainer;
 import org.testng.annotations.Test;
@@ -68,7 +73,9 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
@@ -1068,16 +1075,17 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
                 break;
             }
             String msgStr = new String(msg.getData());
-            log.info("i: {} RECV: {}", i, msgStr);
+            log.info("[testWindowFunction] i: {} RECV: {}", i, msgStr);
             String result = msgStr.split(":")[0];
             assertThat(result).contains(expectedResults[i]);
             i++;
         }
-        // in case last commit is not updated
-        assertThat(i).isGreaterThanOrEqualTo(expectedResults.length - 1);
 
         getFunctionStatus(functionName, NUM_OF_MESSAGES, true);
 
+        // in case last commit is not updated
+        assertThat(i).isGreaterThanOrEqualTo(expectedResults.length - 1);
+
         deleteFunction(functionName);
 
         getFunctionInfoNotFound(functionName);
@@ -2137,6 +2145,123 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         }
     }
 
+    @Test(groups = "function")
+    public void testAvroSchemaFunction() throws Exception {
+        log.info("testAvroSchemaFunction start ...");
+        final String inputTopic = "test-avroschema-input-" + randomName(8);
+        final String outputTopic = "test-avroschema-output-" + randomName(8);
+        final String functionName = "test-avroschema-fn-202003241756";
+        final int numMessages = 10;
+
+        if (pulsarCluster == null) {
+            log.info("pulsarClient is null");
+            this.setupCluster();
+            this.setupFunctionWorkers();
+        }
+
+        @Cleanup PulsarClient pulsarClient = PulsarClient.builder()
+                .serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
+        log.info("pulsar client init - input: {}, output: {}", inputTopic, outputTopic);
+
+        @Cleanup Producer<AvroTestObject> producer = pulsarClient
+                .newProducer(Schema.AVRO(AvroTestObject.class))
+                .topic(inputTopic).create();
+        log.info("pulsar producer init - {}", inputTopic);
+
+        @Cleanup Consumer<AvroTestObject> consumer = pulsarClient
+                .newConsumer(Schema.AVRO(AvroTestObject.class))
+                .subscriptionType(SubscriptionType.Exclusive)
+                .subscriptionName("test-avro-schema")
+                .topic(outputTopic)
+                .subscribe();
+        log.info("pulsar consumer init - {}", outputTopic);
+
+        CompletableFuture<Optional<SchemaInfo>> inputSchemaFuture =
+                ((PulsarClientImpl) pulsarClient).getSchema(inputTopic);
+        inputSchemaFuture.whenComplete((schemaInfo, throwable) -> {
+            if (schemaInfo.isPresent()) {
+                log.info("inputSchemaInfo: {}", schemaInfo.get().toString());
+            } else {
+                log.error("input schema is not present!");
+            }
+        });
+
+        CompletableFuture<Optional<SchemaInfo>> outputSchemaFuture =
+                ((PulsarClientImpl) pulsarClient).getSchema(outputTopic);
+        outputSchemaFuture.whenComplete((schemaInfo, throwable) -> {
+            if (throwable != null) {
+                log.error("get output schemaInfo error", throwable);
+                throwable.printStackTrace();
+                return;
+            }
+            if (schemaInfo.isPresent()) {
+                log.info("outputSchemaInfo: {}", schemaInfo.get().toString());
+            } else {
+                log.error("output schema is not present!");
+            }
+        });
+
+        submitFunction(
+                Runtime.JAVA,
+                inputTopic,
+                outputTopic,
+                functionName,
+                null,
+                AvroSchemaTestFunction.class.getName(),
+                Schema.AVRO(AvroTestObject.class));
+        log.info("pulsar submitFunction");
+
+        getFunctionInfoSuccess(functionName);
+
+        AvroSchemaTestFunction function = new AvroSchemaTestFunction();
+        Set<Object> expectedSet = new HashSet<>();
+
+        log.info("test-avro-schema producer connected: " + producer.isConnected());
+        for (int i = 0 ; i < numMessages ; i++) {
+            AvroTestObject inputObject = new AvroTestObject();
+            inputObject.setBaseValue(i);
+            MessageId messageId = producer.send(inputObject);
+            log.info("test-avro-schema messageId: {}", messageId.toString());
+            expectedSet.add(function.process(inputObject, null));
+            log.info("test-avro-schema expectedSet size: {}", expectedSet.size());
+        }
+        getFunctionStatus(functionName, numMessages, false);
+        log.info("test-avro-schema producer send message finish");
+
+        CompletableFuture<Optional<SchemaInfo>> outputSchemaFuture2 =
+                ((PulsarClientImpl) pulsarClient).getSchema(outputTopic);
+        outputSchemaFuture2.whenComplete((schemaInfo, throwable) -> {
+            if (throwable != null) {
+                log.error("get output schemaInfo error", throwable);
+                throwable.printStackTrace();
+                return;
+            }
+            if (schemaInfo.isPresent()) {
+                log.info("outputSchemaInfo: {}", schemaInfo.get().toString());
+            } else {
+                log.error("output schema is not present!");
+            }
+        });
+
+        log.info("test-avro-schema consumer connected: " + consumer.isConnected());
+        for (int i = 0 ; i < numMessages ; i++) {
+            log.info("test-avro-schema consumer receive [{}] start", i);
+            Message<AvroTestObject> message = consumer.receive();
+            log.info("test-avro-schema consumer receive [{}] over", i);
+            AvroTestObject outputObject = message.getValue();
+            assertTrue(expectedSet.contains(outputObject));
+            expectedSet.remove(outputObject);
+            consumer.acknowledge(message);
+        }
+        log.info("test-avro-schema consumer receive message finish");
+
+        assertEquals(expectedSet.size(), 0);
+
+        deleteFunction(functionName);
+
+        getFunctionInfoNotFound(functionName);
+    }
+
     private  void testDebeziumMySqlConnect()
         throws Exception {