You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/07/29 02:24:39 UTC

[pulsar] branch master updated: Improve SchemaInfoProvider to fetch schema info asynchronously (#4836)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 91c4254  Improve SchemaInfoProvider to fetch schema info asynchronously (#4836)
91c4254 is described below

commit 91c4254cae3be91774220c9fea145d3e433b6a01
Author: Sijie Guo <si...@apache.org>
AuthorDate: Mon Jul 29 10:24:32 2019 +0800

    Improve SchemaInfoProvider to fetch schema info asynchronously (#4836)
    
    *Motivation*
    
    Currently fetching schema information is done synchronously.
    It is called in netty callback threads and will potentially block
    async operations.
    
    *Modifications*
    
    Make most of the operations asynchronously in SchemaInfoProvider.
---
 .../client/api/schema/SchemaInfoProvider.java      |   5 +-
 .../client/impl/MultiTopicsConsumerImpl.java       |  18 +--
 .../pulsar/client/impl/PulsarClientImpl.java       |  58 ++++----
 .../pulsar/client/impl/schema/AvroSchema.java      |   2 +-
 .../pulsar/client/impl/schema/KeyValueSchema.java  |  31 +++--
 .../pulsar/client/impl/schema/StructSchema.java    |  21 +++
 .../impl/schema/generic/GenericAvroSchema.java     |   2 +-
 .../impl/schema/generic/GenericJsonSchema.java     |   2 +-
 .../generic/MultiVersionSchemaInfoProvider.java    |  58 ++++----
 .../apache/pulsar/client/impl/MessageImplTest.java |  13 +-
 .../schema/SupportVersioningAvroSchemaTest.java    |   3 +-
 .../SupportVersioningKeyValueSchemaTest.java       |   5 +-
 .../impl/schema/generic/GenericAvroSchemaTest.java |   3 +-
 .../impl/schema/generic/GenericSchemaImplTest.java |   5 +-
 .../MultiVersionSchemaInfoProviderTest.java        |   4 +-
 .../common/protocol/schema/BytesSchemaVersion.java | 147 ++++++++++++++++++++-
 16 files changed, 279 insertions(+), 98 deletions(-)

diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaInfoProvider.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaInfoProvider.java
index 81e4f44..ac8f5da 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaInfoProvider.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaInfoProvider.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.api.schema;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
 /**
@@ -31,14 +32,14 @@ public interface SchemaInfoProvider {
      * @param schemaVersion schema version
      * @return schema info of the provided <tt>schemaVersion</tt>
      */
-    SchemaInfo getSchemaByVersion(byte[] schemaVersion);
+    CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion);
 
     /**
      * Retrieve the latest schema info.
      *
      * @return the latest schema
      */
-    SchemaInfo getLatestSchema();
+    CompletableFuture<SchemaInfo> getLatestSchema();
 
     /**
      * Retrieve the topic name.
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index e50e8bc..3521eb4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -730,19 +730,21 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     }
 
     private void subscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int numPartitions) {
+        client.preProcessSchemaBeforeSubscribe(client, schema, topicName).whenComplete((ignored, cause) -> {
+            if (null == cause) {
+                doSubscribeTopicPartitions(subscribeResult, topicName, numPartitions);
+            } else {
+                subscribeResult.completeExceptionally(cause);
+            }
+        });
+    }
+
+    private void doSubscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int numPartitions) {
         if (log.isDebugEnabled()) {
             log.debug("Subscribe to topic {} metadata.partitions: {}", topicName, numPartitions);
         }
 
         List<CompletableFuture<Consumer<T>>> futureList;
-
-        try {
-            client.preProcessSchemaBeforeSubscribe(client, schema, topicName);
-        } catch (Throwable t) {
-            subscribeResult.completeExceptionally(t);
-            return;
-        }
-
         if (numPartitions > 1) {
             this.topics.putIfAbsent(topicName, numPartitions);
             allTopicPartitionsNumber.addAndGet(numPartitions);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index bceaf2c..134bb77 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -328,12 +328,8 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     private <T> CompletableFuture<Consumer<T>> singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
-        try {
-            preProcessSchemaBeforeSubscribe(this, schema, conf.getSingleTopic());
-        } catch (Throwable t) {
-            return FutureUtil.failedFuture(t);
-        }
-        return doSingleTopicSubscribeAsync(conf, schema, interceptors);
+        return preProcessSchemaBeforeSubscribe(this, schema, conf.getSingleTopic())
+            .thenCompose(ignored -> doSingleTopicSubscribeAsync(conf, schema, interceptors));
     }
 
     private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
@@ -444,13 +440,10 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     public <T> CompletableFuture<Reader<T>> createReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) {
-        try {
-            preProcessSchemaBeforeSubscribe(this, schema, conf.getTopicName());
-        } catch (Throwable t) {
-            return FutureUtil.failedFuture(t);
-        }
-        return doCreateReaderAsync(conf, schema);
+        return preProcessSchemaBeforeSubscribe(this, schema, conf.getTopicName())
+            .thenCompose(ignored -> doCreateReaderAsync(conf, schema));
     }
+
     <T> CompletableFuture<Reader<T>> doCreateReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) {
         if (state.get() != State.Open) {
             return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
@@ -734,33 +727,40 @@ public class PulsarClientImpl implements PulsarClient {
         return schemaProviderLoadingCache;
     }
 
-    protected void preProcessSchemaBeforeSubscribe(PulsarClientImpl pulsarClientImpl, Schema schema, String topicName) throws Throwable {
+    protected CompletableFuture<Void> preProcessSchemaBeforeSubscribe(PulsarClientImpl pulsarClientImpl,
+                                                                      Schema schema,
+                                                                      String topicName) {
         if (schema != null && schema.supportSchemaVersioning()) {
-            SchemaInfoProvider schemaInfoProvider = null;
+            final SchemaInfoProvider schemaInfoProvider;
             try {
                 schemaInfoProvider = pulsarClientImpl.getSchemaProviderLoadingCache().get(topicName);
             } catch (ExecutionException e) {
                 log.error("Failed to load schema info provider for topic {}", topicName, e);
-                throw e.getCause();
+                return FutureUtil.failedFuture(e.getCause());
             }
 
             if (schema instanceof AutoConsumeSchema) {
-                SchemaInfo schemaInfo = schemaInfoProvider.getLatestSchema();
-                if (schemaInfo.getType() != SchemaType.AVRO && schemaInfo.getType() != SchemaType.JSON){
-                    throw new RuntimeException("Currently schema detection only works for topics with avro schemas");
-                }
-
-                // when using `AutoConsumeSchema`, we use the schema associated with the messages as schema reader
-                // to decode the messages.
-                GenericSchema genericSchema = GenericSchemaImpl.of(
-                    schemaInfoProvider.getLatestSchema(), false /*useProvidedSchemaAsReaderSchema*/);
-                log.info("Auto detected schema for topic {} : {}",
-                        topicName, schemaInfo.getSchemaDefinition());
-                ((AutoConsumeSchema) schema).setSchema(genericSchema);
+                return schemaInfoProvider.getLatestSchema().thenCompose(schemaInfo -> {
+                    if (schemaInfo.getType() != SchemaType.AVRO && schemaInfo.getType() != SchemaType.JSON){
+                        return FutureUtil.failedFuture(
+                            new RuntimeException("Currently schema detection only works"
+                                + " for topics with avro or json schemas"));
+                    }
+
+                    // when using `AutoConsumeSchema`, we use the schema associated with the messages as schema reader
+                    // to decode the messages.
+                    GenericSchema genericSchema = GenericSchemaImpl.of(schemaInfo, false /*useProvidedSchemaAsReaderSchema*/);
+                    log.info("Auto detected schema for topic {} : {}",
+                            topicName, schemaInfo.getSchemaDefinition());
+                    ((AutoConsumeSchema) schema).setSchema(genericSchema);
+                    schema.setSchemaInfoProvider(schemaInfoProvider);
+                    return CompletableFuture.completedFuture(null);
+                });
+            } else {
+                schema.setSchemaInfoProvider(schemaInfoProvider);
             }
