You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/03/06 06:28:39 UTC

[pulsar] branch master updated: Independent schema is set for each consumer generated by topic (#6356)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8003d08  Independent schema is set for each consumer generated by topic (#6356)
8003d08 is described below

commit 8003d08e5ca325867d2e825921f18ddda8d4e1d4
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Fri Mar 6 14:28:30 2020 +0800

    Independent schema is set for each consumer generated by topic (#6356)
    
    ### Motivation
    
    Master Issue: #5454
    
    When one Consumer subscribe multi topic, setSchemaInfoPorvider() will be covered by the consumer generated by the last topic.
    
    ### Modification
    clone schema for each consumer generated by topic.
    ### Verifying this change
    Add the schemaTest for it.
---
 .../schema/JsonSchemaCompatibilityCheckTest.java   |   5 +
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 136 +++++++++++++++++++++
 .../pulsar/schema/{compatibility => }/Schemas.java |   2 +-
 .../SchemaCompatibilityCheckTest.java              |  22 ++--
 .../java/org/apache/pulsar/client/api/Schema.java  |   9 +-
 .../client/kafka/compat/PulsarKafkaSchema.java     |   5 +
 .../client/kafka/compat/PulsarKafkaSchema.java     |   5 +
 .../client/impl/MultiTopicsConsumerImpl.java       |   7 +-
 .../pulsar/client/impl/PulsarClientImpl.java       |  21 ++--
 .../pulsar/client/impl/schema/AbstractSchema.java  |   5 +
 .../client/impl/schema/AutoConsumeSchema.java      |  14 +++
 .../client/impl/schema/AutoProduceBytesSchema.java |   5 +
 .../pulsar/client/impl/schema/AvroSchema.java      |  10 ++
 .../pulsar/client/impl/schema/KeyValueSchema.java  |   5 +
 .../client/impl/schema/KeyValueSchemaInfo.java     |   5 +
 .../impl/schema/generic/GenericAvroSchema.java     |  10 ++
 .../impl/schema/generic/GenericSchemaImpl.java     |   1 -
 .../pulsar/functions/source/SerDeSchema.java       |   4 +
 18 files changed, 244 insertions(+), 27 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
index 09a0c44..7befab0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
@@ -124,5 +124,10 @@ public class JsonSchemaCompatibilityCheckTest extends BaseAvroSchemaCompatibilit
             info.setSchema(mapper.writeValueAsBytes(schema));
             return new OldJSONSchema<>(info, pojo, mapper);
         }
+
+        @Override
+        public Schema<T> clone() {
+            return this;
+        }
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
new file mode 100644
index 0000000..eba8ec0
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.schema;
+
+import com.google.common.collect.Sets;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+
+import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
+import static org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName;
+import static org.junit.Assert.assertEquals;
+
+public class SchemaTest extends MockedPulsarServiceBaseTest {
+
+    private final static String CLUSTER_NAME = "test";
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        super.internalSetup();
+
+        // Setup namespaces
+        admin.clusters().createCluster(CLUSTER_NAME, new ClusterData(pulsar.getBrokerServiceUrl()));
+        TenantInfo tenantInfo = new TenantInfo();
+        tenantInfo.setAllowedClusters(Collections.singleton(CLUSTER_NAME));
+        admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo);
+    }
+
+    @AfterMethod
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testMultiTopicSetSchemaProvider() throws Exception {
+        final String tenant = PUBLIC_TENANT;
+        final String namespace = "test-namespace-" + randomName(16);
+        final String topicOne = "test-multi-version-schema-one";
+        final String topicTwo = "test-multi-version-schema-two";
+        final String fqtnOne = TopicName.get(
+                TopicDomain.persistent.value(),
+                tenant,
+                namespace,
+                topicOne
+        ).toString();
+
+        final String fqtnTwo = TopicName.get(
+                TopicDomain.persistent.value(),
+                tenant,
+                namespace,
+                topicTwo
+        ).toString();
+
+
+        admin.namespaces().createNamespace(
+                tenant + "/" + namespace,
+                Sets.newHashSet(CLUSTER_NAME)
+        );
+
+        admin.topics().createPartitionedTopic(fqtnOne, 3);
+        admin.topics().createPartitionedTopic(fqtnTwo, 3);
+
+        admin.schemas().createSchema(fqtnOne, Schema.AVRO(
+                SchemaDefinition.<Schemas.PersonOne>builder().withAlwaysAllowNull
+                        (false).withSupportSchemaVersioning(true).
+                        withPojo(Schemas.PersonOne.class).build()).getSchemaInfo());
+
+        admin.schemas().createSchema(fqtnOne, Schema.AVRO(
+                SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
+                        (false).withSupportSchemaVersioning(true).
+                        withPojo(Schemas.PersonTwo.class).build()).getSchemaInfo());
+
+        admin.schemas().createSchema(fqtnTwo, Schema.AVRO(
+                SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
+                        (false).withSupportSchemaVersioning(true).
+                        withPojo(Schemas.PersonTwo.class).build()).getSchemaInfo());
+
+        Producer<Schemas.PersonTwo> producer = pulsarClient.newProducer(Schema.AVRO(
+                SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
+                        (false).withSupportSchemaVersioning(true).
+                        withPojo(Schemas.PersonTwo.class).build()))
+                .topic(fqtnOne)
+                .create();
+
+        Schemas.PersonTwo personTwo = new Schemas.PersonTwo();
+        personTwo.setId(1);
+        personTwo.setName("Tom");
+
+
+        Consumer<Schemas.PersonTwo> consumer = pulsarClient.newConsumer(Schema.AVRO(
+                SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
+                        (false).withSupportSchemaVersioning(true).
+                        withPojo(Schemas.PersonTwo.class).build()))
+                .subscriptionName("test")
+                .topic(fqtnOne, fqtnTwo)
+                .subscribe();
+
+        producer.send(personTwo);
+
+        Schemas.PersonTwo personConsume = consumer.receive().getValue();
+        assertEquals("Tom", personConsume.getName());
+        assertEquals(1, personConsume.getId());
+
+        producer.close();
+        consumer.close();
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/Schemas.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/Schemas.java
similarity index 96%
rename from pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/Schemas.java
rename to pulsar-broker/src/test/java/org/apache/pulsar/schema/Schemas.java
index 0978547..df02574 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/Schemas.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/Schemas.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.schema.compatibility;
+package org.apache.pulsar.schema;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
index 4903a68..f94d28b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.schema.Schemas;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -46,7 +47,6 @@ import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 
 @Slf4j
@@ -141,7 +141,7 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest {
 
 
             Schemas.PersonOne personOne = new Schemas.PersonOne();
-            personOne.id = 1;
+            personOne.setId(1);
 
             producerOne.send(personOne);
             Message<Schemas.PersonThree> message = null;
@@ -162,16 +162,16 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest {
                     .create();
 
             Schemas.PersonTwo personTwo = new Schemas.PersonTwo();
-            personTwo.id = 1;
-            personTwo.name = "Jerry";
+            personTwo.setId(1);
+            personTwo.setName("Jerry");
             producerTwo.send(personTwo);
 
             message = consumerThree.receive();
             Schemas.PersonThree personThree = message.getValue();
             consumerThree.acknowledge(message);
 
-            assertEquals(personThree.id, 1);
-            assertEquals(personThree.name, "Jerry");
+            assertEquals(personThree.getId(), 1);
+            assertEquals(personThree.getName(), "Jerry");
 
             consumerThree.close();
             producerOne.close();
@@ -270,8 +270,8 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest {
         Schemas.PersonTwo personTwo = message.getValue();
         consumerTwo.acknowledge(message);
 
-        assertEquals(personTwo.id, 2);
-        assertEquals(personTwo.name, "Lucy");
+        assertEquals(personTwo.getId(), 2);
+        assertEquals(personTwo.getName(), "Lucy");
 
         producer.close();
         consumerTwo.close();
@@ -287,8 +287,8 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest {
         personTwo = message.getValue();
         consumerTwo.acknowledge(message);
 
-        assertEquals(personTwo.id, 2);
-        assertEquals(personTwo.name, "Lucy");
+        assertEquals(personTwo.getId(), 2);
+        assertEquals(personTwo.getName(), "Lucy");
 
         consumerTwo.close();
         producer.close();
@@ -338,7 +338,7 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest {
         Message<Schemas.PersonOne> message = consumerOne.receive();
         personOne = message.getValue();
 
-        assertEquals(10, personOne.id);
+        assertEquals(10, personOne.getId());
 
         consumerOne.close();
         producerOne.close();
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
index bb0e85a..0c871b3 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -35,7 +35,7 @@ import org.apache.pulsar.common.schema.SchemaType;
 /**
  * Message schema definition.
  */
-public interface Schema<T> {
+public interface Schema<T> extends Cloneable{
 
     /**
      * Check if the message is a valid object for this schema.
@@ -137,6 +137,13 @@ public interface Schema<T> {
     }
 
     /**
+     * Duplicates the schema.
+     *
+     * @return The duplicated schema.
+     */
+    Schema<T> clone();
+
+    /**
      * Schema that doesn't perform any encoding on the message payloads. Accepts a byte array and it passes it through.
      */
     Schema<byte[]> BYTES = DefaultImplementation.newBytesSchema();
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java
index 807f482..aef6dd1 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java
@@ -74,4 +74,9 @@ public class PulsarKafkaSchema<T> implements Schema<T> {
     public SchemaInfo getSchemaInfo() {
         return Schema.BYTES.getSchemaInfo();
     }
+
+    @Override
+    public Schema<T> clone() {
+        return this;
+    }
 }
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java
index 807f482..aef6dd1 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java
@@ -74,4 +74,9 @@ public class PulsarKafkaSchema<T> implements Schema<T> {
     public SchemaInfo getSchemaInfo() {
         return Schema.BYTES.getSchemaInfo();
     }
+
+    @Override
+    public Schema<T> clone() {
+        return this;
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 5c26095..970e134 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -800,16 +800,17 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
 
     private void subscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int numPartitions,
             boolean createIfDoesNotExist) {
-        client.preProcessSchemaBeforeSubscribe(client, schema, topicName).whenComplete((ignored, cause) -> {
+        client.preProcessSchemaBeforeSubscribe(client, schema, topicName).whenComplete((schema, cause) -> {
             if (null == cause) {
-                doSubscribeTopicPartitions(subscribeResult, topicName, numPartitions, createIfDoesNotExist);
+                doSubscribeTopicPartitions(schema, subscribeResult, topicName, numPartitions, createIfDoesNotExist);
             } else {
                 subscribeResult.completeExceptionally(cause);
             }
         });
     }
 
-    private void doSubscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int numPartitions,
+    private void doSubscribeTopicPartitions(Schema<T> schema,
+                                            CompletableFuture<Void> subscribeResult, String topicName, int numPartitions,
             boolean createIfDoesNotExist) {
         if (log.isDebugEnabled()) {
             log.debug("Subscribe to topic {} metadata.partitions: {}", topicName, numPartitions);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 2fe8201..27a158d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -335,7 +335,7 @@ public class PulsarClientImpl implements PulsarClient {
 
     private <T> CompletableFuture<Consumer<T>> singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
         return preProcessSchemaBeforeSubscribe(this, schema, conf.getSingleTopic())
-            .thenCompose(ignored -> doSingleTopicSubscribeAsync(conf, schema, interceptors));
+            .thenCompose(schemaClone -> doSingleTopicSubscribeAsync(conf, schemaClone, interceptors));
     }
 
     private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
@@ -448,7 +448,7 @@ public class PulsarClientImpl implements PulsarClient {
 
     public <T> CompletableFuture<Reader<T>> createReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) {
         return preProcessSchemaBeforeSubscribe(this, schema, conf.getTopicName())
-            .thenCompose(ignored -> doCreateReaderAsync(conf, schema));
+            .thenCompose(schemaClone -> doCreateReaderAsync(conf, schemaClone));
     }
 
     <T> CompletableFuture<Reader<T>> doCreateReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) {
@@ -768,8 +768,8 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     @SuppressWarnings("unchecked")
-    protected CompletableFuture<Void> preProcessSchemaBeforeSubscribe(PulsarClientImpl pulsarClientImpl,
-                                                                      Schema schema,
+    protected <T> CompletableFuture<Schema<T>> preProcessSchemaBeforeSubscribe(PulsarClientImpl pulsarClientImpl,
+                                                                      Schema<T> schema,
                                                                       String topicName) {
         if (schema != null && schema.supportSchemaVersioning()) {
             final SchemaInfoProvider schemaInfoProvider;
@@ -779,11 +779,12 @@ public class PulsarClientImpl implements PulsarClient {
                 log.error("Failed to load schema info provider for topic {}", topicName, e);
                 return FutureUtil.failedFuture(e.getCause());
             }
-
+            schema = schema.clone();
             if (schema.requireFetchingSchemaInfo()) {
+                Schema finalSchema = schema;
                 return schemaInfoProvider.getLatestSchema().thenCompose(schemaInfo -> {
                     if (null == schemaInfo) {
-                        if (!(schema instanceof AutoConsumeSchema)) {
+                        if (!(finalSchema instanceof AutoConsumeSchema)) {
                             // no schema info is found
                             return FutureUtil.failedFuture(
                                     new PulsarClientException.NotFoundException(
@@ -792,18 +793,18 @@ public class PulsarClientImpl implements PulsarClient {
                     }
                     try {
                         log.info("Configuring schema for topic {} : {}", topicName, schemaInfo);
-                        schema.configureSchemaInfo(topicName, "topic", schemaInfo);
+                        finalSchema.configureSchemaInfo(topicName, "topic", schemaInfo);
                     } catch (RuntimeException re) {
                         return FutureUtil.failedFuture(re);
                     }
-                    schema.setSchemaInfoProvider(schemaInfoProvider);
-                    return CompletableFuture.completedFuture(null);
+                    finalSchema.setSchemaInfoProvider(schemaInfoProvider);
+                    return CompletableFuture.completedFuture(finalSchema);
                 });
             } else {
                 schema.setSchemaInfoProvider(schemaInfoProvider);
             }
         }
-        return CompletableFuture.completedFuture(null);
+        return CompletableFuture.completedFuture(schema);
     }
 
     //
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
index f459d5c..1084328 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
@@ -61,4 +61,9 @@ abstract class AbstractSchema<T> implements Schema<T> {
         // ignore version by default (most of the primitive schema implementations ignore schema version)
         return decode(byteBuf);
     }
+
+    @Override
+    public Schema<T> clone() {
+        return this;
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 27e8e6e..049b0f5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -132,6 +132,20 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
         }
     }
 
+    @Override
+    public Schema<GenericRecord> clone() {
+        Schema<GenericRecord> schema = Schema.AUTO_CONSUME();
+        if (this.schema != null) {
+            schema.configureSchemaInfo(topicName, componentName, this.schema.getSchemaInfo());
+        } else {
+            schema.configureSchemaInfo(topicName, componentName, null);
+        }
+        if (schemaInfoProvider != null) {
+            schema.setSchemaInfoProvider(schemaInfoProvider);
+        }
+        return schema;
+    }
+
     private GenericSchema generateSchema(SchemaInfo schemaInfo) {
         if (schemaInfo.getType() != SchemaType.AVRO
             && schemaInfo.getType() != SchemaType.JSON) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
index dd5193f..7578ffa 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
@@ -95,4 +95,9 @@ public class AutoProduceBytesSchema<T> implements Schema<byte[]> {
 
         return schema.getSchemaInfo();
     }
+
+    @Override
+    public Schema<byte[]> clone() {
+        return new AutoProduceBytesSchema<>(schema.clone());
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index f4d130f..fe801dd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Conversions;
 import org.apache.avro.data.TimeConversions;
 import org.apache.avro.reflect.ReflectData;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.api.schema.SchemaReader;
 import org.apache.pulsar.client.impl.schema.reader.AvroReader;
@@ -75,6 +76,15 @@ public class AvroSchema<T> extends StructSchema<T> {
         return true;
     }
 
+    @Override
+    public Schema<T> clone() {
+        Schema<T> schema = new AvroSchema<>(schemaInfo);
+        if (schemaInfoProvider != null) {
+            schema.setSchemaInfoProvider(schemaInfoProvider);
+        }
+        return schema;
+    }
+
     public static <T> AvroSchema<T> of(SchemaDefinition<T> schemaDefinition) {
         return new AvroSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.AVRO));
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
index 0e173d2..b81a947 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
@@ -193,6 +193,11 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
         }
     }
 
+    @Override
+    public Schema<KeyValue<K, V>> clone() {
+        return KeyValueSchema.of(keySchema.clone(), valueSchema.clone(), keyValueEncodingType);
+    }
+
     private void configureKeyValueSchemaInfo() {
         this.schemaInfo = KeyValueSchemaInfo.encodeKeyValueSchemaInfo(
             keySchema, valueSchema, keyValueEncodingType
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
index 120f5a7..9573526 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
@@ -48,6 +48,11 @@ public final class KeyValueSchemaInfo {
         public SchemaInfo getSchemaInfo() {
             return BytesSchema.BYTES.getSchemaInfo();
         }
+
+        @Override
+        public Schema<SchemaInfo> clone() {
+            return this;
+        }
     };
 
     private static final String KEY_SCHEMA_NAME = "key.schema.name";
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
index 91220b4..98e646e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
@@ -55,6 +55,16 @@ public class GenericAvroSchema extends GenericSchemaImpl {
     }
 
     @Override
+    public org.apache.pulsar.client.api.Schema<GenericRecord> clone() {
+        org.apache.pulsar.client.api.Schema<GenericRecord> schema =
+                GenericAvroSchema.of(schemaInfo, useProvidedSchemaAsReaderSchema);
+        if (schemaInfoProvider != null) {
+            schema.setSchemaInfoProvider(schemaInfoProvider);
+        }
+        return schema;
+    }
+
+    @Override
     protected SchemaReader<GenericRecord> loadReader(BytesSchemaVersion schemaVersion) {
          SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get());
          if (schemaInfo != null) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
index f22c449..7d18e52 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
@@ -76,5 +76,4 @@ public abstract class GenericSchemaImpl extends StructSchema<GenericRecord> impl
                     + schemaInfo.getType() + "'");
         }
     }
-
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SerDeSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SerDeSchema.java
index 8d4bf1f..c1adfcc 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SerDeSchema.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SerDeSchema.java
@@ -49,4 +49,8 @@ public class SerDeSchema<T> implements Schema<T> {
         return null;
     }
 
+    @Override
+    public Schema<T> clone() {
+        return this;
+    }
 }