You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/03/08 22:35:11 UTC

[GitHub] merlimat closed pull request #1319: Schema registry (2/N)

merlimat closed pull request #1319: Schema registry (2/N)
URL: https://github.com/apache/incubator-pulsar/pull/1319
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index 081fcdb32..71371957e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -402,6 +402,20 @@ flexible messaging model and an intuitive client API.</description>
         <version>${log4j2.version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.apache.logging.log4j</groupId>
+        <artifactId>log4j-api</artifactId>
+        <type>test-jar</type>
+        <version>${log4j2.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.logging.log4j</groupId>
+        <artifactId>log4j-core</artifactId>
+        <type>test-jar</type>
+        <version>${log4j2.version}</version>
+      </dependency>
+
       <dependency>
         <groupId>org.apache.logging.log4j</groupId>
         <artifactId>log4j-api</artifactId>
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index bd4797023..cc2a40927 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -415,6 +415,8 @@
     @FieldContext(dynamic = true)
     private boolean preferLaterVersions = false;
 
+    private String schemaRegistryStorageClassName = "org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory";
+
     /**** --- WebSocket --- ****/
     // Number of IO threads in Pulsar Client used in WebSocket proxy
     private int webSocketNumIoThreads = Runtime.getRuntime().availableProcessors();
@@ -1449,7 +1451,15 @@ public boolean exposeTopicLevelMetricsInPrometheus() {
     public void setExposeTopicLevelMetricsInPrometheus(boolean exposeTopicLevelMetricsInPrometheus) {
         this.exposeTopicLevelMetricsInPrometheus = exposeTopicLevelMetricsInPrometheus;
     }
-    
+
+    public String getSchemaRegistryStorageClassName() {
+       return schemaRegistryStorageClassName;
+    }
+
+    public void setSchemaRegistryStorageClassName(String className) {
+        schemaRegistryStorageClassName = className;
+    }
+
     public boolean authenticateOriginalAuthData() {
         return authenticateOriginalAuthData;
     }
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index abead555d..8681cad71 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -54,6 +54,12 @@
       <artifactId>netty-all</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>${protobuf2.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-common</artifactId>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 6bbc1e62c..51014a7e1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -20,6 +20,9 @@
 
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.IOException;
 import java.net.URL;
 import java.util.List;
@@ -34,7 +37,6 @@
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
-
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
@@ -52,6 +54,7 @@
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.stats.MetricsGenerator;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
 import org.apache.pulsar.broker.web.WebService;
@@ -80,11 +83,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import io.netty.util.concurrent.DefaultThreadFactory;
-
 /**
  * Main class for Pulsar broker service
  */
@@ -123,6 +121,7 @@
     private final String brokerServiceUrl;
     private final String brokerServiceUrlTls;
     private final String brokerVersion;
+    private SchemaRegistryService schemaRegistryService = null;
     private final Optional<WorkerService> functionWorkerService;
 
     private final MessagingServiceShutdownHook shutdownService;
@@ -233,6 +232,10 @@ public void close() throws PulsarServerException {
                 loadManager.stop();
             }
 
+            if (schemaRegistryService != null) {
+                schemaRegistryService.close();
+            }
+
             state = State.Closed;
 
         } catch (Exception e) {
@@ -359,6 +362,8 @@ public synchronized void brokerIsAFollowerNow() {
 
             this.metricsGenerator = new MetricsGenerator(this);
 
+            schemaRegistryService = SchemaRegistryService.create(this);
+
             state = State.Started;
 
             acquireSLANamespace();
@@ -701,4 +706,8 @@ public String getBrokerServiceUrlTls() {
     public String getBrokerVersion() {
         return brokerVersion;
     }
+
+    public SchemaRegistryService getSchemaRegistryService() {
+        return schemaRegistryService;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 9268ac7af..35bc3151b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -23,12 +23,16 @@
 import static org.apache.pulsar.common.api.Commands.hasChecksum;
 import static org.apache.pulsar.common.api.Commands.readChecksum;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import io.netty.buffer.ByteBuf;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-
 import org.apache.bookkeeper.mledger.util.Rate;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
@@ -42,17 +46,11 @@
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats;
 import org.apache.pulsar.common.policies.data.PublisherStats;
+import org.apache.pulsar.common.schema.SchemaVersion;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
-
 /**
  * Represents a currently connected producer
  */
@@ -82,8 +80,10 @@
 
     private final Map<String, String> metadata;
 
+    private final SchemaVersion schemaVersion;
+
     public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName, String appId,
-        boolean isEncrypted, Map<String, String> metadata) {
+        boolean isEncrypted, Map<String, String> metadata, SchemaVersion schemaVersion) {
         this.topic = topic;
         this.cnx = cnx;
         this.producerId = producerId;
@@ -110,6 +110,7 @@ public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName
         this.remoteCluster = isRemote ? producerName.split("\\.")[2] : null;
 
         this.isEncrypted = isEncrypted;
+        this.schemaVersion = schemaVersion;
     }
 
     @Override
@@ -492,6 +493,10 @@ public void checkEncryption() {
         }
     }
 
+    public SchemaVersion getSchemaVersion() {
+        return schemaVersion;
+    }
+
     private static final Logger log = LoggerFactory.getLogger(Producer.class);
 
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index a386ece9a..e0ff275f6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -25,6 +25,12 @@
 import static org.apache.pulsar.common.api.Commands.newLookupErrorResponse;
 import static org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5;
 
+import com.google.protobuf.GeneratedMessageLite;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOption;
+import io.netty.handler.ssl.SslHandler;
 import java.net.SocketAddress;
 import java.util.List;
 import java.util.Map;
@@ -32,6 +38,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
 import org.apache.bookkeeper.mledger.Position;
@@ -52,6 +59,7 @@
 import org.apache.pulsar.common.api.CommandUtils;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarHandler;
+import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
@@ -74,24 +82,18 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.Metadata;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.schema.SchemaVersion;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Sets;
-import com.google.protobuf.GeneratedMessageLite;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOption;
-import io.netty.handler.ssl.SslHandler;
-
 public class ServerCnx extends PulsarHandler {
     private final BrokerService service;
     private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
@@ -697,6 +699,36 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
         });
     }
 
+    private static SchemaType getType(PulsarApi.Schema.Type protocolType) {
+        switch (protocolType) {
+            case Json:
+                return SchemaType.JSON;
+            case Avro:
+                return SchemaType.AVRO;
+            case Thrift:
+                return SchemaType.THRIFT;
+            case Protobuf:
+                return SchemaType.PROTOBUF;
+            default:
+                return SchemaType.NONE;
+        }
+    }
+
+    private SchemaData getSchema(PulsarApi.Schema protocolSchema) {
+        return SchemaData.builder()
+            .data(protocolSchema.getSchemaData().toByteArray())
+            .isDeleted(false)
+            .timestamp(System.currentTimeMillis())
+            .user(originalPrincipal)
+            .type(getType(protocolSchema.getType()))
+            .props(protocolSchema.getPropertiesList().stream().collect(
+                Collectors.toMap(
+                    PulsarApi.KeyValue::getKey,
+                    PulsarApi.KeyValue::getValue
+                )
+            )).build();
+    }
+
     @Override
     protected void handleProducer(final CommandProducer cmdProducer) {
         checkArgument(state == State.Connected);
@@ -752,7 +784,8 @@ protected void handleProducer(final CommandProducer cmdProducer) {
                                 Producer producer = existingProducerFuture.getNow(null);
                                 log.info("[{}] Producer with the same id is already created: {}", remoteAddress,
                                         producer);
-                                ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producer.getProducerName()));
+                                ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producer.getProducerName(),
+                                    producer.getSchemaVersion()));
                                 return null;
                             } else {
                                 // There was an early request to create a producer with
@@ -805,41 +838,56 @@ protected void handleProducer(final CommandProducer cmdProducer) {
 
                             disableTcpNoDelayIfNeeded(topicName.toString(), producerName);
 
-                            Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole,
-                                    isEncrypted, metadata);
+                            CompletableFuture<SchemaVersion> schemaVersionFuture;
+                            if (cmdProducer.hasSchema()) {
+                                schemaVersionFuture = topic.addSchema(getSchema(cmdProducer.getSchema()));
+                            } else {
+                                schemaVersionFuture = CompletableFuture.completedFuture(SchemaVersion.Empty);
+                            }
+
+                            schemaVersionFuture.exceptionally(exception -> {
+                                ctx.writeAndFlush(Commands.newError(requestId, ServerError.UnknownError, exception.getMessage()));
+                                producers.remove(producerId, producerFuture);
+                                return null;
+                            });
+
+                            schemaVersionFuture.thenAccept(schemaVersion -> {
+                                Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole,
+                                    isEncrypted, metadata, schemaVersion);
 
-                            try {
-                                topic.addProducer(producer);
+                                try {
+                                    topic.addProducer(producer);
 
-                                if (isActive()) {
-                                    if (producerFuture.complete(producer)) {
-                                        log.info("[{}] Created new producer: {}", remoteAddress, producer);
-                                        ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName,
-                                                producer.getLastSequenceId()));
-                                        return;
+                                    if (isActive()) {
+                                        if (producerFuture.complete(producer)) {
+                                            log.info("[{}] Created new producer: {}", remoteAddress, producer);
+                                            ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName,
+                                                producer.getLastSequenceId(), producer.getSchemaVersion()));
+                                            return;
+                                        } else {
+                                            // The producer's future was completed before by
+                                            // a close command
+                                            producer.closeNow();
+                                            log.info("[{}] Cleared producer created after timeout on client side {}",
+                                                remoteAddress, producer);
+                                        }
                                     } else {
-                                        // The producer's future was completed before by
-                                        // a close command
                                         producer.closeNow();
-                                        log.info("[{}] Cleared producer created after timeout on client side {}",
-                                                remoteAddress, producer);
-                                    }
-                                } else {
-                                    producer.closeNow();
-                                    log.info("[{}] Cleared producer created after connection was closed: {}",
+                                        log.info("[{}] Cleared producer created after connection was closed: {}",
                                             remoteAddress, producer);
-                                    producerFuture.completeExceptionally(
+                                        producerFuture.completeExceptionally(
                                             new IllegalStateException("Producer created after connection was closed"));
-                                }
-                            } catch (BrokerServiceException ise) {
-                                log.error("[{}] Failed to add producer to topic {}: {}", remoteAddress, topicName,
+                                    }
+                                } catch (BrokerServiceException ise) {
+                                    log.error("[{}] Failed to add producer to topic {}: {}", remoteAddress, topicName,
                                         ise.getMessage());
-                                ctx.writeAndFlush(Commands.newError(requestId,
+                                    ctx.writeAndFlush(Commands.newError(requestId,
                                         BrokerServiceException.getClientErrorCode(ise), ise.getMessage()));
-                                producerFuture.completeExceptionally(ise);
-                            }
+                                    producerFuture.completeExceptionally(ise);
+                                }
 
-                            producers.remove(producerId, producerFuture);
+                                producers.remove(producerId, producerFuture);
+                            });
                         }).exceptionally(exception -> {
                             Throwable cause = exception.getCause();
                             if (!(cause instanceof ServiceUnitNotReadyException)) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 80aed778b..be60116a4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -18,9 +18,9 @@
  */
 package org.apache.pulsar.broker.service;
 
+import io.netty.buffer.ByteBuf;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
@@ -30,13 +30,13 @@
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicStats;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaVersion;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.utils.StatsOutputStream;
 
-import io.netty.buffer.ByteBuf;
-
 public interface Topic {
 
     public interface PublishContext {
@@ -125,4 +125,6 @@ void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats
     PersistentTopicInternalStats getInternalStats();
 
     Position getLastMessageId();
+
+    CompletableFuture<SchemaVersion> addSchema(SchemaData schema);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index ef355f0ea..4a6de05e0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -22,6 +22,13 @@
 import static org.apache.bookkeeper.mledger.impl.EntryCacheManager.create;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 
+import com.carrotsearch.hppc.ObjectObjectHashMap;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import io.netty.buffer.ByteBuf;
+import io.netty.util.concurrent.FastThreadLocal;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -32,7 +39,6 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
@@ -70,6 +76,8 @@
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PublisherStats;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaVersion;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
@@ -79,15 +87,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.carrotsearch.hppc.ObjectObjectHashMap;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.util.concurrent.FastThreadLocal;
-
 public class NonPersistentTopic implements Topic {
     private final String topic;
 
@@ -973,7 +972,14 @@ public void markBatchMessagePublished() {
         this.hasBatchMessagePublished = true;
     }
 
-
-
     private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class);
+
+    @Override
+    public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
+        String base = TopicName.get(getName()).getPartitionedTopicName();
+        String id = TopicName.get(base).getSchemaName();
+        return brokerService.pulsar()
+            .getSchemaRegistryService()
+            .putSchemaIfAbsent(id, schema);
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 62017f0f6..cdca1cd8f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -92,6 +92,8 @@
 import org.apache.pulsar.common.policies.data.PublisherStats;
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaVersion;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -1601,4 +1603,13 @@ public Position getLastMessageId() {
     }
 
     private static final Logger log = LoggerFactory.getLogger(PersistentTopic.class);
+
+    @Override
+    public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
+        String base = TopicName.get(getName()).getPartitionedTopicName();
+        String id = TopicName.get(base).getSchemaName();
+        return brokerService.pulsar()
+            .getSchemaRegistryService()
+            .putSchemaIfAbsent(id, schema);
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
new file mode 100644
index 000000000..db3b9f7a9
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaVersion;
+
+public class DefaultSchemaRegistryService implements SchemaRegistryService {
+    @Override
+    public CompletableFuture<SchemaAndMetadata> getSchema(String schemaId) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<SchemaAndMetadata> getSchema(String schemaId, SchemaVersion version) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<SchemaVersion> deleteSchema(String schemaId, String user) {
+        return completedFuture(null);
+    }
+
+    @Override
+    public SchemaVersion versionFromBytes(byte[] version) {
+        return null;
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
new file mode 100644
index 000000000..4dfbd6d99
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistry.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import com.google.common.base.MoreObjects;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaVersion;
+
+public interface SchemaRegistry extends AutoCloseable {
+
+    CompletableFuture<SchemaAndMetadata> getSchema(String schemaId);
+
+    CompletableFuture<SchemaAndMetadata> getSchema(String schemaId, SchemaVersion version);
+
+    CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema);
+
+    CompletableFuture<SchemaVersion> deleteSchema(String schemaId, String user);
+
+    SchemaVersion versionFromBytes(byte[] version);
+
+    class SchemaAndMetadata {
+        public final String id;
+        public final SchemaData schema;
+        public final SchemaVersion version;
+
+        SchemaAndMetadata(String id, SchemaData schema, SchemaVersion version) {
+            this.id = id;
+            this.schema = schema;
+            this.version = version;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            SchemaAndMetadata that = (SchemaAndMetadata) o;
+            return version == that.version &&
+                Objects.equals(id, that.id) &&
+                Objects.equals(schema, that.schema);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(id, schema, version);
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this)
+                .add("id", id)
+                .add("schema", schema)
+                .add("version", version)
+                .toString();
+        }
+    }
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
new file mode 100644
index 000000000..69e736481
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import java.lang.reflect.Method;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public interface SchemaRegistryService extends SchemaRegistry {
+    String CreateMethodName = "create";
+    Logger log = LoggerFactory.getLogger(SchemaRegistryService.class);
+
+    static SchemaRegistryService create(PulsarService pulsar) {
+        try {
+            ServiceConfiguration config = pulsar.getConfiguration();
+            final Class<?> storageClass = Class.forName(config.getSchemaRegistryStorageClassName());
+            Object factoryInstance = storageClass.newInstance();
+            Method createMethod = storageClass.getMethod(CreateMethodName, PulsarService.class);
+            SchemaStorage schemaStorage = (SchemaStorage) createMethod.invoke(factoryInstance, pulsar);
+            return new SchemaRegistryServiceImpl(schemaStorage);
+        } catch (Exception e) {
+            log.warn("Error when trying to create scehema registry storage: {}", e);
+        }
+        return new DefaultSchemaRegistryService();
+    }
+
+    void close() throws Exception;
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
new file mode 100644
index 000000000..1aec03950
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import static java.util.Objects.isNull;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.Functions.toPairs;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import javax.validation.constraints.NotNull;
+import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.schema.SchemaVersion;
+
+public class SchemaRegistryServiceImpl implements SchemaRegistryService {
+    private final SchemaStorage schemaStorage;
+    private final Clock clock;
+
+    @VisibleForTesting
+    SchemaRegistryServiceImpl(SchemaStorage schemaStorage, Clock clock) {
+        this.schemaStorage = schemaStorage;
+        this.clock = clock;
+    }
+
+    @VisibleForTesting
+    SchemaRegistryServiceImpl(SchemaStorage schemaStorage) {
+        this(schemaStorage, Clock.systemUTC());
+    }
+
+    @Override
+    @NotNull
+    public CompletableFuture<SchemaAndMetadata> getSchema(String schemaId) {
+        return getSchema(schemaId, SchemaVersion.Latest);
+    }
+
+    @Override
+    @NotNull
+    public CompletableFuture<SchemaAndMetadata> getSchema(String schemaId, SchemaVersion version) {
+        return schemaStorage.get(schemaId, version).thenCompose(stored -> {
+                if (isNull(stored)) {
+                    return completedFuture(null);
+                } else {
+                    return Functions.bytesToSchemaInfo(stored.data)
+                        .thenApply(Functions::schemaInfoToSchema)
+                        .thenApply(schema -> new SchemaAndMetadata(schemaId, schema, stored.version));
+                }
+            }
+        );
+    }
+
+    @Override
+    @NotNull
+    public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData schema) {
+        SchemaRegistryFormat.SchemaInfo info = SchemaRegistryFormat.SchemaInfo.newBuilder()
+            .setType(Functions.convertFromDomainType(schema.getType()))
+            .setSchema(ByteString.copyFrom(schema.getData()))
+            .setSchemaId(schemaId)
+            .setUser(schema.getUser())
+            .setDeleted(false)
+            .setTimestamp(clock.millis())
+            .addAllProps(toPairs(schema.getProps()))
+            .build();
+        return schemaStorage.put(schemaId, info.toByteArray());
+    }
+
+    @Override
+    @NotNull
+    public CompletableFuture<SchemaVersion> deleteSchema(String schemaId, String user) {
+        byte[] deletedEntry = deleted(schemaId, user).toByteArray();
+        return schemaStorage.put(schemaId, deletedEntry);
+    }
+
+    @Override
+    public SchemaVersion versionFromBytes(byte[] version) {
+        return schemaStorage.versionFromBytes(version);
+    }
+
+    @Override
+    public void close() throws Exception {
+        schemaStorage.close();
+    }
+
+    private SchemaRegistryFormat.SchemaInfo deleted(String schemaId, String user) {
+        return SchemaRegistryFormat.SchemaInfo.newBuilder()
+            .setSchemaId(schemaId)
+            .setType(SchemaRegistryFormat.SchemaInfo.SchemaType.NONE)
+            .setSchema(ByteString.EMPTY)
+            .setUser(user)
+            .setDeleted(true)
+            .setTimestamp(clock.millis())
+            .build();
+    }
+
+    interface Functions {
+        static SchemaType convertToDomainType(SchemaRegistryFormat.SchemaInfo.SchemaType type) {
+            switch (type) {
+                case AVRO:
+                    return SchemaType.AVRO;
+                case JSON:
+                    return SchemaType.JSON;
+                case PROTO:
+                    return SchemaType.PROTOBUF;
+                case THRIFT:
+                    return SchemaType.THRIFT;
+                default:
+                    return SchemaType.NONE;
+            }
+        }
+
+        static SchemaRegistryFormat.SchemaInfo.SchemaType convertFromDomainType(SchemaType type) {
+            switch (type) {
+                case AVRO:
+                    return SchemaRegistryFormat.SchemaInfo.SchemaType.AVRO;
+                case JSON:
+                    return SchemaRegistryFormat.SchemaInfo.SchemaType.JSON;
+                case THRIFT:
+                    return SchemaRegistryFormat.SchemaInfo.SchemaType.THRIFT;
+                case PROTOBUF:
+                    return SchemaRegistryFormat.SchemaInfo.SchemaType.PROTO;
+                default:
+                    return SchemaRegistryFormat.SchemaInfo.SchemaType.NONE;
+            }
+        }
+
+        static Map<String, String> toMap(List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> pairs) {
+            Map<String, String> map = new HashMap<>();
+            for (SchemaRegistryFormat.SchemaInfo.KeyValuePair pair : pairs) {
+                map.put(pair.getKey(), pair.getValue());
+            }
+            return map;
+        }
+
+        static List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> toPairs(Map<String, String> map) {
+            List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> pairs = new ArrayList<>(map.size());
+            for (Map.Entry<String, String> entry : map.entrySet()) {
+                SchemaRegistryFormat.SchemaInfo.KeyValuePair.Builder builder =
+                    SchemaRegistryFormat.SchemaInfo.KeyValuePair.newBuilder();
+                pairs.add(builder.setKey(entry.getKey()).setValue(entry.getValue()).build());
+            }
+            return pairs;
+        }
+
+        static SchemaData schemaInfoToSchema(SchemaRegistryFormat.SchemaInfo info) {
+            return SchemaData.builder()
+                .user(info.getUser())
+                .type(convertToDomainType(info.getType()))
+                .data(info.getSchema().toByteArray())
+                .isDeleted(info.getDeleted())
+                .props(toMap(info.getPropsList()))
+                .build();
+        }
+
+        static CompletableFuture<SchemaRegistryFormat.SchemaInfo> bytesToSchemaInfo(byte[] bytes) {
+            CompletableFuture<SchemaRegistryFormat.SchemaInfo> future;
+            try {
+                future = completedFuture(SchemaRegistryFormat.SchemaInfo.parseFrom(bytes));
+            } catch (InvalidProtocolBufferException e) {
+                future = new CompletableFuture<>();
+                future.completeExceptionally(e);
+            }
+            return future;
+        }
+    }
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
new file mode 100644
index 000000000..4d0d8af18
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.common.schema.SchemaVersion;
+
+public interface SchemaStorage {
+
+    CompletableFuture<SchemaVersion> put(String key, byte[] value);
+
+    CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
+
+    CompletableFuture<SchemaVersion> delete(String key);
+
+    SchemaVersion versionFromBytes(byte[] version);
+
+    void close() throws Exception;
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorageFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorageFactory.java
new file mode 100644
index 000000000..c4cff34b3
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorageFactory.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import javax.validation.constraints.NotNull;
+import org.apache.pulsar.broker.PulsarService;
+
+public interface SchemaStorageFactory {
+    @NotNull
+    SchemaStorage create(PulsarService pulsar) throws Exception;
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java
new file mode 100644
index 000000000..f28a70797
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import com.google.common.base.MoreObjects;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.pulsar.common.schema.SchemaVersion;
+
+public class StoredSchema {
+    public final byte[] data;
+    public final SchemaVersion version;
+    public final Map<String, String> metadata;
+
+    public StoredSchema(byte[] data, SchemaVersion version, Map<String, String> metadata) {
+        this.data = data;
+        this.version = version;
+        this.metadata = metadata;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        StoredSchema that = (StoredSchema) o;
+        return Arrays.equals(data, that.data) &&
+            Objects.equals(version, that.version) &&
+            Objects.equals(metadata, that.metadata);
+    }
+
+    @Override
+    public int hashCode() {
+
+        int result = Objects.hash(version, metadata);
+        result = 31 * result + Arrays.hashCode(data);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("data", data)
+            .add("version", version)
+            .add("metadata", metadata)
+            .toString();
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java
new file mode 100644
index 000000000..39227313c
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java
@@ -0,0 +1,1373 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// Generated by the protocol buffer compiler.  DO NOT EDIT!
+// source: src/main/proto/SchemaRegistryFormat.proto
+
+package org.apache.pulsar.broker.service.schema.proto;
+
+public final class SchemaRegistryFormat {
+  private SchemaRegistryFormat() {}
+  public static void registerAllExtensions(
+      com.google.protobuf.ExtensionRegistryLite registry) {
+  }
+  public interface SchemaInfoOrBuilder
+      extends com.google.protobuf.MessageLiteOrBuilder {
+    
+    // required string schema_id = 1;
+    boolean hasSchemaId();
+    String getSchemaId();
+    
+    // required string user = 2;
+    boolean hasUser();
+    String getUser();
+    
+    // required .pulsar.schema.SchemaInfo.SchemaType type = 3;
+    boolean hasType();
+    SchemaRegistryFormat.SchemaInfo.SchemaType getType();
+    
+    // required bytes schema = 4;
+    boolean hasSchema();
+    com.google.protobuf.ByteString getSchema();
+    
+    // required int64 timestamp = 5;
+    boolean hasTimestamp();
+    long getTimestamp();
+    
+    // required bool deleted = 6;
+    boolean hasDeleted();
+    boolean getDeleted();
+    
+    // repeated .pulsar.schema.SchemaInfo.KeyValuePair props = 7;
+    java.util.List<SchemaRegistryFormat.SchemaInfo.KeyValuePair>
+        getPropsList();
+    SchemaRegistryFormat.SchemaInfo.KeyValuePair getProps(int index);
+    int getPropsCount();
+  }
+  public static final class SchemaInfo extends
+      com.google.protobuf.GeneratedMessageLite
+      implements SchemaInfoOrBuilder {
+    // Use SchemaInfo.newBuilder() to construct.
+    private SchemaInfo(Builder builder) {
+      super(builder);
+    }
+    private SchemaInfo(boolean noInit) {}
+    
+    private static final SchemaInfo defaultInstance;
+    public static SchemaInfo getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public SchemaInfo getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    public enum SchemaType
+        implements com.google.protobuf.Internal.EnumLite {
+      NONE(0, 1),
+      THRIFT(1, 2),
+      AVRO(2, 3),
+      JSON(3, 4),
+      PROTO(4, 5),
+      ;
+      
+      public static final int NONE_VALUE = 1;
+      public static final int THRIFT_VALUE = 2;
+      public static final int AVRO_VALUE = 3;
+      public static final int JSON_VALUE = 4;
+      public static final int PROTO_VALUE = 5;
+      
+      
+      public final int getNumber() { return value; }
+      
+      public static SchemaType valueOf(int value) {
+        switch (value) {
+          case 1: return NONE;
+          case 2: return THRIFT;
+          case 3: return AVRO;
+          case 4: return JSON;
+          case 5: return PROTO;
+          default: return null;
+        }
+      }
+      
+      public static com.google.protobuf.Internal.EnumLiteMap<SchemaType>
+          internalGetValueMap() {
+        return internalValueMap;
+      }
+      private static com.google.protobuf.Internal.EnumLiteMap<SchemaType>
+          internalValueMap =
+            new com.google.protobuf.Internal.EnumLiteMap<SchemaType>() {
+              public SchemaType findValueByNumber(int number) {
+                return SchemaType.valueOf(number);
+              }
+            };
+      
+      private final int value;
+      
+      private SchemaType(int index, int value) {
+        this.value = value;
+      }
+      
+      // @@protoc_insertion_point(enum_scope:pulsar.schema.SchemaInfo.SchemaType)
+    }
+    
+    public interface KeyValuePairOrBuilder
+        extends com.google.protobuf.MessageLiteOrBuilder {
+      
+      // required string key = 1;
+      boolean hasKey();
+      String getKey();
+      
+      // required string value = 2;
+      boolean hasValue();
+      String getValue();
+    }
+    public static final class KeyValuePair extends
+        com.google.protobuf.GeneratedMessageLite
+        implements KeyValuePairOrBuilder {
+      // Use KeyValuePair.newBuilder() to construct.
+      private KeyValuePair(Builder builder) {
+        super(builder);
+      }
+      private KeyValuePair(boolean noInit) {}
+      
+      private static final KeyValuePair defaultInstance;
+      public static KeyValuePair getDefaultInstance() {
+        return defaultInstance;
+      }
+      
+      public KeyValuePair getDefaultInstanceForType() {
+        return defaultInstance;
+      }
+      
+      private int bitField0_;
+      // required string key = 1;
+      public static final int KEY_FIELD_NUMBER = 1;
+      private java.lang.Object key_;
+      public boolean hasKey() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public String getKey() {
+        java.lang.Object ref = key_;
+        if (ref instanceof String) {
+          return (String) ref;
+        } else {
+          com.google.protobuf.ByteString bs = 
+              (com.google.protobuf.ByteString) ref;
+          String s = bs.toStringUtf8();
+          if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+            key_ = s;
+          }
+          return s;
+        }
+      }
+      private com.google.protobuf.ByteString getKeyBytes() {
+        java.lang.Object ref = key_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+          key_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      
+      // required string value = 2;
+      public static final int VALUE_FIELD_NUMBER = 2;
+      private java.lang.Object value_;
+      public boolean hasValue() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public String getValue() {
+        java.lang.Object ref = value_;
+        if (ref instanceof String) {
+          return (String) ref;
+        } else {
+          com.google.protobuf.ByteString bs = 
+              (com.google.protobuf.ByteString) ref;
+          String s = bs.toStringUtf8();
+          if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+            value_ = s;
+          }
+          return s;
+        }
+      }
+      private com.google.protobuf.ByteString getValueBytes() {
+        java.lang.Object ref = value_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+          value_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      
+      private void initFields() {
+        key_ = "";
+        value_ = "";
+      }
+      private byte memoizedIsInitialized = -1;
+      public final boolean isInitialized() {
+        byte isInitialized = memoizedIsInitialized;
+        if (isInitialized != -1) return isInitialized == 1;
+        
+        if (!hasKey()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+        if (!hasValue()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+        memoizedIsInitialized = 1;
+        return true;
+      }
+      
+      public void writeTo(com.google.protobuf.CodedOutputStream output)
+                          throws java.io.IOException {
+        getSerializedSize();
+        if (((bitField0_ & 0x00000001) == 0x00000001)) {
+          output.writeBytes(1, getKeyBytes());
+        }
+        if (((bitField0_ & 0x00000002) == 0x00000002)) {
+          output.writeBytes(2, getValueBytes());
+        }
+      }
+      
+      private int memoizedSerializedSize = -1;
+      public int getSerializedSize() {
+        int size = memoizedSerializedSize;
+        if (size != -1) return size;
+      
+        size = 0;
+        if (((bitField0_ & 0x00000001) == 0x00000001)) {
+          size += com.google.protobuf.CodedOutputStream
+            .computeBytesSize(1, getKeyBytes());
+        }
+        if (((bitField0_ & 0x00000002) == 0x00000002)) {
+          size += com.google.protobuf.CodedOutputStream
+            .computeBytesSize(2, getValueBytes());
+        }
+        memoizedSerializedSize = size;
+        return size;
+      }
+      
+      private static final long serialVersionUID = 0L;
+      @java.lang.Override
+      protected java.lang.Object writeReplace()
+          throws java.io.ObjectStreamException {
+        return super.writeReplace();
+      }
+      
+      public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseFrom(
+          com.google.protobuf.ByteString data)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return newBuilder().mergeFrom(data).buildParsed();
+      }
+      public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseFrom(
+          com.google.protobuf.ByteString data,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return newBuilder().mergeFrom(data, extensionRegistry)
+                 .buildParsed();
+      }
+      public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseFrom(byte[] data)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return newBuilder().mergeFrom(data).buildParsed();
+      }
+      public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseFrom(
+          byte[] data,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return newBuilder().mergeFrom(data, extensionRegistry)
+                 .buildParsed();
+      }
+      public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseFrom(java.io.InputStream input)
+          throws java.io.IOException {
+        return newBuilder().mergeFrom(input).buildParsed();
+      }
+      public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseFrom(
+          java.io.InputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        return newBuilder().mergeFrom(input, extensionRegistry)
+                 .buildParsed();
+      }
+      public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseDelimitedFrom(java.io.InputStream input)
+          throws java.io.IOException {
+        Builder builder = newBuilder();
+        if (builder.mergeDelimitedFrom(input)) {
+          return builder.buildParsed();
+        } else {
+          return null;
+        }
+      }
+      public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseDelimitedFrom(
+          java.io.InputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        Builder builder = newBuilder();
+        if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+          return builder.buildParsed();
+        } else {
+          return null;
+        }
+      }
+      public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseFrom(
+          com.google.protobuf.CodedInputStream input)
+          throws java.io.IOException {
+        return newBuilder().mergeFrom(input).buildParsed();
+      }
+      public static SchemaRegistryFormat.SchemaInfo.KeyValuePair parseFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        return newBuilder().mergeFrom(input, extensionRegistry)
+                 .buildParsed();
+      }
+      
+      public static Builder newBuilder() { return Builder.create(); }
+      public Builder newBuilderForType() { return newBuilder(); }
+      public static Builder newBuilder(SchemaRegistryFormat.SchemaInfo.KeyValuePair prototype) {
+        return newBuilder().mergeFrom(prototype);
+      }
+      public Builder toBuilder() { return newBuilder(this); }
+      
+      public static final class Builder extends
+          com.google.protobuf.GeneratedMessageLite.Builder<
+            SchemaRegistryFormat.SchemaInfo.KeyValuePair, Builder>
+          implements SchemaRegistryFormat.SchemaInfo.KeyValuePairOrBuilder {
+        // Construct using org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat.SchemaInfo.KeyValuePair.newBuilder()
+        private Builder() {
+          maybeForceBuilderInitialization();
+        }
+        
+        private void maybeForceBuilderInitialization() {
+        }
+        private static Builder create() {
+          return new Builder();
+        }
+        
+        public Builder clear() {
+          super.clear();
+          key_ = "";
+          bitField0_ = (bitField0_ & ~0x00000001);
+          value_ = "";
+          bitField0_ = (bitField0_ & ~0x00000002);
+          return this;
+        }
+        
+        public Builder clone() {
+          return create().mergeFrom(buildPartial());
+        }
+        
+        public SchemaRegistryFormat.SchemaInfo.KeyValuePair getDefaultInstanceForType() {
+          return SchemaRegistryFormat.SchemaInfo.KeyValuePair.getDefaultInstance();
+        }
+        
+        public SchemaRegistryFormat.SchemaInfo.KeyValuePair build() {
+          SchemaRegistryFormat.SchemaInfo.KeyValuePair result = buildPartial();
+          if (!result.isInitialized()) {
+            throw newUninitializedMessageException(result);
+          }
+          return result;
+        }
+        
+        private SchemaRegistryFormat.SchemaInfo.KeyValuePair buildParsed()
+            throws com.google.protobuf.InvalidProtocolBufferException {
+          SchemaRegistryFormat.SchemaInfo.KeyValuePair result = buildPartial();
+          if (!result.isInitialized()) {
+            throw newUninitializedMessageException(
+              result).asInvalidProtocolBufferException();
+          }
+          return result;
+        }
+        
+        public SchemaRegistryFormat.SchemaInfo.KeyValuePair buildPartial() {
+          SchemaRegistryFormat.SchemaInfo.KeyValuePair result = new SchemaRegistryFormat.SchemaInfo.KeyValuePair(this);
+          int from_bitField0_ = bitField0_;
+          int to_bitField0_ = 0;
+          if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+            to_bitField0_ |= 0x00000001;
+          }
+          result.key_ = key_;
+          if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+            to_bitField0_ |= 0x00000002;
+          }
+          result.value_ = value_;
+          result.bitField0_ = to_bitField0_;
+          return result;
+        }
+        
+        public Builder mergeFrom(SchemaRegistryFormat.SchemaInfo.KeyValuePair other) {
+          if (other == SchemaRegistryFormat.SchemaInfo.KeyValuePair.getDefaultInstance()) return this;
+          if (other.hasKey()) {
+            setKey(other.getKey());
+          }
+          if (other.hasValue()) {
+            setValue(other.getValue());
+          }
+          return this;
+        }
+        
+        public final boolean isInitialized() {
+          if (!hasKey()) {
+            
+            return false;
+          }
+          if (!hasValue()) {
+            
+            return false;
+          }
+          return true;
+        }
+        
+        public Builder mergeFrom(
+            com.google.protobuf.CodedInputStream input,
+            com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+            throws java.io.IOException {
+          while (true) {
+            int tag = input.readTag();
+            switch (tag) {
+              case 0:
+                
+                return this;
+              default: {
+                if (!parseUnknownField(input, extensionRegistry, tag)) {
+                  
+                  return this;
+                }
+                break;
+              }
+              case 10: {
+                bitField0_ |= 0x00000001;
+                key_ = input.readBytes();
+                break;
+              }
+              case 18: {
+                bitField0_ |= 0x00000002;
+                value_ = input.readBytes();
+                break;
+              }
+            }
+          }
+        }
+        
+        private int bitField0_;
+        
+        // required string key = 1;
+        private java.lang.Object key_ = "";
+        public boolean hasKey() {
+          return ((bitField0_ & 0x00000001) == 0x00000001);
+        }
+        public String getKey() {
+          java.lang.Object ref = key_;
+          if (!(ref instanceof String)) {
+            String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+            key_ = s;
+            return s;
+          } else {
+            return (String) ref;
+          }
+        }
+        public Builder setKey(String value) {
+          if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+          key_ = value;
+          
+          return this;
+        }
+        public Builder clearKey() {
+          bitField0_ = (bitField0_ & ~0x00000001);
+          key_ = getDefaultInstance().getKey();
+          
+          return this;
+        }
+        void setKey(com.google.protobuf.ByteString value) {
+          bitField0_ |= 0x00000001;
+          key_ = value;
+          
+        }
+        
+        // required string value = 2;
+        private java.lang.Object value_ = "";
+        public boolean hasValue() {
+          return ((bitField0_ & 0x00000002) == 0x00000002);
+        }
+        public String getValue() {
+          java.lang.Object ref = value_;
+          if (!(ref instanceof String)) {
+            String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+            value_ = s;
+            return s;
+          } else {
+            return (String) ref;
+          }
+        }
+        public Builder setValue(String value) {
+          if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+          value_ = value;
+          
+          return this;
+        }
+        public Builder clearValue() {
+          bitField0_ = (bitField0_ & ~0x00000002);
+          value_ = getDefaultInstance().getValue();
+          
+          return this;
+        }
+        void setValue(com.google.protobuf.ByteString value) {
+          bitField0_ |= 0x00000002;
+          value_ = value;
+          
+        }
+        
+        // @@protoc_insertion_point(builder_scope:pulsar.schema.SchemaInfo.KeyValuePair)
+      }
+      
+      static {
+        defaultInstance = new KeyValuePair(true);
+        defaultInstance.initFields();
+      }
+      
+      // @@protoc_insertion_point(class_scope:pulsar.schema.SchemaInfo.KeyValuePair)
+    }
+    
+    private int bitField0_;
+    // required string schema_id = 1;
+    public static final int SCHEMA_ID_FIELD_NUMBER = 1;
+    private java.lang.Object schemaId_;
+    public boolean hasSchemaId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public String getSchemaId() {
+      java.lang.Object ref = schemaId_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          schemaId_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getSchemaIdBytes() {
+      java.lang.Object ref = schemaId_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        schemaId_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
+    // required string user = 2;
+    public static final int USER_FIELD_NUMBER = 2;
+    private java.lang.Object user_;
+    public boolean hasUser() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public String getUser() {
+      java.lang.Object ref = user_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          user_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getUserBytes() {
+      java.lang.Object ref = user_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        user_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
+    // required .pulsar.schema.SchemaInfo.SchemaType type = 3;
+    public static final int TYPE_FIELD_NUMBER = 3;
+    private SchemaRegistryFormat.SchemaInfo.SchemaType type_;
+    public boolean hasType() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    public SchemaRegistryFormat.SchemaInfo.SchemaType getType() {
+      return type_;
+    }
+    
+    // required bytes schema = 4;
+    public static final int SCHEMA_FIELD_NUMBER = 4;
+    private com.google.protobuf.ByteString schema_;
+    public boolean hasSchema() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    public com.google.protobuf.ByteString getSchema() {
+      return schema_;
+    }
+    
+    // required int64 timestamp = 5;
+    public static final int TIMESTAMP_FIELD_NUMBER = 5;
+    private long timestamp_;
+    public boolean hasTimestamp() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    public long getTimestamp() {
+      return timestamp_;
+    }
+    
+    // required bool deleted = 6;
+    public static final int DELETED_FIELD_NUMBER = 6;
+    private boolean deleted_;
+    public boolean hasDeleted() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    public boolean getDeleted() {
+      return deleted_;
+    }
+    
+    // repeated .pulsar.schema.SchemaInfo.KeyValuePair props = 7;
+    public static final int PROPS_FIELD_NUMBER = 7;
+    private java.util.List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> props_;
+    public java.util.List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> getPropsList() {
+      return props_;
+    }
+    public java.util.List<? extends SchemaRegistryFormat.SchemaInfo.KeyValuePairOrBuilder>
+        getPropsOrBuilderList() {
+      return props_;
+    }
+    public int getPropsCount() {
+      return props_.size();
+    }
+    public SchemaRegistryFormat.SchemaInfo.KeyValuePair getProps(int index) {
+      return props_.get(index);
+    }
+    public SchemaRegistryFormat.SchemaInfo.KeyValuePairOrBuilder getPropsOrBuilder(
+        int index) {
+      return props_.get(index);
+    }
+    
+    private void initFields() {
+      schemaId_ = "";
+      user_ = "";
+      type_ = SchemaRegistryFormat.SchemaInfo.SchemaType.NONE;
+      schema_ = com.google.protobuf.ByteString.EMPTY;
+      timestamp_ = 0L;
+      deleted_ = false;
+      props_ = java.util.Collections.emptyList();
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasSchemaId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasUser()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasType()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasSchema()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasTimestamp()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasDeleted()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      for (int i = 0; i < getPropsCount(); i++) {
+        if (!getProps(i).isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeBytes(1, getSchemaIdBytes());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBytes(2, getUserBytes());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeEnum(3, type_.getNumber());
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeBytes(4, schema_);
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeInt64(5, timestamp_);
+      }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeBool(6, deleted_);
+      }
+      for (int i = 0; i < props_.size(); i++) {
+        output.writeMessage(7, props_.get(i));
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(1, getSchemaIdBytes());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(2, getUserBytes());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeEnumSize(3, type_.getNumber());
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(4, schema_);
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(5, timestamp_);
+      }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(6, deleted_);
+      }
+      for (int i = 0; i < props_.size(); i++) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(7, props_.get(i));
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static SchemaRegistryFormat.SchemaInfo parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static SchemaRegistryFormat.SchemaInfo parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static SchemaRegistryFormat.SchemaInfo parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static SchemaRegistryFormat.SchemaInfo parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static SchemaRegistryFormat.SchemaInfo parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static SchemaRegistryFormat.SchemaInfo parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static SchemaRegistryFormat.SchemaInfo parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static SchemaRegistryFormat.SchemaInfo parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static SchemaRegistryFormat.SchemaInfo parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static SchemaRegistryFormat.SchemaInfo parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(SchemaRegistryFormat.SchemaInfo prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessageLite.Builder<
+          SchemaRegistryFormat.SchemaInfo, Builder>
+        implements SchemaRegistryFormat.SchemaInfoOrBuilder {
+      // Construct using org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat.SchemaInfo.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        schemaId_ = "";
+        bitField0_ = (bitField0_ & ~0x00000001);
+        user_ = "";
+        bitField0_ = (bitField0_ & ~0x00000002);
+        type_ = SchemaRegistryFormat.SchemaInfo.SchemaType.NONE;
+        bitField0_ = (bitField0_ & ~0x00000004);
+        schema_ = com.google.protobuf.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000008);
+        timestamp_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000010);
+        deleted_ = false;
+        bitField0_ = (bitField0_ & ~0x00000020);
+        props_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000040);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public SchemaRegistryFormat.SchemaInfo getDefaultInstanceForType() {
+        return SchemaRegistryFormat.SchemaInfo.getDefaultInstance();
+      }
+      
+      public SchemaRegistryFormat.SchemaInfo build() {
+        SchemaRegistryFormat.SchemaInfo result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private SchemaRegistryFormat.SchemaInfo buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        SchemaRegistryFormat.SchemaInfo result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public SchemaRegistryFormat.SchemaInfo buildPartial() {
+        SchemaRegistryFormat.SchemaInfo result = new SchemaRegistryFormat.SchemaInfo(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.schemaId_ = schemaId_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.user_ = user_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.type_ = type_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.schema_ = schema_;
+        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        result.timestamp_ = timestamp_;
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.deleted_ = deleted_;
+        if (((bitField0_ & 0x00000040) == 0x00000040)) {
+          props_ = java.util.Collections.unmodifiableList(props_);
+          bitField0_ = (bitField0_ & ~0x00000040);
+        }
+        result.props_ = props_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder mergeFrom(SchemaRegistryFormat.SchemaInfo other) {
+        if (other == SchemaRegistryFormat.SchemaInfo.getDefaultInstance()) return this;
+        if (other.hasSchemaId()) {
+          setSchemaId(other.getSchemaId());
+        }
+        if (other.hasUser()) {
+          setUser(other.getUser());
+        }
+        if (other.hasType()) {
+          setType(other.getType());
+        }
+        if (other.hasSchema()) {
+          setSchema(other.getSchema());
+        }
+        if (other.hasTimestamp()) {
+          setTimestamp(other.getTimestamp());
+        }
+        if (other.hasDeleted()) {
+          setDeleted(other.getDeleted());
+        }
+        if (!other.props_.isEmpty()) {
+          if (props_.isEmpty()) {
+            props_ = other.props_;
+            bitField0_ = (bitField0_ & ~0x00000040);
+          } else {
+            ensurePropsIsMutable();
+            props_.addAll(other.props_);
+          }
+          
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasSchemaId()) {
+          
+          return false;
+        }
+        if (!hasUser()) {
+          
+          return false;
+        }
+        if (!hasType()) {
+          
+          return false;
+        }
+        if (!hasSchema()) {
+          
+          return false;
+        }
+        if (!hasTimestamp()) {
+          
+          return false;
+        }
+        if (!hasDeleted()) {
+          
+          return false;
+        }
+        for (int i = 0; i < getPropsCount(); i++) {
+          if (!getProps(i).isInitialized()) {
+            
+            return false;
+          }
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!parseUnknownField(input, extensionRegistry, tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 10: {
+              bitField0_ |= 0x00000001;
+              schemaId_ = input.readBytes();
+              break;
+            }
+            case 18: {
+              bitField0_ |= 0x00000002;
+              user_ = input.readBytes();
+              break;
+            }
+            case 24: {
+              int rawValue = input.readEnum();
+              SchemaRegistryFormat.SchemaInfo.SchemaType value = SchemaRegistryFormat.SchemaInfo.SchemaType.valueOf(rawValue);
+              if (value != null) {
+                bitField0_ |= 0x00000004;
+                type_ = value;
+              }
+              break;
+            }
+            case 34: {
+              bitField0_ |= 0x00000008;
+              schema_ = input.readBytes();
+              break;
+            }
+            case 40: {
+              bitField0_ |= 0x00000010;
+              timestamp_ = input.readInt64();
+              break;
+            }
+            case 48: {
+              bitField0_ |= 0x00000020;
+              deleted_ = input.readBool();
+              break;
+            }
+            case 58: {
+              SchemaRegistryFormat.SchemaInfo.KeyValuePair.Builder subBuilder = SchemaRegistryFormat.SchemaInfo.KeyValuePair.newBuilder();
+              input.readMessage(subBuilder, extensionRegistry);
+              addProps(subBuilder.buildPartial());
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required string schema_id = 1;
+      private java.lang.Object schemaId_ = "";
+      public boolean hasSchemaId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public String getSchemaId() {
+        java.lang.Object ref = schemaId_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          schemaId_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setSchemaId(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        schemaId_ = value;
+        
+        return this;
+      }
+      public Builder clearSchemaId() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        schemaId_ = getDefaultInstance().getSchemaId();
+        
+        return this;
+      }
+      void setSchemaId(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000001;
+        schemaId_ = value;
+        
+      }
+      
+      // required string user = 2;
+      private java.lang.Object user_ = "";
+      public boolean hasUser() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public String getUser() {
+        java.lang.Object ref = user_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          user_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setUser(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        user_ = value;
+        
+        return this;
+      }
+      public Builder clearUser() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        user_ = getDefaultInstance().getUser();
+        
+        return this;
+      }
+      void setUser(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000002;
+        user_ = value;
+        
+      }
+      
+      // required .pulsar.schema.SchemaInfo.SchemaType type = 3;
+      private SchemaRegistryFormat.SchemaInfo.SchemaType type_ = SchemaRegistryFormat.SchemaInfo.SchemaType.NONE;
+      public boolean hasType() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      public SchemaRegistryFormat.SchemaInfo.SchemaType getType() {
+        return type_;
+      }
+      public Builder setType(SchemaRegistryFormat.SchemaInfo.SchemaType value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        bitField0_ |= 0x00000004;
+        type_ = value;
+        
+        return this;
+      }
+      public Builder clearType() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        type_ = SchemaRegistryFormat.SchemaInfo.SchemaType.NONE;
+        
+        return this;
+      }
+      
+      // required bytes schema = 4;
+      private com.google.protobuf.ByteString schema_ = com.google.protobuf.ByteString.EMPTY;
+      public boolean hasSchema() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      public com.google.protobuf.ByteString getSchema() {
+        return schema_;
+      }
+      public Builder setSchema(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000008;
+        schema_ = value;
+        
+        return this;
+      }
+      public Builder clearSchema() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        schema_ = getDefaultInstance().getSchema();
+        
+        return this;
+      }
+      
+      // required int64 timestamp = 5;
+      private long timestamp_ ;
+      public boolean hasTimestamp() {
+        return ((bitField0_ & 0x00000010) == 0x00000010);
+      }
+      public long getTimestamp() {
+        return timestamp_;
+      }
+      public Builder setTimestamp(long value) {
+        bitField0_ |= 0x00000010;
+        timestamp_ = value;
+        
+        return this;
+      }
+      public Builder clearTimestamp() {
+        bitField0_ = (bitField0_ & ~0x00000010);
+        timestamp_ = 0L;
+        
+        return this;
+      }
+      
+      // required bool deleted = 6;
+      private boolean deleted_ ;
+      public boolean hasDeleted() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      public boolean getDeleted() {
+        return deleted_;
+      }
+      public Builder setDeleted(boolean value) {
+        bitField0_ |= 0x00000020;
+        deleted_ = value;
+        
+        return this;
+      }
+      public Builder clearDeleted() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        deleted_ = false;
+        
+        return this;
+      }
+      
+      // repeated .pulsar.schema.SchemaInfo.KeyValuePair props = 7;
+      private java.util.List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> props_ =
+        java.util.Collections.emptyList();
+      private void ensurePropsIsMutable() {
+        if (!((bitField0_ & 0x00000040) == 0x00000040)) {
+          props_ = new java.util.ArrayList<SchemaRegistryFormat.SchemaInfo.KeyValuePair>(props_);
+          bitField0_ |= 0x00000040;
+         }
+      }
+      
+      public java.util.List<SchemaRegistryFormat.SchemaInfo.KeyValuePair> getPropsList() {
+        return java.util.Collections.unmodifiableList(props_);
+      }
+      public int getPropsCount() {
+        return props_.size();
+      }
+      public SchemaRegistryFormat.SchemaInfo.KeyValuePair getProps(int index) {
+        return props_.get(index);
+      }
+      public Builder setProps(
+          int index, SchemaRegistryFormat.SchemaInfo.KeyValuePair value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensurePropsIsMutable();
+        props_.set(index, value);
+        
+        return this;
+      }
+      public Builder setProps(
+          int index, SchemaRegistryFormat.SchemaInfo.KeyValuePair.Builder builderForValue) {
+        ensurePropsIsMutable();
+        props_.set(index, builderForValue.build());
+        
+        return this;
+      }
+      public Builder addProps(SchemaRegistryFormat.SchemaInfo.KeyValuePair value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensurePropsIsMutable();
+        props_.add(value);
+        
+        return this;
+      }
+      public Builder addProps(
+          int index, SchemaRegistryFormat.SchemaInfo.KeyValuePair value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensurePropsIsMutable();
+        props_.add(index, value);
+        
+        return this;
+      }
+      public Builder addProps(
+          SchemaRegistryFormat.SchemaInfo.KeyValuePair.Builder builderForValue) {
+        ensurePropsIsMutable();
+        props_.add(builderForValue.build());
+        
+        return this;
+      }
+      public Builder addProps(
+          int index, SchemaRegistryFormat.SchemaInfo.KeyValuePair.Builder builderForValue) {
+        ensurePropsIsMutable();
+        props_.add(index, builderForValue.build());
+        
+        return this;
+      }
+      public Builder addAllProps(
+          java.lang.Iterable<? extends SchemaRegistryFormat.SchemaInfo.KeyValuePair> values) {
+        ensurePropsIsMutable();
+        super.addAll(values, props_);
+        
+        return this;
+      }
+      public Builder clearProps() {
+        props_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000040);
+        
+        return this;
+      }
+      public Builder removeProps(int index) {
+        ensurePropsIsMutable();
+        props_.remove(index);
+        
+        return this;
+      }
+      
+      // @@protoc_insertion_point(builder_scope:pulsar.schema.SchemaInfo)
+    }
+    
+    static {
+      defaultInstance = new SchemaInfo(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:pulsar.schema.SchemaInfo)
+  }
+  
+  
+  static {
+  }
+  
+  // @@protoc_insertion_point(outer_class_scope)
+}
diff --git a/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto b/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto
new file mode 100644
index 000000000..e497eaffc
--- /dev/null
+++ b/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+syntax = "proto2";
+
+package pulsar.schema;
+option java_package = "org.apache.pulsar.broker.service.schema.proto";
+option optimize_for = LITE_RUNTIME;
+
+message SchemaInfo {
+    enum SchemaType {
+        NONE = 1;
+        THRIFT = 2;
+        AVRO = 3;
+        JSON = 4;
+        PROTO = 5;
+    }
+    message KeyValuePair {
+        required string key = 1;
+        required string value = 2;
+    }
+    required string schema_id = 1;
+    required string user = 2;
+    required SchemaType type = 3;
+    required bytes schema = 4;
+    required int64 timestamp = 5;
+    required bool deleted = 6;
+
+    repeated KeyValuePair props = 7;
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 2f0f3506e..0e9fa6099 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -96,6 +96,7 @@
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.schema.SchemaVersion;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.compaction.CompactedTopic;
 import org.apache.pulsar.compaction.Compactor;
@@ -335,7 +336,7 @@ public void testAddRemoveProducer() throws Exception {
         String role = "appid1";
         // 1. simple add producer
         Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
-                role, false, null);
+                role, false, null, SchemaVersion.Latest);
         topic.addProducer(producer);
         assertEquals(topic.getProducers().size(), 1);
 
@@ -351,7 +352,7 @@ public void testAddRemoveProducer() throws Exception {
         // 3. add producer for a different topic
         PersistentTopic failTopic = new PersistentTopic(failTopicName, ledgerMock, brokerService);
         Producer failProducer = new Producer(failTopic, serverCnx, 2 /* producer id */, "prod-name",
-                role, false, null);
+                role, false, null, SchemaVersion.Latest);
         try {
             topic.addProducer(failProducer);
             fail("should have failed");
@@ -371,18 +372,18 @@ public void testMaxProducers() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
         String role = "appid1";
         // 1. add producer1
-        Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name1", role, false, null);
+        Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name1", role, false, null, SchemaVersion.Latest);
         topic.addProducer(producer);
         assertEquals(topic.getProducers().size(), 1);
 
         // 2. add producer2
-        Producer producer2 = new Producer(topic, serverCnx, 2 /* producer id */, "prod-name2", role, false, null);
+        Producer producer2 = new Producer(topic, serverCnx, 2 /* producer id */, "prod-name2", role, false, null, SchemaVersion.Latest);
         topic.addProducer(producer2);
         assertEquals(topic.getProducers().size(), 2);
 
         // 3. add producer3 but reached maxProducersPerTopic
         try {
-            Producer producer3 = new Producer(topic, serverCnx, 3 /* producer id */, "prod-name3", role, false, null);
+            Producer producer3 = new Producer(topic, serverCnx, 3 /* producer id */, "prod-name3", role, false, null, SchemaVersion.Latest);
             topic.addProducer(producer3);
             fail("should have failed");
         } catch (BrokerServiceException e) {
@@ -721,7 +722,7 @@ public void testDeleteTopic() throws Exception {
         // 2. delete topic with producer
         topic = (PersistentTopic) brokerService.getTopic(successTopicName).get();
         Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
-                role, false, null);
+                role, false, null, SchemaVersion.Latest);
         topic.addProducer(producer);
 
         assertTrue(topic.delete().isCompletedExceptionally());
@@ -877,7 +878,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
             String role = "appid1";
             Thread.sleep(10); /* delay to ensure that the delete gets executed first */
             Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
-                    role, false, null);
+                    role, false, null, SchemaVersion.Latest);
             topic.addProducer(producer);
             fail("Should have failed");
         } catch (BrokerServiceException e) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index ded2f008c..9fb02ea43 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -35,6 +35,12 @@
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 
+import com.google.common.collect.Maps;
+import com.google.protobuf.ByteString;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
@@ -43,9 +49,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-
 import javax.naming.AuthenticationException;
-
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
@@ -70,6 +74,7 @@
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.ServerCnx.State;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
 import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
 import org.apache.pulsar.common.api.ByteBufPair;
 import org.apache.pulsar.common.api.Commands;
@@ -103,14 +108,6 @@
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Maps;
-import com.google.protobuf.ByteString;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.embedded.EmbeddedChannel;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-
 /**
  */
 @Test
@@ -143,6 +140,8 @@
     public void setup() throws Exception {
         svcConfig = spy(new ServiceConfiguration());
         pulsar = spy(new PulsarService(svcConfig));
+        doReturn(new DefaultSchemaRegistryService()).when(pulsar).getSchemaRegistryService();
+
         svcConfig.setKeepAliveIntervalSeconds(inSec(1, TimeUnit.SECONDS));
         svcConfig.setBacklogQuotaCheckEnabled(false);
         doReturn(svcConfig).when(pulsar).getConfiguration();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
index 5d2b922c1..a0f007a83 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
@@ -23,12 +23,12 @@
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import io.netty.channel.ChannelHandlerContext;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.client.api.PulsarClientException.LookupException;
 import org.apache.pulsar.client.impl.ConsumerBase;
@@ -36,12 +36,11 @@
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
+import org.apache.pulsar.common.schema.SchemaVersion;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import io.netty.channel.ChannelHandlerContext;
-
 /**
  */
 public class ClientErrorsTest {
@@ -143,7 +142,7 @@ private void producerCreateSuccessAfterRetry(String topic) throws Exception {
 
         mockBrokerService.setHandleProducer((ctx, producer) -> {
             if (counter.incrementAndGet() == 2) {
-                ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer"));
+                ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty));
                 return;
             }
             ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.ServiceNotReady, "msg"));
@@ -217,7 +216,8 @@ private void producerFailDoesNotFailOtherProducer(String topic1, String topic2)
                 ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.AuthenticationError, "msg"));
                 return;
             }
-            ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer"));
+            ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty));
+
         });
 
         ProducerBase<byte[]> producer1 = (ProducerBase<byte[]>) client.newProducer().topic(topic1).create();
@@ -255,7 +255,7 @@ private void producerContinuousRetryAfterSendFail(String topic) throws Exception
             int i = counter.incrementAndGet();
             if (i == 1 || i == 5) {
                 // succeed on 1st and 5th attempts
-                ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer"));
+                ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty));
                 return;
             }
             ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.PersistenceError, "msg"));
@@ -479,7 +479,7 @@ public void testOneProducerFailShouldCloseAllProducersInPartitionedProducer() th
                 ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.AuthorizationError, "msg"));
                 return;
             }
