You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/05/29 02:46:26 UTC

[pulsar] branch master updated: Support using AutoProduceBytesSchema as the function output schema (#10716)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 10ed501  Support using AutoProduceBytesSchema as the function output schema (#10716)
10ed501 is described below

commit 10ed501d8a3bdb5f70d2dce831fa28cbab9aa623
Author: ran <ga...@126.com>
AuthorDate: Sat May 29 10:45:36 2021 +0800

    Support using AutoProduceBytesSchema as the function output schema (#10716)
    
    ### Motivation
    
    Currently, in function, we couldn't use the `AutoProduceBytesSchema` as the output schema.
    
    ### Modifications
    
    Support users use the `AUTO_PUBLISH` as the value of configuration `schemaType` in function. Such as `--schema-type AUTO_PUBLISH`
---
 .../java/org/apache/pulsar/schema/SchemaTest.java  |  23 +-
 .../pulsar/functions/instance/ContextImpl.java     |   8 +-
 .../pulsar/functions/source/TopicSchema.java       |   3 +
 .../functions/api/examples/MergeTopicFunction.java |  74 ++++++
 .../pulsar/functions/api/examples/pojo/Users.java  |  47 ++++
 .../integration/functions/PulsarFunctionsTest.java | 253 +++++++++++++++++++--
 .../functions/java/PulsarFunctionsJavaTest.java    |   5 +
 .../functions/utils/CommandGenerator.java          |  38 +++-
 8 files changed, 420 insertions(+), 31 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index a50ab61..fe720a1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -57,6 +57,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.KeyValueSchema;
@@ -877,21 +878,15 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
         }
         message.getValue();
 
-        Schema<?> readerSchema = message.getReaderSchema().get();
-        if (readerSchema instanceof KeyValueSchema
-                && ((KeyValueSchema<?, ?>) readerSchema)
-                .getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) {
-            autoProducer.newMessage(
-                    Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())).keyBytes(message.getKeyBytes())
-                    .value(message.getData())
-                    .properties(message.getProperties())
-                    .send();
-        } else {
-            autoProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
-                    .properties(message.getProperties())
-                    .value(message.getData())
-                    .send();
+        TypedMessageBuilder messageBuilder = autoProducer
+                .newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
+                .value(message.getData())
+                .properties(message.getProperties());
+        if (message.getKeyBytes() != null) {
+            messageBuilder.keyBytes(message.getKeyBytes());
         }
+        messageBuilder.send();
+
         producer.close();
         consumer.close();
     }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 97aa538..eb9f541 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -456,7 +456,13 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
     @Override
     public <O> TypedMessageBuilder<O> newOutputMessage(String pulsarName, String topicName, Schema<O> schema) throws PulsarClientException {
         MessageBuilderImpl<O> messageBuilder = new MessageBuilderImpl<>();
-        TypedMessageBuilder<O> typedMessageBuilder = getProducer(pulsarName, topicName, schema).newMessage();
+        TypedMessageBuilder<O> typedMessageBuilder;
+        Producer<O> producer = getProducer(pulsarName, topicName, schema);
+        if (schema != null) {
+            typedMessageBuilder = producer.newMessage(schema);
+        } else {
+            typedMessageBuilder = producer.newMessage();
+        }
         messageBuilder.setUnderlyingBuilder(typedMessageBuilder);
         return messageBuilder;
     }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index bc29029..7ed6b6c 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -169,6 +169,9 @@ public class TopicSchema {
         case PROTOBUF:
             return ProtobufSchema.ofGenericClass(clazz, new HashMap<>());
 
+        case AUTO_PUBLISH:
+            return (Schema<T>) Schema.AUTO_PRODUCE_BYTES();
+
         default:
             throw new RuntimeException("Unsupported schema type" + type);
         }
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/MergeTopicFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/MergeTopicFunction.java
new file mode 100644
index 0000000..ea17846
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/MergeTopicFunction.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+
+/**
+ * Merge various schemas data to a topic.
+ */
+@Slf4j
+public class MergeTopicFunction implements Function<GenericRecord, byte[]> {
+
+    @Override
+    public byte[] process(GenericRecord genericRecord, Context context) throws Exception {
+        if (context.getCurrentRecord().getMessage().isPresent()) {
+            Message<?> msg =  context.getCurrentRecord().getMessage().get();
+            if (!msg.getReaderSchema().isPresent()) {
+                log.warn("The reader schema is null.");
+                return null;
+            }
+            log.info("process message with reader schema {}", msg.getReaderSchema().get());
+            TypedMessageBuilder<byte[]> messageBuilder =
+                    context.newOutputMessage(context.getOutputTopic(),
+                            Schema.AUTO_PRODUCE_BYTES(msg.getReaderSchema().get()));
+
+            messageBuilder
+                    .value(msg.getData())
+                    .property("__original_topic", msg.getTopicName())
+                    .property("__publish_time", String.valueOf(msg.getPublishTime()))
+                    .property("__sequence_id", String.valueOf(msg.getSequenceId()))
+                    .property("__producer_name", msg.getProducerName());
+
+            if (msg.getKeyBytes() != null)  {
+                messageBuilder.keyBytes(msg.getKeyBytes());
+            }
+
+            if (msg.getEventTime() > 0) {
+                messageBuilder.eventTime(msg.getEventTime());
+            }
+
+            if (!msg.getProperties().isEmpty()) {
+                messageBuilder.properties(msg.getProperties());
+            }
+
+            messageBuilder.send();
+            log.info("send message successfully");
+        } else {
+            log.warn("context current record message is not present.");
+        }
+        return null;
+    }
+}
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/Users.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/Users.java
new file mode 100644
index 0000000..1522513
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/pojo/Users.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples.pojo;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Pojo for test multi-version schema.
+ */
+public class Users {
+
+    @NoArgsConstructor
+    @AllArgsConstructor
+    @Data
+    public static class UserV1 {
+        private String name;
+        private Integer age;
+    }
+
+    @NoArgsConstructor
+    @AllArgsConstructor
+    @Data
+    public static class UserV2 {
+        private String name;
+        private Integer age;
+        private String phone;
+    }
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 1d51c2f..5b8116e 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -23,6 +23,10 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.gson.Gson;
 import java.time.Duration;
 import java.util.Collections;
@@ -32,9 +36,12 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.BatcherBuilder;
@@ -43,17 +50,28 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
 import org.apache.pulsar.functions.api.examples.AvroSchemaTestFunction;
+import org.apache.pulsar.functions.api.examples.MergeTopicFunction;
 import org.apache.pulsar.functions.api.examples.pojo.AvroTestObject;
+import org.apache.pulsar.functions.api.examples.pojo.Users;
 import org.apache.pulsar.functions.api.examples.serde.CustomObject;
 import org.apache.pulsar.tests.integration.docker.ContainerExecException;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
@@ -71,6 +89,8 @@ import org.testng.annotations.Test;
 @Slf4j
 public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
 
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
     public PulsarFunctionsTest(FunctionRuntimeType functionRuntimeType) {
         super(functionRuntimeType);
     }
@@ -534,7 +554,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
                         null,
                         PUBLISH_JAVA_CLASS,
                         schema,
-                        Collections.singletonMap("publish-topic", outputTopicName));
+                        Collections.singletonMap("publish-topic", outputTopicName),
+                        null, null, null);
                 break;
             case PYTHON:
                 submitFunction(
@@ -545,7 +566,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
                         PUBLISH_FUNCTION_PYTHON_FILE,
                         PUBLISH_PYTHON_CLASS,
                         schema,
-                        Collections.singletonMap("publish-topic", outputTopicName));
+                        Collections.singletonMap("publish-topic", outputTopicName),
+                        null, null, null);
                 break;
             case GO:
                 submitFunction(
@@ -556,7 +578,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
                         PUBLISH_FUNCTION_GO_FILE,
                         null,
                         schema,
-                        Collections.singletonMap("publish-topic", outputTopicName));
+                        Collections.singletonMap("publish-topic", outputTopicName),
+                        null, null, null);
         }
 
         // get function info
