You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/07/29 02:24:39 UTC
[pulsar] branch master updated: Improve SchemaInfoProvider to fetch
schema info asynchronously (#4836)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 91c4254 Improve SchemaInfoProvider to fetch schema info asynchronously (#4836)
91c4254 is described below
commit 91c4254cae3be91774220c9fea145d3e433b6a01
Author: Sijie Guo <si...@apache.org>
AuthorDate: Mon Jul 29 10:24:32 2019 +0800
Improve SchemaInfoProvider to fetch schema info asynchronously (#4836)
*Motivation*
Currently fetching schema information is done synchronously.
It is called in netty callback threads and will potentially block
async operations.
*Modifications*
Make most of the operations asynchronously in SchemaInfoProvider.
---
.../client/api/schema/SchemaInfoProvider.java | 5 +-
.../client/impl/MultiTopicsConsumerImpl.java | 18 +--
.../pulsar/client/impl/PulsarClientImpl.java | 58 ++++----
.../pulsar/client/impl/schema/AvroSchema.java | 2 +-
.../pulsar/client/impl/schema/KeyValueSchema.java | 31 +++--
.../pulsar/client/impl/schema/StructSchema.java | 21 +++
.../impl/schema/generic/GenericAvroSchema.java | 2 +-
.../impl/schema/generic/GenericJsonSchema.java | 2 +-
.../generic/MultiVersionSchemaInfoProvider.java | 58 ++++----
.../apache/pulsar/client/impl/MessageImplTest.java | 13 +-
.../schema/SupportVersioningAvroSchemaTest.java | 3 +-
.../SupportVersioningKeyValueSchemaTest.java | 5 +-
.../impl/schema/generic/GenericAvroSchemaTest.java | 3 +-
.../impl/schema/generic/GenericSchemaImplTest.java | 5 +-
.../MultiVersionSchemaInfoProviderTest.java | 4 +-
.../common/protocol/schema/BytesSchemaVersion.java | 147 ++++++++++++++++++++-
16 files changed, 279 insertions(+), 98 deletions(-)
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaInfoProvider.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaInfoProvider.java
index 81e4f44..ac8f5da 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaInfoProvider.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaInfoProvider.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api.schema;
+import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.common.schema.SchemaInfo;
/**
@@ -31,14 +32,14 @@ public interface SchemaInfoProvider {
* @param schemaVersion schema version
* @return schema info of the provided <tt>schemaVersion</tt>
*/
- SchemaInfo getSchemaByVersion(byte[] schemaVersion);
+ CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion);
/**
* Retrieve the latest schema info.
*
* @return the latest schema
*/
- SchemaInfo getLatestSchema();
+ CompletableFuture<SchemaInfo> getLatestSchema();
/**
* Retrieve the topic name.
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index e50e8bc..3521eb4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -730,19 +730,21 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
}
private void subscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int numPartitions) {
+ client.preProcessSchemaBeforeSubscribe(client, schema, topicName).whenComplete((ignored, cause) -> {
+ if (null == cause) {
+ doSubscribeTopicPartitions(subscribeResult, topicName, numPartitions);
+ } else {
+ subscribeResult.completeExceptionally(cause);
+ }
+ });
+ }
+
+ private void doSubscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int numPartitions) {
if (log.isDebugEnabled()) {
log.debug("Subscribe to topic {} metadata.partitions: {}", topicName, numPartitions);
}
List<CompletableFuture<Consumer<T>>> futureList;
-
- try {
- client.preProcessSchemaBeforeSubscribe(client, schema, topicName);
- } catch (Throwable t) {
- subscribeResult.completeExceptionally(t);
- return;
- }
-
if (numPartitions > 1) {
this.topics.putIfAbsent(topicName, numPartitions);
allTopicPartitionsNumber.addAndGet(numPartitions);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index bceaf2c..134bb77 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -328,12 +328,8 @@ public class PulsarClientImpl implements PulsarClient {
}
private <T> CompletableFuture<Consumer<T>> singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
- try {
- preProcessSchemaBeforeSubscribe(this, schema, conf.getSingleTopic());
- } catch (Throwable t) {
- return FutureUtil.failedFuture(t);
- }
- return doSingleTopicSubscribeAsync(conf, schema, interceptors);
+ return preProcessSchemaBeforeSubscribe(this, schema, conf.getSingleTopic())
+ .thenCompose(ignored -> doSingleTopicSubscribeAsync(conf, schema, interceptors));
}
private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
@@ -444,13 +440,10 @@ public class PulsarClientImpl implements PulsarClient {
}
public <T> CompletableFuture<Reader<T>> createReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) {
- try {
- preProcessSchemaBeforeSubscribe(this, schema, conf.getTopicName());
- } catch (Throwable t) {
- return FutureUtil.failedFuture(t);
- }
- return doCreateReaderAsync(conf, schema);
+ return preProcessSchemaBeforeSubscribe(this, schema, conf.getTopicName())
+ .thenCompose(ignored -> doCreateReaderAsync(conf, schema));
}
+
<T> CompletableFuture<Reader<T>> doCreateReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) {
if (state.get() != State.Open) {
return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
@@ -734,33 +727,40 @@ public class PulsarClientImpl implements PulsarClient {
return schemaProviderLoadingCache;
}
- protected void preProcessSchemaBeforeSubscribe(PulsarClientImpl pulsarClientImpl, Schema schema, String topicName) throws Throwable {
+ protected CompletableFuture<Void> preProcessSchemaBeforeSubscribe(PulsarClientImpl pulsarClientImpl,
+ Schema schema,
+ String topicName) {
if (schema != null && schema.supportSchemaVersioning()) {
- SchemaInfoProvider schemaInfoProvider = null;
+ final SchemaInfoProvider schemaInfoProvider;
try {
schemaInfoProvider = pulsarClientImpl.getSchemaProviderLoadingCache().get(topicName);
} catch (ExecutionException e) {
log.error("Failed to load schema info provider for topic {}", topicName, e);
- throw e.getCause();
+ return FutureUtil.failedFuture(e.getCause());
}
if (schema instanceof AutoConsumeSchema) {
- SchemaInfo schemaInfo = schemaInfoProvider.getLatestSchema();
- if (schemaInfo.getType() != SchemaType.AVRO && schemaInfo.getType() != SchemaType.JSON){
- throw new RuntimeException("Currently schema detection only works for topics with avro schemas");
- }
-
- // when using `AutoConsumeSchema`, we use the schema associated with the messages as schema reader
- // to decode the messages.
- GenericSchema genericSchema = GenericSchemaImpl.of(
- schemaInfoProvider.getLatestSchema(), false /*useProvidedSchemaAsReaderSchema*/);
- log.info("Auto detected schema for topic {} : {}",
- topicName, schemaInfo.getSchemaDefinition());
- ((AutoConsumeSchema) schema).setSchema(genericSchema);
+ return schemaInfoProvider.getLatestSchema().thenCompose(schemaInfo -> {
+ if (schemaInfo.getType() != SchemaType.AVRO && schemaInfo.getType() != SchemaType.JSON){
+ return FutureUtil.failedFuture(
+ new RuntimeException("Currently schema detection only works"
+ + " for topics with avro or json schemas"));
+ }
+
+ // when using `AutoConsumeSchema`, we use the schema associated with the messages as schema reader
+ // to decode the messages.
+ GenericSchema genericSchema = GenericSchemaImpl.of(schemaInfo, false /*useProvidedSchemaAsReaderSchema*/);
+ log.info("Auto detected schema for topic {} : {}",
+ topicName, schemaInfo.getSchemaDefinition());
+ ((AutoConsumeSchema) schema).setSchema(genericSchema);
+ schema.setSchemaInfoProvider(schemaInfoProvider);
+ return CompletableFuture.completedFuture(null);
+ });
+ } else {
+ schema.setSchemaInfoProvider(schemaInfoProvider);
}
- schema.setSchemaInfoProvider(schemaInfoProvider);
}
-
+ return CompletableFuture.completedFuture(null);
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index 2944e14..4f4ae5a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -100,7 +100,7 @@ public class AvroSchema<T> extends StructSchema<T> {
@Override
protected SchemaReader<T> loadReader(byte[] schemaVersion) {
- SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
+ SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion);
if (schemaInfo != null) {
log.info("Load schema reader for version({}), schema is : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion),
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
index 7eb4757..fd7dcaa 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl.schema;
import static com.google.common.base.Preconditions.checkArgument;
+import java.util.concurrent.CompletableFuture;
import lombok.Getter;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
@@ -103,14 +104,15 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
if (keySchema instanceof StructSchema) {
keySchema.setSchemaInfoProvider(new SchemaInfoProvider() {
@Override
- public SchemaInfo getSchemaByVersion(byte[] schemaVersion) {
- SchemaInfo versionSchemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
- return KeyValueSchemaInfo.decodeKeyValueSchemaInfo(versionSchemaInfo).getKey();
+ public CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion) {
+ return schemaInfoProvider.getSchemaByVersion(schemaVersion)
+ .thenApply(si -> KeyValueSchemaInfo.decodeKeyValueSchemaInfo(si).getKey());
}
@Override
- public SchemaInfo getLatestSchema() {
- return ((StructSchema<K>) keySchema).schemaInfo;
+ public CompletableFuture<SchemaInfo> getLatestSchema() {
+ return CompletableFuture.completedFuture(
+ ((StructSchema<K>) keySchema).schemaInfo);
}
@Override
@@ -123,14 +125,15 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
if (valueSchema instanceof StructSchema) {
valueSchema.setSchemaInfoProvider(new SchemaInfoProvider() {
@Override
- public SchemaInfo getSchemaByVersion(byte[] schemaVersion) {
- SchemaInfo versionSchemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
- return KeyValueSchemaInfo.decodeKeyValueSchemaInfo(versionSchemaInfo).getValue();
+ public CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion) {
+ return schemaInfoProvider.getSchemaByVersion(schemaVersion)
+ .thenApply(si -> KeyValueSchemaInfo.decodeKeyValueSchemaInfo(si).getValue());
}
@Override
- public SchemaInfo getLatestSchema() {
- return ((StructSchema<V>) valueSchema).schemaInfo;
+ public CompletableFuture<SchemaInfo> getLatestSchema() {
+ return CompletableFuture.completedFuture(
+ ((StructSchema<V>) valueSchema).schemaInfo);
}
@Override
@@ -142,13 +145,13 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
this.schemaInfoProvider = new SchemaInfoProvider() {
@Override
- public SchemaInfo getSchemaByVersion(byte[] schemaVersion) {
- return schemaInfo;
+ public CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion) {
+ return CompletableFuture.completedFuture(schemaInfo);
}
@Override
- public SchemaInfo getLatestSchema() {
- return schemaInfo;
+ public CompletableFuture<SchemaInfo> getLatestSchema() {
+ return CompletableFuture.completedFuture(schemaInfo);
}
@Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
index dd618ae..4ec9fc8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
@@ -29,6 +29,7 @@ import com.google.common.cache.LoadingCache;
import org.apache.avro.Schema.Parser;
import org.apache.avro.reflect.ReflectData;
import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
@@ -139,6 +140,26 @@ public abstract class StructSchema<T> implements Schema<T> {
*/
protected abstract SchemaReader<T> loadReader(byte[] schemaVersion);
+ /**
+ * TODO: think about how to make this async
+ */
+ protected SchemaInfo getSchemaInfoByVersion(byte[] schemaVersion) {
+ try {
+ return schemaInfoProvider.getSchemaByVersion(schemaVersion).get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SerializationException(
+ "Interrupted at fetching schema info for " + SchemaUtils.getStringSchemaVersion(schemaVersion),
+ e
+ );
+ } catch (ExecutionException e) {
+ throw new SerializationException(
+ "Failed at fetching schema info for " + SchemaUtils.getStringSchemaVersion(schemaVersion),
+ e.getCause()
+ );
+ }
+ }
+
protected void setWriter(SchemaWriter<T> writer) {
this.writer = writer;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
index 2fa6829..c295a3b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
@@ -55,7 +55,7 @@ public class GenericAvroSchema extends GenericSchemaImpl {
@Override
protected SchemaReader<GenericRecord> loadReader(byte[] schemaVersion) {
- SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
+ SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion);
if (schemaInfo != null) {
log.info("Load schema reader for version({}), schema is : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion),
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
index 3b82591..307edd5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
@@ -48,7 +48,7 @@ class GenericJsonSchema extends GenericSchemaImpl {
@Override
protected SchemaReader<GenericRecord> loadReader(byte[] schemaVersion) {
- SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
+ SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion);
if (schemaInfo != null) {
log.info("Load schema reader for version({}), schema is : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion),
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java
index 12e8956..4c430be 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java
@@ -21,15 +21,17 @@ package org.apache.pulsar.client.impl.schema.generic;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
-import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -43,13 +45,21 @@ public class MultiVersionSchemaInfoProvider implements SchemaInfoProvider {
private final TopicName topicName;
private final PulsarClientImpl pulsarClient;
- private final LoadingCache<byte[], SchemaInfo> cache = CacheBuilder.newBuilder().maximumSize(100000)
- .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<byte[], SchemaInfo>() {
- @Override
- public SchemaInfo load(byte[] schemaVersion) throws Exception {
- return loadSchema(schemaVersion);
- }
- });
+ private final LoadingCache<BytesSchemaVersion, CompletableFuture<SchemaInfo>> cache = CacheBuilder.newBuilder()
+ .maximumSize(100000)
+ .expireAfterAccess(30, TimeUnit.MINUTES)
+ .build(new CacheLoader<BytesSchemaVersion, CompletableFuture<SchemaInfo>>() {
+ @Override
+ public CompletableFuture<SchemaInfo> load(BytesSchemaVersion schemaVersion) {
+ CompletableFuture<SchemaInfo> siFuture = loadSchema(schemaVersion.get());
+ siFuture.whenComplete((si, cause) -> {
+ if (null != cause) {
+ cache.asMap().remove(schemaVersion, siFuture);
+ }
+ });
+ return siFuture;
+ }
+ });
public MultiVersionSchemaInfoProvider(TopicName topicName, PulsarClientImpl pulsarClient) {
this.topicName = topicName;
@@ -57,30 +67,24 @@ public class MultiVersionSchemaInfoProvider implements SchemaInfoProvider {
}
@Override
- public SchemaInfo getSchemaByVersion(byte[] schemaVersion) {
+ public CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion) {
try {
if (null == schemaVersion) {
- return null;
+ return CompletableFuture.completedFuture(null);
}
- return cache.get(schemaVersion);
+ return cache.get(BytesSchemaVersion.of(schemaVersion));
} catch (ExecutionException e) {
- LOG.error("Can't get generic schema for topic {} schema version {}",
+ LOG.error("Can't get schema for topic {} schema version {}",
topicName.toString(), new String(schemaVersion, StandardCharsets.UTF_8), e);
- throw new RuntimeException("Can't get generic schema for topic " + topicName.toString());
+ return FutureUtil.failedFuture(e.getCause());
}
}
@Override
- public SchemaInfo getLatestSchema() {
- try {
- Optional<SchemaInfo> optional = pulsarClient.getLookup()
- .getSchema(topicName).get();
- return optional.orElse(null);
- } catch (ExecutionException | InterruptedException e) {
- LOG.error("Can't get current schema for topic {}",
- topicName.toString(), e);
- throw new RuntimeException("Can't get current schema for topic " + topicName.toString());
- }
+ public CompletableFuture<SchemaInfo> getLatestSchema() {
+ return pulsarClient.getLookup()
+ .getSchema(topicName)
+ .thenApply(o -> o.orElse(null));
}
@Override
@@ -88,10 +92,10 @@ public class MultiVersionSchemaInfoProvider implements SchemaInfoProvider {
return topicName.getLocalName();
}
- private SchemaInfo loadSchema(byte[] schemaVersion) throws ExecutionException, InterruptedException {
- Optional<SchemaInfo> optional = pulsarClient.getLookup()
- .getSchema(topicName, schemaVersion).get();
- return optional.orElse(null);
+ private CompletableFuture<SchemaInfo> loadSchema(byte[] schemaVersion) {
+ return pulsarClient.getLookup()
+ .getSchema(topicName, schemaVersion)
+ .thenApply(o -> o.orElse(null));
}
public PulsarClientImpl getPulsarClient() {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
index e253ace..dd49445 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
import java.nio.ByteBuffer;
import java.util.Base64;
+import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
@@ -184,7 +185,7 @@ public class MessageImplTest {
Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema);
keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
- .thenReturn(keyValueSchema.getSchemaInfo());
+ .thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
foo.setField1("field1");
@@ -221,7 +222,7 @@ public class MessageImplTest {
fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
- .thenReturn(keyValueSchema.getSchemaInfo());
+ .thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
foo.setField1("field1");
@@ -259,7 +260,7 @@ public class MessageImplTest {
Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema);
keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
- .thenReturn(keyValueSchema.getSchemaInfo());
+ .thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
foo.setField1("field1");
@@ -296,7 +297,7 @@ public class MessageImplTest {
fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
- .thenReturn(keyValueSchema.getSchemaInfo());
+ .thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
foo.setField1("field1");
@@ -334,7 +335,7 @@ public class MessageImplTest {
Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema);
keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
- .thenReturn(keyValueSchema.getSchemaInfo());
+ .thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
foo.setField1("field1");
@@ -371,7 +372,7 @@ public class MessageImplTest {
fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
- .thenReturn(keyValueSchema.getSchemaInfo());
+ .thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
foo.setField1("field1");
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningAvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningAvroSchemaTest.java
index 1b3ef52..21c8a0b 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningAvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningAvroSchemaTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl.schema;
+import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
@@ -56,7 +57,7 @@ public class SupportVersioningAvroSchemaTest {
@Test
public void testDecode() {
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
- .thenReturn(genericAvroSchema.getSchemaInfo());
+ .thenReturn(CompletableFuture.completedFuture(genericAvroSchema.getSchemaInfo()));
SchemaTestUtils.FooV2 fooV2 = new SchemaTestUtils.FooV2();
fooV2.setField1(SchemaTestUtils.TEST_MULTI_VERSION_SCHEMA_STRING);
SchemaTestUtils.Foo foo = (SchemaTestUtils.Foo)schema.decode(avroFooV2Schema.encode(fooV2), new byte[10]);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java
index ebcc00d..40f879e 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl.schema;
+import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
@@ -44,7 +45,7 @@ public class SupportVersioningKeyValueSchemaTest {
keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
- .thenReturn(keyValueSchema.getSchemaInfo());
+ .thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
bar.setField1(true);
@@ -82,7 +83,7 @@ public class SupportVersioningKeyValueSchemaTest {
keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
- .thenReturn(keyValueSchema.getSchemaInfo());
+ .thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
bar.setField1(true);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchemaTest.java
index 814975d..0752291 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchemaTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl.schema.generic;
+import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.AvroSchema;
@@ -67,7 +68,7 @@ public class GenericAvroSchemaTest {
MultiVersionSchemaInfoProvider provider = mock(MultiVersionSchemaInfoProvider.class);
readerSchema.setSchemaInfoProvider(provider);
when(provider.getSchemaByVersion(any(byte[].class)))
- .thenReturn(writerSchema.getSchemaInfo());
+ .thenReturn(CompletableFuture.completedFuture(writerSchema.getSchemaInfo()));
GenericRecord dataForWriter = writerSchema.newRecordBuilder()
.set("field1", SchemaTestUtils.TEST_MULTI_VERSION_SCHEMA_STRING)
.set("field3", 0)
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
index 74133f4..24acbea 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
+import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -63,7 +64,7 @@ public class GenericSchemaImplTest {
genericSchema.setSchemaInfoProvider(multiVersionGenericSchemaProvider);
decodeSchema.setSchema(genericSchema);
when(multiVersionGenericSchemaProvider.getSchemaByVersion(any(byte[].class)))
- .thenReturn(genericSchema.getSchemaInfo());
+ .thenReturn(CompletableFuture.completedFuture(genericSchema.getSchemaInfo()));
testAUTOEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
}
@@ -78,7 +79,7 @@ public class GenericSchemaImplTest {
decodeSchema.setSchema(genericSchema);
GenericSchema genericAvroSchema = GenericSchemaImpl.of(Schema.AVRO(Foo.class).getSchemaInfo());
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
- .thenReturn(genericAvroSchema.getSchemaInfo());
+ .thenReturn(CompletableFuture.completedFuture(genericAvroSchema.getSchemaInfo()));
testAUTOEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
index 2e101b1..c81ff14 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
@@ -52,7 +52,7 @@ public class MultiVersionSchemaInfoProviderTest {
}
@Test
- public void testGetSchema() {
+ public void testGetSchema() throws Exception {
CompletableFuture<Optional<SchemaInfo>> completableFuture = new CompletableFuture<>();
SchemaInfo schemaInfo = AvroSchema.of(SchemaDefinition.<SchemaTestUtils>builder().withPojo(SchemaTestUtils.class).build()).getSchemaInfo();
completableFuture.complete(Optional.of(schemaInfo));
@@ -61,7 +61,7 @@ public class MultiVersionSchemaInfoProviderTest {
any(TopicName.class),
any(byte[].class)))
.thenReturn(completableFuture);
- SchemaInfo schemaInfoByVersion = schemaProvider.getSchemaByVersion(new byte[0]);
+ SchemaInfo schemaInfoByVersion = schemaProvider.getSchemaByVersion(new byte[0]).get();
assertEquals(schemaInfoByVersion, schemaInfo);
}
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/BytesSchemaVersion.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/BytesSchemaVersion.java
index 5ceacbd..4b3ba78 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/BytesSchemaVersion.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/BytesSchemaVersion.java
@@ -18,12 +18,22 @@
*/
package org.apache.pulsar.common.protocol.schema;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Comparator;
+
/**
* Bytes schema version
*/
-public class BytesSchemaVersion implements SchemaVersion {
+public class BytesSchemaVersion implements SchemaVersion, Comparable<BytesSchemaVersion> {
+
+ private static final char[] HEX_CHARS_UPPER = {
+ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'
+ };
private final byte[] bytes;
+ // cache the hash code for the string, default to 0
+ private int hashCode;
private BytesSchemaVersion(byte[] bytes) {
this.bytes = bytes;
@@ -37,4 +47,139 @@ public class BytesSchemaVersion implements SchemaVersion {
public static BytesSchemaVersion of(byte[] bytes) {
return bytes != null ? new BytesSchemaVersion(bytes) : null;
}
+
+ /**
+ * Get the data from the Bytes.
+ * @return The underlying byte array
+ */
+ public byte[] get() {
+ return this.bytes;
+ }
+
+ /**
+ * The hashcode is cached except for the case where it is computed as 0, in which
+ * case we compute the hashcode on every call.
+ *
+ * @return the hashcode
+ */
+ @Override
+ public int hashCode() {
+ if (hashCode == 0) {
+ hashCode = Arrays.hashCode(bytes);
+ }
+ return hashCode;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (other == null) {
+ return false;
+ }
+
+ // we intentionally use the function to compute hashcode here
+ if (this.hashCode() != other.hashCode()) {
+ return false;
+ }
+
+ if (other instanceof BytesSchemaVersion) {
+ return Arrays.equals(this.bytes, ((BytesSchemaVersion) other).get());
+ }
+
+ return false;
+ }
+
+ @Override
+ public int compareTo(BytesSchemaVersion that) {
+ return BYTES_LEXICO_COMPARATOR.compare(this.bytes, that.bytes);
+ }
+
+ @Override
+ public String toString() {
+ return BytesSchemaVersion.toString(bytes, 0, bytes.length);
+ }
+
+ /**
+ * Write a printable representation of a byte array. Non-printable
+ * characters are hex escaped in the format \\x%02X, eg:
+ * \x00 \x05 etc.
+ *
+ * This function is brought from org.apache.hadoop.hbase.util.Bytes
+ *
+ * @param b array to write out
+ * @param off offset to start at
+ * @param len length to write
+ * @return string output
+ */
+ private static String toString(final byte[] b, int off, int len) {
+ StringBuilder result = new StringBuilder();
+
+ if (b == null)
+ return result.toString();
+
+ // just in case we are passed a 'len' that is > buffer length...
+ if (off >= b.length)
+ return result.toString();
+
+ if (off + len > b.length)
+ len = b.length - off;
+
+ for (int i = off; i < off + len; ++i) {
+ int ch = b[i] & 0xFF;
+ if (ch >= ' ' && ch <= '~' && ch != '\\') {
+ result.append((char) ch);
+ } else {
+ result.append("\\x");
+ result.append(HEX_CHARS_UPPER[ch / 0x10]);
+ result.append(HEX_CHARS_UPPER[ch % 0x10]);
+ }
+ }
+ return result.toString();
+ }
+
+ /**
+ * A byte array comparator based on lexicograpic ordering.
+ */
+ public final static ByteArrayComparator BYTES_LEXICO_COMPARATOR = new LexicographicByteArrayComparator();
+
+ public interface ByteArrayComparator extends Comparator<byte[]>, Serializable {
+
+ int compare(final byte[] buffer1, int offset1, int length1,
+ final byte[] buffer2, int offset2, int length2);
+ }
+
+ private static class LexicographicByteArrayComparator implements ByteArrayComparator {
+
+ private static final long serialVersionUID = -1915703761143534937L;
+
+ @Override
+ public int compare(byte[] buffer1, byte[] buffer2) {
+ return compare(buffer1, 0, buffer1.length, buffer2, 0, buffer2.length);
+ }
+
+ public int compare(final byte[] buffer1, int offset1, int length1,
+ final byte[] buffer2, int offset2, int length2) {
+
+ // short circuit equal case
+ if (buffer1 == buffer2 &&
+ offset1 == offset2 &&
+ length1 == length2) {
+ return 0;
+ }
+
+ // similar to Arrays.compare() but considers offset and length
+ int end1 = offset1 + length1;
+ int end2 = offset2 + length2;
+ for (int i = offset1, j = offset2; i < end1 && j < end2; i++, j++) {
+ int a = buffer1[i] & 0xff;
+ int b = buffer2[j] & 0xff;
+ if (a != b) {
+ return a - b;
+ }
+ }
+ return length1 - length2;
+ }
+ }
}