You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/18 13:36:00 UTC
[pulsar] branch master updated: [Schema] Multi version schema
reader implementation. (#8464)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 72d6d28 [Schema] Multi version schema reader implementation. (#8464)
72d6d28 is described below
commit 72d6d28a7ca7a89f24e7397cefcd8a2a67a4252e
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Wed Nov 18 21:35:35 2020 +0800
[Schema] Multi version schema reader implementation. (#8464)
This PR in order to change decode multi version message by reader cache in schema to multi version reader and user can custom reader to decode multi version message.
---
.../pulsar/client/api/schema/SchemaReader.java | 30 ++++++++
.../client/impl/schema/AbstractStructSchema.java | 73 ++-----------------
.../client/impl/schema/AvroBaseStructSchema.java | 49 +++++++++++++
.../pulsar/client/impl/schema/AvroSchema.java | 47 +++++-------
.../pulsar/client/impl/schema/JSONSchema.java | 11 ++-
.../pulsar/client/impl/schema/ProtobufSchema.java | 11 +--
.../pulsar/client/impl/schema/StructSchema.java | 2 +-
.../generic/AbstractMultiVersionGenericReader.java | 43 +++++++++++
.../impl/schema/generic/GenericAvroSchema.java | 40 +++--------
.../impl/schema/generic/GenericJsonSchema.java | 38 +---------
.../impl/schema/generic/GenericSchemaImpl.java | 18 ++---
.../generic/MultiVersionGenericAvroReader.java | 65 +++++++++++++++++
...ema.java => MultiVersionGenericJsonReader.java} | 49 ++++++-------
.../reader/AbstractMultiVersionAvroBaseReader.java | 35 +++++++++
.../AbstractMultiVersionReader.java} | 83 ++++++++--------------
.../impl/schema/reader/MultiVersionAvroReader.java | 64 +++++++++++++++++
.../{StructSchema.java => util/SchemaUtil.java} | 62 ++++++----------
.../pulsar/client/impl/schema/AvroSchemaTest.java | 27 +++++++
.../pulsar/io/influxdb/v2/InfluxDBSinkTest.java | 4 +-
.../pulsar/sql/presto/TestAvroSchemaHandler.java | 6 +-
20 files changed, 437 insertions(+), 320 deletions(-)
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java
index d4279dc..dc18c4e 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaReader.java
@@ -56,4 +56,34 @@ public interface SchemaReader<T> {
* @return the serialized object
*/
T read(InputStream inputStream);
+
+ /**
+ * Serialize bytes convert pojo.
+ *
+ * @param bytes the data
+ * @param schemaVersion the schema version of message
+ * @return the serialized object
+ */
+ default T read(byte[] bytes, byte[] schemaVersion) {
+ return read(bytes, 0, bytes.length);
+ }
+
+ /**
+ * serialize bytes convert pojo.
+ *
+ * @param inputStream the stream of message
+ * @param schemaVersion the schema version of message
+ * @return the serialized object
+ */
+ default T read(InputStream inputStream, byte[] schemaVersion) {
+ return read(inputStream);
+ }
+
+ /**
+ * Set schema info provider, this method support multi version reader.
+ *
+ * @param schemaInfoProvider the stream of message
+ */
+ default void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
index 7730474..015ec4d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
@@ -18,26 +18,16 @@
*/
package org.apache.pulsar.client.impl.schema;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
-import org.apache.avro.AvroTypeException;
-import org.apache.commons.codec.binary.Hex;
-import org.apache.commons.lang3.SerializationException;
-import org.apache.pulsar.client.api.SchemaSerializationException;
+
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.api.schema.SchemaWriter;
-import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
/**
* minimal abstract StructSchema
*/
@@ -45,19 +35,11 @@ public abstract class AbstractStructSchema<T> extends AbstractSchema<T> {
protected static final Logger LOG = LoggerFactory.getLogger(AbstractStructSchema.class);
- protected SchemaInfo schemaInfo;
+ protected final SchemaInfo schemaInfo;
protected SchemaReader<T> reader;
protected SchemaWriter<T> writer;
protected SchemaInfoProvider schemaInfoProvider;
- LoadingCache<BytesSchemaVersion, SchemaReader<T>> readerCache = CacheBuilder.newBuilder().maximumSize(100000)
- .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<BytesSchemaVersion, SchemaReader<T>>() {
- @Override
- public SchemaReader<T> load(BytesSchemaVersion schemaVersion) {
- return loadReader(schemaVersion);
- }
- });
-
public AbstractStructSchema(SchemaInfo schemaInfo){
this.schemaInfo = schemaInfo;
}
@@ -75,17 +57,7 @@ public abstract class AbstractStructSchema<T> extends AbstractSchema<T> {
@Override
public T decode(byte[] bytes, byte[] schemaVersion) {
- try {
- return schemaVersion == null ? decode(bytes) :
- readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(bytes);
- } catch (ExecutionException | AvroTypeException e) {
- if (e instanceof AvroTypeException) {
- throw new SchemaSerializationException(e);
- }
- LOG.error("Can't get generic schema for topic {} schema version {}",
- schemaInfoProvider.getTopicName(), Hex.encodeHexString(schemaVersion), e);
- throw new RuntimeException("Can't get generic schema for topic " + schemaInfoProvider.getTopicName());
- }
+ return reader.read(bytes, schemaVersion);
}
@Override
@@ -95,14 +67,7 @@ public abstract class AbstractStructSchema<T> extends AbstractSchema<T> {
@Override
public T decode(ByteBuf byteBuf, byte[] schemaVersion) {
- try {
- return schemaVersion == null ? decode(byteBuf) :
- readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(new ByteBufInputStream(byteBuf));
- } catch (ExecutionException e) {
- LOG.error("Can't get generic schema for topic {} schema version {}",
- schemaInfoProvider.getTopicName(), Hex.encodeHexString(schemaVersion), e);
- throw new RuntimeException("Can't get generic schema for topic " + schemaInfoProvider.getTopicName());
- }
+ return reader.read(new ByteBufInputStream(byteBuf), schemaVersion);
}
@Override
@@ -112,34 +77,8 @@ public abstract class AbstractStructSchema<T> extends AbstractSchema<T> {
@Override
public void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
- this.schemaInfoProvider = schemaInfoProvider;
- }
-
- /**
- * Load the schema reader for reading messages encoded by the given schema version.
- *
- * @param schemaVersion the provided schema version
- * @return the schema reader for decoding messages encoded by the provided schema version.
- */
- protected abstract SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion);
-
- /**
- * TODO: think about how to make this async
- */
- protected SchemaInfo getSchemaInfoByVersion(byte[] schemaVersion) {
- try {
- return schemaInfoProvider.getSchemaByVersion(schemaVersion).get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new SerializationException(
- "Interrupted at fetching schema info for " + SchemaUtils.getStringSchemaVersion(schemaVersion),
- e
- );
- } catch (ExecutionException e) {
- throw new SerializationException(
- "Failed at fetching schema info for " + SchemaUtils.getStringSchemaVersion(schemaVersion),
- e.getCause()
- );
+ if (reader != null) {
+ this.reader.setSchemaInfoProvider(schemaInfoProvider);
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroBaseStructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroBaseStructSchema.java
new file mode 100644
index 0000000..b464731
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroBaseStructSchema.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.avro.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseAvroSchema;
+
+/**
+ * This is a base schema implementation for Avro Based `Struct` types.
+ * A struct type is used for presenting records (objects) which
+ * have multiple fields.
+ *
+ * <p>Currently Pulsar supports 3 `Struct` types -
+ * {@link org.apache.pulsar.common.schema.SchemaType#AVRO},
+ * {@link org.apache.pulsar.common.schema.SchemaType#JSON},
+ * and {@link org.apache.pulsar.common.schema.SchemaType#PROTOBUF}.
+ */
+public abstract class AvroBaseStructSchema<T> extends AbstractStructSchema<T>{
+
+ protected final Schema schema;
+
+ public AvroBaseStructSchema(SchemaInfo schemaInfo) {
+ super(schemaInfo);
+ this.schema = parseAvroSchema(new String(schemaInfo.getSchema(), UTF_8));
+ }
+
+ public Schema getAvroSchema(){
+ return schema;
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index 3e35cb2..46227b8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl.schema;
import lombok.extern.slf4j.Slf4j;
+
import org.apache.avro.Conversions;
import org.apache.avro.data.JodaTimeConversions;
import org.apache.avro.data.TimeConversions;
@@ -26,9 +27,9 @@ import org.apache.avro.reflect.ReflectData;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaReader;
-import org.apache.pulsar.client.impl.schema.reader.AvroReader;
+import org.apache.pulsar.client.api.schema.SchemaWriter;
+import org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader;
import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
-import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.slf4j.Logger;
@@ -36,11 +37,14 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
+import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.getJsr310ConversionEnabledFromSchemaInfo;
+import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseSchemaInfo;
+
/**
* An AVRO schema implementation.
*/
@Slf4j
-public class AvroSchema<T> extends StructSchema<T> {
+public class AvroSchema<T> extends AvroBaseStructSchema<T> {
private static final Logger LOG = LoggerFactory.getLogger(AvroSchema.class);
private ClassLoader pojoClassLoader;
@@ -49,10 +53,16 @@ public class AvroSchema<T> extends StructSchema<T> {
super(schemaInfo);
this.pojoClassLoader = pojoClassLoader;
boolean jsr310ConversionEnabled = getJsr310ConversionEnabledFromSchemaInfo(schemaInfo);
- setReader(new AvroReader<>(schema, pojoClassLoader, jsr310ConversionEnabled));
+ setReader(new MultiVersionAvroReader<>(schema, pojoClassLoader, getJsr310ConversionEnabledFromSchemaInfo(schemaInfo)));
setWriter(new AvroWriter<>(schema, jsr310ConversionEnabled));
}
+ private AvroSchema(SchemaReader<T> reader, SchemaWriter<T> writer, SchemaInfo schemaInfo) {
+ super(schemaInfo);
+ setReader(reader);
+ setWriter(writer);
+ }
+
@Override
public boolean supportSchemaVersioning() {
return true;
@@ -68,6 +78,10 @@ public class AvroSchema<T> extends StructSchema<T> {
}
public static <T> AvroSchema<T> of(SchemaDefinition<T> schemaDefinition) {
+ if (schemaDefinition.getSchemaReaderOpt().isPresent() && schemaDefinition.getSchemaWriterOpt().isPresent()) {
+ return new AvroSchema<>(schemaDefinition.getSchemaReaderOpt().get(),
+ schemaDefinition.getSchemaWriterOpt().get(), parseSchemaInfo(schemaDefinition, SchemaType.AVRO));
+ }
ClassLoader pojoClassLoader = null;
if (schemaDefinition.getPojo() != null) {
pojoClassLoader = schemaDefinition.getPojo().getClassLoader();
@@ -88,31 +102,6 @@ public class AvroSchema<T> extends StructSchema<T> {
return new AvroSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.AVRO), pojoClassLoader);
}
- @Override
- protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
- SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get());
- if (schemaInfo != null) {
- log.info("Load schema reader for version({}), schema is : {}, schemaInfo: {}",
- SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
- schemaInfo.getSchemaDefinition(), schemaInfo.toString());
- boolean jsr310ConversionEnabled = getJsr310ConversionEnabledFromSchemaInfo(schemaInfo);
- return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()), schema, pojoClassLoader, jsr310ConversionEnabled);
- } else {
- log.warn("No schema found for version({}), use latest schema : {}",
- SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
- this.schemaInfo.getSchemaDefinition());
- return reader;
- }
- }
-
- private static boolean getJsr310ConversionEnabledFromSchemaInfo(SchemaInfo schemaInfo) {
- if (schemaInfo != null) {
- return Boolean.parseBoolean(schemaInfo.getProperties()
- .getOrDefault(SchemaDefinitionBuilderImpl.JSR310_CONVERSION_ENABLED, "false"));
- }
- return false;
- }
-
public static void addLogicalTypeConversions(ReflectData reflectData, boolean jsr310ConversionEnabled) {
reflectData.addLogicalTypeConversion(new Conversions.DecimalConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.DateConversion());
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
index 45651f3..4e3b874 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
@@ -25,22 +25,24 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
import lombok.extern.slf4j.Slf4j;
+
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.api.schema.SchemaWriter;
import org.apache.pulsar.client.impl.schema.reader.JacksonJsonReader;
import org.apache.pulsar.client.impl.schema.writer.JacksonJsonWriter;
-import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.util.Map;
+import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseSchemaInfo;
+
/**
* A schema implementation to deal with json data.
*/
@Slf4j
-public class JSONSchema<T> extends StructSchema<T> {
+public class JSONSchema<T> extends AvroBaseStructSchema<T> {
// Cannot use org.apache.pulsar.common.util.ObjectMapperFactory.getThreadLocal() because it does not
// return shaded version of object mapper
private static final ThreadLocal<ObjectMapper> JSON_MAPPER = ThreadLocal.withInitial(() -> {
@@ -59,11 +61,6 @@ public class JSONSchema<T> extends StructSchema<T> {
setReader(reader);
}
- @Override
- protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
- throw new RuntimeException("JSONSchema don't support schema versioning");
- }
-
/**
* Implemented for backwards compatibility reasons
* since the original schema generated by JSONSchema was based off the json schema standard
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
index 23dde95..f7971eb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
@@ -22,14 +22,14 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.Descriptors;
import com.google.protobuf.GeneratedMessageV3;
+
import lombok.AllArgsConstructor;
import lombok.Getter;
+
import org.apache.avro.protobuf.ProtobufData;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
-import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.impl.schema.reader.ProtobufReader;
import org.apache.pulsar.client.impl.schema.writer.ProtobufWriter;
-import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
@@ -45,7 +45,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
/**
* A schema implementation to deal with protobuf generated messages.
*/
-public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> extends StructSchema<T> {
+public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> extends AvroBaseStructSchema<T> {
public static final String PARSING_INFO_PROPERTY = "__PARSING_INFO__";
@@ -94,11 +94,6 @@ public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> ex
}
}
- @Override
- protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
- throw new RuntimeException("ProtobufSchema don't support schema versioning");
- }
-
public static <T extends com.google.protobuf.GeneratedMessageV3> ProtobufSchema<T> of(Class<T> pojo) {
return of(pojo, new HashMap<>());
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
index 986e186..8cc1868 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
@@ -40,6 +40,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
* {@link org.apache.pulsar.common.schema.SchemaType#JSON},
* and {@link org.apache.pulsar.common.schema.SchemaType#PROTOBUF}.
*/
+@Deprecated
public abstract class StructSchema<T> extends AbstractStructSchema<T> {
protected final Schema schema;
@@ -47,7 +48,6 @@ public abstract class StructSchema<T> extends AbstractStructSchema<T> {
protected StructSchema(SchemaInfo schemaInfo) {
super(schemaInfo);
this.schema = parseAvroSchema(new String(schemaInfo.getSchema(), UTF_8));
- this.schemaInfo = schemaInfo;
if (schemaInfo.getProperties().containsKey(GenericAvroSchema.OFFSET_PROP)) {
this.schema.addProp(GenericAvroSchema.OFFSET_PROP,
schemaInfo.getProperties().get(GenericAvroSchema.OFFSET_PROP));
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/AbstractMultiVersionGenericReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/AbstractMultiVersionGenericReader.java
new file mode 100644
index 0000000..58396a1
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/AbstractMultiVersionGenericReader.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.generic;
+
+import org.apache.avro.Schema;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.SchemaReader;
+import org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionAvroBaseReader;
+
+/**
+ * The abstract class of multi version generic reader.
+ */
+public abstract class AbstractMultiVersionGenericReader extends AbstractMultiVersionAvroBaseReader<GenericRecord> {
+
+ // the flag controls whether to use the provided schema as reader schema
+ // to decode the messages. In `AUTO_CONSUME` mode, setting this flag to `false`
+ // allows decoding the messages using the schema associated with the messages.
+ protected final boolean useProvidedSchemaAsReaderSchema;
+
+ protected AbstractMultiVersionGenericReader(boolean useProvidedSchemaAsReaderSchema,
+ SchemaReader<GenericRecord> providerSchemaReader,
+ Schema readerSchema) {
+ super(providerSchemaReader, readerSchema);
+ this.useProvidedSchemaAsReaderSchema = useProvidedSchemaAsReaderSchema;
+ }
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
index 94c5ba1..8f63328 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
@@ -19,12 +19,9 @@
package org.apache.pulsar.client.impl.schema.generic;
import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
+
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
-import org.apache.pulsar.client.api.schema.SchemaReader;
-import org.apache.pulsar.client.impl.schema.SchemaUtils;
-import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
/**
@@ -41,9 +38,14 @@ public class GenericAvroSchema extends GenericSchemaImpl {
GenericAvroSchema(SchemaInfo schemaInfo,
boolean useProvidedSchemaAsReaderSchema) {
- super(schemaInfo, useProvidedSchemaAsReaderSchema);
- setReader(new GenericAvroReader(schema));
+ super(schemaInfo);
+ setReader(new MultiVersionGenericAvroReader(useProvidedSchemaAsReaderSchema, schema));
setWriter(new GenericAvroWriter(schema));
+
+ if (schemaInfo.getProperties().containsKey(GenericAvroSchema.OFFSET_PROP)) {
+ this.schema.addProp(GenericAvroSchema.OFFSET_PROP,
+ schemaInfo.getProperties().get(GenericAvroSchema.OFFSET_PROP));
+ }
}
@Override
@@ -59,34 +61,10 @@ public class GenericAvroSchema extends GenericSchemaImpl {
@Override
public org.apache.pulsar.client.api.Schema<GenericRecord> clone() {
org.apache.pulsar.client.api.Schema<GenericRecord> schema =
- GenericAvroSchema.of(schemaInfo, useProvidedSchemaAsReaderSchema);
+ GenericAvroSchema.of(schemaInfo, ((AbstractMultiVersionGenericReader) reader).useProvidedSchemaAsReaderSchema);
if (schemaInfoProvider != null) {
schema.setSchemaInfoProvider(schemaInfoProvider);
}
return schema;
}
-
- @Override
- protected SchemaReader<GenericRecord> loadReader(BytesSchemaVersion schemaVersion) {
- SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get());
- if (schemaInfo != null) {
- log.info("Load schema reader for version({}), schema is : {}",
- SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
- schemaInfo);
- Schema writerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition());
- Schema readerSchema = useProvidedSchemaAsReaderSchema ? schema : writerSchema;
- readerSchema.addProp(OFFSET_PROP, schemaInfo.getProperties().getOrDefault(OFFSET_PROP, "0"));
-
- return new GenericAvroReader(
- writerSchema,
- readerSchema,
- schemaVersion.get());
- } else {
- log.warn("No schema found for version({}), use latest schema : {}",
- SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
- this.schemaInfo);
- return reader;
- }
- }
-
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
index 8edae25..87a1790 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
@@ -18,16 +18,8 @@
*/
package org.apache.pulsar.client.impl.schema.generic;
-import java.util.stream.Collectors;
-
import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
-import org.apache.pulsar.client.api.schema.Field;
-import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
-import org.apache.pulsar.client.api.schema.SchemaReader;
-import org.apache.pulsar.client.impl.schema.SchemaUtils;
-import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
/**
@@ -42,35 +34,9 @@ public class GenericJsonSchema extends GenericSchemaImpl {
GenericJsonSchema(SchemaInfo schemaInfo,
boolean useProvidedSchemaAsReaderSchema) {
- super(schemaInfo, useProvidedSchemaAsReaderSchema);
+ super(schemaInfo);
setWriter(new GenericJsonWriter());
- setReader(new GenericJsonReader(fields, schemaInfo));
- }
-
- @Override
- protected SchemaReader<GenericRecord> loadReader(BytesSchemaVersion schemaVersion) {
- SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get());
- if (schemaInfo != null) {
- log.info("Load schema reader for version({}), schema is : {}",
- SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
- schemaInfo.getSchemaDefinition());
- Schema readerSchema;
- if (useProvidedSchemaAsReaderSchema) {
- readerSchema = schema;
- } else {
- readerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition());
- }
- return new GenericJsonReader(schemaVersion.get(),
- readerSchema.getFields()
- .stream()
- .map(f -> new Field(f.name(), f.pos()))
- .collect(Collectors.toList()), schemaInfo);
- } else {
- log.warn("No schema found for version({}), use latest schema : {}",
- SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
- this.schemaInfo.getSchemaDefinition());
- return reader;
- }
+ setReader(new MultiVersionGenericJsonReader(useProvidedSchemaAsReaderSchema, schema, schemaInfo, fields));
}
@Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
index 426bb7a..59e748c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.client.impl.schema.generic;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
-import org.apache.pulsar.client.impl.schema.StructSchema;
+import org.apache.pulsar.client.impl.schema.AvroBaseStructSchema;
import org.apache.pulsar.common.schema.SchemaInfo;
import java.util.List;
@@ -32,23 +32,17 @@ import java.util.stream.Collectors;
* warning :
* we suggest migrate GenericSchemaImpl.of() to <GenericSchema Implementor>.of() method (e.g. GenericJsonSchema 、GenericAvroSchema )
*/
-public abstract class GenericSchemaImpl extends StructSchema<GenericRecord> implements GenericSchema<GenericRecord> {
+public abstract class GenericSchemaImpl extends AvroBaseStructSchema<GenericRecord> implements GenericSchema<GenericRecord> {
protected final List<Field> fields;
- // the flag controls whether to use the provided schema as reader schema
- // to decode the messages. In `AUTO_CONSUME` mode, setting this flag to `false`
- // allows decoding the messages using the schema associated with the messages.
- protected final boolean useProvidedSchemaAsReaderSchema;
- protected GenericSchemaImpl(SchemaInfo schemaInfo,
- boolean useProvidedSchemaAsReaderSchema) {
+ protected GenericSchemaImpl(SchemaInfo schemaInfo) {
super(schemaInfo);
this.fields = schema.getFields()
.stream()
.map(f -> new Field(f.name(), f.pos()))
.collect(Collectors.toList());
- this.useProvidedSchemaAsReaderSchema = useProvidedSchemaAsReaderSchema;
}
@Override
@@ -69,9 +63,9 @@ public abstract class GenericSchemaImpl extends StructSchema<GenericRecord> impl
/**
* warning :
* we suggest migrate GenericSchemaImpl.of() to <GenericSchema Implementor>.of() method (e.g. GenericJsonSchema 、GenericAvroSchema )
- * @param schemaInfo
- * @param useProvidedSchemaAsReaderSchema
- * @return
+ * @param schemaInfo {@link SchemaInfo}
+ * @param useProvidedSchemaAsReaderSchema {@link Boolean}
+ * @return generic schema implementation
*/
public static GenericSchemaImpl of(SchemaInfo schemaInfo,
boolean useProvidedSchemaAsReaderSchema) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericAvroReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericAvroReader.java
new file mode 100644
index 0000000..0c2f4d2
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericAvroReader.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.generic;
+
+import org.apache.avro.Schema;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.SchemaReader;
+import org.apache.pulsar.client.impl.schema.SchemaUtils;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseAvroSchema;
+
+/**
+ * A multi version generic avro reader.
+ */
+public class MultiVersionGenericAvroReader extends AbstractMultiVersionGenericReader {
+
+ public MultiVersionGenericAvroReader(boolean useProvidedSchemaAsReaderSchema, Schema readerSchema) {
+ super(useProvidedSchemaAsReaderSchema, new GenericAvroReader(readerSchema), readerSchema);
+ }
+
+ @Override
+ protected SchemaReader<GenericRecord> loadReader(BytesSchemaVersion schemaVersion) {
+ SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get());
+ if (schemaInfo != null) {
+ LOG.info("Load schema reader for version({}), schema is : {}",
+ SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
+ schemaInfo);
+ Schema writerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition());
+ Schema readerSchema = useProvidedSchemaAsReaderSchema ? this.readerSchema : writerSchema;
+ readerSchema.addProp(GenericAvroSchema.OFFSET_PROP,
+ schemaInfo.getProperties().getOrDefault(GenericAvroSchema.OFFSET_PROP, "0"));
+
+ return new GenericAvroReader(
+ writerSchema,
+ readerSchema,
+ schemaVersion.get());
+ } else {
+ LOG.warn("No schema found for version({}), use latest schema : {}",
+ SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
+ this.readerSchema);
+ return providerSchemaReader;
+ }
+ }
+ protected static final Logger LOG = LoggerFactory.getLogger(MultiVersionGenericAvroReader.class);
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericJsonReader.java
similarity index 63%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericJsonReader.java
index 8edae25..daab882 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericJsonReader.java
@@ -18,45 +18,41 @@
*/
package org.apache.pulsar.client.impl.schema.generic;
-import java.util.stream.Collectors;
-
-import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.impl.schema.SchemaUtils;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseAvroSchema;
/**
- * A generic json schema.
+ * A multi version generic json reader.
*/
-@Slf4j
-public class GenericJsonSchema extends GenericSchemaImpl {
+public class MultiVersionGenericJsonReader extends AbstractMultiVersionGenericReader {
- public GenericJsonSchema(SchemaInfo schemaInfo) {
- this(schemaInfo, true);
- }
-
- GenericJsonSchema(SchemaInfo schemaInfo,
- boolean useProvidedSchemaAsReaderSchema) {
- super(schemaInfo, useProvidedSchemaAsReaderSchema);
- setWriter(new GenericJsonWriter());
- setReader(new GenericJsonReader(fields, schemaInfo));
+ public MultiVersionGenericJsonReader(boolean useProvidedSchemaAsReaderSchema, Schema readerSchema,
+ SchemaInfo schemaInfo, List<Field> fields) {
+ super(useProvidedSchemaAsReaderSchema, new GenericJsonReader(fields, schemaInfo), readerSchema);
}
@Override
protected SchemaReader<GenericRecord> loadReader(BytesSchemaVersion schemaVersion) {
SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get());
if (schemaInfo != null) {
- log.info("Load schema reader for version({}), schema is : {}",
- SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
- schemaInfo.getSchemaDefinition());
+ LOG.info("Load schema reader for version({}), schema is : {}",
+ SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
+ schemaInfo.getSchemaDefinition());
Schema readerSchema;
if (useProvidedSchemaAsReaderSchema) {
- readerSchema = schema;
+ readerSchema = this.readerSchema;
} else {
readerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition());
}
@@ -66,15 +62,12 @@ public class GenericJsonSchema extends GenericSchemaImpl {
.map(f -> new Field(f.name(), f.pos()))
.collect(Collectors.toList()), schemaInfo);
} else {
- log.warn("No schema found for version({}), use latest schema : {}",
- SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
- this.schemaInfo.getSchemaDefinition());
- return reader;
+ LOG.warn("No schema found for version({}), use latest schema : {}",
+ SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
+ this.readerSchema);
+ return providerSchemaReader;
}
}
- @Override
- public GenericRecordBuilder newRecordBuilder() {
- throw new UnsupportedOperationException("Json Schema doesn't support record builder yet");
- }
+ protected static final Logger LOG = LoggerFactory.getLogger(MultiVersionGenericAvroReader.class);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AbstractMultiVersionAvroBaseReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AbstractMultiVersionAvroBaseReader.java
new file mode 100644
index 0000000..b657e42
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AbstractMultiVersionAvroBaseReader.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.reader;
+
+import org.apache.avro.Schema;
+import org.apache.pulsar.client.api.schema.SchemaReader;
+
+/**
+ * The abstract class of multi version avro base reader.
+ */
+public abstract class AbstractMultiVersionAvroBaseReader<T> extends AbstractMultiVersionReader<T> {
+
+ protected Schema readerSchema;
+
+ public AbstractMultiVersionAvroBaseReader(SchemaReader<T> providerSchemaReader, Schema readerSchema) {
+ super(providerSchemaReader);
+ this.readerSchema = readerSchema;
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AbstractMultiVersionReader.java
similarity index 71%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AbstractMultiVersionReader.java
index 7730474..75a6a5d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractStructSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AbstractMultiVersionReader.java
@@ -16,38 +16,33 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.client.impl.schema;
+package org.apache.pulsar.client.impl.schema.reader;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufInputStream;
import org.apache.avro.AvroTypeException;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang3.SerializationException;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.api.schema.SchemaReader;
-import org.apache.pulsar.client.api.schema.SchemaWriter;
+import org.apache.pulsar.client.impl.schema.SchemaUtils;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.InputStream;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
- * minimal abstract StructSchema
+ * The multi version reader abstract class, implement it will handle the multi version schema.
*/
-public abstract class AbstractStructSchema<T> extends AbstractSchema<T> {
+public abstract class AbstractMultiVersionReader<T> implements SchemaReader<T> {
- protected static final Logger LOG = LoggerFactory.getLogger(AbstractStructSchema.class);
-
- protected SchemaInfo schemaInfo;
- protected SchemaReader<T> reader;
- protected SchemaWriter<T> writer;
+ protected final SchemaReader<T> providerSchemaReader;
protected SchemaInfoProvider schemaInfoProvider;
LoadingCache<BytesSchemaVersion, SchemaReader<T>> readerCache = CacheBuilder.newBuilder().maximumSize(100000)
@@ -58,30 +53,26 @@ public abstract class AbstractStructSchema<T> extends AbstractSchema<T> {
}
});
- public AbstractStructSchema(SchemaInfo schemaInfo){
- this.schemaInfo = schemaInfo;
+ AbstractMultiVersionReader(SchemaReader<T> providerSchemaReader) {
+ this.providerSchemaReader = providerSchemaReader;
}
-
@Override
- public byte[] encode(T message) {
- return writer.write(message);
+ public T read(byte[] bytes, int offset, int length) {
+ return providerSchemaReader.read(bytes);
}
@Override
- public T decode(byte[] bytes) {
- return reader.read(bytes);
+ public T read(InputStream inputStream) {
+ return providerSchemaReader.read(inputStream);
}
@Override
- public T decode(byte[] bytes, byte[] schemaVersion) {
+ public T read(InputStream inputStream, byte[] schemaVersion) {
try {
- return schemaVersion == null ? decode(bytes) :
- readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(bytes);
- } catch (ExecutionException | AvroTypeException e) {
- if (e instanceof AvroTypeException) {
- throw new SchemaSerializationException(e);
- }
+ return schemaVersion == null ? read(inputStream) :
+ readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(inputStream);
+ } catch (ExecutionException e) {
LOG.error("Can't get generic schema for topic {} schema version {}",
schemaInfoProvider.getTopicName(), Hex.encodeHexString(schemaVersion), e);
throw new RuntimeException("Can't get generic schema for topic " + schemaInfoProvider.getTopicName());
@@ -89,16 +80,14 @@ public abstract class AbstractStructSchema<T> extends AbstractSchema<T> {
}
@Override
- public T decode(ByteBuf byteBuf) {
- return reader.read(new ByteBufInputStream(byteBuf));
- }
-
- @Override
- public T decode(ByteBuf byteBuf, byte[] schemaVersion) {
+ public T read(byte[] bytes, byte[] schemaVersion) {
try {
- return schemaVersion == null ? decode(byteBuf) :
- readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(new ByteBufInputStream(byteBuf));
- } catch (ExecutionException e) {
+ return schemaVersion == null ? read(bytes) :
+ readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(bytes);
+ } catch (ExecutionException | AvroTypeException e) {
+ if (e instanceof AvroTypeException) {
+ throw new SchemaSerializationException(e);
+ }
LOG.error("Can't get generic schema for topic {} schema version {}",
schemaInfoProvider.getTopicName(), Hex.encodeHexString(schemaVersion), e);
throw new RuntimeException("Can't get generic schema for topic " + schemaInfoProvider.getTopicName());
@@ -106,11 +95,6 @@ public abstract class AbstractStructSchema<T> extends AbstractSchema<T> {
}
@Override
- public SchemaInfo getSchemaInfo() {
- return this.schemaInfo;
- }
-
- @Override
public void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
this.schemaInfoProvider = schemaInfoProvider;
}
@@ -132,27 +116,16 @@ public abstract class AbstractStructSchema<T> extends AbstractSchema<T> {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SerializationException(
- "Interrupted at fetching schema info for " + SchemaUtils.getStringSchemaVersion(schemaVersion),
- e
+ "Interrupted at fetching schema info for " + SchemaUtils.getStringSchemaVersion(schemaVersion),
+ e
);
} catch (ExecutionException e) {
throw new SerializationException(
- "Failed at fetching schema info for " + SchemaUtils.getStringSchemaVersion(schemaVersion),
- e.getCause()
+ "Failed at fetching schema info for " + SchemaUtils.getStringSchemaVersion(schemaVersion),
+ e.getCause()
);
}
}
- protected void setWriter(SchemaWriter<T> writer) {
- this.writer = writer;
- }
-
- protected void setReader(SchemaReader<T> reader) {
- this.reader = reader;
- }
-
- protected SchemaReader<T> getReader() {
- return reader;
- }
-
+ protected static final Logger LOG = LoggerFactory.getLogger(AbstractMultiVersionReader.class);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/MultiVersionAvroReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/MultiVersionAvroReader.java
new file mode 100644
index 0000000..8f3e5cc
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/MultiVersionAvroReader.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.reader;
+
+import org.apache.avro.Schema;
+import org.apache.pulsar.client.api.schema.SchemaReader;
+import org.apache.pulsar.client.impl.schema.SchemaUtils;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.getJsr310ConversionEnabledFromSchemaInfo;
+import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseAvroSchema;
+
+/**
+ * A multi version avro reader.
+ */
+public class MultiVersionAvroReader<T> extends AbstractMultiVersionAvroBaseReader<T> {
+
+ private final ClassLoader pojoClassLoader;
+
+ public MultiVersionAvroReader(Schema readerSchema, ClassLoader pojoClassLoader, boolean jsr310ConversionEnabled) {
+ super(new AvroReader<>(readerSchema, pojoClassLoader, jsr310ConversionEnabled), readerSchema);
+ this.pojoClassLoader = pojoClassLoader;
+ }
+
+ @Override
+ protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
+ SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get());
+ if (schemaInfo != null) {
+ LOG.info("Load schema reader for version({}), schema is : {}, schemaInfo: {}",
+ SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
+ schemaInfo.getSchemaDefinition(), schemaInfo.toString());
+ boolean jsr310ConversionEnabled = getJsr310ConversionEnabledFromSchemaInfo(schemaInfo);
+ return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()),
+ readerSchema, pojoClassLoader, jsr310ConversionEnabled);
+ } else {
+ LOG.warn("No schema found for version({}), use latest schema : {}",
+ SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
+ this.readerSchema);
+ return providerSchemaReader;
+ }
+ }
+
+ protected static final Logger LOG = LoggerFactory.getLogger(MultiVersionAvroReader.class);
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java
similarity index 70%
copy from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
copy to pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java
index 986e186..7e1e1c0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java
@@ -16,13 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.client.impl.schema;
+package org.apache.pulsar.client.impl.schema.util;
import org.apache.avro.Schema;
import org.apache.avro.reflect.ReflectData;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
-import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
+import org.apache.pulsar.client.impl.schema.SchemaDefinitionBuilderImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
@@ -30,36 +30,31 @@ import java.lang.reflect.Field;
import static java.nio.charset.StandardCharsets.UTF_8;
-/**
- * This is a base schema implementation for Avro Based `Struct` types.
- * A struct type is used for presenting records (objects) which
- * have multiple fields.
- *
- * <p>Currently Pulsar supports 3 `Struct` types -
- * {@link org.apache.pulsar.common.schema.SchemaType#AVRO},
- * {@link org.apache.pulsar.common.schema.SchemaType#JSON},
- * and {@link org.apache.pulsar.common.schema.SchemaType#PROTOBUF}.
- */
-public abstract class StructSchema<T> extends AbstractStructSchema<T> {
-
- protected final Schema schema;
+public class SchemaUtil {
- protected StructSchema(SchemaInfo schemaInfo) {
- super(schemaInfo);
- this.schema = parseAvroSchema(new String(schemaInfo.getSchema(), UTF_8));
- this.schemaInfo = schemaInfo;
- if (schemaInfo.getProperties().containsKey(GenericAvroSchema.OFFSET_PROP)) {
- this.schema.addProp(GenericAvroSchema.OFFSET_PROP,
- schemaInfo.getProperties().get(GenericAvroSchema.OFFSET_PROP));
+ public static boolean getJsr310ConversionEnabledFromSchemaInfo(SchemaInfo schemaInfo) {
+ if (schemaInfo != null) {
+ return Boolean.parseBoolean(schemaInfo.getProperties()
+ .getOrDefault(SchemaDefinitionBuilderImpl.JSR310_CONVERSION_ENABLED, "false"));
}
+ return false;
}
- public Schema getAvroSchema() {
- return schema;
+ public static Schema parseAvroSchema(String schemaJson) {
+ final Schema.Parser parser = new Schema.Parser();
+ parser.setValidateDefaults(false);
+ return parser.parse(schemaJson);
}
+ public static <T> SchemaInfo parseSchemaInfo(SchemaDefinition<T> schemaDefinition, SchemaType schemaType) {
+ return SchemaInfo.builder()
+ .schema(createAvroSchema(schemaDefinition).toString().getBytes(UTF_8))
+ .properties(schemaDefinition.getProperties())
+ .name("")
+ .type(schemaType).build();
+ }
- protected static Schema createAvroSchema(SchemaDefinition schemaDefinition) {
+ public static Schema createAvroSchema(SchemaDefinition schemaDefinition) {
Class pojo = schemaDefinition.getPojo();
if (StringUtils.isNotBlank(schemaDefinition.getJsonDef())) {
@@ -89,7 +84,7 @@ public abstract class StructSchema<T> extends AbstractStructSchema<T> {
}
}
- protected static Schema extractAvroSchema(SchemaDefinition schemaDefinition, Class pojo) {
+ public static Schema extractAvroSchema(SchemaDefinition schemaDefinition, Class pojo) {
try {
return parseAvroSchema(pojo.getDeclaredField("SCHEMA$").get(null).toString());
} catch (NoSuchFieldException | IllegalAccessException | IllegalArgumentException ignored) {
@@ -97,19 +92,4 @@ public abstract class StructSchema<T> extends AbstractStructSchema<T> {
: ReflectData.get().getSchema(pojo);
}
}
-
- protected static Schema parseAvroSchema(String schemaJson) {
- final Schema.Parser parser = new Schema.Parser();
- parser.setValidateDefaults(false);
- return parser.parse(schemaJson);
- }
-
- public static <T> SchemaInfo parseSchemaInfo(SchemaDefinition<T> schemaDefinition, SchemaType schemaType) {
- return SchemaInfo.builder()
- .schema(createAvroSchema(schemaDefinition).toString().getBytes(UTF_8))
- .properties(schemaDefinition.getProperties())
- .name("")
- .type(schemaType).build();
- }
-
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
index 8fd8154..9116e28 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
@@ -32,6 +32,7 @@ import java.time.LocalDate;
import java.time.LocalTime;
import java.util.Arrays;
+import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import lombok.Data;
@@ -50,10 +51,15 @@ import org.apache.avro.reflect.Nullable;
import org.apache.avro.reflect.ReflectData;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.client.api.schema.SchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaDefinitionBuilder;
+import org.apache.pulsar.client.api.schema.SchemaReader;
+import org.apache.pulsar.client.api.schema.SchemaWriter;
import org.apache.pulsar.client.avro.generated.NasaMission;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
+import org.apache.pulsar.client.impl.schema.reader.JacksonJsonReader;
import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
+import org.apache.pulsar.client.impl.schema.writer.JacksonJsonWriter;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.joda.time.DateTime;
@@ -392,4 +398,25 @@ public class AvroSchemaTest {
Assert.assertEquals(((BufferedBinaryEncoder)encoder).bytesBuffered(), 0);
}
+ @Test
+ public void testAvroSchemaUserDefinedReadAndWriter() {
+ SchemaReader<Foo> reader = new JacksonJsonReader<>(new ObjectMapper(), Foo.class);
+ SchemaWriter<Foo> writer = new JacksonJsonWriter<>(new ObjectMapper());
+ SchemaDefinition<Foo> schemaDefinition = SchemaDefinition.<Foo>builder()
+ .withPojo(Bar.class)
+ .withSchemaReader(reader)
+ .withSchemaWriter(writer)
+ .build();
+
+ AvroSchema<Foo> schema = AvroSchema.of(schemaDefinition);
+ Foo foo = new Foo();
+ foo.setColor(SchemaTestUtils.Color.RED);
+ String field1 = "test";
+ foo.setField1(field1);
+ schema.encode(foo);
+ foo = schema.decode(schema.encode(foo));
+ assertEquals(foo.getColor(), SchemaTestUtils.Color.RED);
+ assertEquals(field1, foo.getField1());
+ }
+
}
diff --git a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkTest.java b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkTest.java
index a2e77cd..ef71e00 100644
--- a/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkTest.java
+++ b/pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v2/InfluxDBSinkTest.java
@@ -28,10 +28,10 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.schema.AbstractStructSchema;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
-import org.apache.pulsar.client.impl.schema.StructSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
import org.apache.pulsar.functions.api.Record;
@@ -144,7 +144,7 @@ public class InfluxDBSinkTest {
openWriteClose(jsonSchema);
}
- private void openWriteClose(StructSchema<Cpu> schema) throws Exception {
+ private void openWriteClose(AbstractStructSchema<Cpu> schema) throws Exception {
// test open
Map<String, Object> map = new HashMap();
map.put("influxdbUrl", "http://localhost:9999");
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestAvroSchemaHandler.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestAvroSchemaHandler.java
index 7b150ea..4dbda98 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestAvroSchemaHandler.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestAvroSchemaHandler.java
@@ -28,7 +28,6 @@ import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
-import org.apache.pulsar.client.impl.schema.StructSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
import org.apache.pulsar.common.api.raw.RawMessage;
import org.apache.pulsar.common.schema.SchemaType;
@@ -41,6 +40,7 @@ import java.util.ArrayList;
import java.util.List;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseSchemaInfo;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.when;
@@ -77,7 +77,7 @@ public class TestAvroSchemaHandler {
Schema schema1 = ReflectData.AllowNull.get().getSchema(Foo1.class);
PulsarSqlSchemaInfoProvider pulsarSqlSchemaInfoProvider = mock(PulsarSqlSchemaInfoProvider.class);
AvroSchemaHandler avroSchemaHandler = new AvroSchemaHandler(pulsarSqlSchemaInfoProvider,
- StructSchema.parseSchemaInfo(SchemaDefinition.builder().withPojo(Foo2.class).build(), SchemaType.AVRO), columnHandles);
+ parseSchemaInfo(SchemaDefinition.builder().withPojo(Foo2.class).build(), SchemaType.AVRO), columnHandles);
byte[] schemaVersion = new byte[8];
for (int i = 0 ; i<8; i++) {
schemaVersion[i] = 0;
@@ -100,7 +100,7 @@ public class TestAvroSchemaHandler {
when(message.getData()).thenReturn(ByteBufAllocator.DEFAULT
.buffer(bytes.length, bytes.length).writeBytes(byteArrayOutputStream.toByteArray()));
when(pulsarSqlSchemaInfoProvider.getSchemaByVersion(any()))
- .thenReturn(completedFuture(StructSchema.parseSchemaInfo(SchemaDefinition.builder()
+ .thenReturn(completedFuture(parseSchemaInfo(SchemaDefinition.builder()
.withPojo(Foo1.class).build(), SchemaType.AVRO)));
Object object = ((GenericAvroRecord)avroSchemaHandler.deserialize(message.getData(),