-            ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer"));
+            ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty));
         });
 
         mockBrokerService.setHandleCloseProducer((ctx, closeProducer) -> {
@@ -583,7 +583,7 @@ public void testProducerReconnect() throws Exception {
         });
 
         mockBrokerService.setHandleProducer((ctx, produce) -> {
-            ctx.writeAndFlush(Commands.newProducerSuccess(produce.getRequestId(), "default-producer"));
+            ctx.writeAndFlush(Commands.newProducerSuccess(produce.getRequestId(), "default-producer", SchemaVersion.Empty));
         });
 
         mockBrokerService.setHandleSend((ctx, sendCmd, headersAndPayload) -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
index bdc8e322e..91e5bbe6d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
@@ -47,6 +47,7 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSend;
 import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.schema.SchemaVersion;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.eclipse.jetty.server.Request;
 import org.eclipse.jetty.server.Server;
@@ -174,7 +175,7 @@ protected void handleProducer(PulsarApi.CommandProducer producer) {
                 return;
             }
             // default
-            ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer"));
+            ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty));
         }
 
         @Override
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index b51d29ea0..093f94441 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.common.api;
 
+import static com.google.protobuf.ByteString.copyFromUtf8;
 import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum;
 import static com.scurrilous.circe.checksum.Crc32cIntChecksum.resumeChecksum;
 