-            schema.setSchemaInfoProvider(schemaInfoProvider);
         }
-
+        return CompletableFuture.completedFuture(null);
     }
 
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index 2944e14..4f4ae5a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -100,7 +100,7 @@ public class AvroSchema<T> extends StructSchema<T> {
 
     @Override
     protected SchemaReader<T> loadReader(byte[] schemaVersion) {
-        SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
+        SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion);
         if (schemaInfo != null) {
             log.info("Load schema reader for version({}), schema is : {}",
                 SchemaUtils.getStringSchemaVersion(schemaVersion),
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
index 7eb4757..fd7dcaa 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl.schema;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import java.util.concurrent.CompletableFuture;
 import lombok.Getter;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
@@ -103,14 +104,15 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
         if (keySchema instanceof StructSchema) {
             keySchema.setSchemaInfoProvider(new SchemaInfoProvider() {
                 @Override
-                public SchemaInfo getSchemaByVersion(byte[] schemaVersion) {
-                    SchemaInfo versionSchemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
-                    return KeyValueSchemaInfo.decodeKeyValueSchemaInfo(versionSchemaInfo).getKey();
+                public CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion) {
+                    return schemaInfoProvider.getSchemaByVersion(schemaVersion)
+                        .thenApply(si -> KeyValueSchemaInfo.decodeKeyValueSchemaInfo(si).getKey());
                 }
 
                 @Override
-                public SchemaInfo getLatestSchema() {
-                    return ((StructSchema<K>) keySchema).schemaInfo;
+                public CompletableFuture<SchemaInfo> getLatestSchema() {
+                    return CompletableFuture.completedFuture(
+                        ((StructSchema<K>) keySchema).schemaInfo);
                 }
 
                 @Override
@@ -123,14 +125,15 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
         if (valueSchema instanceof StructSchema) {
             valueSchema.setSchemaInfoProvider(new SchemaInfoProvider() {
                 @Override
-                public SchemaInfo getSchemaByVersion(byte[] schemaVersion) {
-                    SchemaInfo versionSchemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
-                    return KeyValueSchemaInfo.decodeKeyValueSchemaInfo(versionSchemaInfo).getValue();
+                public CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion) {
+                    return schemaInfoProvider.getSchemaByVersion(schemaVersion)
+                        .thenApply(si -> KeyValueSchemaInfo.decodeKeyValueSchemaInfo(si).getValue());
                 }
 
                 @Override
-                public SchemaInfo getLatestSchema() {
-                    return ((StructSchema<V>) valueSchema).schemaInfo;
+                public CompletableFuture<SchemaInfo> getLatestSchema() {
+                    return CompletableFuture.completedFuture(
+                        ((StructSchema<V>) valueSchema).schemaInfo);
                 }
 
                 @Override
@@ -142,13 +145,13 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
 
         this.schemaInfoProvider = new SchemaInfoProvider() {
             @Override
-            public SchemaInfo getSchemaByVersion(byte[] schemaVersion) {
-                return schemaInfo;
+            public CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion) {
+                return CompletableFuture.completedFuture(schemaInfo);
             }
 
             @Override
-            public SchemaInfo getLatestSchema() {
-                return schemaInfo;
+            public CompletableFuture<SchemaInfo> getLatestSchema() {
+                return CompletableFuture.completedFuture(schemaInfo);
             }
 
             @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
index dd618ae..4ec9fc8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
@@ -29,6 +29,7 @@ import com.google.common.cache.LoadingCache;
 import org.apache.avro.Schema.Parser;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
@@ -139,6 +140,26 @@ public abstract class StructSchema<T> implements Schema<T> {
      */
     protected abstract SchemaReader<T> loadReader(byte[] schemaVersion);
 
+    /**
+     * TODO: think about how to make this async
+     */
+    protected SchemaInfo getSchemaInfoByVersion(byte[] schemaVersion) {
+        try {
+            return schemaInfoProvider.getSchemaByVersion(schemaVersion).get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new SerializationException(
+                "Interrupted at fetching schema info for " + SchemaUtils.getStringSchemaVersion(schemaVersion),
+                e
+            );
+        } catch (ExecutionException e) {
+            throw new SerializationException(
+                "Failed at fetching schema info for " + SchemaUtils.getStringSchemaVersion(schemaVersion),
+                e.getCause()
+            );
+        }
+    }
+
     protected void setWriter(SchemaWriter<T> writer) {
         this.writer = writer;
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
index 2fa6829..c295a3b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
@@ -55,7 +55,7 @@ public class GenericAvroSchema extends GenericSchemaImpl {
 
     @Override
     protected SchemaReader<GenericRecord> loadReader(byte[] schemaVersion) {
-         SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
+         SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion);
          if (schemaInfo != null) {
              log.info("Load schema reader for version({}), schema is : {}",
                  SchemaUtils.getStringSchemaVersion(schemaVersion),
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
index 3b82591..307edd5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
@@ -48,7 +48,7 @@ class GenericJsonSchema extends GenericSchemaImpl {
 
     @Override
     protected SchemaReader<GenericRecord> loadReader(byte[] schemaVersion) {
-        SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
+        SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion);
         if (schemaInfo != null) {
             log.info("Load schema reader for version({}), schema is : {}",
                 SchemaUtils.getStringSchemaVersion(schemaVersion),
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java
index 12e8956..4c430be 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java
@@ -21,15 +21,17 @@ package org.apache.pulsar.client.impl.schema.generic;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.charset.StandardCharsets;
-import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -43,13 +45,21 @@ public class MultiVersionSchemaInfoProvider implements SchemaInfoProvider {
     private final TopicName topicName;
     private final PulsarClientImpl pulsarClient;
 
-    private final LoadingCache<byte[], SchemaInfo> cache = CacheBuilder.newBuilder().maximumSize(100000)
-            .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<byte[], SchemaInfo>() {
-                @Override
-                public SchemaInfo load(byte[] schemaVersion) throws Exception {
-                    return loadSchema(schemaVersion);
-                }
-            });
+    private final LoadingCache<BytesSchemaVersion, CompletableFuture<SchemaInfo>> cache = CacheBuilder.newBuilder()
+        .maximumSize(100000)
+        .expireAfterAccess(30, TimeUnit.MINUTES)
+        .build(new CacheLoader<BytesSchemaVersion, CompletableFuture<SchemaInfo>>() {
+            @Override
+            public CompletableFuture<SchemaInfo> load(BytesSchemaVersion schemaVersion) {
+                CompletableFuture<SchemaInfo> siFuture = loadSchema(schemaVersion.get());
+                siFuture.whenComplete((si, cause) -> {
+                    if (null != cause) {
+                        cache.asMap().remove(schemaVersion, siFuture);
+                    }
+                });
+                return siFuture;
+            }
+        });
 
     public MultiVersionSchemaInfoProvider(TopicName topicName, PulsarClientImpl pulsarClient) {
         this.topicName = topicName;
@@ -57,30 +67,24 @@ public class MultiVersionSchemaInfoProvider implements SchemaInfoProvider {
     }
 
     @Override
-    public SchemaInfo getSchemaByVersion(byte[] schemaVersion) {
+    public CompletableFuture<SchemaInfo> getSchemaByVersion(byte[] schemaVersion) {
         try {
             if (null == schemaVersion) {
-                return null;
+                return CompletableFuture.completedFuture(null);
             }
-            return cache.get(schemaVersion);
+            return cache.get(BytesSchemaVersion.of(schemaVersion));
         } catch (ExecutionException e) {
-            LOG.error("Can't get generic schema for topic {} schema version {}",
+            LOG.error("Can't get schema for topic {} schema version {}",
                     topicName.toString(), new String(schemaVersion, StandardCharsets.UTF_8), e);
-            throw new RuntimeException("Can't get generic schema for topic " + topicName.toString());
+            return FutureUtil.failedFuture(e.getCause());
         }
     }
 
     @Override
-    public SchemaInfo getLatestSchema() {
-        try {
-            Optional<SchemaInfo> optional = pulsarClient.getLookup()
-                    .getSchema(topicName).get();
-            return optional.orElse(null);
-        } catch (ExecutionException | InterruptedException e) {
-            LOG.error("Can't get current schema for topic {}",
-                    topicName.toString(), e);
-            throw new RuntimeException("Can't get current schema for topic " + topicName.toString());
-        }
+    public CompletableFuture<SchemaInfo> getLatestSchema() {
+        return pulsarClient.getLookup()
+            .getSchema(topicName)
+            .thenApply(o -> o.orElse(null));
     }
 
     @Override
@@ -88,10 +92,10 @@ public class MultiVersionSchemaInfoProvider implements SchemaInfoProvider {
         return topicName.getLocalName();
     }
 
-    private SchemaInfo loadSchema(byte[] schemaVersion) throws ExecutionException, InterruptedException {
-        Optional<SchemaInfo> optional = pulsarClient.getLookup()
-                .getSchema(topicName, schemaVersion).get();
-        return optional.orElse(null);
+    private CompletableFuture<SchemaInfo> loadSchema(byte[] schemaVersion) {
+         return pulsarClient.getLookup()
+                .getSchema(topicName, schemaVersion)
+                .thenApply(o -> o.orElse(null));
     }
 
     public PulsarClientImpl getPulsarClient() {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
index e253ace..dd49445 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl;
 import java.nio.ByteBuffer;
 import java.util.Base64;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
@@ -184,7 +185,7 @@ public class MessageImplTest {
         Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema);
         keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
         when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
-                .thenReturn(keyValueSchema.getSchemaInfo());
+                .thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
 
         SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
         foo.setField1("field1");
@@ -221,7 +222,7 @@ public class MessageImplTest {
                 fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
         keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
         when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
-                .thenReturn(keyValueSchema.getSchemaInfo());
+                .thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
 
         SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
         foo.setField1("field1");
@@ -259,7 +260,7 @@ public class MessageImplTest {
         Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema);
         keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
         when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
-                .thenReturn(keyValueSchema.getSchemaInfo());
+                .thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
 
         SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
         foo.setField1("field1");
@@ -296,7 +297,7 @@ public class MessageImplTest {
                 fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
         keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
         when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
-                .thenReturn(keyValueSchema.getSchemaInfo());
+                .thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
 
         SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
         foo.setField1("field1");
@@ -334,7 +335,7 @@ public class MessageImplTest {
         Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = Schema.KeyValue(fooSchema, barSchema);
         keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
         when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
-                .thenReturn(keyValueSchema.getSchemaInfo());
+                .thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
 
         SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
         foo.setField1("field1");
@@ -371,7 +372,7 @@ public class MessageImplTest {
                 fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
         keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
         when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
-                .thenReturn(keyValueSchema.getSchemaInfo());
+                .thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
 
         SchemaTestUtils.Foo foo = new SchemaTestUtils.Foo();
         foo.setField1("field1");
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningAvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningAvroSchemaTest.java
index 1b3ef52..21c8a0b 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningAvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningAvroSchemaTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
 import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
@@ -56,7 +57,7 @@ public class SupportVersioningAvroSchemaTest {
     @Test
     public void testDecode() {
         when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
-                .thenReturn(genericAvroSchema.getSchemaInfo());
+                .thenReturn(CompletableFuture.completedFuture(genericAvroSchema.getSchemaInfo()));
         SchemaTestUtils.FooV2 fooV2 = new SchemaTestUtils.FooV2();
         fooV2.setField1(SchemaTestUtils.TEST_MULTI_VERSION_SCHEMA_STRING);
         SchemaTestUtils.Foo foo = (SchemaTestUtils.Foo)schema.decode(avroFooV2Schema.encode(fooV2), new byte[10]);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java
index ebcc00d..40f879e 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider;
@@ -44,7 +45,7 @@ public class SupportVersioningKeyValueSchemaTest {
         keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
 
         when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
-                .thenReturn(keyValueSchema.getSchemaInfo());
+                .thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
 
         SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
         bar.setField1(true);
@@ -82,7 +83,7 @@ public class SupportVersioningKeyValueSchemaTest {
         keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
 
         when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
-                .thenReturn(keyValueSchema.getSchemaInfo());
+                .thenReturn(CompletableFuture.completedFuture(keyValueSchema.getSchemaInfo()));
 
         SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
         bar.setField1(true);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchemaTest.java
index 814975d..0752291 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchemaTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl.schema.generic;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
@@ -67,7 +68,7 @@ public class GenericAvroSchemaTest {
         MultiVersionSchemaInfoProvider provider = mock(MultiVersionSchemaInfoProvider.class);
         readerSchema.setSchemaInfoProvider(provider);
         when(provider.getSchemaByVersion(any(byte[].class)))
-            .thenReturn(writerSchema.getSchemaInfo());
+            .thenReturn(CompletableFuture.completedFuture(writerSchema.getSchemaInfo()));
         GenericRecord dataForWriter = writerSchema.newRecordBuilder()
             .set("field1", SchemaTestUtils.TEST_MULTI_VERSION_SCHEMA_STRING)
             .set("field3", 0)
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
index 74133f4..24acbea 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
+import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -63,7 +64,7 @@ public class GenericSchemaImplTest {
         genericSchema.setSchemaInfoProvider(multiVersionGenericSchemaProvider);
         decodeSchema.setSchema(genericSchema);
         when(multiVersionGenericSchemaProvider.getSchemaByVersion(any(byte[].class)))
-                .thenReturn(genericSchema.getSchemaInfo());
+                .thenReturn(CompletableFuture.completedFuture(genericSchema.getSchemaInfo()));
 
         testAUTOEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
     }
@@ -78,7 +79,7 @@ public class GenericSchemaImplTest {
         decodeSchema.setSchema(genericSchema);
         GenericSchema genericAvroSchema = GenericSchemaImpl.of(Schema.AVRO(Foo.class).getSchemaInfo());
         when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
-                .thenReturn(genericAvroSchema.getSchemaInfo());
+                .thenReturn(CompletableFuture.completedFuture(genericAvroSchema.getSchemaInfo()));
         testAUTOEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
     }
 
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
index 2e101b1..c81ff14 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProviderTest.java
@@ -52,7 +52,7 @@ public class MultiVersionSchemaInfoProviderTest {
     }
 
     @Test
-    public void testGetSchema() {
+    public void testGetSchema() throws Exception {
         CompletableFuture<Optional<SchemaInfo>> completableFuture = new CompletableFuture<>();
         SchemaInfo schemaInfo = AvroSchema.of(SchemaDefinition.<SchemaTestUtils>builder().withPojo(SchemaTestUtils.class).build()).getSchemaInfo();
         completableFuture.complete(Optional.of(schemaInfo));
@@ -61,7 +61,7 @@ public class MultiVersionSchemaInfoProviderTest {
                         any(TopicName.class),
                         any(byte[].class)))
                 .thenReturn(completableFuture);
-        SchemaInfo schemaInfoByVersion = schemaProvider.getSchemaByVersion(new byte[0]);
+        SchemaInfo schemaInfoByVersion = schemaProvider.getSchemaByVersion(new byte[0]).get();
         assertEquals(schemaInfoByVersion, schemaInfo);
     }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/BytesSchemaVersion.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/BytesSchemaVersion.java
index 5ceacbd..4b3ba78 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/BytesSchemaVersion.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/BytesSchemaVersion.java
@@ -18,12 +18,22 @@
  */
 package org.apache.pulsar.common.protocol.schema;
 
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Comparator;
+
 /**
  * Bytes schema version
  */
-public class BytesSchemaVersion implements SchemaVersion {
+public class BytesSchemaVersion implements SchemaVersion, Comparable<BytesSchemaVersion> {
+
+    private static final char[] HEX_CHARS_UPPER = {
+        '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'
+    };
 
     private final byte[] bytes;
+    // cache the hash code for the string, default to 0
+    private int hashCode;
 
     private BytesSchemaVersion(byte[] bytes) {
         this.bytes = bytes;
@@ -37,4 +47,139 @@ public class BytesSchemaVersion implements SchemaVersion {
     public static BytesSchemaVersion of(byte[] bytes) {
         return bytes != null ? new BytesSchemaVersion(bytes) : null;
     }
+
+    /**
+     * Get the data from the Bytes.
+     * @return The underlying byte array
+     */
+    public byte[] get() {
+        return this.bytes;
+    }
+
+    /**
+     * The hashcode is cached except for the case where it is computed as 0, in which
+     * case we compute the hashcode on every call.
+     *
+     * @return the hashcode
+     */
+    @Override
+    public int hashCode() {
+        if (hashCode == 0) {
+            hashCode = Arrays.hashCode(bytes);
+        }
+        return hashCode;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (other == null) {
+            return false;
+        }
+
+        // we intentionally use the function to compute hashcode here
+        if (this.hashCode() != other.hashCode()) {
+            return false;
+        }
+
+        if (other instanceof BytesSchemaVersion) {
+            return Arrays.equals(this.bytes, ((BytesSchemaVersion) other).get());
+        }
+
+        return false;
+    }
+
+    @Override
+    public int compareTo(BytesSchemaVersion that) {
+        return BYTES_LEXICO_COMPARATOR.compare(this.bytes, that.bytes);
+    }
+
+    @Override
+    public String toString() {
+        return BytesSchemaVersion.toString(bytes, 0, bytes.length);
+    }
+
+    /**
+     * Write a printable representation of a byte array. Non-printable
+     * characters are hex escaped in the format \\x%02X, eg:
+     * \x00 \x05 etc.
+     *
+     * This function is brought from org.apache.hadoop.hbase.util.Bytes
+     *
+     * @param b array to write out
+     * @param off offset to start at
+     * @param len length to write
+     * @return string output
+     */
+    private static String toString(final byte[] b, int off, int len) {
+        StringBuilder result = new StringBuilder();
+
+        if (b == null)
+            return result.toString();
+
+        // just in case we are passed a 'len' that is > buffer length...
+        if (off >= b.length)
+            return result.toString();
+
+        if (off + len > b.length)
+            len = b.length - off;
+
+        for (int i = off; i < off + len; ++i) {
+            int ch = b[i] & 0xFF;
+            if (ch >= ' ' && ch <= '~' && ch != '\\') {
+                result.append((char) ch);
+            } else {
+                result.append("\\x");
+                result.append(HEX_CHARS_UPPER[ch / 0x10]);
+                result.append(HEX_CHARS_UPPER[ch % 0x10]);
+            }
+        }
+        return result.toString();
+    }
+
+    /**
+     * A byte array comparator based on lexicograpic ordering.
+     */
+    public final static ByteArrayComparator BYTES_LEXICO_COMPARATOR = new LexicographicByteArrayComparator();
+
+    public interface ByteArrayComparator extends Comparator<byte[]>, Serializable {
+
+        int compare(final byte[] buffer1, int offset1, int length1,
+                    final byte[] buffer2, int offset2, int length2);
+    }
+
+    private static class LexicographicByteArrayComparator implements ByteArrayComparator {
+
+        private static final long serialVersionUID = -1915703761143534937L;
+
+        @Override
+        public int compare(byte[] buffer1, byte[] buffer2) {
+            return compare(buffer1, 0, buffer1.length, buffer2, 0, buffer2.length);
+        }
+
+        public int compare(final byte[] buffer1, int offset1, int length1,
+                           final byte[] buffer2, int offset2, int length2) {
+
+            // short circuit equal case
+            if (buffer1 == buffer2 &&
+                    offset1 == offset2 &&
+                    length1 == length2) {
+                return 0;
+            }
+
+            // similar to Arrays.compare() but considers offset and length
+            int end1 = offset1 + length1;
+            int end2 = offset2 + length2;
+            for (int i = offset1, j = offset2; i < end1 && j < end2; i++, j++) {
+                int a = buffer1[i] & 0xff;
+                int b = buffer2[j] & 0xff;
+                if (a != b) {
+                    return a - b;
+                }
+            }
+            return length1 - length2;
+        }
+    }
 }