You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/06/04 18:08:07 UTC

[pulsar] branch master updated: Improve broker unit test CI (#7173)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new df325c2  Improve broker unit test CI (#7173)
df325c2 is described below

commit df325c291e1051aaa43c1f0be5bee8920a6bda2b
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Jun 5 02:07:51 2020 +0800

    Improve broker unit test CI (#7173)
    
    * CI Tests
    
    * CI Tests
    
    * CI Tests
    
    * CI Tests
    
    * CI Tests
    
    * CI Tests
    
    * CI Tests
    
    * CI Tests
    
    * CI Tests
    
    * CI Tests
    
    * CI Tests
    
    * Remove test from broker and client which is run in long time task.
    
    * Split transaction tests in broker module.
---
 ...i-unit-broker.yml => ci-unit-broker-broker.yml} | 13 ++--
 ...i-unit-proxy.yaml => ci-unit-broker-client.yml} | 22 ++++---
 ...ci-unit-proxy.yaml => ci-unit-broker-flaky.yml} | 22 ++++---
 ...nit-proxy.yaml => ci-unit-broker-long-time.yml} | 22 ++++---
 .github/workflows/ci-unit-broker-others.yml        | 54 +---------------
 ...xy.yaml => ci-unit-broker-publish-throttle.yml} | 22 ++++---
 .github/workflows/ci-unit-broker-sasl.yml          |  2 +-
 ...t-proxy.yaml => ci-unit-broker-transaction.yml} | 22 ++++---
 .github/workflows/ci-unit-broker.yml               |  8 +--
 .github/workflows/ci-unit-proxy.yaml               |  2 +-
 .../pulsar/broker/admin/impl/BrokersBase.java      |  4 +-
 .../pulsar/broker/service/BrokerService.java       |  4 +-
 .../broker/service/persistent/PersistentTopic.java |  6 --
 .../service/MessagePublishBufferThrottleTest.java  | 72 +++++++++-------------
 .../pulsar/broker/service/ServerCnxTest.java       |  8 ++-
 .../SystemTopicBasedTopicPoliciesServiceTest.java  |  6 ++
 16 files changed, 133 insertions(+), 156 deletions(-)

diff --git a/.github/workflows/ci-unit-broker.yml b/.github/workflows/ci-unit-broker-broker.yml
similarity index 74%
copy from .github/workflows/ci-unit-broker.yml
copy to .github/workflows/ci-unit-broker-broker.yml
index 45eebac..5b66995 100644
--- a/.github/workflows/ci-unit-broker.yml
+++ b/.github/workflows/ci-unit-broker-broker.yml
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-name: CI - Unit - Brokers
+name: CI - Unit - Brokers - Broker
 on:
   pull_request:
     branches:
@@ -66,14 +66,11 @@ jobs:
 
       - name: run unit tests install by skip tests
         if: steps.docs.outputs.changed_only == 'no'
-        run: mvn clean install -DskipTests
-        
-      - name: run unit test pulsar-broker
+        run: mvn clean install -DskipTests -pl pulsar-broker -am
+
+      - name: run unit tests pulsar broker tests
         if: steps.docs.outputs.changed_only == 'no'
-        run: |
-          df -h
-          free -h
-          mvn test -e '-Dtest=!PersistentTransactionBufferTest,!ServerCnxTest,!AdminApiOffloadTest,!AdminApiSchemaValidationEnforced,!V1_AdminApiTest2,!ProxyPublishConsumeTlsTest,!PulsarFunctionE2ETest,!MessageIdSerialization,!AdminApiTest2,!PulsarFunctionLocalRunTest,!PartitionedProducerConsumerTest,!KafkaProducerSimpleConsumerTest,!MessagePublishThrottlingTest,!ReaderTest,!RackAwareTest,!SimpleProducerConsumerTest,!V1_ProducerConsumerTest,!PersistentFailoverE2ETest,!BrokerClientIntegra [...]
+        run: mvn test -DfailIfNoTests=false '-Dtest=org/apache/pulsar/broker/**/*.java,!NamespaceOwnershipListenerTests,!AdminApiTest2,!MessagePublishBufferThrottleTest,!PrecisTopicPublishRateThrottleTest,!PublishRateLimiterTest,!MessagePublishThrottlingTest,!LoadBalancerTest,!org/apache/pulsar/broker/transaction/**/*.java,!AdminApiTest,!V1_AdminApiTest' -pl pulsar-broker
 
       - name: package surefire artifacts
         if: failure()
