You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/05 12:18:23 UTC

[pulsar] branch branch-2.7 updated (13c4f9b -> 4fceca7)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 13c4f9b  Compression must be applied during deferred schema preparation and enableBatching is enabled (#9396)
     new 13fef42  Fix BookkeeperSchemaStorage NPE (#9264)
     new 1f17b1a  Fix the partition number not equals expected error (#9446)
     new 4fceca7  Do not use a static map of listeners in TopicPoliciesService (#9486)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/pulsar/broker/PulsarService.java    | 17 +++-----
 .../SystemTopicBasedTopicPoliciesService.java      | 15 +++++---
 .../broker/service/TopicPoliciesService.java       |  7 +---
 .../broker/service/schema/SchemaServiceTest.java   | 10 +++++
 .../pulsar/client/impl/TopicsConsumerImplTest.java | 45 ++++++++++++++++++++++
 .../client/impl/MultiTopicsConsumerImpl.java       |  8 +++-
 6 files changed, 78 insertions(+), 24 deletions(-)


[pulsar] 03/03: Do not use a static map of listeners in TopicPoliciesService (#9486)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4fceca75e7f133342ddd342e890492687ed12f3c
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Feb 5 02:55:39 2021 -0800

    Do not use a static map of listeners in TopicPoliciesService (#9486)
    
    Maybe CI jobs are failing with OOM in the brokers unit tests. The Surefire worker is configured with 4 processes, each with xmx of 1G.
    
    The problem was introduced in #7863 where a static map of listeners was added to an interface. That makes that map to contain all the `PulsarService` instances created during the tests execution and keeping references to everything else.
    
    The map should instead be scoped to the specific instance.
    
    (cherry picked from commit 31ee4541a16bd7194d50b576434eb7742ef87e99)
---
 .../service/SystemTopicBasedTopicPoliciesService.java     | 15 +++++++++------
 .../pulsar/broker/service/TopicPoliciesService.java       |  7 +------
 2 files changed, 10 insertions(+), 12 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 2627f4d..8b1ec1e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -21,6 +21,11 @@ package org.apache.pulsar.broker.service;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
@@ -40,11 +45,6 @@ import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
 /**
  * Cached topic policies service will cache the system topic reader and the topic policies
  *
@@ -63,6 +63,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
 
     private final Map<NamespaceName, Boolean> policyCacheInitMap = new ConcurrentHashMap<>();
 
+    private final Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = new ConcurrentHashMap<>();
+
     public SystemTopicBasedTopicPoliciesService(PulsarService pulsarService) {
         this.pulsarService = pulsarService;
     }
@@ -123,7 +125,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
             return;
         }
         TopicPoliciesEvent event = msg.getValue().getTopicPoliciesEvent();
-        TopicName topicName = TopicName.get(event.getDomain(), event.getTenant(), event.getNamespace(), event.getTopic());
+        TopicName topicName = TopicName.get(event.getDomain(), event.getTenant(),
+                event.getNamespace(), event.getTopic());
         if (listeners.get(topicName) != null) {
             TopicPolicies policies = event.getPolicies();
             for (TopicPolicyListener<TopicPolicies> listener : listeners.get(topicName)) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index a4da39f..82b0abf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -18,24 +18,19 @@
  */
 package org.apache.pulsar.broker.service;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.util.FutureUtil;
 
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
  * Topic policies service
  */
 public interface TopicPoliciesService {
 
     TopicPoliciesService DISABLED = new TopicPoliciesServiceDisabled();
-    Map<TopicName, List<TopicPolicyListener<TopicPolicies>>> listeners = new ConcurrentHashMap<>();
 
     /**
      * Update policies for a topic async


[pulsar] 02/03: Fix the partition number not equals expected error (#9446)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1f17b1a08482cda96e944a0331277c7429a276bd
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Feb 5 14:46:47 2021 +0800

    Fix the partition number not equals expected error (#9446)
    
    Fixes #8000
    
    ### Motivation
    
    Fix the partition number not equals expected error
    
    ### Verifying this change
    
    New tests added, without this fix, you can see errors like
    `topics consumer java.lang.IllegalStateException: allTopicPartitionsNumber 2 not equals expected: 5`
    
    (cherry picked from commit bbce00a2245cf05b829182a9a75a86d4e1139492)
---
 .../pulsar/client/impl/TopicsConsumerImplTest.java | 45 ++++++++++++++++++++++
 .../client/impl/MultiTopicsConsumerImpl.java       |  8 +++-
 2 files changed, 52 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 227e74d..17da94a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -1170,4 +1171,48 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
         }
     }
 
+    @Test(timeOut = testTimeout)
+    public void testPartitionsUpdatesForMultipleTopics() throws Exception {
+        final String topicName0 = "persistent://public/default/testPartitionsUpdatesForMultipleTopics-0";
+        final String subName = "my-sub";
+        admin.topics().createPartitionedTopic(topicName0, 2);
+        assertEquals(admin.topics().getPartitionedTopicMetadata(topicName0).partitions, 2);
+
+        PatternMultiTopicsConsumerImpl<String> consumer = (PatternMultiTopicsConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
+                .topicsPattern("persistent://public/default/test.*")
+                .subscriptionType(SubscriptionType.Failover)
+                .subscriptionName(subName)
+                .subscribe();
+
+        Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 2);
+        Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 2);
+
+        admin.topics().updatePartitionedTopic(topicName0, 5);
+        consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
+
+        Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
+            Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 5);
+            Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 5);
+        });
+
+        final String topicName1 = "persistent://public/default/testPartitionsUpdatesForMultipleTopics-1";
+        admin.topics().createPartitionedTopic(topicName1, 3);
+        assertEquals(admin.topics().getPartitionedTopicMetadata(topicName1).partitions, 3);
+
+        consumer.getRecheckPatternTimeout().task().run(consumer.getRecheckPatternTimeout());
+
+        Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
+            Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 8);
+            Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 8);
+        });
+
+        admin.topics().updatePartitionedTopic(topicName1, 5);
+        consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
+
+        Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
+            Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 10);
+            Assert.assertEquals(consumer.allTopicPartitionsNumber.intValue(), 10);
+        });
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 4149b39..55ee806 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1154,6 +1154,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         return consumers.values().stream().collect(Collectors.toList());
     }
 
+    // get all partitions that in the topics map
+    int getPartitionsOfTheTopicMap() {
+        return topics.values().stream().mapToInt(Integer::intValue).sum();
+    }
+
     @Override
     public void pause() {
         synchronized (pauseMutex) {
@@ -1228,7 +1233,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                 future.complete(null);
                 return future;
             } else if (oldPartitionNumber < currentPartitionNumber) {
-                allTopicPartitionsNumber.compareAndSet(oldPartitionNumber, currentPartitionNumber);
+                allTopicPartitionsNumber.addAndGet(currentPartitionNumber - oldPartitionNumber);
+                topics.put(topicName, currentPartitionNumber);
                 List<String> newPartitions = list.subList(oldPartitionNumber, currentPartitionNumber);
                 // subscribe new added partitions
                 List<CompletableFuture<Consumer<T>>> futureList = newPartitions


[pulsar] 01/03: Fix BookkeeperSchemaStorage NPE (#9264)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 13fef424b8d3a4f53b908bc286880158cb2a158d
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Feb 5 14:47:26 2021 +0800

    Fix BookkeeperSchemaStorage NPE (#9264)
    
    ### Motivation
    
    The NullPointerException is thrown when the zookeeper had an OOM issue. After we increase the zookeeper memory and restart the zookeeper cluster, the broker still kept throwing NullPointerException. The exception was fixed after rolling restart all brokers.
    
    ```
    07:54:13.142 [pulsar-io-25-1] INFO  org.apache.pulsar.broker.service.ServerCnx - [/10.0.168.5:42978] Subscribing on topic [topic] / [subscription]
    07:54:13.143 [Thread-241] WARN  org.apache.pulsar.broker.service.ServerCnx - [/10.0.168.5:42978][topic][subscription] Failed to create consumer: null
    java.util.concurrent.CompletionException: java.lang.NullPointerException
    	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_252]
    	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1005) ~[?:1.8.0_252]
    	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137) ~[?:1.8.0_252]
    	at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.lambda$getSchema$6(BookkeeperSchemaStorage.java:175) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
    	at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660) ~[?:1.8.0_252]
    	at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.getSchema(BookkeeperSchemaStorage.java:169) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
    	at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.get(BookkeeperSchemaStorage.java:126) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
    	at org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.getSchema(SchemaRegistryServiceImpl.java:95) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
    	at org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl.getSchema(SchemaRegistryServiceImpl.java:81) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
    	at org.apache.pulsar.broker.service.schema.validator.SchemaRegistryServiceWithSchemaDataValidator.getSchema(SchemaRegistryServiceWithSchemaDataValidator.java:52) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
    	at org.apache.pulsar.broker.service.AbstractTopic.hasSchema(AbstractTopic.java:244) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
    	at org.apache.pulsar.broker.service.persistent.PersistentTopic.addSchemaIfIdleOrCheckCompatible(PersistentTopic.java:2144) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
    	at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:920) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
    	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995) ~[?:1.8.0_252]
    	at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2137) ~[?:1.8.0_252]
    	at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$16(ServerCnx.java:902) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
    	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) ~[?:1.8.0_252]
    	at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628) ~[?:1.8.0_252]
    	at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) ~[?:1.8.0_252]
    	at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:852) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
    	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:239) ~[org.apache.pulsar-pulsar-common-2.6.2.jar:2.6.2]
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
    	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
    	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:191) ~[io.netty-netty-handler-4.1.48.Final.jar:4.1.48.Final]
    	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:153) ~[io.netty-netty-handler-4.1.48.Final.jar:4.1.48.Final]
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
    	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
    	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321) ~[io.netty-netty-codec-4.1.48.Final.jar:4.1.48.Final]
    	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295) ~[io.netty-netty-codec-4.1.48.Final.jar:4.1.48.Final]
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
    	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
    	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
    	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[io.netty-netty-transport-4.1.48.Final.jar:4.1.48.Final]
    	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) ~[io.netty-netty-transport-native-epoll-4.1.48.Final-linux-x86_64.jar:4.1.48.Final]
    	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) ~[io.netty-netty-transport-native-epoll-4.1.48.Final-linux-x86_64.jar:4.1.48.Final]
    	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[io.netty-netty-transport-native-epoll-4.1.48.Final-linux-x86_64.jar:4.1.48.Final]
    	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
    	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
    	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
    	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
    Caused by: java.lang.NullPointerException
    	at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.openLedger(BookkeeperSchemaStorage.java:565) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
    	at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.readSchemaEntry(BookkeeperSchemaStorage.java:470) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
    	at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.lambda$null$4(BookkeeperSchemaStorage.java:185) ~[org.apache.pulsar-pulsar-broker-2.6.2.jar:2.6.2]
    	at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:995) ~[?:1.8.0_252]
    	... 43 more
    ```
    
    The problem is the bookkeeper client used by the `BookkeeperSchemaStorage` does not create success due to the zookeeper issue. Currently, start the broker will create and start the SchemaStorage, but if the SchemaStorage start failed, the broker only prints a log `Unable to create schema registry storage`. after this moment, the broker will continue the start process, if the subsequent steps do not throw any exceptions, the broker will start successfully, however, the bookkeeper clien [...]
    
    ### Modifications
    
    Make sure the SchemaStorage start success when starting the broker, if SchemaStorage starts failed, the broker also should be start failed.
    
    (cherry picked from commit 3923643d36618c1c7f6b4d13219fbb8ddea31a73)
---
 .../java/org/apache/pulsar/broker/PulsarService.java    | 17 ++++++-----------
 .../pulsar/broker/service/schema/SchemaServiceTest.java | 10 ++++++++++
 2 files changed, 16 insertions(+), 11 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 785ff43..eb7f849 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -950,17 +950,12 @@ public class PulsarService implements AutoCloseable {
         }
     }
 
-    private SchemaStorage createAndStartSchemaStorage() {
-        SchemaStorage schemaStorage = null;
-        try {
-            final Class<?> storageClass = Class.forName(config.getSchemaRegistryStorageClassName());
-            Object factoryInstance = storageClass.newInstance();
-            Method createMethod = storageClass.getMethod("create", PulsarService.class);
-            schemaStorage = (SchemaStorage) createMethod.invoke(factoryInstance, this);
-            schemaStorage.start();
-        } catch (Exception e) {
-            LOG.warn("Unable to create schema registry storage");
-        }
+    private SchemaStorage createAndStartSchemaStorage() throws Exception {
+        final Class<?> storageClass = Class.forName(config.getSchemaRegistryStorageClassName());
+        Object factoryInstance = storageClass.newInstance();
+        Method createMethod = storageClass.getMethod("create", PulsarService.class);
+        SchemaStorage schemaStorage = (SchemaStorage) createMethod.invoke(factoryInstance, this);
+        schemaStorage.start();
         return schemaStorage;
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
index 2507f7d..6b2f192 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/SchemaServiceTest.java
@@ -37,10 +37,13 @@ import java.util.concurrent.ExecutionException;
 
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hashing;
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.protocol.schema.SchemaStorage;
+import org.apache.pulsar.common.protocol.schema.StoredSchema;
 import org.apache.pulsar.common.schema.LongSchemaVersion;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
@@ -87,6 +90,7 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
+        conf.setSchemaRegistryStorageClassName("org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory");
         super.internalSetup();
         BookkeeperSchemaStorage storage = new BookkeeperSchemaStorage(pulsar);
         storage.init();
@@ -319,6 +323,12 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
         putSchema(schemaId1, schemaData3, version(2), SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
     }
 
+    @Test(expectedExceptions = PulsarServerException.class)
+    public void testSchemaStorageFailed() throws Exception {
+        conf.setSchemaRegistryStorageClassName("Unknown class name");
+        restartBroker();
+    }
+
     private void putSchema(String schemaId, SchemaData schema, SchemaVersion expectedVersion) throws Exception {
         putSchema(schemaId, schema, expectedVersion, SchemaCompatibilityStrategy.FULL);
     }