You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/03/06 06:28:39 UTC
[pulsar] branch master updated: Independent schema is set for each
consumer generated by topic (#6356)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8003d08 Independent schema is set for each consumer generated by topic (#6356)
8003d08 is described below
commit 8003d08e5ca325867d2e825921f18ddda8d4e1d4
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Fri Mar 6 14:28:30 2020 +0800
Independent schema is set for each consumer generated by topic (#6356)
### Motivation
Master Issue: #5454
When one Consumer subscribe multi topic, setSchemaInfoPorvider() will be covered by the consumer generated by the last topic.
### Modification
clone schema for each consumer generated by topic.
### Verifying this change
Add the schemaTest for it.
---
.../schema/JsonSchemaCompatibilityCheckTest.java | 5 +
.../java/org/apache/pulsar/schema/SchemaTest.java | 136 +++++++++++++++++++++
.../pulsar/schema/{compatibility => }/Schemas.java | 2 +-
.../SchemaCompatibilityCheckTest.java | 22 ++--
.../java/org/apache/pulsar/client/api/Schema.java | 9 +-
.../client/kafka/compat/PulsarKafkaSchema.java | 5 +
.../client/kafka/compat/PulsarKafkaSchema.java | 5 +
.../client/impl/MultiTopicsConsumerImpl.java | 7 +-
.../pulsar/client/impl/PulsarClientImpl.java | 21 ++--
.../pulsar/client/impl/schema/AbstractSchema.java | 5 +
.../client/impl/schema/AutoConsumeSchema.java | 14 +++
.../client/impl/schema/AutoProduceBytesSchema.java | 5 +
.../pulsar/client/impl/schema/AvroSchema.java | 10 ++
.../pulsar/client/impl/schema/KeyValueSchema.java | 5 +
.../client/impl/schema/KeyValueSchemaInfo.java | 5 +
.../impl/schema/generic/GenericAvroSchema.java | 10 ++
.../impl/schema/generic/GenericSchemaImpl.java | 1 -
.../pulsar/functions/source/SerDeSchema.java | 4 +
18 files changed, 244 insertions(+), 27 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
index 09a0c44..7befab0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
@@ -124,5 +124,10 @@ public class JsonSchemaCompatibilityCheckTest extends BaseAvroSchemaCompatibilit
info.setSchema(mapper.writeValueAsBytes(schema));
return new OldJSONSchema<>(info, pojo, mapper);
}
+
+ @Override
+ public Schema<T> clone() {
+ return this;
+ }
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
new file mode 100644
index 0000000..eba8ec0
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.schema;
+
+import com.google.common.collect.Sets;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+
+import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
+import static org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName;
+import static org.junit.Assert.assertEquals;
+
+public class SchemaTest extends MockedPulsarServiceBaseTest {
+
+ private final static String CLUSTER_NAME = "test";
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception {
+ super.internalSetup();
+
+ // Setup namespaces
+ admin.clusters().createCluster(CLUSTER_NAME, new ClusterData(pulsar.getBrokerServiceUrl()));
+ TenantInfo tenantInfo = new TenantInfo();
+ tenantInfo.setAllowedClusters(Collections.singleton(CLUSTER_NAME));
+ admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo);
+ }
+
+ @AfterMethod
+ @Override
+ public void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testMultiTopicSetSchemaProvider() throws Exception {
+ final String tenant = PUBLIC_TENANT;
+ final String namespace = "test-namespace-" + randomName(16);
+ final String topicOne = "test-multi-version-schema-one";
+ final String topicTwo = "test-multi-version-schema-two";
+ final String fqtnOne = TopicName.get(
+ TopicDomain.persistent.value(),
+ tenant,
+ namespace,
+ topicOne
+ ).toString();
+
+ final String fqtnTwo = TopicName.get(
+ TopicDomain.persistent.value(),
+ tenant,
+ namespace,
+ topicTwo
+ ).toString();
+
+
+ admin.namespaces().createNamespace(
+ tenant + "/" + namespace,
+ Sets.newHashSet(CLUSTER_NAME)
+ );
+
+ admin.topics().createPartitionedTopic(fqtnOne, 3);
+ admin.topics().createPartitionedTopic(fqtnTwo, 3);
+
+ admin.schemas().createSchema(fqtnOne, Schema.AVRO(
+ SchemaDefinition.<Schemas.PersonOne>builder().withAlwaysAllowNull
+ (false).withSupportSchemaVersioning(true).
+ withPojo(Schemas.PersonOne.class).build()).getSchemaInfo());
+
+ admin.schemas().createSchema(fqtnOne, Schema.AVRO(
+ SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
+ (false).withSupportSchemaVersioning(true).
+ withPojo(Schemas.PersonTwo.class).build()).getSchemaInfo());
+
+ admin.schemas().createSchema(fqtnTwo, Schema.AVRO(
+ SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
+ (false).withSupportSchemaVersioning(true).
+ withPojo(Schemas.PersonTwo.class).build()).getSchemaInfo());
+
+ Producer<Schemas.PersonTwo> producer = pulsarClient.newProducer(Schema.AVRO(
+ SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
+ (false).withSupportSchemaVersioning(true).
+ withPojo(Schemas.PersonTwo.class).build()))
+ .topic(fqtnOne)
+ .create();
+
+ Schemas.PersonTwo personTwo = new Schemas.PersonTwo();
+ personTwo.setId(1);
+ personTwo.setName("Tom");
+
+
+ Consumer<Schemas.PersonTwo> consumer = pulsarClient.newConsumer(Schema.AVRO(
+ SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
+ (false).withSupportSchemaVersioning(true).
+ withPojo(Schemas.PersonTwo.class).build()))
+ .subscriptionName("test")
+ .topic(fqtnOne, fqtnTwo)
+ .subscribe();
+
+ producer.send(personTwo);
+
+ Schemas.PersonTwo personConsume = consumer.receive().getValue();
+ assertEquals("Tom", personConsume.getName());
+ assertEquals(1, personConsume.getId());
+
+ producer.close();
+ consumer.close();
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/Schemas.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/Schemas.java
similarity index 96%
rename from pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/Schemas.java
rename to pulsar-broker/src/test/java/org/apache/pulsar/schema/Schemas.java
index 0978547..df02574 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/Schemas.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/Schemas.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.schema.compatibility;
+package org.apache.pulsar.schema;
import lombok.AllArgsConstructor;
import lombok.Data;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
index 4903a68..f94d28b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.schema.Schemas;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -46,7 +47,6 @@ import java.util.concurrent.ThreadLocalRandom;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
@Slf4j
@@ -141,7 +141,7 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest {
Schemas.PersonOne personOne = new Schemas.PersonOne();
- personOne.id = 1;
+ personOne.setId(1);
producerOne.send(personOne);
Message<Schemas.PersonThree> message = null;
@@ -162,16 +162,16 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest {
.create();
Schemas.PersonTwo personTwo = new Schemas.PersonTwo();
- personTwo.id = 1;
- personTwo.name = "Jerry";
+ personTwo.setId(1);
+ personTwo.setName("Jerry");
producerTwo.send(personTwo);
message = consumerThree.receive();
Schemas.PersonThree personThree = message.getValue();
consumerThree.acknowledge(message);
- assertEquals(personThree.id, 1);
- assertEquals(personThree.name, "Jerry");
+ assertEquals(personThree.getId(), 1);
+ assertEquals(personThree.getName(), "Jerry");
consumerThree.close();
producerOne.close();
@@ -270,8 +270,8 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest {
Schemas.PersonTwo personTwo = message.getValue();
consumerTwo.acknowledge(message);
- assertEquals(personTwo.id, 2);
- assertEquals(personTwo.name, "Lucy");
+ assertEquals(personTwo.getId(), 2);
+ assertEquals(personTwo.getName(), "Lucy");
producer.close();
consumerTwo.close();
@@ -287,8 +287,8 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest {
personTwo = message.getValue();
consumerTwo.acknowledge(message);
- assertEquals(personTwo.id, 2);
- assertEquals(personTwo.name, "Lucy");
+ assertEquals(personTwo.getId(), 2);
+ assertEquals(personTwo.getName(), "Lucy");
consumerTwo.close();
producer.close();
@@ -338,7 +338,7 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest {
Message<Schemas.PersonOne> message = consumerOne.receive();
personOne = message.getValue();
- assertEquals(10, personOne.id);
+ assertEquals(10, personOne.getId());
consumerOne.close();
producerOne.close();
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
index bb0e85a..0c871b3 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -35,7 +35,7 @@ import org.apache.pulsar.common.schema.SchemaType;
/**
* Message schema definition.
*/
-public interface Schema<T> {
+public interface Schema<T> extends Cloneable{
/**
* Check if the message is a valid object for this schema.
@@ -137,6 +137,13 @@ public interface Schema<T> {
}
/**
+ * Duplicates the schema.
+ *
+ * @return The duplicated schema.
+ */
+ Schema<T> clone();
+
+ /**
* Schema that doesn't perform any encoding on the message payloads. Accepts a byte array and it passes it through.
*/
Schema<byte[]> BYTES = DefaultImplementation.newBytesSchema();
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java
index 807f482..aef6dd1 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java
@@ -74,4 +74,9 @@ public class PulsarKafkaSchema<T> implements Schema<T> {
public SchemaInfo getSchemaInfo() {
return Schema.BYTES.getSchemaInfo();
}
+
+ @Override
+ public Schema<T> clone() {
+ return this;
+ }
}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java
index 807f482..aef6dd1 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java
@@ -74,4 +74,9 @@ public class PulsarKafkaSchema<T> implements Schema<T> {
public SchemaInfo getSchemaInfo() {
return Schema.BYTES.getSchemaInfo();
}
+
+ @Override
+ public Schema<T> clone() {
+ return this;
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 5c26095..970e134 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -800,16 +800,17 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
private void subscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int numPartitions,
boolean createIfDoesNotExist) {
- client.preProcessSchemaBeforeSubscribe(client, schema, topicName).whenComplete((ignored, cause) -> {
+ client.preProcessSchemaBeforeSubscribe(client, schema, topicName).whenComplete((schema, cause) -> {
if (null == cause) {
- doSubscribeTopicPartitions(subscribeResult, topicName, numPartitions, createIfDoesNotExist);
+ doSubscribeTopicPartitions(schema, subscribeResult, topicName, numPartitions, createIfDoesNotExist);
} else {
subscribeResult.completeExceptionally(cause);
}
});
}
- private void doSubscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int numPartitions,
+ private void doSubscribeTopicPartitions(Schema<T> schema,
+ CompletableFuture<Void> subscribeResult, String topicName, int numPartitions,
boolean createIfDoesNotExist) {
if (log.isDebugEnabled()) {
log.debug("Subscribe to topic {} metadata.partitions: {}", topicName, numPartitions);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 2fe8201..27a158d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -335,7 +335,7 @@ public class PulsarClientImpl implements PulsarClient {
private <T> CompletableFuture<Consumer<T>> singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
return preProcessSchemaBeforeSubscribe(this, schema, conf.getSingleTopic())
- .thenCompose(ignored -> doSingleTopicSubscribeAsync(conf, schema, interceptors));
+ .thenCompose(schemaClone -> doSingleTopicSubscribeAsync(conf, schemaClone, interceptors));
}
private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
@@ -448,7 +448,7 @@ public class PulsarClientImpl implements PulsarClient {
public <T> CompletableFuture<Reader<T>> createReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) {
return preProcessSchemaBeforeSubscribe(this, schema, conf.getTopicName())
- .thenCompose(ignored -> doCreateReaderAsync(conf, schema));
+ .thenCompose(schemaClone -> doCreateReaderAsync(conf, schemaClone));
}
<T> CompletableFuture<Reader<T>> doCreateReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) {
@@ -768,8 +768,8 @@ public class PulsarClientImpl implements PulsarClient {
}
@SuppressWarnings("unchecked")
- protected CompletableFuture<Void> preProcessSchemaBeforeSubscribe(PulsarClientImpl pulsarClientImpl,
- Schema schema,
+ protected <T> CompletableFuture<Schema<T>> preProcessSchemaBeforeSubscribe(PulsarClientImpl pulsarClientImpl,
+ Schema<T> schema,
String topicName) {
if (schema != null && schema.supportSchemaVersioning()) {
final SchemaInfoProvider schemaInfoProvider;
@@ -779,11 +779,12 @@ public class PulsarClientImpl implements PulsarClient {
log.error("Failed to load schema info provider for topic {}", topicName, e);
return FutureUtil.failedFuture(e.getCause());
}
-
+ schema = schema.clone();
if (schema.requireFetchingSchemaInfo()) {
+ Schema finalSchema = schema;
return schemaInfoProvider.getLatestSchema().thenCompose(schemaInfo -> {
if (null == schemaInfo) {
- if (!(schema instanceof AutoConsumeSchema)) {
+ if (!(finalSchema instanceof AutoConsumeSchema)) {
// no schema info is found
return FutureUtil.failedFuture(
new PulsarClientException.NotFoundException(
@@ -792,18 +793,18 @@ public class PulsarClientImpl implements PulsarClient {
}
try {
log.info("Configuring schema for topic {} : {}", topicName, schemaInfo);
- schema.configureSchemaInfo(topicName, "topic", schemaInfo);
+ finalSchema.configureSchemaInfo(topicName, "topic", schemaInfo);
} catch (RuntimeException re) {
return FutureUtil.failedFuture(re);
}
- schema.setSchemaInfoProvider(schemaInfoProvider);
- return CompletableFuture.completedFuture(null);
+ finalSchema.setSchemaInfoProvider(schemaInfoProvider);
+ return CompletableFuture.completedFuture(finalSchema);
});
} else {
schema.setSchemaInfoProvider(schemaInfoProvider);
}
}
- return CompletableFuture.completedFuture(null);
+ return CompletableFuture.completedFuture(schema);
}
//
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
index f459d5c..1084328 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java
@@ -61,4 +61,9 @@ abstract class AbstractSchema<T> implements Schema<T> {
// ignore version by default (most of the primitive schema implementations ignore schema version)
return decode(byteBuf);
}
+
+ @Override
+ public Schema<T> clone() {
+ return this;
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 27e8e6e..049b0f5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -132,6 +132,20 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
}
}
+ @Override
+ public Schema<GenericRecord> clone() {
+ Schema<GenericRecord> schema = Schema.AUTO_CONSUME();
+ if (this.schema != null) {
+ schema.configureSchemaInfo(topicName, componentName, this.schema.getSchemaInfo());
+ } else {
+ schema.configureSchemaInfo(topicName, componentName, null);
+ }
+ if (schemaInfoProvider != null) {
+ schema.setSchemaInfoProvider(schemaInfoProvider);
+ }
+ return schema;
+ }
+
private GenericSchema generateSchema(SchemaInfo schemaInfo) {
if (schemaInfo.getType() != SchemaType.AVRO
&& schemaInfo.getType() != SchemaType.JSON) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
index dd5193f..7578ffa 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
@@ -95,4 +95,9 @@ public class AutoProduceBytesSchema<T> implements Schema<byte[]> {
return schema.getSchemaInfo();
}
+
+ @Override
+ public Schema<byte[]> clone() {
+ return new AutoProduceBytesSchema<>(schema.clone());
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index f4d130f..fe801dd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Conversions;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.reflect.ReflectData;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.impl.schema.reader.AvroReader;
@@ -75,6 +76,15 @@ public class AvroSchema<T> extends StructSchema<T> {
return true;
}
+ @Override
+ public Schema<T> clone() {
+ Schema<T> schema = new AvroSchema<>(schemaInfo);
+ if (schemaInfoProvider != null) {
+ schema.setSchemaInfoProvider(schemaInfoProvider);
+ }
+ return schema;
+ }
+
public static <T> AvroSchema<T> of(SchemaDefinition<T> schemaDefinition) {
return new AvroSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.AVRO));
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
index 0e173d2..b81a947 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
@@ -193,6 +193,11 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
}
}
+ @Override
+ public Schema<KeyValue<K, V>> clone() {
+ return KeyValueSchema.of(keySchema.clone(), valueSchema.clone(), keyValueEncodingType);
+ }
+
private void configureKeyValueSchemaInfo() {
this.schemaInfo = KeyValueSchemaInfo.encodeKeyValueSchemaInfo(
keySchema, valueSchema, keyValueEncodingType
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
index 120f5a7..9573526 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
@@ -48,6 +48,11 @@ public final class KeyValueSchemaInfo {
public SchemaInfo getSchemaInfo() {
return BytesSchema.BYTES.getSchemaInfo();
}
+
+ @Override
+ public Schema<SchemaInfo> clone() {
+ return this;
+ }
};
private static final String KEY_SCHEMA_NAME = "key.schema.name";
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
index 91220b4..98e646e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
@@ -55,6 +55,16 @@ public class GenericAvroSchema extends GenericSchemaImpl {
}
@Override
+ public org.apache.pulsar.client.api.Schema<GenericRecord> clone() {
+ org.apache.pulsar.client.api.Schema<GenericRecord> schema =
+ GenericAvroSchema.of(schemaInfo, useProvidedSchemaAsReaderSchema);
+ if (schemaInfoProvider != null) {
+ schema.setSchemaInfoProvider(schemaInfoProvider);
+ }
+ return schema;
+ }
+
+ @Override
protected SchemaReader<GenericRecord> loadReader(BytesSchemaVersion schemaVersion) {
SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get());
if (schemaInfo != null) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
index f22c449..7d18e52 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
@@ -76,5 +76,4 @@ public abstract class GenericSchemaImpl extends StructSchema<GenericRecord> impl
+ schemaInfo.getType() + "'");
}
}
-
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SerDeSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SerDeSchema.java
index 8d4bf1f..c1adfcc 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SerDeSchema.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SerDeSchema.java
@@ -49,4 +49,8 @@ public class SerDeSchema<T> implements Schema<T> {
return null;
}
+ @Override
+ public Schema<T> clone() {
+ return this;
+ }
}