You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/08/16 02:46:48 UTC
[pulsar] branch master updated: Fix StructSchema reader cache
loading logic
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0367f5f Fix StructSchema reader cache loading logic
0367f5f is described below
commit 0367f5f0a236b26cfec74b272ca44a4c77eceda1
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Fri Aug 16 10:46:42 2019 +0800
Fix StructSchema reader cache loading logic
### Motivation
StructSchema LoadingCache for cache reader, but key is byte[], it will compare with the address's hashcode. So every decode will generate a new reader so we should change the type of the key for LoadingCache
---
.../java/org/apache/pulsar/client/impl/schema/AvroSchema.java | 9 +++++----
.../java/org/apache/pulsar/client/impl/schema/JSONSchema.java | 3 ++-
.../org/apache/pulsar/client/impl/schema/ProtobufSchema.java | 3 ++-
.../org/apache/pulsar/client/impl/schema/StructSchema.java | 11 ++++++-----
.../pulsar/client/impl/schema/generic/GenericAvroSchema.java | 11 ++++++-----
.../pulsar/client/impl/schema/generic/GenericJsonSchema.java | 11 ++++++-----
6 files changed, 27 insertions(+), 21 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index 4f4ae5a..9eddee5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.impl.schema.reader.AvroReader;
import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.slf4j.Logger;
@@ -99,16 +100,16 @@ public class AvroSchema<T> extends StructSchema<T> {
}
@Override
- protected SchemaReader<T> loadReader(byte[] schemaVersion) {
- SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion);
+ protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
+ SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get());
if (schemaInfo != null) {
log.info("Load schema reader for version({}), schema is : {}",
- SchemaUtils.getStringSchemaVersion(schemaVersion),
+ SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
schemaInfo.getSchemaDefinition());
return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()), schema);
} else {
log.warn("No schema found for version({}), use latest schema : {}",
- SchemaUtils.getStringSchemaVersion(schemaVersion),
+ SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
this.schemaInfo.getSchemaDefinition());
return reader;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
index 04b249c..90a288e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.impl.schema.reader.JsonReader;
import org.apache.pulsar.client.impl.schema.writer.JsonWriter;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
@@ -58,7 +59,7 @@ public class JSONSchema<T> extends StructSchema<T> {
}
@Override
- protected SchemaReader<T> loadReader(byte[] schemaVersion) {
+ protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
throw new RuntimeException("JSONSchema don't support schema versioning");
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
index b40f2ef..23dde95 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.impl.schema.reader.ProtobufReader;
import org.apache.pulsar.client.impl.schema.writer.ProtobufWriter;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
@@ -94,7 +95,7 @@ public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> ex
}
@Override
- protected SchemaReader<T> loadReader(byte[] schemaVersion) {
+ protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
throw new RuntimeException("ProtobufSchema don't support schema versioning");
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
index 4ec9fc8..a6608f4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.api.schema.SchemaWriter;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.slf4j.Logger;
@@ -60,10 +61,10 @@ public abstract class StructSchema<T> implements Schema<T> {
protected SchemaReader<T> reader;
protected SchemaWriter<T> writer;
protected SchemaInfoProvider schemaInfoProvider;
- private final LoadingCache<byte[], SchemaReader<T>> readerCache = CacheBuilder.newBuilder().maximumSize(100000)
- .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<byte[], SchemaReader<T>>() {
+ private final LoadingCache<BytesSchemaVersion, SchemaReader<T>> readerCache = CacheBuilder.newBuilder().maximumSize(100000)
+ .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<BytesSchemaVersion, SchemaReader<T>>() {
@Override
- public SchemaReader<T> load(byte[] schemaVersion) {
+ public SchemaReader<T> load(BytesSchemaVersion schemaVersion) {
return loadReader(schemaVersion);
}
});
@@ -90,7 +91,7 @@ public abstract class StructSchema<T> implements Schema<T> {
@Override
public T decode(byte[] bytes, byte[] schemaVersion) {
try {
- return readerCache.get(schemaVersion).read(bytes);
+ return readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(bytes);
} catch (ExecutionException e) {
LOG.error("Can't get generic schema for topic {} schema version {}",
schemaInfoProvider.getTopicName(), Hex.encodeHexString(schemaVersion), e);
@@ -138,7 +139,7 @@ public abstract class StructSchema<T> implements Schema<T> {
* @param schemaVersion the provided schema version
* @return the schema reader for decoding messages encoded by the provided schema version.
*/
- protected abstract SchemaReader<T> loadReader(byte[] schemaVersion);
+ protected abstract SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion);
/**
* TODO: think about how to make this async
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
index fab70d4..91220b4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
@@ -24,6 +24,7 @@ import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.impl.schema.SchemaUtils;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
/**
@@ -54,21 +55,21 @@ public class GenericAvroSchema extends GenericSchemaImpl {
}
@Override
- protected SchemaReader<GenericRecord> loadReader(byte[] schemaVersion) {
- SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion);
+ protected SchemaReader<GenericRecord> loadReader(BytesSchemaVersion schemaVersion) {
+ SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get());
if (schemaInfo != null) {
log.info("Load schema reader for version({}), schema is : {}",
- SchemaUtils.getStringSchemaVersion(schemaVersion),
+ SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
schemaInfo);
Schema writerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition());
Schema readerSchema = useProvidedSchemaAsReaderSchema ? schema : writerSchema;
return new GenericAvroReader(
writerSchema,
readerSchema,
- schemaVersion);
+ schemaVersion.get());
} else {
log.warn("No schema found for version({}), use latest schema : {}",
- SchemaUtils.getStringSchemaVersion(schemaVersion),
+ SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
this.schemaInfo);
return reader;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
index 307edd5..7bde670 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.impl.schema.SchemaUtils;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
/**
@@ -47,11 +48,11 @@ class GenericJsonSchema extends GenericSchemaImpl {
}
@Override
- protected SchemaReader<GenericRecord> loadReader(byte[] schemaVersion) {
- SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion);
+ protected SchemaReader<GenericRecord> loadReader(BytesSchemaVersion schemaVersion) {
+ SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get());
if (schemaInfo != null) {
log.info("Load schema reader for version({}), schema is : {}",
- SchemaUtils.getStringSchemaVersion(schemaVersion),
+ SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
schemaInfo.getSchemaDefinition());
Schema readerSchema;
if (useProvidedSchemaAsReaderSchema) {
@@ -59,14 +60,14 @@ class GenericJsonSchema extends GenericSchemaImpl {
} else {
readerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition());
}
- return new GenericJsonReader(schemaVersion,
+ return new GenericJsonReader(schemaVersion.get(),
readerSchema.getFields()
.stream()
.map(f -> new Field(f.name(), f.pos()))
.collect(Collectors.toList()));
} else {
log.warn("No schema found for version({}), use latest schema : {}",
- SchemaUtils.getStringSchemaVersion(schemaVersion),
+ SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
this.schemaInfo.getSchemaDefinition());
return reader;
}