diff --git a/.github/workflows/ci-unit-proxy.yaml b/.github/workflows/ci-unit-broker-client.yml
similarity index 77%
copy from .github/workflows/ci-unit-proxy.yaml
copy to .github/workflows/ci-unit-broker-client.yml
index e7f26fe..74b6c65 100644
--- a/.github/workflows/ci-unit-proxy.yaml
+++ b/.github/workflows/ci-unit-broker-client.yml
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-name: CI - Unit - Proxy
+name: CI - Unit - Brokers - Client
 on:
   pull_request:
     branches:
@@ -55,26 +55,32 @@ jobs:
         with:
           maven-version: 3.6.1
 
-      - name: run unit tests install by skip tests
+      - name: clean disk
         if: steps.docs.outputs.changed_only == 'no'
-        run: mvn clean install -DskipTests
+        run: |
+          sudo swapoff /swapfile
+          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
+          sudo apt clean
+          docker rmi $(docker images -q) -f
+          df -h
 
-      - name: run unit tests pulsar proxy
+      - name: run unit tests install by skip tests
         if: steps.docs.outputs.changed_only == 'no'
-        run: mvn test -DfailIfNoTests=false '-Dtest=!ProxyTest,!ProxyLookupThrottlingTest' -pl pulsar-proxy
+        run: mvn clean install -DskipTests -pl pulsar-broker -am
 
-      - name: run proxy lookup throttling test
+      - name: run unit tests pulsar broker client tests
         if: steps.docs.outputs.changed_only == 'no'
-        run: mvn test -DfailIfNoTests=false '-Dtest=ProxyLookupThrottlingTest' -pl pulsar-proxy
+        run: mvn test -DfailIfNoTests=false '-Dtest=org/apache/pulsar/client/**/*.java,!UnAcknowledgedMessagesTimeoutTest,!SimpleSchemaTest,!ConsumerDedupPermitsUpdate,!ConsumerBatchReceiveTest,!KeySharedSubscriptionTest' -pl pulsar-broker
 
       - name: package surefire artifacts
         if: failure()
         run: |
+          df -h
+          free -h
           rm -rf artifacts
           mkdir artifacts
           find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \;
           zip -r artifacts.zip artifacts
-
       - uses: actions/upload-artifact@master
         name: upload surefire-artifacts
         if: failure()
diff --git a/.github/workflows/ci-unit-proxy.yaml b/.github/workflows/ci-unit-broker-flaky.yml
similarity index 77%
copy from .github/workflows/ci-unit-proxy.yaml
copy to .github/workflows/ci-unit-broker-flaky.yml
index e7f26fe..4eaf301 100644
--- a/.github/workflows/ci-unit-proxy.yaml
+++ b/.github/workflows/ci-unit-broker-flaky.yml
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-name: CI - Unit - Proxy
+name: CI - Unit - Brokers - Flaky
 on:
   pull_request:
     branches:
@@ -55,26 +55,32 @@ jobs:
         with:
           maven-version: 3.6.1
 
-      - name: run unit tests install by skip tests
+      - name: clean disk
         if: steps.docs.outputs.changed_only == 'no'
-        run: mvn clean install -DskipTests
+        run: |
+          sudo swapoff /swapfile
+          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
+          sudo apt clean
+          docker rmi $(docker images -q) -f
+          df -h
 
-      - name: run unit tests pulsar proxy
+      - name: run unit tests install by skip tests
         if: steps.docs.outputs.changed_only == 'no'
-        run: mvn test -DfailIfNoTests=false '-Dtest=!ProxyTest,!ProxyLookupThrottlingTest' -pl pulsar-proxy
+        run: mvn clean install -DskipTests -pl pulsar-broker -am
 
-      - name: run proxy lookup throttling test
+      - name: run unit tests pulsar broker flaky tests
         if: steps.docs.outputs.changed_only == 'no'