@@ -765,20 +788,24 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
                                            String functionFile,
                                            String functionClass,
                                            Schema<T> inputTopicSchema) throws Exception {
-        submitFunction(runtime, inputTopicName, outputTopicName, functionName, functionFile, functionClass, inputTopicSchema, null);
+        submitFunction(runtime, inputTopicName, outputTopicName, functionName, functionFile, functionClass,
+                inputTopicSchema, null, null, null, null);
     }
 
     private <T> void submitFunction(Runtime runtime,
-                                           String inputTopicName,
-                                           String outputTopicName,
-                                           String functionName,
-                                           String functionFile,
-                                           String functionClass,
-                                           Schema<T> inputTopicSchema,
-                                           Map<String, String> userConfigs) throws Exception {
+                                    String inputTopicName,
+                                    String outputTopicName,
+                                    String functionName,
+                                    String functionFile,
+                                    String functionClass,
+                                    Schema<T> inputTopicSchema,
+                                    Map<String, String> userConfigs,
+                                    String customSchemaInputs,
+                                    String outputSchemaType,
+                                    SubscriptionInitialPosition subscriptionInitialPosition) throws Exception {
 
         CommandGenerator generator;
-        log.info("------- INPUT TOPIC: '{}'", inputTopicName);
+        log.info("------- INPUT TOPIC: '{}', customSchemaInputs: {}", inputTopicName, customSchemaInputs);
         if (inputTopicName.endsWith(".*")) {
             log.info("----- CREATING TOPIC PATTERN FUNCTION --- ");
             generator = CommandGenerator.createTopicPatternGenerator(inputTopicName, functionClass);
@@ -791,6 +818,15 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         if (userConfigs != null) {
             generator.setUserConfig(userConfigs);
         }
+        if (customSchemaInputs != null) {
+            generator.setCustomSchemaInputs(customSchemaInputs);
+        }
+        if (outputSchemaType != null) {
+            generator.setSchemaType(outputSchemaType);
+        }
+        if (subscriptionInitialPosition != null) {
+            generator.setSubscriptionInitialPosition(subscriptionInitialPosition);
+        }
         String command = "";
 
         switch (runtime){
@@ -814,7 +850,10 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
                 commands);
         assertTrue(result.getStdout().contains("\"Created successfully\""));
 
-        ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), inputTopicSchema);
+        if (StringUtils.isNotEmpty(inputTopicName)) {
+            ensureSubscriptionCreated(
+                    inputTopicName, String.format("public/default/%s", functionName), inputTopicSchema);
+        }
     }
 
     private void updateFunctionParallelism(String functionName, int parallelism) throws Exception {
@@ -1520,4 +1559,192 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         client.close();
     }
 