@@ -37,11 +38,11 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
-import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandError;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
@@ -72,6 +73,7 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
+import org.apache.pulsar.common.schema.SchemaVersion;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
 
@@ -115,7 +117,7 @@ public static ByteBuf newConnect(String authMethodName, String authData, int pro
         }
 
         if (authData != null) {
-            connectBuilder.setAuthData(ByteString.copyFromUtf8(authData));
+            connectBuilder.setAuthData(copyFromUtf8(authData));
         }
 
         if (originalPrincipal != null) {
@@ -165,15 +167,16 @@ public static ByteBuf newSuccess(long requestId) {
         return res;
     }
 
-    public static ByteBuf newProducerSuccess(long requestId, String producerName) {
-        return newProducerSuccess(requestId, producerName, -1);
+    public static ByteBuf newProducerSuccess(long requestId, String producerName, SchemaVersion schemaVersion) {
+        return newProducerSuccess(requestId, producerName, -1, schemaVersion);
     }
 
-    public static ByteBuf newProducerSuccess(long requestId, String producerName, long lastSequenceId) {
+    public static ByteBuf newProducerSuccess(long requestId, String producerName, long lastSequenceId, SchemaVersion schemaVersion) {
         CommandProducerSuccess.Builder producerSuccessBuilder = CommandProducerSuccess.newBuilder();
         producerSuccessBuilder.setRequestId(requestId);
         producerSuccessBuilder.setProducerName(producerName);
         producerSuccessBuilder.setLastSequenceId(lastSequenceId);
+        producerSuccessBuilder.setSchemaVersion(ByteString.copyFrom(schemaVersion.bytes()));
         CommandProducerSuccess producerSuccess = producerSuccessBuilder.build();
         ByteBuf res = serializeWithSize(
                 BaseCommand.newBuilder().setType(Type.PRODUCER_SUCCESS).setProducerSuccess(producerSuccess));
@@ -980,4 +983,5 @@ public static boolean peerSupportsGetLastMessageId(int peerVersion) {
     public static boolean peerSupportsActiveConsumerListener(int peerVersion) {
         return peerVersion >= ProtocolVersion.v12.getNumber();
     }
+
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index 61339e139..c34f46cd9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -288,6 +288,12 @@ public boolean isGlobal() {
         return cluster == null || Constants.GLOBAL_CLUSTER.equalsIgnoreCase(cluster);
     }
 
+    public String getSchemaName() {
+        return getProperty()
+            + "/" + getNamespacePortion()
+            + "/" + getLocalName();
+    }
+
     @Override
     public String toString() {
         return completeTopicName;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/EmptyVersion.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/EmptyVersion.java
new file mode 100644
index 000000000..0aaefb3ef
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/EmptyVersion.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+final public class EmptyVersion implements SchemaVersion {
+    private static final byte[] EMPTY = new byte[]{};
+
+    @Override
+    public byte[] bytes() {
+        return EMPTY;
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/LatestVersion.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/LatestVersion.java
new file mode 100644
index 000000000..b26231c1d
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/LatestVersion.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+final class LatestVersion implements SchemaVersion {
+    private static final byte[] EMPTY = new byte[]{};
+
+    @Override
+    public byte[] bytes() {
+        return EMPTY;
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaData.java
new file mode 100644
index 000000000..5a5012c9b
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaData.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+import java.util.Map;
+import lombok.Builder;
+import lombok.Data;
+
+@Builder
+@Data
+public class SchemaData {
+    private final SchemaType type;
+    private final boolean isDeleted;
+    private final long timestamp;
+    private final String user;
+    private final byte[] data;
+    private final Map<String, String> props;
+}
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
new file mode 100644
index 000000000..e9a01f0a1
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+public enum SchemaType {
+    AVRO, PROTOBUF, THRIFT, JSON, NONE
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaVersion.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaVersion.java
new file mode 100644
index 000000000..e31e45d05
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaVersion.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+public interface SchemaVersion {
+    SchemaVersion Latest = new LatestVersion();
+    SchemaVersion Empty = new EmptyVersion();
+
+    byte[] bytes();
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services