-        run: mvn test -DfailIfNoTests=false '-Dtest=ProxyLookupThrottlingTest' -pl pulsar-proxy
+        run: mvn test -DfailIfNoTests=false '-Dtest=MessageIdTest,UnAcknowledgedMessagesTimeoutTest,ConsumerPreciseDispatcherFlowControl,SimpleSchemaTest,ConsumerDedupPermitsUpdate,NamespaceOwnershipListenerTests' -pl pulsar-broker
 
       - name: package surefire artifacts
         if: failure()
         run: |
+          df -h
+          free -h
           rm -rf artifacts
           mkdir artifacts
           find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \;
           zip -r artifacts.zip artifacts
-
       - uses: actions/upload-artifact@master
         name: upload surefire-artifacts
         if: failure()
diff --git a/.github/workflows/ci-unit-proxy.yaml b/.github/workflows/ci-unit-broker-long-time.yml
similarity index 77%
copy from .github/workflows/ci-unit-proxy.yaml
copy to .github/workflows/ci-unit-broker-long-time.yml
index e7f26fe..647ee76 100644
--- a/.github/workflows/ci-unit-proxy.yaml
+++ b/.github/workflows/ci-unit-broker-long-time.yml
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-name: CI - Unit - Proxy
+name: CI - Unit - Brokers - Long - Time
 on:
   pull_request:
     branches:
@@ -55,26 +55,32 @@ jobs:
         with:
           maven-version: 3.6.1
 
-      - name: run unit tests install by skip tests
+      - name: clean disk
         if: steps.docs.outputs.changed_only == 'no'
-        run: mvn clean install -DskipTests
+        run: |
+          sudo swapoff /swapfile
+          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
+          sudo apt clean
+          docker rmi $(docker images -q) -f
+          df -h
 
-      - name: run unit tests pulsar proxy
+      - name: run unit tests install by skip tests
         if: steps.docs.outputs.changed_only == 'no'
-        run: mvn test -DfailIfNoTests=false '-Dtest=!ProxyTest,!ProxyLookupThrottlingTest' -pl pulsar-proxy
+        run: mvn clean install -DskipTests -pl pulsar-broker -am
 
-      - name: run proxy lookup throttling test
+      - name: run unit tests pulsar broker long time tests
         if: steps.docs.outputs.changed_only == 'no'
-        run: mvn test -DfailIfNoTests=false '-Dtest=ProxyLookupThrottlingTest' -pl pulsar-proxy
+        run: mvn test -DfailIfNoTests=false '-Dtest=PulsarFunctionE2ESecurityTest,SchemaCompatibilityCheckTest,CompactedTopicTest,LoadBalancerTest,ConsumerBatchReceiveTest,KeySharedSubscriptionTest' -pl pulsar-broker
 
       - name: package surefire artifacts
         if: failure()
         run: |
+          df -h
+          free -h
           rm -rf artifacts
           mkdir artifacts
           find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \;
           zip -r artifacts.zip artifacts
-
       - uses: actions/upload-artifact@master
         name: upload surefire-artifacts
         if: failure()
diff --git a/.github/workflows/ci-unit-broker-others.yml b/.github/workflows/ci-unit-broker-others.yml
index e1d3495..7f4d638 100644
--- a/.github/workflows/ci-unit-broker-others.yml
+++ b/.github/workflows/ci-unit-broker-others.yml
@@ -66,59 +66,11 @@ jobs:
 
       - name: run unit tests install by skip tests
         if: steps.docs.outputs.changed_only == 'no'
-        run: mvn clean install -DskipTests
+        run: mvn clean install -DskipTests -pl pulsar-broker -am
 
-      - name: run unit tests pulsar broker reader test
+      - name: run unit tests pulsar broker other tests
         if: steps.docs.outputs.changed_only == 'no'