+    protected void testMergeFunction() throws Exception {
+        log.info("start merge function test ...");
+
+        String ns = "public/ns-merge-" + randomName(8);
+        @Cleanup
+        PulsarAdmin pulsarAdmin = getPulsarAdmin();
+        pulsarAdmin.namespaces().createNamespace(ns);
+        pulsarAdmin.namespaces().setSchemaCompatibilityStrategy(ns, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+        SchemaCompatibilityStrategy strategy = pulsarAdmin.namespaces().getSchemaCompatibilityStrategy(ns);
+        log.info("namespace {} SchemaCompatibilityStrategy is {}", ns, strategy);
+
+        @Cleanup
+        PulsarClient pulsarClient = getPulsarClient();
+
+        ObjectNode inputSpecNode = objectMapper.createObjectNode();
+        Map<String, AtomicInteger> topicMsgCntMap = new ConcurrentHashMap<>();
+        int messagePerTopic = 10;
+        prepareDataForMergeFunction(ns, pulsarClient, inputSpecNode, messagePerTopic, topicMsgCntMap);
+
+        final String outputTopic = ns + "/test-merge-output";
+        @Cleanup
+        Consumer<GenericRecord> consumer = pulsarClient
+                .newConsumer(Schema.AUTO_CONSUME())
+                .subscriptionName("test-merge-fn")
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .topic(outputTopic)
+                .subscribe();
+
+        final String functionName = "test-merge-fn-" + randomName(8);
+        submitFunction(
+                Runtime.JAVA,
+                "",
+                outputTopic,
+                functionName,
+                null,
+                MergeTopicFunction.class.getName(),
+                null,
+                null,
+                inputSpecNode.toString(),
+                SchemaType.AUTO_PUBLISH.name().toUpperCase(),
+                SubscriptionInitialPosition.Earliest);
+
+        getFunctionInfoSuccess(functionName);
+
+        getFunctionStatus(functionName, topicMsgCntMap.keySet().size() * messagePerTopic, true);
+
+        Message<GenericRecord> message;
+        do {
+            message = consumer.receive(30, TimeUnit.SECONDS);
+            if (message != null) {
+                String baseTopic = message.getProperty("baseTopic");
+                GenericRecord genericRecord = message.getValue();
+                log.info("receive msg baseTopic: {}, schemaType: {}, nativeClass: {}, nativeObject: {}",
+                        baseTopic,
+                        genericRecord.getSchemaType(),
+                        genericRecord.getNativeObject().getClass(),
+                        genericRecord.getNativeObject());
+                checkSchemaForAutoSchema(message, baseTopic);
+                topicMsgCntMap.get(baseTopic).decrementAndGet();
+                consumer.acknowledge(message);
+            }
+        } while (message != null);
+
+        for (Map.Entry<String, AtomicInteger> entry : topicMsgCntMap.entrySet()) {
+            assertEquals(entry.getValue().get(), 0,
+                    "topic " + entry.getKey() + " left message cnt is not 0.");
+        }
+
+        deleteFunction(functionName);
+
+        getFunctionInfoNotFound(functionName);
+        log.info("finish merge function test.");
+    }
+
+    private void prepareDataForMergeFunction(String ns,
+                                             PulsarClient pulsarClient,
+                                             ObjectNode inputSpecNode,
+                                             int messagePerTopic,
+                                             Map<String, AtomicInteger> topicMsgCntMap) throws PulsarClientException {
+        generateDataByDifferentSchema(ns, "merge-schema-bytes", pulsarClient,
+                Schema.BYTES, "bytes schema test".getBytes(), messagePerTopic, inputSpecNode, topicMsgCntMap);
+        generateDataByDifferentSchema(ns, "merge-schema-string", pulsarClient,
+                Schema.STRING, "string schema test", messagePerTopic, inputSpecNode, topicMsgCntMap);
+        generateDataByDifferentSchema(ns, "merge-schema-json-userv1", pulsarClient,
+                Schema.JSON(Users.UserV1.class), new Users.UserV1("ran", 33),
+                messagePerTopic, inputSpecNode, topicMsgCntMap);
+        generateDataByDifferentSchema(ns, "merge-schema-json-userv2", pulsarClient,
+                Schema.JSON(Users.UserV2.class), new Users.UserV2("tang", 18, "123123123"),
+                messagePerTopic, inputSpecNode, topicMsgCntMap);
+        generateDataByDifferentSchema(ns, "merge-schema-avro-userv2", pulsarClient,
+                Schema.AVRO(Users.UserV2.class), new Users.UserV2("tang", 20, "456456456"),
+                messagePerTopic, inputSpecNode, topicMsgCntMap);
+        generateDataByDifferentSchema(ns, "merge-schema-k-int-v-json-userv1-separate", pulsarClient,
+                Schema.KeyValue(Schema.INT32, Schema.JSON(Users.UserV1.class), KeyValueEncodingType.SEPARATED),
+                new KeyValue<>(100, new Users.UserV1("ran", 40)),
+                messagePerTopic, inputSpecNode, topicMsgCntMap);
+        generateDataByDifferentSchema(ns, "merge-schema-k-json-userv2-v-json-userv1-inline", pulsarClient,
+                Schema.KeyValue(Schema.JSON(Users.UserV2.class), Schema.JSON(Users.UserV1.class),
+                        KeyValueEncodingType.INLINE),
+                new KeyValue<>(new Users.UserV2("tang", 20, "789789789"),
+                        new Users.UserV1("ran", 40)),
+                messagePerTopic, inputSpecNode, topicMsgCntMap);
+    }
+
+    private void generateDataByDifferentSchema(String ns,
+                                               String baseTopic,
+                                               PulsarClient pulsarClient,
+                                               Schema schema,
+                                               Object data,
+                                               int messageCnt,
+                                               ObjectNode inputSpecNode,
+                                               Map<String, AtomicInteger> topicMsgCntMap) throws PulsarClientException {
+        String topic = ns + "/" + baseTopic;
+        Producer producer = pulsarClient.newProducer(schema)
+                .topic(topic)
+                .create();
+        for (int i = 0; i < messageCnt; i++) {
+            producer.newMessage().value(data).property("baseTopic", baseTopic).send();
+        }
+        ObjectNode confNode = objectMapper.createObjectNode();
+        confNode.put("schemaType", SchemaType.AUTO_CONSUME.name().toUpperCase());
+        inputSpecNode.put(topic, confNode.toString());
+        topicMsgCntMap.put(baseTopic, new AtomicInteger(messageCnt));
+        producer.close();
+        log.info("[merge-fn] generate {} messages for schema {}", messageCnt, schema.getSchemaInfo());
+    }
+
+    private void checkSchemaForAutoSchema(Message<GenericRecord> message, String baseTopic) {
+        if (!message.getReaderSchema().isPresent()) {
+            fail("Failed to get reader schema for auto consume multiple schema topic.");
+        }
+        Object nativeObject = message.getValue().getNativeObject();
+        JsonNode jsonNode;
+        KeyValue<?, ?> kv;
+        switch (baseTopic) {
+            case "merge-schema-bytes":
+                assertEquals(new String((byte[]) nativeObject), "bytes schema test");
+                break;
+            case "merge-schema-string":
+                assertEquals((String) nativeObject, "string schema test");
+                break;
+            case "merge-schema-json-userv1":
+                jsonNode = (JsonNode) nativeObject;
+                assertEquals(jsonNode.get("name").textValue(), "ran");
+                assertEquals(jsonNode.get("age").intValue(), 33);
+                break;
+            case "merge-schema-json-userv2":
+                jsonNode = (JsonNode) nativeObject;
+                assertEquals(jsonNode.get("name").textValue(), "tang");
+                assertEquals(jsonNode.get("age").intValue(), 18);
+                assertEquals(jsonNode.get("phone").textValue(), "123123123");
+                break;
+            case "merge-schema-avro-userv2":
+                org.apache.avro.generic.GenericRecord genericRecord =
+                        (org.apache.avro.generic.GenericRecord) nativeObject;
+                assertEquals(genericRecord.get("name").toString(), "tang");
+                assertEquals(genericRecord.get("age"), 20);
+                assertEquals(genericRecord.get("phone").toString(), "456456456");
+                break;
+            case "merge-schema-k-int-v-json-userv1-separate":
+                kv = (KeyValue<Integer, GenericRecord>) nativeObject;
+                assertEquals(kv.getKey(), 100);
+                jsonNode = ((GenericJsonRecord) kv.getValue()).getJsonNode();
+                assertEquals(jsonNode.get("name").textValue(), "ran");
+                assertEquals(jsonNode.get("age").intValue(), 40);
+                break;
+            case "merge-schema-k-json-userv2-v-json-userv1-inline":
+                kv = (KeyValue<GenericRecord, GenericRecord>) nativeObject;
+                jsonNode = ((GenericJsonRecord) kv.getKey()).getJsonNode();
+                assertEquals(jsonNode.get("name").textValue(), "tang");
+                assertEquals(jsonNode.get("age").intValue(), 20);
+                assertEquals(jsonNode.get("phone").textValue(), "789789789");
+                jsonNode = ((GenericJsonRecord) kv.getValue()).getJsonNode();
+                assertEquals(jsonNode.get("name").textValue(), "ran");
+                assertEquals(jsonNode.get("age").intValue(), 40);
+                break;
+            default:
+                // nothing to do
+        }
+    }
+
+    private PulsarClient getPulsarClient() throws PulsarClientException {
+        return PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
+    }
+
+    private PulsarAdmin getPulsarAdmin() throws PulsarClientException {
+        return PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
+    }
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
index 13134fa..466942d 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/java/PulsarFunctionsJavaTest.java
@@ -156,4 +156,9 @@ public class PulsarFunctionsJavaTest extends PulsarFunctionsTest {
        testWindowFunction("sliding", EXPECTED_RESULTS);
    }
 
+    @Test(groups = {"java_function", "function"})
+    public void testMergeFunctionTest() throws Exception {
+	    testMergeFunction();
+   }
+
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
index 65b54ee..4edf462 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/utils/CommandGenerator.java
@@ -24,6 +24,8 @@ import java.util.Map;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 
 @Getter
@@ -57,6 +59,9 @@ public class CommandGenerator {
     private Long windowLengthDurationMs;
     private Integer slidingIntervalCount;
     private Long slidingIntervalDurationMs;
+    private String customSchemaInputs;
+    private String schemaType;
+    private SubscriptionInitialPosition subscriptionInitialPosition;
 
     private Map<String, String> userConfig = new HashMap<>();
     public static final String JAVAJAR = "/pulsar/examples/java-test-functions.jar";
@@ -97,12 +102,21 @@ public class CommandGenerator {
         if(runtime != Runtime.GO){
             commandBuilder.append(" --className " + functionClassName);
         }
-        if (sourceTopic != null) {
+        if (StringUtils.isNotEmpty(sourceTopic)) {
             commandBuilder.append(" --inputs " + sourceTopic);
         }
         if (sinkTopic != null) {
             commandBuilder.append(" --output " + sinkTopic);
         }
+        if (customSchemaInputs != null) {
+            commandBuilder.append(" --custom-schema-inputs \'" + customSchemaInputs + "\'");
+        }
+        if (schemaType != null) {
+            commandBuilder.append(" --schema-type " + schemaType);
+        }
+        if (subscriptionInitialPosition != null) {
+            commandBuilder.append(" --subs-position " + subscriptionInitialPosition.name());
+        }
         switch (runtime){
             case JAVA:
                 commandBuilder.append(" --jar " + JAVAJAR);
@@ -148,7 +162,7 @@ public class CommandGenerator {
         if (runtime != Runtime.GO){
             commandBuilder.append(" --className " + functionClassName);
         }
-        if (sourceTopic != null) {
+        if (StringUtils.isNotEmpty(sourceTopic)) {
             commandBuilder.append(" --inputs " + sourceTopic);
         }
         if (sourceTopicPattern != null) {
@@ -190,6 +204,15 @@ public class CommandGenerator {
         if (slidingIntervalDurationMs != null)  {
             commandBuilder.append(" --slidingIntervalDurationMs " + slidingIntervalDurationMs);
         }
+        if (customSchemaInputs != null) {
+            commandBuilder.append(" --custom-schema-inputs \'" + customSchemaInputs + "\'");
+        }
+        if (schemaType != null) {
+            commandBuilder.append(" --schema-type " + schemaType);
+        }
+        if (subscriptionInitialPosition != null) {
+            commandBuilder.append(" --subs-position " + subscriptionInitialPosition.name());
+        }
 
         switch (runtime){
             case JAVA:
@@ -240,7 +263,7 @@ public class CommandGenerator {
         if (functionClassName != null) {
             commandBuilder.append(" --className " + functionClassName);
         }
-        if (sourceTopic != null) {
+        if (StringUtils.isNotEmpty(sourceTopic)) {
             commandBuilder.append(" --inputs " + sourceTopic);
         }
         if (customSereSourceTopics != null && !customSereSourceTopics.isEmpty()) {
@@ -279,6 +302,15 @@ public class CommandGenerator {
         if (slidingIntervalDurationMs != null)  {
             commandBuilder.append(" --slidingIntervalDurationMs " + slidingIntervalDurationMs);
         }
+        if (customSchemaInputs != null) {
+            commandBuilder.append(" --custom-schema-inputs \'" + customSchemaInputs + "\'");
+        }
+        if (schemaType != null) {
+            commandBuilder.append(" --schema-type " + schemaType);
+        }
+        if (subscriptionInitialPosition != null) {
+            commandBuilder.append(" --subs-position " + subscriptionInitialPosition.name());
+        }
 
         if (codeFile != null) {
             switch (runtime) {