You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/04/28 04:04:50 UTC
[pulsar] branch master updated: Add the multi version schema
support (#3876)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d5ff082 Add the multi version schema support (#3876)
d5ff082 is described below
commit d5ff0828d222775d0562a1b5d589e3f231a1716f
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Sun Apr 28 12:04:45 2019 +0800
Add the multi version schema support (#3876)
### Motivation
Fix #3742
In order to decode the message correctly by AVRO schema, we need to know the schema what the message is.
### Modification
- Introduced Schema Reader and Schema Writer for StructSchema.
- Reader is used to decode message
- Writer is used to encode message
- The implementations of StructSchema, provides their schema reader and writer implementations.
- Introduced a schema reader cache for caching the readers for different schema versions.
---
.../java/org/apache/pulsar/client/api/Schema.java | 18 +++-
.../pulsar/client/api/schema/GenericSchema.java | 2 +-
.../pulsar/client/api/schema/SchemaDefinition.java | 7 ++
.../client/api/schema/SchemaDefinitionBuilder.java | 15 ++-
...{GenericSchema.java => SchemaInfoProvider.java} | 27 ++++--
.../pulsar/client/api/schema/SchemaReader.java | 18 ++--
.../pulsar/client/api/schema/SchemaWriter.java | 18 ++--
.../client/internal/DefaultImplementation.java | 8 +-
.../apache/pulsar/common/schema/SchemaInfo.java | 2 +
.../org/apache/pulsar/client/impl/MessageImpl.java | 7 +-
.../client/impl/MultiTopicsConsumerImpl.java | 7 ++
.../pulsar/client/impl/PulsarClientImpl.java | 94 ++++++++++++-------
.../client/impl/schema/AutoConsumeSchema.java | 11 +++
.../pulsar/client/impl/schema/AvroSchema.java | 85 ++++++-----------
.../pulsar/client/impl/schema/JSONSchema.java | 46 +++------
.../pulsar/client/impl/schema/ProtobufSchema.java | 87 +++++++++--------
.../impl/schema/SchemaDefinitionBuilderImpl.java | 12 ++-
.../client/impl/schema/SchemaDefinitionImpl.java | 19 ++--
.../pulsar/client/impl/schema/StructSchema.java | 104 +++++++++++++++++----
...nericAvroSchema.java => GenericAvroReader.java} | 75 +++++++--------
.../impl/schema/generic/GenericAvroSchema.java | 64 ++++---------
...nericAvroSchema.java => GenericAvroWriter.java} | 51 ++--------
...nericJsonSchema.java => GenericJsonReader.java} | 46 ++++-----
.../impl/schema/generic/GenericJsonSchema.java | 43 ++++-----
...{SchemaProvider.java => GenericJsonWriter.java} | 32 ++++---
.../impl/schema/generic/GenericSchemaImpl.java | 32 ++-----
.../schema/generic/MultiVersionGenericSchema.java | 62 ------------
...er.java => MultiVersionSchemaInfoProvider.java} | 45 +++++----
.../client/impl/schema/reader/AvroReader.java | 58 ++++++++++++
.../client/impl/schema/reader/JsonReader.java | 49 ++++------
.../client/impl/schema/reader/ProtobufReader.java | 47 ++++------
.../client/impl/schema/writer/AvroWriter.java | 53 +++++++++++
.../client/impl/schema/writer/JsonWriter.java | 37 ++++----
.../ProtobufWriter.java} | 21 ++---
.../client/impl/schema/ProtobufSchemaTest.java | 21 ++---
.../client/impl/schema/SchemaBuilderTest.java | 2 +-
.../pulsar/client/impl/schema/SchemaTestUtils.java | 12 +++
.../schema/SupportVersioningAvroSchemaTest.java | 66 +++++++++++++
.../impl/schema/generic/GenericAvroReaderTest.java | 86 +++++++++++++++++
.../impl/schema/generic/GenericSchemaImplTest.java | 64 +++++++++++--
.../generic/MultiVersionGenericSchemaTest.java | 90 ------------------
...ava => MultiVersionSchemaInfoProviderTest.java} | 13 ++-
.../io/hbase/sink/HbaseGenericRecordSinkTest.java | 18 +++-
.../io/influxdb/InfluxDBGenericRecordSinkTest.java | 14 ++-
.../org/apache/pulsar/io/jdbc/JdbcSinkTest.java | 21 +++--
.../pulsar/io/solr/SolrGenericRecordSinkTest.java | 17 +++-
.../tests/integration/schema/SchemaTest.java | 56 ++++++++++-
.../pulsar/tests/integration/schema/Schemas.java | 18 ++++
48 files changed, 1031 insertions(+), 769 deletions(-)
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
index 18e02bf..72de4fe 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -26,6 +26,7 @@ import java.util.Date;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
@@ -80,6 +81,9 @@ public interface Schema<T> {
return false;
}
+ default void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
+ }
+
/**
* Decode a byte array into an object using the schema definition and deserializer implementation
*
@@ -183,7 +187,17 @@ public interface Schema<T> {
* @return a Schema instance
*/
static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF(Class<T> clazz) {
- return DefaultImplementation.newProtobufSchema(clazz);
+ return DefaultImplementation.newProtobufSchema(SchemaDefinition.builder().withPojo(clazz).build());
+ }
+
+ /**
+ * Create a Protobuf schema type with schema definition.
+ *
+ * @param schemaDefinition schemaDefinition the definition of the schema
+ * @return a Schema instance
+ */
+ static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> PROTOBUF(SchemaDefinition<T> schemaDefinition) {
+ return DefaultImplementation.newProtobufSchema(schemaDefinition);
}
/**
@@ -300,7 +314,7 @@ public interface Schema<T> {
* @param schemaInfo schema info
* @return a generic schema instance
*/
- static GenericSchema generic(SchemaInfo schemaInfo) {
+ static GenericSchema<GenericRecord> generic(SchemaInfo schemaInfo) {
return DefaultImplementation.getGenericSchema(schemaInfo);
}
}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/GenericSchema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/GenericSchema.java
index 8b5137d..95e6939 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/GenericSchema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/GenericSchema.java
@@ -24,7 +24,7 @@ import org.apache.pulsar.client.api.Schema;
/**
* A schema that serializes and deserializes between {@link GenericRecord} and bytes.
*/
-public interface GenericSchema extends Schema<GenericRecord> {
+public interface GenericSchema<T extends GenericRecord> extends Schema<T> {
/**
* Returns the list of fields.
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java
index daf90df..32fddec 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java
@@ -61,4 +61,11 @@ public interface SchemaDefinition<T> {
* @return pojo schema
*/
public Class<T> getPojo();
+
+ /**
+ * Get supportSchemaVersioning schema definition
+ *
+ * @return the flag of supportSchemaVersioning
+ */
+ public boolean getSupportSchemaVersioning();
}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java
index 77bb363..8305f02 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java
@@ -49,7 +49,7 @@ public interface SchemaDefinitionBuilder<T> {
* @param key property key
* @param value property value
*
- * @return record schema definition
+ * @return schema definition builder
*/
SchemaDefinitionBuilder<T> addProperty(String key, String value);
@@ -58,7 +58,7 @@ public interface SchemaDefinitionBuilder<T> {
*
* @param pojo pojo schema definition
*
- * @return record schema definition
+ * @return schema definition builder
*/
SchemaDefinitionBuilder<T> withPojo(Class pojo);
@@ -67,11 +67,20 @@ public interface SchemaDefinitionBuilder<T> {
*
* @param jsonDefinition json schema definition
*
- * @return record schema definition
+ * @return schema definition builder
*/
SchemaDefinitionBuilder<T> withJsonDef(String jsonDefinition);
/**
+ * Set schema whether decode by schema version
+ *
+ * @param supportSchemaVersioning decode by version
+ *
+ * @return schema definition builder
+ */
+ SchemaDefinitionBuilder<T> withSupportSchemaVersioning(boolean supportSchemaVersioning);
+
+ /**
* Build the schema definition.
*
* @return the schema definition.
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/GenericSchema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaInfoProvider.java
similarity index 61%
copy from pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/GenericSchema.java
copy to pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaInfoProvider.java
index 8b5137d..81e4f44 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/GenericSchema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaInfoProvider.java
@@ -18,26 +18,33 @@
*/
package org.apache.pulsar.client.api.schema;
-import java.util.List;
-import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
/**
- * A schema that serializes and deserializes between {@link GenericRecord} and bytes.
+ * Schema Provider.
*/
-public interface GenericSchema extends Schema<GenericRecord> {
+public interface SchemaInfoProvider {
/**
- * Returns the list of fields.
+ * Retrieve the schema info of a given <tt>schemaVersion</tt>.
*
- * @return the list of fields of generic record.
+ * @param schemaVersion schema version
+ * @return schema info of the provided <tt>schemaVersion</tt>
*/
- List<Field> getFields();
+ SchemaInfo getSchemaByVersion(byte[] schemaVersion);
/**
- * Create a builder to build {@link GenericRecord}.
+ * Retrieve the latest schema info.
*
- * @return generic record builder
+ * @return the latest schema
*/
- GenericRecordBuilder newRecordBuilder();
+ SchemaInfo getLatestSchema();
+
+ /**
+ * Retrieve the topic name.
+ *
+ * @return the topic name
+ */
+ public String getTopicName();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/SchemaProvider.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java
similarity index 67%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/SchemaProvider.java
copy to pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java
index 18778e1..0f33935 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/SchemaProvider.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java
@@ -16,21 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.client.impl.schema.generic;
+package org.apache.pulsar.client.api.schema;
-import org.apache.pulsar.client.api.Schema;
-
-/**
- * Schema Provider.
- */
-public interface SchemaProvider<T> {
+public interface SchemaReader<T> {
/**
- * Retrieve the schema instance of a given <tt>schemaVersion</tt>.
+ * serialize bytes convert pojo
*
- * @param schemaVersion schema version
- * @return schema instance of the provided <tt>schemaVersion</tt>
+ * @param bytes the data
+ * @return the serialized object
*/
- Schema<T> getSchema(byte[] schemaVersion);
-
+ T read(byte[] bytes);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/SchemaProvider.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaWriter.java
similarity index 67%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/SchemaProvider.java
copy to pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaWriter.java
index 18778e1..a93739a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/SchemaProvider.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaWriter.java
@@ -16,21 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.client.impl.schema.generic;
+package org.apache.pulsar.client.api.schema;
-import org.apache.pulsar.client.api.Schema;
-
-/**
- * Schema Provider.
- */
-public interface SchemaProvider<T> {
+public interface SchemaWriter<T> {
/**
- * Retrieve the schema instance of a given <tt>schemaVersion</tt>.
+ * serialize the message into bytes
*
- * @param schemaVersion schema version
- * @return schema instance of the provided <tt>schemaVersion</tt>
+ * @param message the message for encode
+ * @return the serialized bytes
*/
- Schema<T> getSchema(byte[] schemaVersion);
-
+ byte[] write(T message);
}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
index ccc0a15..da48774 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
@@ -214,10 +214,10 @@ public class DefaultImplementation {
.invoke(null,schemaDefinition));
}
- public static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> newProtobufSchema(Class<T> clazz) {
+ public static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> newProtobufSchema(SchemaDefinition schemaDefinition) {
return catchExceptions(
- () -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.ProtobufSchema", "of", Class.class)
- .invoke(null, clazz));
+ () -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.ProtobufSchema", "of", SchemaDefinition.class)
+ .invoke(null, schemaDefinition));
}
public static <T> Schema<T> newJSONSchema(SchemaDefinition schemaDefinition) {
@@ -262,7 +262,7 @@ public class DefaultImplementation {
"getSchema", SchemaInfo.class).invoke(null, schemaInfo));
}
- public static GenericSchema getGenericSchema(SchemaInfo schemaInfo) {
+ public static GenericSchema<GenericRecord> getGenericSchema(SchemaInfo schemaInfo) {
return catchExceptions(
() -> (GenericSchema) getStaticMethod("org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl",
"of", SchemaInfo.class).invoke(null, schemaInfo));
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
index abd9dea..87d0e15 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.Map;
import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
@@ -31,6 +32,7 @@ import lombok.experimental.Accessors;
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true)
+@Builder
public class SchemaInfo {
@EqualsAndHashCode.Exclude
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 6a38abb..6b9731b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -230,7 +230,7 @@ public class MessageImpl<T> implements Message<T> {
@Override
public byte[] getSchemaVersion() {
- if (msgMetadataBuilder.hasSchemaVersion()) {
+ if (msgMetadataBuilder != null && msgMetadataBuilder.hasSchemaVersion()) {
return msgMetadataBuilder.getSchemaVersion().toByteArray();
} else {
return null;
@@ -241,8 +241,9 @@ public class MessageImpl<T> implements Message<T> {
public T getValue() {
// check if the schema passed in from client supports schema versioning or not
// this is an optimization to only get schema version when necessary
- if (schema.supportSchemaVersioning()) {
- return schema.decode(getData(), getSchemaVersion());
+ byte [] schemaVersion = getSchemaVersion();
+ if (schema.supportSchemaVersioning() && schemaVersion != null) {
+ return schema.decode(getData(), schemaVersion);
} else {
return schema.decode(getData());
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 8ebf1b9..312c1cd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -742,6 +742,13 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
List<CompletableFuture<Consumer<T>>> futureList;
+ try {
+ client.preProcessSchemaBeforeSubscribe(client, schema, topicName);
+ } catch (Throwable t) {
+ subscribeResult.completeExceptionally(t);
+ return;
+ }
+
if (numPartitions > 1) {
this.topics.putIfAbsent(topicName, numPartitions);
allTopicPartitionsNumber.addAndGet(numPartitions);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index bdd3345..3da7e14 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -22,6 +22,9 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.commons.lang3.StringUtils.isBlank;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -52,6 +55,8 @@ import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -60,6 +65,7 @@ import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
+import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
@@ -98,6 +104,15 @@ public class PulsarClientImpl implements PulsarClient {
private final EventLoopGroup eventLoopGroup;
+ private final LoadingCache<String, SchemaInfoProvider> schemaProviderLoadingCache = CacheBuilder.newBuilder().maximumSize(100000)
+ .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<String, SchemaInfoProvider>() {
+
+ @Override
+ public SchemaInfoProvider load(String topicName) {
+ return newSchemaProvider(topicName);
+ }
+ });
+
public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException {
this(conf, getEventLoopGroup(conf));
}
@@ -291,24 +306,12 @@ public class PulsarClientImpl implements PulsarClient {
}
private <T> CompletableFuture<Consumer<T>> singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
- if (schema instanceof AutoConsumeSchema) {
- AutoConsumeSchema autoConsumeSchema = (AutoConsumeSchema) schema;
- return lookup.getSchema(TopicName.get(conf.getSingleTopic()))
- .thenCompose(schemaInfoOptional -> {
- if (schemaInfoOptional.isPresent() && schemaInfoOptional.get().getType() == SchemaType.AVRO) {
- GenericSchemaImpl genericSchema = GenericSchemaImpl.of(schemaInfoOptional.get());
- log.info("Auto detected schema for topic {} : {}",
- conf.getSingleTopic(), new String(schemaInfoOptional.get().getSchema(), UTF_8));
- autoConsumeSchema.setSchema(genericSchema);
- return doSingleTopicSubscribeAsync(conf, schema, interceptors);
- } else {
- return FutureUtil.failedFuture(
- new PulsarClientException.LookupException("Currently schema detection only works for topics with avro schemas"));
- }
- });
- } else {
- return doSingleTopicSubscribeAsync(conf, schema, interceptors);
+ try {
+ preProcessSchemaBeforeSubscribe(this, schema, conf.getSingleTopic());
+ } catch (Throwable t) {
+ return FutureUtil.failedFuture(t);
}
+ return doSingleTopicSubscribeAsync(conf, schema, interceptors);
}
private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
@@ -423,24 +426,12 @@ public class PulsarClientImpl implements PulsarClient {
}
public <T> CompletableFuture<Reader<T>> createReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) {
- if (schema instanceof AutoConsumeSchema) {
- AutoConsumeSchema autoConsumeSchema = (AutoConsumeSchema) schema;
- return lookup.getSchema(TopicName.get(conf.getTopicName()))
- .thenCompose(schemaInfoOptional -> {
- if (schemaInfoOptional.isPresent() && schemaInfoOptional.get().getType() == SchemaType.AVRO) {
- GenericSchemaImpl genericSchema = GenericSchemaImpl.of(schemaInfoOptional.get());
- log.info("Auto detected schema for topic {} : {}",
- conf.getTopicName(), new String(schemaInfoOptional.get().getSchema(), UTF_8));
- autoConsumeSchema.setSchema(genericSchema);
- return doCreateReaderAsync(conf, schema);
- } else {
- return FutureUtil.failedFuture(
- new PulsarClientException.LookupException("Currently schema detection only works for topics with avro schemas"));
- }
- });
- } else {
- return doCreateReaderAsync(conf, schema);
+ try {
+ preProcessSchemaBeforeSubscribe(this, schema, conf.getTopicName());
+ } catch (Throwable t) {
+ return FutureUtil.failedFuture(t);
}
+ return doCreateReaderAsync(conf, schema);
}
<T> CompletableFuture<Reader<T>> doCreateReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) {
if (state.get() != State.Open) {
@@ -737,4 +728,39 @@ public class PulsarClientImpl implements PulsarClient {
return null;
}
}
+
+ private SchemaInfoProvider newSchemaProvider(String topicName) {
+ return new MultiVersionSchemaInfoProvider(TopicName.get(topicName), this);
+ }
+
+ private LoadingCache<String, SchemaInfoProvider> getSchemaProviderLoadingCache() {
+ return schemaProviderLoadingCache;
+ }
+
+ protected void preProcessSchemaBeforeSubscribe(PulsarClientImpl pulsarClientImpl, Schema schema, String topicName) throws Throwable {
+ if (schema != null && schema.supportSchemaVersioning()) {
+ SchemaInfoProvider schemaInfoProvider = null;
+ try {
+ schemaInfoProvider = pulsarClientImpl.getSchemaProviderLoadingCache().get(topicName);
+ } catch (ExecutionException e) {
+ log.error("Failed to load schema info provider for topic {}", topicName, e);
+ throw e.getCause();
+ }
+
+ if (schema instanceof AutoConsumeSchema) {
+ SchemaInfo schemaInfo = schemaInfoProvider.getLatestSchema();
+ if (schemaInfo.getType() != SchemaType.AVRO){
+ throw new RuntimeException("Currently schema detection only works for topics with avro schemas");
+
+ }
+ GenericSchema genericSchema = GenericSchemaImpl.of(schemaInfoProvider.getLatestSchema());
+ log.info("Auto detected schema for topic {} : {}",
+ topicName, new String(schemaInfo.getSchema(), UTF_8));
+ ((AutoConsumeSchema) schema).setSchema(genericSchema);
+ }
+ schema.setSchemaInfoProvider(schemaInfoProvider);
+ }
+
+ }
+
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 6df185b..050716a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkState;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
@@ -49,6 +50,11 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
}
@Override
+ public boolean supportSchemaVersioning() {
+ return true;
+ }
+
+ @Override
public byte[] encode(GenericRecord message) {
ensureSchemaInitialized();
@@ -63,6 +69,11 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
}
@Override
+ public void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
+ schema.setSchemaInfoProvider(schemaInfoProvider);
+ }
+
+ @Override
public SchemaInfo getSchemaInfo() {
ensureSchemaInitialized();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index a2ce0d1..efe08d2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -21,20 +21,16 @@ package org.apache.pulsar.client.impl.schema;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Conversions;
import org.apache.avro.data.TimeConversions;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.api.schema.SchemaReader;
+import org.apache.pulsar.client.impl.schema.reader.AvroReader;
+import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
import java.util.Map;
/**
@@ -42,15 +38,9 @@ import java.util.Map;
*/
@Slf4j
public class AvroSchema<T> extends StructSchema<T> {
+ private static final Logger LOG = LoggerFactory.getLogger(AvroSchema.class);
- private ReflectDatumWriter<T> datumWriter;
- private ReflectDatumReader<T> reader;
- private BinaryEncoder encoder;
- private ByteArrayOutputStream byteArrayOutputStream;
-
- private static final ThreadLocal<BinaryDecoder> decoders =
- new ThreadLocal<>();
-// the aim to fix avro's bug
+ // the aim to fix avro's bug
// https://issues.apache.org/jira/browse/AVRO-1891 bug address explain
// fix the avro logical type read and write
static {
@@ -77,53 +67,19 @@ public class AvroSchema<T> extends StructSchema<T> {
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeConversion());
}
- private AvroSchema(org.apache.avro.Schema schema,
- SchemaDefinition schemaDefinition) {
- super(
- SchemaType.AVRO,
- schema,
- schemaDefinition.getProperties());
- this.byteArrayOutputStream = new ByteArrayOutputStream();
- this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, this.encoder);
- this.datumWriter = new ReflectDatumWriter<>(this.schema);
- this.reader = new ReflectDatumReader<>(this.schema);
- }
-
- @Override
- public synchronized byte[] encode(T message) {
- try {
- datumWriter.write(message, this.encoder);
- this.encoder.flush();
- return this.byteArrayOutputStream.toByteArray();
- } catch (Exception e) {
- throw new SchemaSerializationException(e);
- } finally {
- this.byteArrayOutputStream.reset();
- }
- }
-
- @Override
- public T decode(byte[] bytes) {
- try {
- BinaryDecoder decoderFromCache = decoders.get();
- BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, decoderFromCache);
- if (decoderFromCache == null) {
- decoders.set(decoder);
- }
- return reader.read(null, DecoderFactory.get().binaryDecoder(bytes, decoder));
- } catch (IOException e) {
- throw new SchemaSerializationException(e);
- }
+ private AvroSchema(SchemaInfo schemaInfo) {
+ super(schemaInfo);
+ setReader(new AvroReader<>(schema));
+ setWriter(new AvroWriter<>(schema));
}
@Override
- public SchemaInfo getSchemaInfo() {
- return this.schemaInfo;
+ public boolean supportSchemaVersioning() {
+ return true;
}
public static <T> AvroSchema<T> of(SchemaDefinition<T> schemaDefinition) {
- return schemaDefinition.getJsonDef() == null ?
- new AvroSchema<>(createAvroSchema(schemaDefinition), schemaDefinition) : new AvroSchema<>(parseAvroSchema(schemaDefinition.getJsonDef()), schemaDefinition);
+ return new AvroSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.AVRO));
}
public static <T> AvroSchema<T> of(Class<T> pojo) {
@@ -132,7 +88,18 @@ public class AvroSchema<T> extends StructSchema<T> {
public static <T> AvroSchema<T> of(Class<T> pojo, Map<String, String> properties) {
SchemaDefinition<T> schemaDefinition = SchemaDefinition.<T>builder().withPojo(pojo).withProperties(properties).build();
- return new AvroSchema<>(createAvroSchema(schemaDefinition), schemaDefinition);
+ return new AvroSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.AVRO));
}
+ @Override
+ protected SchemaReader<T> loadReader(byte[] schemaVersion) {
+ SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
+ if (schemaInfo != null) {
+ return new AvroReader<>(parseAvroSchema(new String(schemaInfo.getSchema())), schema);
+ } else {
+ return reader;
+ }
+ }
+
+
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
index 629b769..4646465 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
@@ -25,12 +25,14 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.api.schema.SchemaReader;
+import org.apache.pulsar.client.api.schema.SchemaWriter;
+import org.apache.pulsar.client.impl.schema.reader.JsonReader;
+import org.apache.pulsar.client.impl.schema.writer.JsonWriter;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
-import java.io.IOException;
import java.util.Map;
/**
@@ -48,39 +50,17 @@ public class JSONSchema<T> extends StructSchema<T> {
});
private final Class<T> pojo;
- private final ObjectMapper objectMapper;
- private JSONSchema(org.apache.avro.Schema schema,
- SchemaDefinition<T> schemaDefinition) {
- super(
- SchemaType.JSON,
- schema,
- schemaDefinition.getProperties());
- this.pojo = schemaDefinition.getPojo();
- this.objectMapper = JSON_MAPPER.get();
+ private JSONSchema(SchemaInfo schemaInfo, Class<T> pojo) {
+ super(schemaInfo);
+ this.pojo = pojo;
+ setWriter(new JsonWriter<>(JSON_MAPPER.get()));
+ setReader(new JsonReader<>(JSON_MAPPER.get(), pojo));
}
@Override
- public byte[] encode(T message) throws SchemaSerializationException {
- try {
- return objectMapper.writeValueAsBytes(message);
- } catch (JsonProcessingException e) {
- throw new SchemaSerializationException(e);
- }
- }
-
- @Override
- public T decode(byte[] bytes) {
- try {
- return objectMapper.readValue(bytes, this.pojo);
- } catch (IOException e) {
- throw new SchemaSerializationException(e);
- }
- }
-
- @Override
- public SchemaInfo getSchemaInfo() {
- return this.schemaInfo;
+ protected SchemaReader<T> loadReader(byte[] schemaVersion) {
+ throw new RuntimeException("JSONSchema don't support schema versioning");
}
/**
@@ -108,9 +88,7 @@ public class JSONSchema<T> extends StructSchema<T> {
}
public static <T> JSONSchema<T> of(SchemaDefinition<T> schemaDefinition) {
- String jsonDef = schemaDefinition.getJsonDef();
- return jsonDef == null ? new JSONSchema<>(createAvroSchema(schemaDefinition), schemaDefinition) :
- new JSONSchema<>(parseAvroSchema(jsonDef), schemaDefinition);
+ return new JSONSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.JSON), schemaDefinition.getPojo());
}
public static <T> JSONSchema<T> of(Class<T> pojo) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
index de61dca..9328908 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
@@ -21,28 +21,32 @@ package org.apache.pulsar.client.impl.schema;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.Descriptors;
-import com.google.protobuf.Parser;
+import com.google.protobuf.GeneratedMessageV3;
import lombok.AllArgsConstructor;
import lombok.Getter;
-import org.apache.avro.protobuf.ProtobufDatumReader;
-import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.avro.protobuf.ProtobufData;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.api.schema.SchemaReader;
+import org.apache.pulsar.client.api.schema.SchemaWriter;
+import org.apache.pulsar.client.impl.schema.reader.ProtobufReader;
+import org.apache.pulsar.client.impl.schema.writer.ProtobufWriter;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.lang.reflect.InvocationTargetException;
-import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
/**
* A schema implementation to deal with protobuf generated messages.
*/
public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> extends StructSchema<T> {
- private Parser<T> tParser;
public static final String PARSING_INFO_PROPERTY = "__PARSING_INFO__";
@Getter
@@ -57,29 +61,19 @@ public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> ex
}
private static <T> org.apache.avro.Schema createProtobufAvroSchema(Class<T> pojo) {
- ProtobufDatumReader datumReader = new ProtobufDatumReader(pojo);
- return datumReader.getSchema();
+ return ProtobufData.get().getSchema(pojo);
}
- private ProtobufSchema(Map<String, String> properties, Class<T> pojo) {
- super(
- SchemaType.PROTOBUF,
- createProtobufAvroSchema(pojo),
- properties);
+ private ProtobufSchema(SchemaInfo schemaInfo, T protoMessageInstance) {
+ super(schemaInfo);
+ setReader(new ProtobufReader<>(protoMessageInstance));
+ setWriter(new ProtobufWriter<>());
// update properties with protobuf related properties
- try {
- T protoMessageInstance = (T) pojo.getMethod("getDefaultInstance").invoke(null);
- tParser = (Parser<T>) protoMessageInstance.getParserForType();
-
- Map<String, String> allProperties = new HashMap<>();
- allProperties.putAll(schemaInfo.getProperties());
- // set protobuf parsing info
- allProperties.put(PARSING_INFO_PROPERTY, getParsingInfo(protoMessageInstance));
-
- schemaInfo.setProperties(allProperties);
- } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
- throw new IllegalArgumentException(e);
- }
+ Map<String, String> allProperties = new HashMap<>();
+ allProperties.putAll(schemaInfo.getProperties());
+ // set protobuf parsing info
+ allProperties.put(PARSING_INFO_PROPERTY, getParsingInfo(protoMessageInstance));
+ schemaInfo.setProperties(allProperties);
}
private String getParsingInfo(T protoMessageInstance) {
@@ -101,38 +95,43 @@ public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> ex
}
@Override
- public byte[] encode(T message) {
- return message.toByteArray();
- }
+ protected SchemaReader<T> loadReader(byte[] schemaVersion) {
+ throw new RuntimeException("ProtobufSchema don't support schema versioning"); }
- @Override
- public T decode(byte[] bytes) {
- try {
- return this.tParser.parseFrom(bytes);
- } catch (Exception e) {
- throw new RuntimeException(new SchemaSerializationException(e));
- }
+ public static <T extends com.google.protobuf.GeneratedMessageV3> ProtobufSchema<T> of(Class<T> pojo) {
+ return of(pojo, new HashMap<>());
}
- @Override
- public SchemaInfo getSchemaInfo() {
- return schemaInfo;
+ public static <T> ProtobufSchema ofGenericClass(Class<T> pojo, Map<String, String> properties) {
+ SchemaDefinition<T> schemaDefinition = SchemaDefinition.<T>builder().withPojo(pojo).withProperties(properties).build();
+ return ProtobufSchema.of(schemaDefinition);
}
- public static <T extends com.google.protobuf.GeneratedMessageV3> ProtobufSchema<T> of(Class<T> pojo) {
- return of(pojo, Collections.emptyMap());
- }
+ public static <T> ProtobufSchema of(SchemaDefinition<T> schemaDefinition) {
+ Class<T> pojo = schemaDefinition.getPojo();
- public static ProtobufSchema ofGenericClass(Class pojo, Map<String, String> properties) {
if (!com.google.protobuf.GeneratedMessageV3.class.isAssignableFrom(pojo)) {
throw new IllegalArgumentException(com.google.protobuf.GeneratedMessageV3.class.getName()
+ " is not assignable from " + pojo.getName());
}
- return new ProtobufSchema<>(properties, pojo);
+
+ SchemaInfo schemaInfo = SchemaInfo.builder()
+ .schema(createProtobufAvroSchema(schemaDefinition.getPojo()).toString().getBytes(UTF_8))
+ .type(SchemaType.PROTOBUF)
+ .name("")
+ .properties(schemaDefinition.getProperties())
+ .build();
+
+ try {
+ return new ProtobufSchema(schemaInfo,
+ (GeneratedMessageV3) pojo.getMethod("getDefaultInstance").invoke(null));
+ }catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
+ throw new IllegalArgumentException(e);
+ }
}
public static <T extends com.google.protobuf.GeneratedMessageV3> ProtobufSchema<T> of(
- Class<T> pojo, Map<String, String> properties){
+ Class pojo, Map<String, String> properties){
return ofGenericClass(pojo, properties);
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java
index 2db6cf4..ea9ede5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java
@@ -55,6 +55,11 @@ public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T
*/
private String jsonDef;
+ /**
+ * The flag of message decode whether by schema version
+ */
+ private boolean supportSchemaVersioning = false;
+
@Override
public SchemaDefinitionBuilder<T> withAlwaysAllowNull(boolean alwaysAllowNull) {
this.alwaysAllowNull = alwaysAllowNull;
@@ -79,6 +84,11 @@ public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T
return this;
}
+ @Override
+ public SchemaDefinitionBuilder<T> withSupportSchemaVersioning(boolean supportSchemaVersioning) {
+ this.supportSchemaVersioning = supportSchemaVersioning;
+ return this;
+ }
@Override
public SchemaDefinitionBuilder<T> withProperties(Map<String,String> properties) {
@@ -89,7 +99,7 @@ public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T
@Override
public SchemaDefinition<T> build() {
properties.put(ALWAYS_ALLOW_NULL, this.alwaysAllowNull ? "true" : "false");
- return new SchemaDefinitionImpl(clazz, jsonDef, alwaysAllowNull, properties);
+ return new SchemaDefinitionImpl(clazz, jsonDef, alwaysAllowNull, properties, supportSchemaVersioning);
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java
index 04f1a24..8ace246 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.client.impl.schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.Map;
/**
@@ -48,11 +48,14 @@ public class SchemaDefinitionImpl<T> implements SchemaDefinition<T>{
private String jsonDef;
- public SchemaDefinitionImpl(Class<T> pojo, String jsonDef, boolean alwaysAllowNull, Map<String,String> properties) {
+ private boolean supportSchemaVersioning;
+
+ public SchemaDefinitionImpl(Class<T> pojo, String jsonDef, boolean alwaysAllowNull, Map<String,String> properties, boolean supportSchemaVersioning) {
this.alwaysAllowNull = alwaysAllowNull;
this.properties = properties;
this.jsonDef = jsonDef;
this.pojo = pojo;
+ this.supportSchemaVersioning = supportSchemaVersioning;
}
/**
* get schema whether always allow null or not
@@ -60,7 +63,6 @@ public class SchemaDefinitionImpl<T> implements SchemaDefinition<T>{
* @return schema always null or not
*/
public boolean getAlwaysAllowNull() {
-
return alwaysAllowNull;
}
@@ -70,7 +72,6 @@ public class SchemaDefinitionImpl<T> implements SchemaDefinition<T>{
* @return schema class
*/
public String getJsonDef() {
-
return jsonDef;
}
/**
@@ -83,16 +84,18 @@ public class SchemaDefinitionImpl<T> implements SchemaDefinition<T>{
return pojo;
}
+ @Override
+ public boolean getSupportSchemaVersioning() {
+ return supportSchemaVersioning;
+ }
+
/**
* Get schema class
*
* @return schema class
*/
public Map<String, String> getProperties() {
-
- return properties;
+ return Collections.unmodifiableMap(properties);
}
-
-
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
index 31156d4..89427d4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
@@ -20,13 +20,25 @@ package org.apache.pulsar.client.impl.schema;
import static java.nio.charset.StandardCharsets.UTF_8;
-import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import org.apache.avro.Schema.Parser;
import org.apache.avro.reflect.ReflectData;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
+import org.apache.pulsar.client.api.schema.SchemaReader;
+import org.apache.pulsar.client.api.schema.SchemaWriter;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This is a base schema implementation for `Struct` types.
@@ -38,39 +50,99 @@ import org.apache.pulsar.common.schema.SchemaType;
* {@link org.apache.pulsar.common.schema.SchemaType#JSON},
* and {@link org.apache.pulsar.common.schema.SchemaType#PROTOBUF}.
*/
-abstract class StructSchema<T> implements Schema<T> {
+public abstract class StructSchema<T> implements Schema<T> {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(StructSchema.class);
protected final org.apache.avro.Schema schema;
protected final SchemaInfo schemaInfo;
+ protected SchemaReader<T> reader;
+ protected SchemaWriter<T> writer;
+ protected SchemaInfoProvider schemaInfoProvider;
+ private final LoadingCache<byte[], SchemaReader<T>> readerCache = CacheBuilder.newBuilder().maximumSize(100000)
+ .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<byte[], SchemaReader<T>>() {
+ @Override
+ public SchemaReader<T> load(byte[] schemaVersion) {
+ return loadReader(schemaVersion);
+ }
+ });
- protected StructSchema(SchemaType schemaType,
- org.apache.avro.Schema schema,
- Map<String, String> properties) {
- this.schema = schema;
- this.schemaInfo = new SchemaInfo();
- this.schemaInfo.setName("");
- this.schemaInfo.setType(schemaType);
- this.schemaInfo.setSchema(this.schema.toString().getBytes(UTF_8));
- this.schemaInfo.setProperties(properties);
+ protected StructSchema(SchemaInfo schemaInfo) {
+ this.schema = parseAvroSchema(new String(schemaInfo.getSchema(), UTF_8));
+ this.schemaInfo = schemaInfo;
}
- protected org.apache.avro.Schema getAvroSchema() {
+ public org.apache.avro.Schema getAvroSchema() {
return schema;
}
@Override
+ public byte[] encode(T message) {
+ return writer.write(message);
+ }
+
+ @Override
+ public T decode(byte[] bytes) {
+ return reader.read(bytes);
+ }
+
+ @Override
+ public T decode(byte[] bytes, byte[] schemaVersion) {
+ try {
+ return readerCache.get(schemaVersion).read(bytes);
+ } catch (ExecutionException e) {
+ LOG.error("Can't get generic schema for topic {} schema version {}",
+ schemaInfoProvider.getTopicName(), Hex.encodeHexString(schemaVersion), e);
+ throw new RuntimeException("Can't get generic schema for topic " + schemaInfoProvider.getTopicName());
+ }
+ }
+
+ @Override
public SchemaInfo getSchemaInfo() {
return this.schemaInfo;
}
protected static org.apache.avro.Schema createAvroSchema(SchemaDefinition schemaDefinition) {
Class pojo = schemaDefinition.getPojo();
- return schemaDefinition.getAlwaysAllowNull() ? ReflectData.AllowNull.get().getSchema(pojo) : ReflectData.get().getSchema(pojo);
+
+ if (StringUtils.isNotBlank(schemaDefinition.getJsonDef())) {
+ return parseAvroSchema(schemaDefinition.getJsonDef());
+ } else if (pojo != null) {
+ return schemaDefinition.getAlwaysAllowNull() ? ReflectData.AllowNull.get().getSchema(pojo) : ReflectData.get().getSchema(pojo);
+ } else {
+ throw new RuntimeException("Schema definition must specify pojo class or schema json definition");
+ }
+ }
+
+ protected static org.apache.avro.Schema parseAvroSchema(String schemaJson) {
+ final Parser parser = new Parser();
+ return parser.parse(schemaJson);
+ }
+
+ protected static <T> SchemaInfo parseSchemaInfo(SchemaDefinition<T> schemaDefinition, SchemaType schemaType) {
+ return SchemaInfo.builder()
+ .schema(createAvroSchema(schemaDefinition).toString().getBytes(UTF_8))
+ .properties(schemaDefinition.getProperties())
+ .name("")
+ .type(schemaType).build();
+ }
+
+ public void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
+ this.schemaInfoProvider = schemaInfoProvider;
+ }
+
+ protected abstract SchemaReader<T> loadReader(byte[] schemaVersion);
+
+ protected void setWriter(SchemaWriter<T> writer) {
+ this.writer = writer;
+ }
+
+ protected void setReader(SchemaReader<T> reader) {
+ this.reader = reader;
}
- protected static org.apache.avro.Schema parseAvroSchema(String jsonDef) {
- Parser parser = new Parser();
- return parser.parse(jsonDef);
+ protected SchemaReader<T> getReader() {
+ return reader;
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java
similarity index 54%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java
index 7cca2ac..bc4f65e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java
@@ -18,69 +18,62 @@
*/
package org.apache.pulsar.client.impl.schema.generic;
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
+import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
-import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.client.api.schema.SchemaReader;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
-/**
- * A generic avro schema.
- */
-class GenericAvroSchema extends GenericSchemaImpl {
- private final GenericDatumWriter<org.apache.avro.generic.GenericRecord> datumWriter;
+public class GenericAvroReader implements SchemaReader<GenericRecord> {
+
+ private final GenericDatumReader<GenericAvroRecord> reader;
private BinaryEncoder encoder;
private final ByteArrayOutputStream byteArrayOutputStream;
- private final GenericDatumReader<org.apache.avro.generic.GenericRecord> datumReader;
-
- public GenericAvroSchema(SchemaInfo schemaInfo) {
- super(schemaInfo);
- this.byteArrayOutputStream = new ByteArrayOutputStream();
- this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, encoder);
- this.datumWriter = new GenericDatumWriter(schema);
- this.datumReader = new GenericDatumReader(schema);
+ private final List<Field> fields;
+ private final Schema schema;
+ private final byte[] schemaVersion;
+ public GenericAvroReader(Schema schema) {
+ this(null, schema, null);
}
- @Override
- public synchronized byte[] encode(GenericRecord message) {
- checkArgument(message instanceof GenericAvroRecord);
- GenericAvroRecord gar = (GenericAvroRecord) message;
- try {
- datumWriter.write(gar.getAvroRecord(), this.encoder);
- this.encoder.flush();
- return this.byteArrayOutputStream.toByteArray();
- } catch (Exception e) {
- throw new SchemaSerializationException(e);
- } finally {
- this.byteArrayOutputStream.reset();
+ public GenericAvroReader(Schema writerSchema, Schema readerSchema, byte[] schemaVersion) {
+ this.schema = readerSchema;
+ this.fields = schema.getFields()
+ .stream()
+ .map(f -> new Field(f.name(), f.pos()))
+ .collect(Collectors.toList());
+ this.schemaVersion = schemaVersion;
+ if (writerSchema == null) {
+ this.reader = new GenericDatumReader<>(readerSchema);
+ } else {
+ this.reader = new GenericDatumReader<>(writerSchema, readerSchema);
}
+ this.byteArrayOutputStream = new ByteArrayOutputStream();
+ this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, encoder);
}
@Override
- public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
+ public GenericAvroRecord read(byte[] bytes) {
try {
Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
- org.apache.avro.generic.GenericRecord avroRecord = datumReader.read(
- null,
- decoder);
+ org.apache.avro.generic.GenericRecord avroRecord =
+ (org.apache.avro.generic.GenericRecord)reader.read(
+ null,
+ decoder);
return new GenericAvroRecord(schemaVersion, schema, fields, avroRecord);
} catch (IOException e) {
throw new SchemaSerializationException(e);
}
}
-
- @Override
- public GenericRecordBuilder newRecordBuilder() {
- return new AvroRecordBuilderImpl(this);
- }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
index 7cca2ac..42344d0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
@@ -18,69 +18,39 @@
*/
package org.apache.pulsar.client.impl.schema.generic;
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.Decoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
+import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.common.schema.SchemaInfo;
/**
* A generic avro schema.
*/
-class GenericAvroSchema extends GenericSchemaImpl {
-
- private final GenericDatumWriter<org.apache.avro.generic.GenericRecord> datumWriter;
- private BinaryEncoder encoder;
- private final ByteArrayOutputStream byteArrayOutputStream;
- private final GenericDatumReader<org.apache.avro.generic.GenericRecord> datumReader;
+public class GenericAvroSchema extends GenericSchemaImpl {
public GenericAvroSchema(SchemaInfo schemaInfo) {
super(schemaInfo);
- this.byteArrayOutputStream = new ByteArrayOutputStream();
- this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, encoder);
- this.datumWriter = new GenericDatumWriter(schema);
- this.datumReader = new GenericDatumReader(schema);
+ setReader(new GenericAvroReader(schema));
+ setWriter(new GenericAvroWriter(schema));
}
@Override
- public synchronized byte[] encode(GenericRecord message) {
- checkArgument(message instanceof GenericAvroRecord);
- GenericAvroRecord gar = (GenericAvroRecord) message;
- try {
- datumWriter.write(gar.getAvroRecord(), this.encoder);
- this.encoder.flush();
- return this.byteArrayOutputStream.toByteArray();
- } catch (Exception e) {
- throw new SchemaSerializationException(e);
- } finally {
- this.byteArrayOutputStream.reset();
- }
+ public GenericRecordBuilder newRecordBuilder() {
+ return new AvroRecordBuilderImpl(this);
}
- @Override
- public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
- try {
- Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
- org.apache.avro.generic.GenericRecord avroRecord = datumReader.read(
- null,
- decoder);
- return new GenericAvroRecord(schemaVersion, schema, fields, avroRecord);
- } catch (IOException e) {
- throw new SchemaSerializationException(e);
- }
- }
@Override
- public GenericRecordBuilder newRecordBuilder() {
- return new AvroRecordBuilderImpl(this);
+ protected SchemaReader<GenericRecord> loadReader(byte[] schemaVersion) {
+ SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
+ if (schemaInfo != null) {
+ return new GenericAvroReader(
+ parseAvroSchema(new String(schemaInfo.getSchema())),
+ schema,
+ schemaVersion);
+ } else {
+ return reader;
+ }
}
+
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroWriter.java
similarity index 52%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroWriter.java
index 7cca2ac..28ad116 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroWriter.java
@@ -18,45 +18,32 @@
*/
package org.apache.pulsar.client.impl.schema.generic;
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.Decoder;
-import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
-import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.client.api.schema.SchemaWriter;
-/**
- * A generic avro schema.
- */
-class GenericAvroSchema extends GenericSchemaImpl {
+import java.io.ByteArrayOutputStream;
+
+public class GenericAvroWriter implements SchemaWriter<GenericRecord> {
- private final GenericDatumWriter<org.apache.avro.generic.GenericRecord> datumWriter;
+ private final GenericDatumWriter<org.apache.avro.generic.GenericRecord> writer;
private BinaryEncoder encoder;
private final ByteArrayOutputStream byteArrayOutputStream;
- private final GenericDatumReader<org.apache.avro.generic.GenericRecord> datumReader;
- public GenericAvroSchema(SchemaInfo schemaInfo) {
- super(schemaInfo);
+ public GenericAvroWriter(Schema schema) {
+ this.writer = new GenericDatumWriter<>(schema);
this.byteArrayOutputStream = new ByteArrayOutputStream();
this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, encoder);
- this.datumWriter = new GenericDatumWriter(schema);
- this.datumReader = new GenericDatumReader(schema);
}
@Override
- public synchronized byte[] encode(GenericRecord message) {
- checkArgument(message instanceof GenericAvroRecord);
- GenericAvroRecord gar = (GenericAvroRecord) message;
+ public synchronized byte[] write(GenericRecord message) {
try {
- datumWriter.write(gar.getAvroRecord(), this.encoder);
+ writer.write(((GenericAvroRecord)message).getAvroRecord(), this.encoder);
this.encoder.flush();
return this.byteArrayOutputStream.toByteArray();
} catch (Exception e) {
@@ -65,22 +52,4 @@ class GenericAvroSchema extends GenericSchemaImpl {
this.byteArrayOutputStream.reset();
}
}
-
- @Override
- public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
- try {
- Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
- org.apache.avro.generic.GenericRecord avroRecord = datumReader.read(
- null,
- decoder);
- return new GenericAvroRecord(schemaVersion, schema, fields, avroRecord);
- } catch (IOException e) {
- throw new SchemaSerializationException(e);
- }
- }
-
- @Override
- public GenericRecordBuilder newRecordBuilder() {
- return new AvroRecordBuilderImpl(this);
- }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java
similarity index 63%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java
index d15d563..1d2fced 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java
@@ -18,42 +18,37 @@
*/
package org.apache.pulsar.client.impl.schema.generic;
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.IOException;
import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
+import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.common.schema.SchemaInfo;
-/**
- * A generic json schema.
- */
-class GenericJsonSchema extends GenericSchemaImpl {
+import java.io.IOException;
+import java.util.List;
- private final ObjectMapper objectMapper;
+import static java.nio.charset.StandardCharsets.UTF_8;
- public GenericJsonSchema(SchemaInfo schemaInfo) {
- super(schemaInfo);
+public class GenericJsonReader implements SchemaReader<GenericRecord> {
+
+ private final ObjectMapper objectMapper;
+ private final byte[] schemaVersion;
+ private final List<Field> fields;
+ public GenericJsonReader(List<Field> fields){
+ this.fields = fields;
+ this.schemaVersion = null;
this.objectMapper = new ObjectMapper();
}
- @Override
- public byte[] encode(GenericRecord message) {
- checkArgument(message instanceof GenericAvroRecord);
- GenericJsonRecord gjr = (GenericJsonRecord) message;
- try {
- return objectMapper.writeValueAsBytes(gjr.getJsonNode().toString());
- } catch (IOException ioe) {
- throw new SchemaSerializationException(ioe);
- }
+ public GenericJsonReader(byte[] schemaVersion, List<Field> fields){
+ this.objectMapper = new ObjectMapper();
+ this.fields = fields;
+ this.schemaVersion = schemaVersion;
}
-
@Override
- public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
+ public GenericJsonRecord read(byte[] bytes) {
try {
JsonNode jn = objectMapper.readTree(new String(bytes, UTF_8));
return new GenericJsonRecord(schemaVersion, fields, jn);
@@ -61,9 +56,4 @@ class GenericJsonSchema extends GenericSchemaImpl {
throw new SchemaSerializationException(ioe);
}
}
-
- @Override
- public GenericRecordBuilder newRecordBuilder() {
- throw new UnsupportedOperationException("Json Schema doesn't support record builder yet");
- }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
index d15d563..14d939f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
@@ -18,47 +18,38 @@
*/
package org.apache.pulsar.client.impl.schema.generic;
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.nio.charset.StandardCharsets.UTF_8;
+import java.util.stream.Collectors;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.IOException;
-import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
+import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.common.schema.SchemaInfo;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
/**
* A generic json schema.
*/
class GenericJsonSchema extends GenericSchemaImpl {
- private final ObjectMapper objectMapper;
-
public GenericJsonSchema(SchemaInfo schemaInfo) {
super(schemaInfo);
- this.objectMapper = new ObjectMapper();
- }
-
- @Override
- public byte[] encode(GenericRecord message) {
- checkArgument(message instanceof GenericAvroRecord);
- GenericJsonRecord gjr = (GenericJsonRecord) message;
- try {
- return objectMapper.writeValueAsBytes(gjr.getJsonNode().toString());
- } catch (IOException ioe) {
- throw new SchemaSerializationException(ioe);
- }
+ setWriter(new GenericJsonWriter());
+ setReader(new GenericJsonReader(fields));
}
@Override
- public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
- try {
- JsonNode jn = objectMapper.readTree(new String(bytes, UTF_8));
- return new GenericJsonRecord(schemaVersion, fields, jn);
- } catch (IOException ioe) {
- throw new SchemaSerializationException(ioe);
+ protected SchemaReader<GenericRecord> loadReader(byte[] schemaVersion) {
+ SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
+ if (schemaInfo != null) {
+ return new GenericJsonReader(schemaVersion,
+ (parseAvroSchema(new String(schemaInfo.getSchema(), UTF_8)).getFields()
+ .stream()
+ .map(f -> new Field(f.name(), f.pos()))
+ .collect(Collectors.toList())));
+ } else {
+ return reader;
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/SchemaProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonWriter.java
similarity index 53%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/SchemaProvider.java
copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonWriter.java
index 18778e1..52b23e8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/SchemaProvider.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonWriter.java
@@ -18,19 +18,27 @@
*/
package org.apache.pulsar.client.impl.schema.generic;
-import org.apache.pulsar.client.api.Schema;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.SchemaWriter;
-/**
- * Schema Provider.
- */
-public interface SchemaProvider<T> {
+import java.io.IOException;
+
+public class GenericJsonWriter implements SchemaWriter<GenericRecord> {
+
+ private final ObjectMapper objectMapper;
- /**
- * Retrieve the schema instance of a given <tt>schemaVersion</tt>.
- *
- * @param schemaVersion schema version
- * @return schema instance of the provided <tt>schemaVersion</tt>
- */
- Schema<T> getSchema(byte[] schemaVersion);
+ public GenericJsonWriter() {
+ this.objectMapper = new ObjectMapper();
+ }
+ @Override
+ public byte[] write(GenericRecord message) {
+ try {
+ return objectMapper.writeValueAsBytes(((GenericJsonRecord)message).getJsonNode().toString());
+ } catch (IOException ioe) {
+ throw new SchemaSerializationException(ioe);
+ }
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
index 69b3e15..a8aa283 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
@@ -20,36 +20,27 @@ package org.apache.pulsar.client.impl.schema.generic;
import java.util.List;
import java.util.stream.Collectors;
-import org.apache.pulsar.client.api.Schema;
+
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.impl.schema.StructSchema;
import org.apache.pulsar.common.schema.SchemaInfo;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
/**
* A generic schema representation.
*/
-public abstract class GenericSchemaImpl implements GenericSchema {
+public abstract class GenericSchemaImpl extends StructSchema<GenericRecord> implements GenericSchema<GenericRecord> {
- protected final org.apache.avro.Schema schema;
protected final List<Field> fields;
- protected final SchemaInfo schemaInfo;
protected GenericSchemaImpl(SchemaInfo schemaInfo) {
- this.schemaInfo = schemaInfo;
- this.schema = new org.apache.avro.Schema.Parser().parse(
- new String(schemaInfo.getSchema(), UTF_8)
- );
- this.fields = schema.getFields()
- .stream()
- .map(f -> new Field(f.name(), f.pos()))
- .collect(Collectors.toList());
- }
+ super(schemaInfo);
- public org.apache.avro.Schema getAvroSchema() {
- return schema;
+ this.fields = schema.getFields()
+ .stream()
+ .map(f -> new Field(f.name(), f.pos()))
+ .collect(Collectors.toList());
}
@Override
@@ -57,11 +48,6 @@ public abstract class GenericSchemaImpl implements GenericSchema {
return fields;
}
- @Override
- public SchemaInfo getSchemaInfo() {
- return schemaInfo;
- }
-
/**
* Create a generic schema out of a <tt>SchemaInfo</tt>.
*
@@ -75,7 +61,7 @@ public abstract class GenericSchemaImpl implements GenericSchema {
case JSON:
return new GenericJsonSchema(schemaInfo);
default:
- throw new UnsupportedOperationException("Generic schema is not supported on schema type '"
+ throw new UnsupportedOperationException("Generic schema is not supported on schema type "
+ schemaInfo.getType() + "'");
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchema.java
deleted file mode 100644
index 457c281..0000000
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchema.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl.schema.generic;
-
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.impl.schema.BytesSchema;
-import org.apache.pulsar.common.schema.SchemaInfo;
-
-/**
- * A schema implementation that handles schema versioning.
- */
-public class MultiVersionGenericSchema implements Schema<GenericRecord> {
-
- private final SchemaProvider<GenericRecord> provider;
-
- MultiVersionGenericSchema(SchemaProvider<GenericRecord> provider) {
- this.provider = provider;
- }
-
- @Override
- public byte[] encode(GenericRecord message) {
- throw new UnsupportedOperationException("This schema implementation is only used for AUTO_CONSUME");
- }
-
- @Override
- public boolean supportSchemaVersioning() {
- return true;
- }
-
- @Override
- public GenericRecord decode(byte[] bytes) {
- return provider.getSchema(null).decode(bytes);
- }
-
- @Override
- public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
- return provider.getSchema(schemaVersion).decode(bytes, schemaVersion);
- }
-
- @Override
- public SchemaInfo getSchemaInfo() {
- // simulate it is a bytes schema
- return BytesSchema.of().getSchemaInfo();
- }
-}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java
similarity index 61%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProvider.java
rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java
index fbf6c19..12e8956 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProvider.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java
@@ -21,8 +21,7 @@ package org.apache.pulsar.client.impl.schema.generic;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
-import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;
@@ -37,28 +36,28 @@ import java.util.concurrent.TimeUnit;
/**
* Multi version generic schema provider by guava cache.
*/
-public class MultiVersionGenericSchemaProvider implements SchemaProvider<GenericRecord> {
+public class MultiVersionSchemaInfoProvider implements SchemaInfoProvider {
- private static final Logger LOG = LoggerFactory.getLogger(MultiVersionGenericSchemaProvider.class);
+ private static final Logger LOG = LoggerFactory.getLogger(MultiVersionSchemaInfoProvider.class);
private final TopicName topicName;
private final PulsarClientImpl pulsarClient;
- private final LoadingCache<byte[], GenericSchema> cache = CacheBuilder.newBuilder().maximumSize(100000)
- .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<byte[], GenericSchema>() {
+ private final LoadingCache<byte[], SchemaInfo> cache = CacheBuilder.newBuilder().maximumSize(100000)
+ .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<byte[], SchemaInfo>() {
@Override
- public GenericSchema load(byte[] schemaVersion) throws Exception {
+ public SchemaInfo load(byte[] schemaVersion) throws Exception {
return loadSchema(schemaVersion);
}
});
- public MultiVersionGenericSchemaProvider(TopicName topicName, PulsarClientImpl pulsarClient) {
+ public MultiVersionSchemaInfoProvider(TopicName topicName, PulsarClientImpl pulsarClient) {
this.topicName = topicName;
this.pulsarClient = pulsarClient;
}
@Override
- public GenericSchema getSchema(byte[] schemaVersion) {
+ public SchemaInfo getSchemaByVersion(byte[] schemaVersion) {
try {
if (null == schemaVersion) {
return null;
@@ -67,18 +66,32 @@ public class MultiVersionGenericSchemaProvider implements SchemaProvider<Generic
} catch (ExecutionException e) {
LOG.error("Can't get generic schema for topic {} schema version {}",
topicName.toString(), new String(schemaVersion, StandardCharsets.UTF_8), e);
- return null;
+ throw new RuntimeException("Can't get generic schema for topic " + topicName.toString());
}
}
- private GenericSchema loadSchema(byte[] schemaVersion) throws ExecutionException, InterruptedException {
- Optional<SchemaInfo> schemaInfo = pulsarClient.getLookup()
- .getSchema(topicName, schemaVersion).get();
- return schemaInfo.map(GenericSchemaImpl::of).orElse(null);
+ @Override
+ public SchemaInfo getLatestSchema() {
+ try {
+ Optional<SchemaInfo> optional = pulsarClient.getLookup()
+ .getSchema(topicName).get();
+ return optional.orElse(null);
+ } catch (ExecutionException | InterruptedException e) {
+ LOG.error("Can't get current schema for topic {}",
+ topicName.toString(), e);
+ throw new RuntimeException("Can't get current schema for topic " + topicName.toString());
+ }
}
- public TopicName getTopic() {
- return topicName;
+ @Override
+ public String getTopicName() {
+ return topicName.getLocalName();
+ }
+
+ private SchemaInfo loadSchema(byte[] schemaVersion) throws ExecutionException, InterruptedException {
+ Optional<SchemaInfo> optional = pulsarClient.getLookup()
+ .getSchema(topicName, schemaVersion).get();
+ return optional.orElse(null);
}
public PulsarClientImpl getPulsarClient() {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java
new file mode 100644
index 0000000..c502df5
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.reader;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.SchemaReader;
+
+import java.io.IOException;
+
+public class AvroReader<T> implements SchemaReader<T> {
+
+ private ReflectDatumReader<T> reader;
+ private static final ThreadLocal<BinaryDecoder> decoders =
+ new ThreadLocal<>();
+
+ public AvroReader(Schema schema) {
+ this.reader = new ReflectDatumReader<>(schema);
+ }
+
+ public AvroReader(Schema writerSchema, Schema readerSchema) {
+ this.reader = new ReflectDatumReader<>(writerSchema, readerSchema);
+ }
+
+ @Override
+ public T read(byte[] bytes) {
+ try {
+ BinaryDecoder decoderFromCache = decoders.get();
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, decoderFromCache);
+ if (decoderFromCache == null) {
+ decoders.set(decoder);
+ }
+ return reader.read(null, DecoderFactory.get().binaryDecoder(bytes, decoder));
+ } catch (IOException e) {
+ throw new SchemaSerializationException(e);
+ }
+ }
+
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/JsonReader.java
similarity index 51%
copy from pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/JsonReader.java
index abd9dea..05ef7cb 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/JsonReader.java
@@ -16,38 +16,29 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.schema;
+package org.apache.pulsar.client.impl.schema.reader;
-import java.util.Collections;
-import java.util.Map;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.SchemaReader;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.NoArgsConstructor;
-import lombok.experimental.Accessors;
+import java.io.IOException;
-@Data
-@AllArgsConstructor
-@NoArgsConstructor
-@Accessors(chain = true)
-public class SchemaInfo {
+public class JsonReader<T> implements SchemaReader<T> {
+ private final Class<T> pojo;
+ private final ObjectMapper objectMapper;
- @EqualsAndHashCode.Exclude
- private String name;
+ public JsonReader(ObjectMapper objectMapper, Class<T> pojo) {
+ this.pojo = pojo;
+ this.objectMapper = objectMapper;
+ }
- /**
- * The schema data in AVRO JSON format
- */
- private byte[] schema;
-
- /**
- * The type of schema (AVRO, JSON, PROTOBUF, etc..)
- */
- private SchemaType type;
-
- /**
- * Additional properties of the schema definition (implementation defined)
- */
- private Map<String, String> properties = Collections.emptyMap();
+ @Override
+ public T read(byte[] bytes) {
+ try {
+ return objectMapper.readValue(bytes, this.pojo);
+ } catch (IOException e) {
+ throw new SchemaSerializationException(e);
+ }
+ }
}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java
similarity index 51%
copy from pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java
index abd9dea..bca952e 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java
@@ -16,38 +16,27 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.schema;
+package org.apache.pulsar.client.impl.schema.reader;
-import java.util.Collections;
-import java.util.Map;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Parser;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.SchemaReader;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.NoArgsConstructor;
-import lombok.experimental.Accessors;
+public class ProtobufReader<T extends com.google.protobuf.GeneratedMessageV3> implements SchemaReader<T> {
+ private Parser<T> tParser;
-@Data
-@AllArgsConstructor
-@NoArgsConstructor
-@Accessors(chain = true)
-public class SchemaInfo {
+ public ProtobufReader(T protoMessageInstance) {
+ tParser = (Parser<T>) (protoMessageInstance).getParserForType();
+ }
- @EqualsAndHashCode.Exclude
- private String name;
+ @Override
+ public T read(byte[] bytes) {
+ try {
+ return this.tParser.parseFrom(bytes);
+ } catch (InvalidProtocolBufferException e) {
+ throw new SchemaSerializationException(e);
+ }
+ }
- /**
- * The schema data in AVRO JSON format
- */
- private byte[] schema;
-
- /**
- * The type of schema (AVRO, JSON, PROTOBUF, etc..)
- */
- private SchemaType type;
-
- /**
- * Additional properties of the schema definition (implementation defined)
- */
- private Map<String, String> properties = Collections.emptyMap();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/AvroWriter.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/AvroWriter.java
new file mode 100644
index 0000000..912bca7
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/AvroWriter.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.writer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.SchemaWriter;
+
+import java.io.ByteArrayOutputStream;
+
+public class AvroWriter<T> implements SchemaWriter<T> {
+ private final ReflectDatumWriter<T> writer;
+ private BinaryEncoder encoder;
+ private ByteArrayOutputStream byteArrayOutputStream;
+
+ public AvroWriter(Schema schema) {
+ this.byteArrayOutputStream = new ByteArrayOutputStream();
+ this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, this.encoder);
+ this.writer = new ReflectDatumWriter<>(schema);
+ }
+
+ @Override
+ public synchronized byte[] write(T message) {
+ try {
+ writer.write(message, this.encoder);
+ this.encoder.flush();
+ return this.byteArrayOutputStream.toByteArray();
+ } catch (Exception e) {
+ throw new SchemaSerializationException(e);
+ } finally {
+ this.byteArrayOutputStream.reset();
+ }
+ }
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/GenericSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/JsonWriter.java
similarity index 52%
copy from pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/GenericSchema.java
copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/JsonWriter.java
index 8b5137d..d828c3f 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/GenericSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/JsonWriter.java
@@ -16,28 +16,27 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.client.api.schema;
+package org.apache.pulsar.client.impl.schema.writer;
-import java.util.List;
-import org.apache.pulsar.client.api.Schema;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.SchemaWriter;
-/**
- * A schema that serializes and deserializes between {@link GenericRecord} and bytes.
- */
-public interface GenericSchema extends Schema<GenericRecord> {
+public class JsonWriter<T> implements SchemaWriter<T> {
- /**
- * Returns the list of fields.
- *
- * @return the list of fields of generic record.
- */
- List<Field> getFields();
+ private final ObjectMapper objectMapper;
- /**
- * Create a builder to build {@link GenericRecord}.
- *
- * @return generic record builder
- */
- GenericRecordBuilder newRecordBuilder();
+ public JsonWriter(ObjectMapper objectMapper) {
+ this.objectMapper = objectMapper;
+ }
+ @Override
+ public byte[] write(T message) {
+ try {
+ return objectMapper.writeValueAsBytes(message);
+ } catch (JsonProcessingException e) {
+ throw new SchemaSerializationException(e);
+ }
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/SchemaProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufWriter.java
similarity index 66%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/SchemaProvider.java
rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufWriter.java
index 18778e1..8c9e8f8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/SchemaProvider.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufWriter.java
@@ -16,21 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.client.impl.schema.generic;
+package org.apache.pulsar.client.impl.schema.writer;
-import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaWriter;
-/**
- * Schema Provider.
- */
-public interface SchemaProvider<T> {
-
- /**
- * Retrieve the schema instance of a given <tt>schemaVersion</tt>.
- *
- * @param schemaVersion schema version
- * @return schema instance of the provided <tt>schemaVersion</tt>
- */
- Schema<T> getSchema(byte[] schemaVersion);
+public class ProtobufWriter<T extends com.google.protobuf.GeneratedMessageV3> implements SchemaWriter<T> {
+ @Override
+ public byte[] write(T message) {
+ return message.toByteArray();
+ }
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java
index 5a9716e..57ccd56 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java
@@ -22,13 +22,13 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
-import org.apache.pulsar.client.impl.schema.ProtobufSchema;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.proto.Function;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Collections;
+import java.util.HashMap;
@Slf4j
public class ProtobufSchemaTest {
@@ -46,15 +46,14 @@ public class ProtobufSchemaTest {
"\"type\":\"double\",\"default\":0}]}],\"default\":null},{\"name\":\"repeatedField\"," +
"\"type\":{\"type\":\"array\",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}}]}";
- private static final String EXPECTED_PARSING_INFO = "{\"__PARSING_INFO__\":\"[{\\\"number\\\":1," +
- "\\\"name\\\":\\\"stringField\\\",\\\"type\\\":\\\"STRING\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\"," +
- "\\\"definition\\\":null},{\\\"number\\\":2,\\\"name\\\":\\\"doubleField\\\",\\\"type\\\":\\\"DOUBLE\\\"," +
- "\\\"label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition\\\":null},{\\\"number\\\":6," +
- "\\\"name\\\":\\\"intField\\\",\\\"type\\\":\\\"INT32\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\"," +
- "\\\"definition\\\":null},{\\\"number\\\":4,\\\"name\\\":\\\"testEnum\\\",\\\"type\\\":\\\"ENUM\\\"," +
- "\\\"label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition\\\":null},{\\\"number\\\":5," +
- "\\\"name\\\":\\\"nestedField\\\",\\\"type\\\":\\\"MESSAGE\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\"," +
- "\\\"definition\\\":null},{\\\"number\\\":10,\\\"name\\\":\\\"repeatedField\\\"," +
+ private static final String EXPECTED_PARSING_INFO = "{\"__alwaysAllowNull\":\"true\",\"__PARSING_INFO__\":" +
+ "\"[{\\\"number\\\":1,\\\"name\\\":\\\"stringField\\\",\\\"type\\\":\\\"STRING\\\",\\\"label\\\":\\\"" +
+ "LABEL_OPTIONAL\\\",\\\"definition\\\":null},{\\\"number\\\":2,\\\"name\\\":\\\"doubleField\\\",\\\"type\\\"" +
+ ":\\\"DOUBLE\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition\\\":null},{\\\"number\\\":6,\\\"name\\\"" +
+ ":\\\"intField\\\",\\\"type\\\":\\\"INT32\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition\\\":null}," +
+ "{\\\"number\\\":4,\\\"name\\\":\\\"testEnum\\\",\\\"type\\\":\\\"ENUM\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\"," +
+ "\\\"definition\\\":null},{\\\"number\\\":5,\\\"name\\\":\\\"nestedField\\\",\\\"type\\\":\\\"MESSAGE\\\",\\\"" +
+ "label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition\\\":null},{\\\"number\\\":10,\\\"name\\\":\\\"repeatedField\\\"," +
"\\\"type\\\":\\\"STRING\\\",\\\"label\\\":\\\"LABEL_REPEATED\\\",\\\"definition\\\":null}]\"}";
@Test
@@ -89,7 +88,7 @@ public class ProtobufSchemaTest {
try {
ProtobufSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage> protobufSchema
= ProtobufSchema.ofGenericClass(org.apache.pulsar.client.schema.proto.Test.TestMessage.class,
- Collections.emptyMap());
+ new HashMap<>());
} catch (Exception e) {
Assert.fail("Should not construct a ProtobufShema over a non-protobuf-generated class");
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
index 456416f..207fd98 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
@@ -179,7 +179,7 @@ public class SchemaBuilderTest {
SchemaInfo schemaInfo = recordSchemaBuilder.build(
SchemaType.AVRO
);
- GenericSchema schema = Schema.generic(schemaInfo);
+ GenericSchema<GenericRecord> schema = Schema.generic(schemaInfo);
GenericRecord record = schema.newRecordBuilder()
.set(schema.getFields().get(0), 32)
.set(schema.getFields().get(1), 1234L)
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaTestUtils.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaTestUtils.java
index 98d5367..a7abc74 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaTestUtils.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaTestUtils.java
@@ -47,6 +47,14 @@ public class SchemaTestUtils {
@AvroDefault("\"defaultValue\"")
private String fieldUnableNull;
}
+ @Data
+ @ToString
+ @EqualsAndHashCode
+ public static class FooV2 {
+ @Nullable
+ private String field1;
+ private int field3;
+ }
@Data
@ToString
@@ -126,4 +134,8 @@ public class SchemaTestUtils {
"fieldUnableNull"
};
+ public static String TEST_MULTI_VERSION_SCHEMA_STRING = "TEST";
+
+ public static String TEST_MULTI_VERSION_SCHEMA_DEFAULT_STRING = "defaultValue";
+
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningAvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningAvroSchemaTest.java
new file mode 100644
index 0000000..47268a1
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningAvroSchemaTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.testng.Assert.assertEquals;
+
+public class SupportVersioningAvroSchemaTest {
+ private MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider;
+ private AvroSchema schema;
+ private GenericAvroSchema genericAvroSchema;
+ private AvroSchema<SchemaTestUtils.FooV2> avroFooV2Schema;
+
+
+ @BeforeMethod
+ public void setup() {
+ this.multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class);
+ avroFooV2Schema = AvroSchema.of(SchemaDefinition.<SchemaTestUtils.FooV2>builder()
+ .withAlwaysAllowNull(false).withPojo(SchemaTestUtils.FooV2.class).build());
+ this.schema = AvroSchema.of(SchemaDefinition.builder()
+ .withPojo(SchemaTestUtils.Foo.class)
+ .withAlwaysAllowNull(false)
+ .withSupportSchemaVersioning(true)
+ .build());
+ schema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
+ SchemaInfo schemaInfo = avroFooV2Schema.schemaInfo;
+ genericAvroSchema = new GenericAvroSchema(schemaInfo);
+ }
+
+ @Test
+ public void testDecode() {
+ when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
+ .thenReturn(genericAvroSchema.getSchemaInfo());
+ SchemaTestUtils.FooV2 fooV2 = new SchemaTestUtils.FooV2();
+ fooV2.setField1(SchemaTestUtils.TEST_MULTI_VERSION_SCHEMA_STRING);
+ SchemaTestUtils.Foo foo = (SchemaTestUtils.Foo)schema.decode(avroFooV2Schema.encode(fooV2), new byte[10]);
+ assertEquals(SchemaTestUtils.TEST_MULTI_VERSION_SCHEMA_STRING, foo.getField1());
+ assertEquals(SchemaTestUtils.TEST_MULTI_VERSION_SCHEMA_DEFAULT_STRING, foo.getFieldUnableNull());
+ }
+
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReaderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReaderTest.java
new file mode 100644
index 0000000..d77d0f2
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReaderTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.generic;
+
+import static org.testng.Assert.assertEquals;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.SchemaTestUtils;
+import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
+import org.apache.pulsar.client.impl.schema.SchemaTestUtils.FooV2;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class GenericAvroReaderTest {
+
+ private Foo foo;
+ private FooV2 fooV2;
+ private AvroSchema fooSchemaNotNull;
+ private AvroSchema fooSchema;
+ private AvroSchema fooV2Schema;
+
+
+ @BeforeMethod
+ public void setup() {
+ fooSchema = AvroSchema.of(Foo.class);
+ fooV2Schema = AvroSchema.of(FooV2.class);
+ fooSchemaNotNull = AvroSchema.of(SchemaDefinition
+ .builder()
+ .withAlwaysAllowNull(false)
+ .withPojo(Foo.class)
+ .build());
+
+ foo = new Foo();
+ foo.setField1("foo1");
+ foo.setField2("bar1");
+ foo.setField4(new SchemaTestUtils.Bar());
+ foo.setFieldUnableNull("notNull");
+
+ fooV2 = new FooV2();
+ fooV2.setField1("foo1");
+ fooV2.setField3(10);
+ }
+
+ @Test
+ public void testGenericAvroReaderByWriterSchema() {
+ byte[] fooBytes = fooSchema.encode(foo);
+
+ GenericAvroReader genericAvroSchemaByWriterSchema = new GenericAvroReader(fooSchema.getAvroSchema());
+ GenericRecord genericRecordByWriterSchema = genericAvroSchemaByWriterSchema.read(fooBytes);
+ assertEquals(genericRecordByWriterSchema.getField("field1"), "foo1");
+ assertEquals(genericRecordByWriterSchema.getField("field2"), "bar1");
+ assertEquals(genericRecordByWriterSchema.getField("fieldUnableNull"), "notNull");
+ }
+
+ @Test
+ public void testGenericAvroReaderByReaderSchema() {
+ byte[] fooV2Bytes = fooV2Schema.encode(fooV2);
+
+ GenericAvroReader genericAvroSchemaByReaderSchema = new GenericAvroReader(fooV2Schema.getAvroSchema(), fooSchemaNotNull.getAvroSchema(), new byte[10]);
+ GenericRecord genericRecordByReaderSchema = genericAvroSchemaByReaderSchema.read(fooV2Bytes);
+ assertEquals(genericRecordByReaderSchema.getField("fieldUnableNull"), "defaultValue");
+ assertEquals(genericRecordByReaderSchema.getField("field1"), "foo1");
+ assertEquals(genericRecordByReaderSchema.getField("field3"), 10);
+ }
+
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
index ed554cc..79c2486 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
@@ -19,12 +19,16 @@
package org.apache.pulsar.client.impl.schema.generic;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
@@ -39,34 +43,46 @@ public class GenericSchemaImplTest {
@Test
public void testGenericAvroSchema() {
Schema<Foo> encodeSchema = Schema.AVRO(Foo.class);
- GenericSchemaImpl decodeSchema = GenericSchemaImpl.of(encodeSchema.getSchemaInfo());
+ GenericSchema decodeSchema = GenericSchemaImpl.of(encodeSchema.getSchemaInfo());
testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
}
@Test
public void testGenericJsonSchema() {
Schema<Foo> encodeSchema = Schema.JSON(Foo.class);
- GenericSchemaImpl decodeSchema = GenericSchemaImpl.of(encodeSchema.getSchemaInfo());
+ GenericSchema decodeSchema = GenericSchemaImpl.of(encodeSchema.getSchemaInfo());
testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
}
@Test
public void testAutoAvroSchema() {
- Schema<Foo> encodeSchema = Schema.AVRO(Foo.class);
+ MultiVersionSchemaInfoProvider multiVersionGenericSchemaProvider = mock(MultiVersionSchemaInfoProvider.class);
AutoConsumeSchema decodeSchema = new AutoConsumeSchema();
- decodeSchema.setSchema(GenericSchemaImpl.of(encodeSchema.getSchemaInfo()));
- testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
+ Schema<Foo> encodeSchema = Schema.AVRO(Foo.class);
+ GenericSchema genericSchema = GenericSchemaImpl.of(encodeSchema.getSchemaInfo());
+ genericSchema.setSchemaInfoProvider(multiVersionGenericSchemaProvider);
+ decodeSchema.setSchema(genericSchema);
+ when(multiVersionGenericSchemaProvider.getSchemaByVersion(any(byte[].class)))
+ .thenReturn(genericSchema.getSchemaInfo());
+
+ testAUTOEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
}
@Test
public void testAutoJsonSchema() {
+ MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider = mock(MultiVersionSchemaInfoProvider.class);
Schema<Foo> encodeSchema = Schema.JSON(Foo.class);
+ GenericSchema genericSchema = GenericSchemaImpl.of(encodeSchema.getSchemaInfo());
+ genericSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
AutoConsumeSchema decodeSchema = new AutoConsumeSchema();
- decodeSchema.setSchema(GenericSchemaImpl.of(encodeSchema.getSchemaInfo()));
- testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
+ decodeSchema.setSchema(genericSchema);
+ GenericSchema genericAvroSchema = GenericSchemaImpl.of(Schema.AVRO(Foo.class).getSchemaInfo());
+ when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
+ .thenReturn(genericAvroSchema.getSchemaInfo());
+ testAUTOEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
}
- public void testEncodeAndDecodeGenericRecord(Schema<Foo> encodeSchema,
+ public void testAUTOEncodeAndDecodeGenericRecord(Schema<Foo> encodeSchema,
Schema<GenericRecord> decodeSchema) {
int numRecords = 10;
for (int i = 0; i < numRecords; i++) {
@@ -82,6 +98,38 @@ public class GenericSchemaImplTest {
log.info("Decoding : {}", new String(data, UTF_8));
+ GenericRecord record = decodeSchema.decode(data, new byte[10]);
+ Object field1 = record.getField("field1");
+ assertEquals("field-1-" + i, field1, "Field 1 is " + field1.getClass());
+ Object field2 = record.getField("field2");
+ assertEquals("field-2-" + i, field2, "Field 2 is " + field2.getClass());
+ Object field3 = record.getField("field3");
+ assertEquals(i, field3, "Field 3 is " + field3.getClass());
+ Object field4 = record.getField("field4");
+ assertTrue(field4 instanceof GenericRecord);
+ GenericRecord field4Record = (GenericRecord) field4;
+ assertEquals(i % 2 == 0, field4Record.getField("field1"));
+ Object fieldUnableNull = record.getField("fieldUnableNull");
+ assertEquals("fieldUnableNull-1-" + i, fieldUnableNull, "fieldUnableNull 1 is " + fieldUnableNull.getClass());
+ }
+ }
+
+ public void testEncodeAndDecodeGenericRecord(Schema<Foo> encodeSchema,
+ Schema<GenericRecord> decodeSchema) {
+ int numRecords = 10;
+ for (int i = 0; i < numRecords; i++) {
+ Foo foo = new Foo();
+ foo.setField1("field-1-" + i);
+ foo.setField2("field-2-" + i);
+ foo.setField3(i);
+ Bar bar = new Bar();
+ bar.setField1(i % 2 == 0);
+ foo.setField4(bar);
+ foo.setFieldUnableNull("fieldUnableNull-1-" + i);
+ byte[] data = encodeSchema.encode(foo);
+
+ log.info("Decoding : {}", new String(data, UTF_8));
+
GenericRecord record = decodeSchema.decode(data);
Object field1 = record.getField("field1");
assertEquals("field-1-" + i, field1, "Field 1 is " + field1.getClass());
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaTest.java
deleted file mode 100644
index d2bce9d..0000000
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl.schema.generic;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertSame;
-import static org.testng.Assert.assertTrue;
-
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-/**
- * Unit test {@link MultiVersionGenericSchema}.
- */
-public class MultiVersionGenericSchemaTest {
-
- private SchemaProvider<GenericRecord> schemaProvider;
- private MultiVersionGenericSchema schema;
-
- @BeforeMethod
- public void setup() {
- this.schemaProvider = mock(SchemaProvider.class);
- this.schema = new MultiVersionGenericSchema(schemaProvider);
- }
-
- @Test(expectedExceptions = UnsupportedOperationException.class)
- public void testEncode() {
- this.schema.encode(mock(GenericRecord.class));
- }
-
- @Test
- public void testSupportSchemaVersioning() {
- assertTrue(schema.supportSchemaVersioning());
- }
-
- @Test
- public void testGetSchemaInfo() {
- assertEquals(new byte[0], schema.getSchemaInfo().getSchema());
- }
-
- @Test
- public void testDecode() {
- Schema<GenericRecord> mockSchema = mock(Schema.class);
- when(schemaProvider.getSchema(any(byte[].class)))
- .thenReturn(mockSchema);
- when(schemaProvider.getSchema(eq(null)))
- .thenReturn(mockSchema);
-
- GenericRecord mockRecord = mock(GenericRecord.class);
- when(mockSchema.decode(any(byte[].class), any(byte[].class)))
- .thenReturn(mockRecord);
- when(mockSchema.decode(any(byte[].class)))
- .thenReturn(mockRecord);
-
- assertSame(
- mockRecord, schema.decode(new byte[0]));
- verify(mockSchema, times(1))
- .decode(any(byte[].class));
-
- assertSame(
- mockRecord, schema.decode(new byte[0], new byte[0]));
- verify(mockSchema, times(1))
- .decode(any(byte[].class), any(byte[].class));
- }
-
-}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProviderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
similarity index 85%
rename from pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProviderTest.java
rename to pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
index aedc0bd..ed14580 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProviderTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
@@ -29,7 +29,6 @@ import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils;
-import org.apache.pulsar.client.tutorial.JsonPojo;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.testng.annotations.BeforeMethod;
@@ -39,17 +38,17 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
/**
- * Unit test for {@link MultiVersionGenericSchemaProvider}.
+ * Unit test for {@link MultiVersionSchemaInfoProvider}.
*/
-public class MultiVersionGenericSchemaProviderTest {
+public class MultiVersionSchemaInfoProviderTest {
- private MultiVersionGenericSchemaProvider schemaProvider;
+ private MultiVersionSchemaInfoProvider schemaProvider;
@BeforeMethod
public void setup() {
PulsarClientImpl client = mock(PulsarClientImpl.class);
when(client.getLookup()).thenReturn(mock(LookupService.class));
- schemaProvider = new MultiVersionGenericSchemaProvider(
+ schemaProvider = new MultiVersionSchemaInfoProvider(
TopicName.get("persistent://public/default/my-topic"), client);
}
@@ -63,7 +62,7 @@ public class MultiVersionGenericSchemaProviderTest {
any(TopicName.class),
any(byte[].class)))
.thenReturn(completableFuture);
- GenericSchema schema = schemaProvider.getSchema(new byte[0]);
- assertEquals(schema.getSchemaInfo(), schemaInfo);
+ SchemaInfo schemaInfoByVersion = schemaProvider.getSchemaByVersion(new byte[0]);
+ assertEquals(schemaInfoByVersion, schemaInfo);
}
}
diff --git a/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java b/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java
index 984f658..58388ef 100644
--- a/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java
+++ b/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java
@@ -18,8 +18,6 @@
*/
package org.apache.pulsar.io.hbase.sink;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@@ -31,10 +29,12 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.functions.api.Record;
@@ -52,12 +52,15 @@ import java.util.List;
import java.util.Map;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* hbase Sink test
*/
@Slf4j
public class HbaseGenericRecordSinkTest {
+ private Message<GenericRecord> message;
+
/**
* A Simple class to test hbase class
@@ -84,6 +87,9 @@ public class HbaseGenericRecordSinkTest {
@Test(enabled = false)
public void TestOpenAndWriteSink() throws Exception {
+ message = mock(MessageImpl.class);
+ GenericSchema<GenericRecord> genericAvroSchema;
+
Map<String, Object> map = new HashMap<>();
map.put("zookeeperQuorum", "localhost");
map.put("zookeeperClientPort", "2181");
@@ -112,13 +118,12 @@ public class HbaseGenericRecordSinkTest {
AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
byte[] bytes = schema.encode(obj);
- ByteBuf payload = Unpooled.copiedBuffer(bytes);
AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
autoConsumeSchema.setSchema(GenericSchemaImpl.of(schema.getSchemaInfo()));
PulsarSourceConfig pulsarSourceConfig = new PulsarSourceConfig();
Consumer consumer = mock(Consumer.class);
- Message<GenericRecord> message = new MessageImpl("fake_topic_name", "11:111", map, payload, autoConsumeSchema);
+
Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
.message(message)
.topicName("fake_topic_name")
@@ -136,6 +141,11 @@ public class HbaseGenericRecordSinkTest {
})
.build();
+ genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
+
+ when(message.getValue())
+ .thenReturn(genericAvroSchema.decode(bytes));
+
log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
obj.toString(),
message.getValue().toString(),
diff --git a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java
index c95a9ed..fe8941e 100644
--- a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java
+++ b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/InfluxDBGenericRecordSinkTest.java
@@ -27,9 +27,11 @@ import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;
@@ -57,6 +59,8 @@ import static org.mockito.Mockito.when;
@Slf4j
public class InfluxDBGenericRecordSinkTest {
+ private Message<GenericRecord> message;
+
/**
* A Simple class to test InfluxDB class
*/
@@ -107,6 +111,8 @@ public class InfluxDBGenericRecordSinkTest {
@Test
public void testOpenAndWrite() throws Exception {
+ message = mock(MessageImpl.class);
+ GenericSchema<GenericRecord> genericAvroSchema;
// prepare a cpu Record
Cpu cpu = new Cpu();
cpu.setMeasurement("cpu");
@@ -121,17 +127,19 @@ public class InfluxDBGenericRecordSinkTest {
AvroSchema<Cpu> schema = AvroSchema.of(Cpu.class);
byte[] bytes = schema.encode(cpu);
- ByteBuf payload = Unpooled.copiedBuffer(bytes);
AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
autoConsumeSchema.setSchema(GenericSchemaImpl.of(schema.getSchemaInfo()));
- Message<GenericRecord> message = new MessageImpl("influx_cpu", "77:777",
- configMap, payload, autoConsumeSchema);
Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
.message(message)
.topicName("influx_cpu")
.build();
+ genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
+
+ when(message.getValue())
+ .thenReturn(genericAvroSchema.decode(bytes));
+
log.info("cpu:{}, Message.getValue: {}, record.getValue: {}",
cpu.toString(),
message.getValue().toString(),
diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
index 9ecc91a..aec9d7e 100644
--- a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
@@ -20,8 +20,7 @@
package org.apache.pulsar.io.jdbc;
import com.google.common.collect.Maps;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
+
import java.util.Map;
import lombok.Data;
import lombok.EqualsAndHashCode;
@@ -29,11 +28,11 @@ import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
-import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.testng.Assert;
@@ -41,12 +40,16 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* Jdbc Sink test
*/
@Slf4j
public class JdbcSinkTest {
private final SqliteUtils sqliteUtils = new SqliteUtils(getClass().getSimpleName());
+ private Message<GenericRecord> message;
/**
* A Simple class to test jdbc class
@@ -72,9 +75,11 @@ public class JdbcSinkTest {
@Test
public void TestOpenAndWriteSink() throws Exception {
+ message = mock(MessageImpl.class);
JdbcAutoSchemaSink jdbcSink;
Map<String, Object> conf;
String tableName = "TestOpenAndWriteSink";
+ GenericSchema<GenericRecord> genericAvroSchema;
String jdbcUrl = sqliteUtils.sqliteUri();
conf = Maps.newHashMap();
@@ -99,16 +104,16 @@ public class JdbcSinkTest {
AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
byte[] bytes = schema.encode(obj);
- ByteBuf payload = Unpooled.copiedBuffer(bytes);
- AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
- autoConsumeSchema.setSchema(GenericSchemaImpl.of(schema.getSchemaInfo()));
- Message<GenericRecord> message = new MessageImpl("fake_topic_name", "77:777", conf, payload, autoConsumeSchema);
Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
.message(message)
.topicName("fake_topic_name")
.build();
+ genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
+
+ when(message.getValue())
+ .thenReturn(genericAvroSchema.decode(bytes));
log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
obj.toString(),
message.getValue().toString(),
diff --git a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java
index 72462e7..92d1f4f 100644
--- a/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java
+++ b/pulsar-io/solr/src/test/java/org/apache/pulsar/io/solr/SolrGenericRecordSinkTest.java
@@ -18,17 +18,17 @@
*/
package org.apache.pulsar.io.solr;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;
@@ -39,6 +39,9 @@ import org.testng.annotations.Test;
import java.util.HashMap;
import java.util.Map;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* solr Sink test
*/
@@ -46,6 +49,7 @@ import java.util.Map;
public class SolrGenericRecordSinkTest {
private SolrServerUtil solrServerUtil;
+ private Message<GenericRecord> message;
/**
* A Simple class to test solr class
@@ -71,6 +75,7 @@ public class SolrGenericRecordSinkTest {
@Test
public void TestOpenAndWriteSink() throws Exception {
+ message = mock(MessageImpl.class);
Map<String, Object> configs = new HashMap<>();
configs.put("solrUrl", "http://localhost:8983/solr");
configs.put("solrMode", "Standalone");
@@ -78,6 +83,7 @@ public class SolrGenericRecordSinkTest {
configs.put("solrCommitWithinMs", "100");
configs.put("username", "");
configs.put("password", "");
+ GenericSchema<GenericRecord> genericAvroSchema;
SolrGenericRecordSink sink = new SolrGenericRecordSink();
@@ -88,16 +94,19 @@ public class SolrGenericRecordSinkTest {
AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
byte[] bytes = schema.encode(obj);
- ByteBuf payload = Unpooled.copiedBuffer(bytes);
AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
autoConsumeSchema.setSchema(GenericSchemaImpl.of(schema.getSchemaInfo()));
- Message<GenericRecord> message = new MessageImpl("fake_topic_name", "77:777", configs, payload, autoConsumeSchema);
Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
.message(message)
.topicName("fake_topic_name")
.build();
+ genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
+
+ when(message.getValue())
+ .thenReturn(genericAvroSchema.decode(bytes));
+
log.info("foo:{}, Message.getValue: {}, record.getValue: {}",
obj.toString(),
message.getValue().toString(),
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
index fab2542..12b8122 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/SchemaTest.java
@@ -19,16 +19,17 @@
package org.apache.pulsar.tests.integration.schema;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
+import static org.testng.Assert.assertEquals;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.*;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.tests.integration.schema.Schemas.Person;
+import org.apache.pulsar.tests.integration.schema.Schemas.PersonConsumeSchema;
import org.apache.pulsar.tests.integration.schema.Schemas.Student;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.testng.annotations.BeforeMethod;
@@ -106,5 +107,54 @@ public class SchemaTest extends PulsarTestSuite {
}
}
+ @Test
+ public void testMultiVersionSchema() throws Exception {
+ final String tenant = PUBLIC_TENANT;
+ final String namespace = "test-namespace-" + randomName(16);
+ final String topic = "test-multi-version-schema";
+ final String fqtn = TopicName.get(
+ TopicDomain.persistent.value(),
+ tenant,
+ namespace,
+ topic
+ ).toString();
+
+ admin.namespaces().createNamespace(
+ tenant + "/" + namespace,
+ Sets.newHashSet(pulsarCluster.getClusterName())
+ );
+
+ // Create a topic with `Person`
+ try (Producer<Person> producer = client.newProducer(Schema.AVRO(
+ SchemaDefinition.<Person>builder().withAlwaysAllowNull
+ (false).withSupportSchemaVersioning(true).
+ withPojo(Person.class).build()))
+ .topic(fqtn)
+ .create()
+ ) {
+ Person person = new Person();
+ person.setName("Tom Hanks");
+ person.setAge(60);
+
+ producer.send(person);
+
+ log.info("Successfully published person : {}", person);
+ }
+
+ //Create a consumer for MultiVersionSchema
+ try (Consumer<PersonConsumeSchema> consumer = client.newConsumer(Schema.AVRO(
+ SchemaDefinition.<PersonConsumeSchema>builder().withAlwaysAllowNull
+ (false).withSupportSchemaVersioning(true).
+ withPojo(PersonConsumeSchema.class).build()))
+ .topic(fqtn)
+ .subscribe();
+ ) {
+ PersonConsumeSchema personConsumeSchema = consumer.receive().getValue();
+ assertEquals("Tom Hanks", personConsumeSchema.getName());
+ assertEquals(60, personConsumeSchema.getAge());
+ assertEquals("man", personConsumeSchema.getGender());
+ log.info("Successfully consumer personConsumeSchema : {}", personConsumeSchema);
+ }
+ }
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/Schemas.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/Schemas.java
index ebe798d..2c4af95 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/Schemas.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/schema/Schemas.java
@@ -36,6 +36,7 @@ import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
+import org.apache.avro.reflect.AvroDefault;
/**
* Keep a list of schemas for testing.
@@ -58,6 +59,23 @@ public final class Schemas {
}
/**
+ * A Person Struct.
+ */
+ @Data
+ @Getter
+ @Setter
+ @ToString
+ @EqualsAndHashCode
+ public static class PersonConsumeSchema {
+
+ private String name;
+ private int age;
+ @AvroDefault("\"male\"")
+ private String gender;
+
+ }
+
+ /**
* A Student Struct.
*/
@Data