-        run: mvn test -DfailIfNoTests=false '-Dtest=ReaderTest' -pl pulsar-broker
-
-      - name: run unit tests pulsar broker rack aware test
-        if: steps.docs.outputs.changed_only == 'no'
-        run: mvn test -DfailIfNoTests=false '-Dtest=RackAwareTest' -pl pulsar-broker
-
-      - name: run unit tests pulsar broker simple producer consumer test
-        if: steps.docs.outputs.changed_only == 'no'
-        run: mvn test -DfailIfNoTests=false '-Dtest=SimpleProducerConsumerTest' -pl pulsar-broker
-
-      - name: run unit tests pulsar broker V1 producer consumer test
-        if: steps.docs.outputs.changed_only == 'no'
-        run: mvn test -DfailIfNoTests=false '-Dtest=V1_ProducerConsumerTest' -pl pulsar-broker
-
-      - name: run unit tests pulsar broker persistent failover end to end test
-        if: steps.docs.outputs.changed_only == 'no'
-        run: mvn test -DfailIfNoTests=false '-Dtest=PersistentFailoverE2ETest' -pl pulsar-broker
-
-      - name: run unit tests pulsar broker client integration test
-        if: steps.docs.outputs.changed_only == 'no'
-        run: mvn test -DfailIfNoTests=false '-Dtest=BrokerClientIntegrationTest' -pl pulsar-broker
-
-      - name: run unit tests pulsar broker replicatior rate limiter test
-        if: steps.docs.outputs.changed_only == 'no'
-        run: mvn test -DfailIfNoTests=false '-Dtest=ReplicatorRateLimiterTest' -pl pulsar-broker
-
-      - name: run unit tests pulsar broker persistent dispatcher failover consumer test
-        if: steps.docs.outputs.changed_only == 'no'
-        run: mvn test -DfailIfNoTests=false '-Dtest=PersistentDispatcherFailoverConsumerTest' -pl pulsar-broker
-
-      - name: run unit tests pulsar broker admin api test
-        if: steps.docs.outputs.changed_only == 'no'
-        run: mvn test -DfailIfNoTests=false '-Dtest=AdminApiTest' -pl pulsar-broker
-
-      - name: run unit tests pulsar broker v1 admin api test
-        if: steps.docs.outputs.changed_only == 'no'
-        run: mvn test -DfailIfNoTests=false '-Dtest=V1_AdminApiTest' -pl pulsar-broker
-
-      - name: run unit tests pulsar broker compaction test
-        if: steps.docs.outputs.changed_only == 'no'
-        run: mvn test -DfailIfNoTests=false '-Dtest=CompactionTest' -pl pulsar-broker
-
-      - name: run unit tests pulsar broker batch message test
-        if: steps.docs.outputs.changed_only == 'no'
-        run: mvn test -DfailIfNoTests=false '-Dtest=BatchMessageTest' -pl pulsar-broker
-
-      - name: run unit tests pulsar broker partitioned topics schema test
-        if: steps.docs.outputs.changed_only == 'no'
-        run: mvn test -DfailIfNoTests=false '-Dtest=PartitionedTopicsSchemaTest' -pl pulsar-broker
+        run: mvn test -DfailIfNoTests=false '-Dtest=ReaderTest,RackAwareTest,SimpleProducerConsumerTest,V1_ProducerConsumerTest,PersistentFailoverE2ETest,BrokerClientIntegrationTest,ReplicatorRateLimiterTest,PersistentDispatcherFailoverConsumerTest,AdminApiTest,V1_AdminApiTest,CompactionTest,BatchMessageTest,PartitionedTopicsSchemaTest' -pl pulsar-broker
 
       - name: package surefire artifacts
         if: failure()
diff --git a/.github/workflows/ci-unit-proxy.yaml b/.github/workflows/ci-unit-broker-publish-throttle.yml
similarity index 77%
copy from .github/workflows/ci-unit-proxy.yaml
copy to .github/workflows/ci-unit-broker-publish-throttle.yml
index e7f26fe..3dc0d51 100644
--- a/.github/workflows/ci-unit-proxy.yaml
+++ b/.github/workflows/ci-unit-broker-publish-throttle.yml
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-name: CI - Unit - Proxy
+name: CI - Unit - Brokers - Publish - Throttle
 on:
   pull_request:
     branches:
@@ -55,26 +55,32 @@ jobs:
         with:
           maven-version: 3.6.1
 
-      - name: run unit tests install by skip tests
+      - name: clean disk
         if: steps.docs.outputs.changed_only == 'no'
-        run: mvn clean install -DskipTests
+        run: |
+          sudo swapoff /swapfile
+          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
+          sudo apt clean
+          docker rmi $(docker images -q) -f
+          df -h
 
-      - name: run unit tests pulsar proxy
+      - name: run unit tests install by skip tests
         if: steps.docs.outputs.changed_only == 'no'
-        run: mvn test -DfailIfNoTests=false '-Dtest=!ProxyTest,!ProxyLookupThrottlingTest' -pl pulsar-proxy
+        run: mvn clean install -DskipTests -pl pulsar-broker -am
 
-      - name: run proxy lookup throttling test
+      - name: run unit tests pulsar broker publish throttle tests
         if: steps.docs.outputs.changed_only == 'no'
-        run: mvn test -DfailIfNoTests=false '-Dtest=ProxyLookupThrottlingTest' -pl pulsar-proxy
+        run: mvn test -DfailIfNoTests=false '-Dtest=MessagePublishBufferThrottleTest,PrecisTopicPublishRateThrottleTest,PublishRateLimiterTest,MessagePublishThrottlingTest' -pl pulsar-broker --threads 1
 
       - name: package surefire artifacts
         if: failure()
         run: |
+          df -h
+          free -h
           rm -rf artifacts
           mkdir artifacts
           find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \;
           zip -r artifacts.zip artifacts
-
       - uses: actions/upload-artifact@master
         name: upload surefire-artifacts
         if: failure()
diff --git a/.github/workflows/ci-unit-broker-sasl.yml b/.github/workflows/ci-unit-broker-sasl.yml
index b01e44b..aa975ab 100644
--- a/.github/workflows/ci-unit-broker-sasl.yml
+++ b/.github/workflows/ci-unit-broker-sasl.yml
@@ -57,7 +57,7 @@ jobs:
 
       - name: run unit tests install by skip tests
         if: steps.docs.outputs.changed_only == 'no'
-        run: mvn clean install -DskipTests
+        run: mvn clean install -DskipTests -pl pulsar-broker-auth-sasl -am
 
       - name: run unit tests pulsar auth sasl
         if: steps.docs.outputs.changed_only == 'no'
diff --git a/.github/workflows/ci-unit-proxy.yaml b/.github/workflows/ci-unit-broker-transaction.yml
similarity index 80%
copy from .github/workflows/ci-unit-proxy.yaml
copy to .github/workflows/ci-unit-broker-transaction.yml
index e7f26fe..4f63ecf 100644
--- a/.github/workflows/ci-unit-proxy.yaml
+++ b/.github/workflows/ci-unit-broker-transaction.yml
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-name: CI - Unit - Proxy
+name: CI - Unit - Brokers - Transaction
 on:
   pull_request:
     branches:
@@ -55,26 +55,32 @@ jobs:
         with:
           maven-version: 3.6.1
 
-      - name: run unit tests install by skip tests
+      - name: clean disk
         if: steps.docs.outputs.changed_only == 'no'
-        run: mvn clean install -DskipTests
+        run: |
+          sudo swapoff /swapfile
+          sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
+          sudo apt clean
+          docker rmi $(docker images -q) -f
+          df -h
 
-      - name: run unit tests pulsar proxy
+      - name: run unit tests install by skip tests
         if: steps.docs.outputs.changed_only == 'no'
-        run: mvn test -DfailIfNoTests=false '-Dtest=!ProxyTest,!ProxyLookupThrottlingTest' -pl pulsar-proxy
+        run: mvn clean install -DskipTests -pl pulsar-broker -am
 
-      - name: run proxy lookup throttling test
+      - name: run unit tests pulsar broker transaction tests
         if: steps.docs.outputs.changed_only == 'no'
-        run: mvn test -DfailIfNoTests=false '-Dtest=ProxyLookupThrottlingTest' -pl pulsar-proxy
+        run: mvn test -DfailIfNoTests=false '-Dtest=org/apache/pulsar/broker/transaction/**/*.java' -pl pulsar-broker
 
       - name: package surefire artifacts
         if: failure()
         run: |
+          df -h
+          free -h
           rm -rf artifacts
           mkdir artifacts
           find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \;
           zip -r artifacts.zip artifacts
-
       - uses: actions/upload-artifact@master
         name: upload surefire-artifacts
         if: failure()
diff --git a/.github/workflows/ci-unit-broker.yml b/.github/workflows/ci-unit-broker.yml
index 45eebac..f61ec12 100644
--- a/.github/workflows/ci-unit-broker.yml
+++ b/.github/workflows/ci-unit-broker.yml
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-name: CI - Unit - Brokers
+name: CI - Unit - Brokers - Default
 on:
   pull_request:
     branches:
@@ -66,14 +66,14 @@ jobs:
 
       - name: run unit tests install by skip tests
         if: steps.docs.outputs.changed_only == 'no'
-        run: mvn clean install -DskipTests
+        run: mvn clean install -DskipTests -pl pulsar-broker -am
         
-      - name: run unit test pulsar-broker
+      - name: run unit test pulsar-broker default tests
         if: steps.docs.outputs.changed_only == 'no'
         run: |
           df -h
           free -h
-          mvn test -e '-Dtest=!PersistentTransactionBufferTest,!ServerCnxTest,!AdminApiOffloadTest,!AdminApiSchemaValidationEnforced,!V1_AdminApiTest2,!ProxyPublishConsumeTlsTest,!PulsarFunctionE2ETest,!MessageIdSerialization,!AdminApiTest2,!PulsarFunctionLocalRunTest,!PartitionedProducerConsumerTest,!KafkaProducerSimpleConsumerTest,!MessagePublishThrottlingTest,!ReaderTest,!RackAwareTest,!SimpleProducerConsumerTest,!V1_ProducerConsumerTest,!PersistentFailoverE2ETest,!BrokerClientIntegra [...]
+          mvn test -e '-Dtest=!PersistentTransactionBufferTest,!ServerCnxTest,!AdminApiOffloadTest,!AdminApiSchemaValidationEnforced,!V1_AdminApiTest2,!ProxyPublishConsumeTlsTest,!PulsarFunctionE2ETest,!MessageIdSerialization,!AdminApiTest2,!PulsarFunctionLocalRunTest,!PartitionedProducerConsumerTest,!KafkaProducerSimpleConsumerTest,!MessagePublishThrottlingTest,!ReaderTest,!RackAwareTest,!SimpleProducerConsumerTest,!V1_ProducerConsumerTest,!PersistentFailoverE2ETest,!BrokerClientIntegra [...]
 
       - name: package surefire artifacts
         if: failure()
diff --git a/.github/workflows/ci-unit-proxy.yaml b/.github/workflows/ci-unit-proxy.yaml
index e7f26fe..7c623e8 100644
--- a/.github/workflows/ci-unit-proxy.yaml
+++ b/.github/workflows/ci-unit-proxy.yaml
@@ -57,7 +57,7 @@ jobs:
 
       - name: run unit tests install by skip tests
         if: steps.docs.outputs.changed_only == 'no'
-        run: mvn clean install -DskipTests
+        run: mvn clean install -DskipTests -pl pulsar-proxy -am
 
       - name: run unit tests pulsar proxy
         if: steps.docs.outputs.changed_only == 'no'
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 3488fd8..072e91c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -67,7 +67,9 @@ import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 
-
+/**
+ * Broker admin base.
+ */
 public class BrokersBase extends AdminResource {
     private static final Logger LOG = LoggerFactory.getLogger(BrokersBase.class);
     private int serviceConfigZkVersion = -1;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 0790e73..efde4b8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2140,7 +2140,9 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
             return Optional.empty();
         }
     }
-    private void checkMessagePublishBuffer() {
+
+    @VisibleForTesting
+    void checkMessagePublishBuffer() {
         AtomicLong currentMessagePublishBufferBytes = new AtomicLong();
         foreachProducer(producer -> currentMessagePublishBufferBytes.addAndGet(producer.getCnx().getMessagePublishBufferSize()));
         if (currentMessagePublishBufferBytes.get() >= maxMessagePublishBufferBytes
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index f5cef0e..a7fa72d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -147,16 +147,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;
 
     private static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;
-
-    // Timestamp of when this topic was last seen active
-    private volatile long lastActive;
     
     // topic has every published chunked message since topic is loaded
     public boolean msgChunkPublished;
 
-    // Flag to signal that producer of this topic has published batch-message so, broker should not allow consumer which
-    // doesn't support batch-message
-    private volatile boolean hasBatchMessagePublished = false;
     private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
     private Optional<SubscribeRateLimiter> subscribeRateLimiter = Optional.empty();
     public volatile long delayedDeliveryTickTimeMillis = 1000;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
index 5397725..8aa9c9a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
@@ -51,9 +51,9 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase {
         super.baseSetup();
         final String topic = "persistent://prop/ns-abc/testMessagePublishBufferThrottleDisabled";
         Producer<byte[]> producer = pulsarClient.newProducer()
-            .topic(topic)
-            .producerName("producer-name")
-            .create();
+                .topic(topic)
+                .producerName("producer-name")
+                .create();
         Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
         Assert.assertNotNull(topicRef);
         ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(Long.MAX_VALUE / 2);
@@ -76,34 +76,26 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase {
     @Test
     public void testMessagePublishBufferThrottleEnable() throws Exception {
         conf.setMaxMessagePublishBufferSizeInMB(1);
-        conf.setMessagePublishBufferCheckIntervalInMillis(2);
+        conf.setMessagePublishBufferCheckIntervalInMillis(Integer.MAX_VALUE);
         super.baseSetup();
-        Thread.sleep(4);
         Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
         final String topic = "persistent://prop/ns-abc/testMessagePublishBufferThrottleEnable";
         Producer<byte[]> producer = pulsarClient.newProducer()
-            .topic(topic)
-            .producerName("producer-name")
-            .create();
+                .topic(topic)
+                .producerName("producer-name")
+                .create();
         Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
         Assert.assertNotNull(topicRef);
         ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(Long.MAX_VALUE / 2);
-        Thread.sleep(4);
-        Assert.assertTrue(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
+        Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
         // The first message can publish success, but the second message should be blocked
         producer.sendAsync(new byte[1024]).get(1, TimeUnit.SECONDS);
-        MessageId messageId = null;
-        try {
-            messageId = producer.sendAsync(new byte[1024]).get(1, TimeUnit.SECONDS);
-            Assert.fail("should failed, because producer blocked by publish buffer limiting");
-        } catch (TimeoutException e) {
-            // No-op
-        }
-        Assert.assertNull(messageId);
+        getPulsar().getBrokerService().checkMessagePublishBuffer();
+        Assert.assertTrue(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
 
         ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(0L);
-        Thread.sleep(4);
-
+        getPulsar().getBrokerService().checkMessagePublishBuffer();
+        Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
         List<CompletableFuture<MessageId>> futures = new ArrayList<>();
         // Make sure the producer can publish succeed.
         for (int i = 0; i < 10; i++) {
@@ -113,7 +105,6 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase {
         for (CompletableFuture<MessageId> future : futures) {
             Assert.assertNotNull(future.get());
         }
-        Thread.sleep(4);
         Assert.assertEquals(pulsar.getBrokerService().getCurrentMessagePublishBufferSize(), 0L);
         super.internalCleanup();
     }
@@ -121,37 +112,35 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase {
     @Test
     public void testBlockByPublishRateLimiting() throws Exception {
         conf.setMaxMessagePublishBufferSizeInMB(1);
-        conf.setMessagePublishBufferCheckIntervalInMillis(2);
+        conf.setMessagePublishBufferCheckIntervalInMillis(Integer.MAX_VALUE);
         super.baseSetup();
-        Thread.sleep(4);
         Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
-        final String topic = "persistent://prop/ns-abc/testMessagePublishBufferThrottleEnable";
+        final String topic = "persistent://prop/ns-abc/testBlockByPublishRateLimiting";
         Producer<byte[]> producer = pulsarClient.newProducer()
-            .topic(topic)
-            .producerName("producer-name")
-            .create();
+                .topic(topic)
+                .producerName("producer-name")
+                .create();
         Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
         Assert.assertNotNull(topicRef);
         ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(Long.MAX_VALUE / 2);
+        Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
         producer.sendAsync(new byte[1024]).get(1, TimeUnit.SECONDS);
 
-        Thread.sleep(4);
+        // Block by publish buffer.
+        getPulsar().getBrokerService().checkMessagePublishBuffer();
+        Assert.assertTrue(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
+
+        // Block by publish rate.
+        ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(0L);
+        getPulsar().getBrokerService().checkMessagePublishBuffer();
         ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setAutoReadDisabledRateLimiting(true);
-        ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(0);
-        Thread.sleep(4);
-        Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
-        MessageId messageId = null;
-        try {
-            messageId = producer.sendAsync(new byte[1024]).get(1, TimeUnit.SECONDS);
-            Assert.fail("should failed, because producer blocked by publish buffer limiting");
-        } catch (TimeoutException e) {
-            // No-op
-        }
-        Assert.assertNull(messageId);
+        ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().disableCnxAutoRead();
+        ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().enableCnxAutoRead();
 
+        // Resume message publish.
         ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setAutoReadDisabledRateLimiting(false);
         ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().enableCnxAutoRead();
-
+        Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
         List<CompletableFuture<MessageId>> futures = new ArrayList<>();
         // Make sure the producer can publish succeed.
         for (int i = 0; i < 10; i++) {
@@ -161,8 +150,7 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase {
         for (CompletableFuture<MessageId> future : futures) {
             Assert.assertNotNull(future.get());
         }
-        Thread.sleep(4);
         Assert.assertEquals(pulsar.getBrokerService().getCurrentMessagePublishBufferSize(), 0L);
         super.internalCleanup();
     }
-}
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 44a2966..dd1176a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -446,6 +446,7 @@ public class ServerCnxTest {
         assertTrue(response instanceof CommandError);
         CommandError error = (CommandError) response;
         assertEquals(error.getError(), ServerError.ServiceNotReady);
+        channel.finish();
     }
 
     @Test(timeOut = 30000)
@@ -531,6 +532,7 @@ public class ServerCnxTest {
                 successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0, "test" /* consumer name */, 0);
         channel.writeInbound(newSubscribeCmd);
         assertTrue(getResponse() instanceof CommandError);
+        channel.finish();
     }
 
     @Test(timeOut = 30000)
@@ -560,6 +562,7 @@ public class ServerCnxTest {
                 "prod-name", Collections.emptyMap());
         channel.writeInbound(clientCommand);
         assertTrue(getResponse() instanceof CommandError);
+        channel.finish();
     }
 
     @Test(timeOut = 30000)
@@ -598,6 +601,7 @@ public class ServerCnxTest {
         assertTrue(topicRef.getSubscriptions().containsKey(successSubName));
         assertTrue(topicRef.getSubscription(successSubName).getDispatcher().isConsumerConnected());
         assertTrue(getResponse() instanceof CommandSuccess);
+        channel.finish();
     }
 
     @Test(timeOut = 30000)
@@ -752,6 +756,7 @@ public class ServerCnxTest {
         assertTrue(response instanceof CommandError, "Response is not CommandError but " + response);
         CommandError error = (CommandError) response;
         assertEquals(error.getError(), ServerError.ServiceNotReady);
+        channel.finish();
     }
 
     @Test(timeOut = 30000)
@@ -1175,7 +1180,6 @@ public class ServerCnxTest {
 
         PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(successTopicName).get();
         topicRef.markBatchMessagePublished();
-
         // test SUBSCRIBE on topic and cursor creation success
         clientCommand = Commands.newSubscribe(successTopicName, failSubName, 2, 2, SubType.Exclusive, 0 /* priority */,
                 "test" /* consumer name */, 0 /*avoid reseting cursor*/);
@@ -1649,5 +1653,7 @@ public class ServerCnxTest {
 
         Object response = getResponse();
         assertTrue(response instanceof CommandSuccess);
+
+        channel.finish();
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 3e51e35..a83301c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -180,6 +180,12 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
         admin.namespaces().createNamespace(NAMESPACE1);
         admin.namespaces().createNamespace(NAMESPACE2);
         admin.namespaces().createNamespace(NAMESPACE3);
+        admin.lookups().lookupTopic(TOPIC1.toString());
+        admin.lookups().lookupTopic(TOPIC2.toString());
+        admin.lookups().lookupTopic(TOPIC3.toString());
+        admin.lookups().lookupTopic(TOPIC4.toString());
+        admin.lookups().lookupTopic(TOPIC5.toString());
+        admin.lookups().lookupTopic(TOPIC6.toString());
         systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient);
         systemTopicBasedTopicPoliciesService = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
     }