You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/06/04 18:08:07 UTC
[pulsar] branch master updated: Improve broker unit test CI (#7173)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new df325c2 Improve broker unit test CI (#7173)
df325c2 is described below
commit df325c291e1051aaa43c1f0be5bee8920a6bda2b
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Jun 5 02:07:51 2020 +0800
Improve broker unit test CI (#7173)
* CI Tests
* CI Tests
* CI Tests
* CI Tests
* CI Tests
* CI Tests
* CI Tests
* CI Tests
* CI Tests
* CI Tests
* CI Tests
* Remove test from broker and client which is run in long time task.
* Split transaction tests in broker module.
---
...i-unit-broker.yml => ci-unit-broker-broker.yml} | 13 ++--
...i-unit-proxy.yaml => ci-unit-broker-client.yml} | 22 ++++---
...ci-unit-proxy.yaml => ci-unit-broker-flaky.yml} | 22 ++++---
...nit-proxy.yaml => ci-unit-broker-long-time.yml} | 22 ++++---
.github/workflows/ci-unit-broker-others.yml | 54 +---------------
...xy.yaml => ci-unit-broker-publish-throttle.yml} | 22 ++++---
.github/workflows/ci-unit-broker-sasl.yml | 2 +-
...t-proxy.yaml => ci-unit-broker-transaction.yml} | 22 ++++---
.github/workflows/ci-unit-broker.yml | 8 +--
.github/workflows/ci-unit-proxy.yaml | 2 +-
.../pulsar/broker/admin/impl/BrokersBase.java | 4 +-
.../pulsar/broker/service/BrokerService.java | 4 +-
.../broker/service/persistent/PersistentTopic.java | 6 --
.../service/MessagePublishBufferThrottleTest.java | 72 +++++++++-------------
.../pulsar/broker/service/ServerCnxTest.java | 8 ++-
.../SystemTopicBasedTopicPoliciesServiceTest.java | 6 ++
16 files changed, 133 insertions(+), 156 deletions(-)
diff --git a/.github/workflows/ci-unit-broker.yml b/.github/workflows/ci-unit-broker-broker.yml
similarity index 74%
copy from .github/workflows/ci-unit-broker.yml
copy to .github/workflows/ci-unit-broker-broker.yml
index 45eebac..5b66995 100644
--- a/.github/workflows/ci-unit-broker.yml
+++ b/.github/workflows/ci-unit-broker-broker.yml
@@ -17,7 +17,7 @@
# under the License.
#
-name: CI - Unit - Brokers
+name: CI - Unit - Brokers - Broker
on:
pull_request:
branches:
@@ -66,14 +66,11 @@ jobs:
- name: run unit tests install by skip tests
if: steps.docs.outputs.changed_only == 'no'
- run: mvn clean install -DskipTests
-
- - name: run unit test pulsar-broker
+ run: mvn clean install -DskipTests -pl pulsar-broker -am
+
+ - name: run unit tests pulsar broker tests
if: steps.docs.outputs.changed_only == 'no'
- run: |
- df -h
- free -h
- mvn test -e '-Dtest=!PersistentTransactionBufferTest,!ServerCnxTest,!AdminApiOffloadTest,!AdminApiSchemaValidationEnforced,!V1_AdminApiTest2,!ProxyPublishConsumeTlsTest,!PulsarFunctionE2ETest,!MessageIdSerialization,!AdminApiTest2,!PulsarFunctionLocalRunTest,!PartitionedProducerConsumerTest,!KafkaProducerSimpleConsumerTest,!MessagePublishThrottlingTest,!ReaderTest,!RackAwareTest,!SimpleProducerConsumerTest,!V1_ProducerConsumerTest,!PersistentFailoverE2ETest,!BrokerClientIntegra [...]
+ run: mvn test -DfailIfNoTests=false '-Dtest=org/apache/pulsar/broker/**/*.java,!NamespaceOwnershipListenerTests,!AdminApiTest2,!MessagePublishBufferThrottleTest,!PrecisTopicPublishRateThrottleTest,!PublishRateLimiterTest,!MessagePublishThrottlingTest,!LoadBalancerTest,!org/apache/pulsar/broker/transaction/**/*.java,!AdminApiTest,!V1_AdminApiTest' -pl pulsar-broker
- name: package surefire artifacts
if: failure()
diff --git a/.github/workflows/ci-unit-proxy.yaml b/.github/workflows/ci-unit-broker-client.yml
similarity index 77%
copy from .github/workflows/ci-unit-proxy.yaml
copy to .github/workflows/ci-unit-broker-client.yml
index e7f26fe..74b6c65 100644
--- a/.github/workflows/ci-unit-proxy.yaml
+++ b/.github/workflows/ci-unit-broker-client.yml
@@ -17,7 +17,7 @@
# under the License.
#
-name: CI - Unit - Proxy
+name: CI - Unit - Brokers - Client
on:
pull_request:
branches:
@@ -55,26 +55,32 @@ jobs:
with:
maven-version: 3.6.1
- - name: run unit tests install by skip tests
+ - name: clean disk
if: steps.docs.outputs.changed_only == 'no'
- run: mvn clean install -DskipTests
+ run: |
+ sudo swapoff /swapfile
+ sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
+ sudo apt clean
+ docker rmi $(docker images -q) -f
+ df -h
- - name: run unit tests pulsar proxy
+ - name: run unit tests install by skip tests
if: steps.docs.outputs.changed_only == 'no'
- run: mvn test -DfailIfNoTests=false '-Dtest=!ProxyTest,!ProxyLookupThrottlingTest' -pl pulsar-proxy
+ run: mvn clean install -DskipTests -pl pulsar-broker -am
- - name: run proxy lookup throttling test
+ - name: run unit tests pulsar broker client tests
if: steps.docs.outputs.changed_only == 'no'
- run: mvn test -DfailIfNoTests=false '-Dtest=ProxyLookupThrottlingTest' -pl pulsar-proxy
+ run: mvn test -DfailIfNoTests=false '-Dtest=org/apache/pulsar/client/**/*.java,!UnAcknowledgedMessagesTimeoutTest,!SimpleSchemaTest,!ConsumerDedupPermitsUpdate,!ConsumerBatchReceiveTest,!KeySharedSubscriptionTest' -pl pulsar-broker
- name: package surefire artifacts
if: failure()
run: |
+ df -h
+ free -h
rm -rf artifacts
mkdir artifacts
find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \;
zip -r artifacts.zip artifacts
-
- uses: actions/upload-artifact@master
name: upload surefire-artifacts
if: failure()
diff --git a/.github/workflows/ci-unit-proxy.yaml b/.github/workflows/ci-unit-broker-flaky.yml
similarity index 77%
copy from .github/workflows/ci-unit-proxy.yaml
copy to .github/workflows/ci-unit-broker-flaky.yml
index e7f26fe..4eaf301 100644
--- a/.github/workflows/ci-unit-proxy.yaml
+++ b/.github/workflows/ci-unit-broker-flaky.yml
@@ -17,7 +17,7 @@
# under the License.
#
-name: CI - Unit - Proxy
+name: CI - Unit - Brokers - Flaky
on:
pull_request:
branches:
@@ -55,26 +55,32 @@ jobs:
with:
maven-version: 3.6.1
- - name: run unit tests install by skip tests
+ - name: clean disk
if: steps.docs.outputs.changed_only == 'no'
- run: mvn clean install -DskipTests
+ run: |
+ sudo swapoff /swapfile
+ sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
+ sudo apt clean
+ docker rmi $(docker images -q) -f
+ df -h
- - name: run unit tests pulsar proxy
+ - name: run unit tests install by skip tests
if: steps.docs.outputs.changed_only == 'no'
- run: mvn test -DfailIfNoTests=false '-Dtest=!ProxyTest,!ProxyLookupThrottlingTest' -pl pulsar-proxy
+ run: mvn clean install -DskipTests -pl pulsar-broker -am
- - name: run proxy lookup throttling test
+ - name: run unit tests pulsar broker flaky tests
if: steps.docs.outputs.changed_only == 'no'
- run: mvn test -DfailIfNoTests=false '-Dtest=ProxyLookupThrottlingTest' -pl pulsar-proxy
+ run: mvn test -DfailIfNoTests=false '-Dtest=MessageIdTest,UnAcknowledgedMessagesTimeoutTest,ConsumerPreciseDispatcherFlowControl,SimpleSchemaTest,ConsumerDedupPermitsUpdate,NamespaceOwnershipListenerTests' -pl pulsar-broker
- name: package surefire artifacts
if: failure()
run: |
+ df -h
+ free -h
rm -rf artifacts
mkdir artifacts
find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \;
zip -r artifacts.zip artifacts
-
- uses: actions/upload-artifact@master
name: upload surefire-artifacts
if: failure()
diff --git a/.github/workflows/ci-unit-proxy.yaml b/.github/workflows/ci-unit-broker-long-time.yml
similarity index 77%
copy from .github/workflows/ci-unit-proxy.yaml
copy to .github/workflows/ci-unit-broker-long-time.yml
index e7f26fe..647ee76 100644
--- a/.github/workflows/ci-unit-proxy.yaml
+++ b/.github/workflows/ci-unit-broker-long-time.yml
@@ -17,7 +17,7 @@
# under the License.
#
-name: CI - Unit - Proxy
+name: CI - Unit - Brokers - Long - Time
on:
pull_request:
branches:
@@ -55,26 +55,32 @@ jobs:
with:
maven-version: 3.6.1
- - name: run unit tests install by skip tests
+ - name: clean disk
if: steps.docs.outputs.changed_only == 'no'
- run: mvn clean install -DskipTests
+ run: |
+ sudo swapoff /swapfile
+ sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
+ sudo apt clean
+ docker rmi $(docker images -q) -f
+ df -h
- - name: run unit tests pulsar proxy
+ - name: run unit tests install by skip tests
if: steps.docs.outputs.changed_only == 'no'
- run: mvn test -DfailIfNoTests=false '-Dtest=!ProxyTest,!ProxyLookupThrottlingTest' -pl pulsar-proxy
+ run: mvn clean install -DskipTests -pl pulsar-broker -am
- - name: run proxy lookup throttling test
+ - name: run unit tests pulsar broker long time tests
if: steps.docs.outputs.changed_only == 'no'
- run: mvn test -DfailIfNoTests=false '-Dtest=ProxyLookupThrottlingTest' -pl pulsar-proxy
+ run: mvn test -DfailIfNoTests=false '-Dtest=PulsarFunctionE2ESecurityTest,SchemaCompatibilityCheckTest,CompactedTopicTest,LoadBalancerTest,ConsumerBatchReceiveTest,KeySharedSubscriptionTest' -pl pulsar-broker
- name: package surefire artifacts
if: failure()
run: |
+ df -h
+ free -h
rm -rf artifacts
mkdir artifacts
find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \;
zip -r artifacts.zip artifacts
-
- uses: actions/upload-artifact@master
name: upload surefire-artifacts
if: failure()
diff --git a/.github/workflows/ci-unit-broker-others.yml b/.github/workflows/ci-unit-broker-others.yml
index e1d3495..7f4d638 100644
--- a/.github/workflows/ci-unit-broker-others.yml
+++ b/.github/workflows/ci-unit-broker-others.yml
@@ -66,59 +66,11 @@ jobs:
- name: run unit tests install by skip tests
if: steps.docs.outputs.changed_only == 'no'
- run: mvn clean install -DskipTests
+ run: mvn clean install -DskipTests -pl pulsar-broker -am
- - name: run unit tests pulsar broker reader test
+ - name: run unit tests pulsar broker other tests
if: steps.docs.outputs.changed_only == 'no'
- run: mvn test -DfailIfNoTests=false '-Dtest=ReaderTest' -pl pulsar-broker
-
- - name: run unit tests pulsar broker rack aware test
- if: steps.docs.outputs.changed_only == 'no'
- run: mvn test -DfailIfNoTests=false '-Dtest=RackAwareTest' -pl pulsar-broker
-
- - name: run unit tests pulsar broker simple producer consumer test
- if: steps.docs.outputs.changed_only == 'no'
- run: mvn test -DfailIfNoTests=false '-Dtest=SimpleProducerConsumerTest' -pl pulsar-broker
-
- - name: run unit tests pulsar broker V1 producer consumer test
- if: steps.docs.outputs.changed_only == 'no'
- run: mvn test -DfailIfNoTests=false '-Dtest=V1_ProducerConsumerTest' -pl pulsar-broker
-
- - name: run unit tests pulsar broker persistent failover end to end test
- if: steps.docs.outputs.changed_only == 'no'
- run: mvn test -DfailIfNoTests=false '-Dtest=PersistentFailoverE2ETest' -pl pulsar-broker
-
- - name: run unit tests pulsar broker client integration test
- if: steps.docs.outputs.changed_only == 'no'
- run: mvn test -DfailIfNoTests=false '-Dtest=BrokerClientIntegrationTest' -pl pulsar-broker
-
- - name: run unit tests pulsar broker replicatior rate limiter test
- if: steps.docs.outputs.changed_only == 'no'
- run: mvn test -DfailIfNoTests=false '-Dtest=ReplicatorRateLimiterTest' -pl pulsar-broker
-
- - name: run unit tests pulsar broker persistent dispatcher failover consumer test
- if: steps.docs.outputs.changed_only == 'no'
- run: mvn test -DfailIfNoTests=false '-Dtest=PersistentDispatcherFailoverConsumerTest' -pl pulsar-broker
-
- - name: run unit tests pulsar broker admin api test
- if: steps.docs.outputs.changed_only == 'no'
- run: mvn test -DfailIfNoTests=false '-Dtest=AdminApiTest' -pl pulsar-broker
-
- - name: run unit tests pulsar broker v1 admin api test
- if: steps.docs.outputs.changed_only == 'no'
- run: mvn test -DfailIfNoTests=false '-Dtest=V1_AdminApiTest' -pl pulsar-broker
-
- - name: run unit tests pulsar broker compaction test
- if: steps.docs.outputs.changed_only == 'no'
- run: mvn test -DfailIfNoTests=false '-Dtest=CompactionTest' -pl pulsar-broker
-
- - name: run unit tests pulsar broker batch message test
- if: steps.docs.outputs.changed_only == 'no'
- run: mvn test -DfailIfNoTests=false '-Dtest=BatchMessageTest' -pl pulsar-broker
-
- - name: run unit tests pulsar broker partitioned topics schema test
- if: steps.docs.outputs.changed_only == 'no'
- run: mvn test -DfailIfNoTests=false '-Dtest=PartitionedTopicsSchemaTest' -pl pulsar-broker
+ run: mvn test -DfailIfNoTests=false '-Dtest=ReaderTest,RackAwareTest,SimpleProducerConsumerTest,V1_ProducerConsumerTest,PersistentFailoverE2ETest,BrokerClientIntegrationTest,ReplicatorRateLimiterTest,PersistentDispatcherFailoverConsumerTest,AdminApiTest,V1_AdminApiTest,CompactionTest,BatchMessageTest,PartitionedTopicsSchemaTest' -pl pulsar-broker
- name: package surefire artifacts
if: failure()
diff --git a/.github/workflows/ci-unit-proxy.yaml b/.github/workflows/ci-unit-broker-publish-throttle.yml
similarity index 77%
copy from .github/workflows/ci-unit-proxy.yaml
copy to .github/workflows/ci-unit-broker-publish-throttle.yml
index e7f26fe..3dc0d51 100644
--- a/.github/workflows/ci-unit-proxy.yaml
+++ b/.github/workflows/ci-unit-broker-publish-throttle.yml
@@ -17,7 +17,7 @@
# under the License.
#
-name: CI - Unit - Proxy
+name: CI - Unit - Brokers - Publish - Throttle
on:
pull_request:
branches:
@@ -55,26 +55,32 @@ jobs:
with:
maven-version: 3.6.1
- - name: run unit tests install by skip tests
+ - name: clean disk
if: steps.docs.outputs.changed_only == 'no'
- run: mvn clean install -DskipTests
+ run: |
+ sudo swapoff /swapfile
+ sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
+ sudo apt clean
+ docker rmi $(docker images -q) -f
+ df -h
- - name: run unit tests pulsar proxy
+ - name: run unit tests install by skip tests
if: steps.docs.outputs.changed_only == 'no'
- run: mvn test -DfailIfNoTests=false '-Dtest=!ProxyTest,!ProxyLookupThrottlingTest' -pl pulsar-proxy
+ run: mvn clean install -DskipTests -pl pulsar-broker -am
- - name: run proxy lookup throttling test
+ - name: run unit tests pulsar broker publish throttle tests
if: steps.docs.outputs.changed_only == 'no'
- run: mvn test -DfailIfNoTests=false '-Dtest=ProxyLookupThrottlingTest' -pl pulsar-proxy
+ run: mvn test -DfailIfNoTests=false '-Dtest=MessagePublishBufferThrottleTest,PrecisTopicPublishRateThrottleTest,PublishRateLimiterTest,MessagePublishThrottlingTest' -pl pulsar-broker --threads 1
- name: package surefire artifacts
if: failure()
run: |
+ df -h
+ free -h
rm -rf artifacts
mkdir artifacts
find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \;
zip -r artifacts.zip artifacts
-
- uses: actions/upload-artifact@master
name: upload surefire-artifacts
if: failure()
diff --git a/.github/workflows/ci-unit-broker-sasl.yml b/.github/workflows/ci-unit-broker-sasl.yml
index b01e44b..aa975ab 100644
--- a/.github/workflows/ci-unit-broker-sasl.yml
+++ b/.github/workflows/ci-unit-broker-sasl.yml
@@ -57,7 +57,7 @@ jobs:
- name: run unit tests install by skip tests
if: steps.docs.outputs.changed_only == 'no'
- run: mvn clean install -DskipTests
+ run: mvn clean install -DskipTests -pl pulsar-broker-auth-sasl -am
- name: run unit tests pulsar auth sasl
if: steps.docs.outputs.changed_only == 'no'
diff --git a/.github/workflows/ci-unit-proxy.yaml b/.github/workflows/ci-unit-broker-transaction.yml
similarity index 80%
copy from .github/workflows/ci-unit-proxy.yaml
copy to .github/workflows/ci-unit-broker-transaction.yml
index e7f26fe..4f63ecf 100644
--- a/.github/workflows/ci-unit-proxy.yaml
+++ b/.github/workflows/ci-unit-broker-transaction.yml
@@ -17,7 +17,7 @@
# under the License.
#
-name: CI - Unit - Proxy
+name: CI - Unit - Brokers - Transaction
on:
pull_request:
branches:
@@ -55,26 +55,32 @@ jobs:
with:
maven-version: 3.6.1
- - name: run unit tests install by skip tests
+ - name: clean disk
if: steps.docs.outputs.changed_only == 'no'
- run: mvn clean install -DskipTests
+ run: |
+ sudo swapoff /swapfile
+ sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
+ sudo apt clean
+ docker rmi $(docker images -q) -f
+ df -h
- - name: run unit tests pulsar proxy
+ - name: run unit tests install by skip tests
if: steps.docs.outputs.changed_only == 'no'
- run: mvn test -DfailIfNoTests=false '-Dtest=!ProxyTest,!ProxyLookupThrottlingTest' -pl pulsar-proxy
+ run: mvn clean install -DskipTests -pl pulsar-broker -am
- - name: run proxy lookup throttling test
+ - name: run unit tests pulsar broker transaction tests
if: steps.docs.outputs.changed_only == 'no'
- run: mvn test -DfailIfNoTests=false '-Dtest=ProxyLookupThrottlingTest' -pl pulsar-proxy
+ run: mvn test -DfailIfNoTests=false '-Dtest=org/apache/pulsar/broker/transaction/**/*.java' -pl pulsar-broker
- name: package surefire artifacts
if: failure()
run: |
+ df -h
+ free -h
rm -rf artifacts
mkdir artifacts
find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \;
zip -r artifacts.zip artifacts
-
- uses: actions/upload-artifact@master
name: upload surefire-artifacts
if: failure()
diff --git a/.github/workflows/ci-unit-broker.yml b/.github/workflows/ci-unit-broker.yml
index 45eebac..f61ec12 100644
--- a/.github/workflows/ci-unit-broker.yml
+++ b/.github/workflows/ci-unit-broker.yml
@@ -17,7 +17,7 @@
# under the License.
#
-name: CI - Unit - Brokers
+name: CI - Unit - Brokers - Default
on:
pull_request:
branches:
@@ -66,14 +66,14 @@ jobs:
- name: run unit tests install by skip tests
if: steps.docs.outputs.changed_only == 'no'
- run: mvn clean install -DskipTests
+ run: mvn clean install -DskipTests -pl pulsar-broker -am
- - name: run unit test pulsar-broker
+ - name: run unit test pulsar-broker default tests
if: steps.docs.outputs.changed_only == 'no'
run: |
df -h
free -h
- mvn test -e '-Dtest=!PersistentTransactionBufferTest,!ServerCnxTest,!AdminApiOffloadTest,!AdminApiSchemaValidationEnforced,!V1_AdminApiTest2,!ProxyPublishConsumeTlsTest,!PulsarFunctionE2ETest,!MessageIdSerialization,!AdminApiTest2,!PulsarFunctionLocalRunTest,!PartitionedProducerConsumerTest,!KafkaProducerSimpleConsumerTest,!MessagePublishThrottlingTest,!ReaderTest,!RackAwareTest,!SimpleProducerConsumerTest,!V1_ProducerConsumerTest,!PersistentFailoverE2ETest,!BrokerClientIntegra [...]
+ mvn test -e '-Dtest=!PersistentTransactionBufferTest,!ServerCnxTest,!AdminApiOffloadTest,!AdminApiSchemaValidationEnforced,!V1_AdminApiTest2,!ProxyPublishConsumeTlsTest,!PulsarFunctionE2ETest,!MessageIdSerialization,!AdminApiTest2,!PulsarFunctionLocalRunTest,!PartitionedProducerConsumerTest,!KafkaProducerSimpleConsumerTest,!MessagePublishThrottlingTest,!ReaderTest,!RackAwareTest,!SimpleProducerConsumerTest,!V1_ProducerConsumerTest,!PersistentFailoverE2ETest,!BrokerClientIntegra [...]
- name: package surefire artifacts
if: failure()
diff --git a/.github/workflows/ci-unit-proxy.yaml b/.github/workflows/ci-unit-proxy.yaml
index e7f26fe..7c623e8 100644
--- a/.github/workflows/ci-unit-proxy.yaml
+++ b/.github/workflows/ci-unit-proxy.yaml
@@ -57,7 +57,7 @@ jobs:
- name: run unit tests install by skip tests
if: steps.docs.outputs.changed_only == 'no'
- run: mvn clean install -DskipTests
+ run: mvn clean install -DskipTests -pl pulsar-proxy -am
- name: run unit tests pulsar proxy
if: steps.docs.outputs.changed_only == 'no'
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 3488fd8..072e91c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -67,7 +67,9 @@ import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
-
+/**
+ * Broker admin base.
+ */
public class BrokersBase extends AdminResource {
private static final Logger LOG = LoggerFactory.getLogger(BrokersBase.class);
private int serviceConfigZkVersion = -1;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 0790e73..efde4b8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2140,7 +2140,9 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
return Optional.empty();
}
}
- private void checkMessagePublishBuffer() {
+
+ @VisibleForTesting
+ void checkMessagePublishBuffer() {
AtomicLong currentMessagePublishBufferBytes = new AtomicLong();
foreachProducer(producer -> currentMessagePublishBufferBytes.addAndGet(producer.getCnx().getMessagePublishBufferSize()));
if (currentMessagePublishBufferBytes.get() >= maxMessagePublishBufferBytes
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index f5cef0e..a7fa72d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -147,16 +147,10 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;
private static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;
-
- // Timestamp of when this topic was last seen active
- private volatile long lastActive;
// topic has every published chunked message since topic is loaded
public boolean msgChunkPublished;
- // Flag to signal that producer of this topic has published batch-message so, broker should not allow consumer which
- // doesn't support batch-message
- private volatile boolean hasBatchMessagePublished = false;
private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
private Optional<SubscribeRateLimiter> subscribeRateLimiter = Optional.empty();
public volatile long delayedDeliveryTickTimeMillis = 1000;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
index 5397725..8aa9c9a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java
@@ -51,9 +51,9 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase {
super.baseSetup();
final String topic = "persistent://prop/ns-abc/testMessagePublishBufferThrottleDisabled";
Producer<byte[]> producer = pulsarClient.newProducer()
- .topic(topic)
- .producerName("producer-name")
- .create();
+ .topic(topic)
+ .producerName("producer-name")
+ .create();
Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
Assert.assertNotNull(topicRef);
((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(Long.MAX_VALUE / 2);
@@ -76,34 +76,26 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase {
@Test
public void testMessagePublishBufferThrottleEnable() throws Exception {
conf.setMaxMessagePublishBufferSizeInMB(1);
- conf.setMessagePublishBufferCheckIntervalInMillis(2);
+ conf.setMessagePublishBufferCheckIntervalInMillis(Integer.MAX_VALUE);
super.baseSetup();
- Thread.sleep(4);
Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
final String topic = "persistent://prop/ns-abc/testMessagePublishBufferThrottleEnable";
Producer<byte[]> producer = pulsarClient.newProducer()
- .topic(topic)
- .producerName("producer-name")
- .create();
+ .topic(topic)
+ .producerName("producer-name")
+ .create();
Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
Assert.assertNotNull(topicRef);
((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(Long.MAX_VALUE / 2);
- Thread.sleep(4);
- Assert.assertTrue(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
+ Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
// The first message can publish success, but the second message should be blocked
producer.sendAsync(new byte[1024]).get(1, TimeUnit.SECONDS);
- MessageId messageId = null;
- try {
- messageId = producer.sendAsync(new byte[1024]).get(1, TimeUnit.SECONDS);
- Assert.fail("should failed, because producer blocked by publish buffer limiting");
- } catch (TimeoutException e) {
- // No-op
- }
- Assert.assertNull(messageId);
+ getPulsar().getBrokerService().checkMessagePublishBuffer();
+ Assert.assertTrue(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(0L);
- Thread.sleep(4);
-
+ getPulsar().getBrokerService().checkMessagePublishBuffer();
+ Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
List<CompletableFuture<MessageId>> futures = new ArrayList<>();
// Make sure the producer can publish succeed.
for (int i = 0; i < 10; i++) {
@@ -113,7 +105,6 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase {
for (CompletableFuture<MessageId> future : futures) {
Assert.assertNotNull(future.get());
}
- Thread.sleep(4);
Assert.assertEquals(pulsar.getBrokerService().getCurrentMessagePublishBufferSize(), 0L);
super.internalCleanup();
}
@@ -121,37 +112,35 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase {
@Test
public void testBlockByPublishRateLimiting() throws Exception {
conf.setMaxMessagePublishBufferSizeInMB(1);
- conf.setMessagePublishBufferCheckIntervalInMillis(2);
+ conf.setMessagePublishBufferCheckIntervalInMillis(Integer.MAX_VALUE);
super.baseSetup();
- Thread.sleep(4);
Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
- final String topic = "persistent://prop/ns-abc/testMessagePublishBufferThrottleEnable";
+ final String topic = "persistent://prop/ns-abc/testBlockByPublishRateLimiting";
Producer<byte[]> producer = pulsarClient.newProducer()
- .topic(topic)
- .producerName("producer-name")
- .create();
+ .topic(topic)
+ .producerName("producer-name")
+ .create();
Topic topicRef = pulsar.getBrokerService().getTopicReference(topic).get();
Assert.assertNotNull(topicRef);
((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(Long.MAX_VALUE / 2);
+ Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
producer.sendAsync(new byte[1024]).get(1, TimeUnit.SECONDS);
- Thread.sleep(4);
+ // Block by publish buffer.
+ getPulsar().getBrokerService().checkMessagePublishBuffer();
+ Assert.assertTrue(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
+
+ // Block by publish rate.
+ ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(0L);
+ getPulsar().getBrokerService().checkMessagePublishBuffer();
((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setAutoReadDisabledRateLimiting(true);
- ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setMessagePublishBufferSize(0);
- Thread.sleep(4);
- Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
- MessageId messageId = null;
- try {
- messageId = producer.sendAsync(new byte[1024]).get(1, TimeUnit.SECONDS);
- Assert.fail("should failed, because producer blocked by publish buffer limiting");
- } catch (TimeoutException e) {
- // No-op
- }
- Assert.assertNull(messageId);
+ ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().disableCnxAutoRead();
+ ((AbstractTopic)topicRef).producers.get("producer-name").getCnx().enableCnxAutoRead();
+ // Resume message publish.
((AbstractTopic)topicRef).producers.get("producer-name").getCnx().setAutoReadDisabledRateLimiting(false);
((AbstractTopic)topicRef).producers.get("producer-name").getCnx().enableCnxAutoRead();
-
+ Assert.assertFalse(pulsar.getBrokerService().isReachMessagePublishBufferThreshold());
List<CompletableFuture<MessageId>> futures = new ArrayList<>();
// Make sure the producer can publish succeed.
for (int i = 0; i < 10; i++) {
@@ -161,8 +150,7 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase {
for (CompletableFuture<MessageId> future : futures) {
Assert.assertNotNull(future.get());
}
- Thread.sleep(4);
Assert.assertEquals(pulsar.getBrokerService().getCurrentMessagePublishBufferSize(), 0L);
super.internalCleanup();
}
-}
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 44a2966..dd1176a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -446,6 +446,7 @@ public class ServerCnxTest {
assertTrue(response instanceof CommandError);
CommandError error = (CommandError) response;
assertEquals(error.getError(), ServerError.ServiceNotReady);
+ channel.finish();
}
@Test(timeOut = 30000)
@@ -531,6 +532,7 @@ public class ServerCnxTest {
successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0, "test" /* consumer name */, 0);
channel.writeInbound(newSubscribeCmd);
assertTrue(getResponse() instanceof CommandError);
+ channel.finish();
}
@Test(timeOut = 30000)
@@ -560,6 +562,7 @@ public class ServerCnxTest {
"prod-name", Collections.emptyMap());
channel.writeInbound(clientCommand);
assertTrue(getResponse() instanceof CommandError);
+ channel.finish();
}
@Test(timeOut = 30000)
@@ -598,6 +601,7 @@ public class ServerCnxTest {
assertTrue(topicRef.getSubscriptions().containsKey(successSubName));
assertTrue(topicRef.getSubscription(successSubName).getDispatcher().isConsumerConnected());
assertTrue(getResponse() instanceof CommandSuccess);
+ channel.finish();
}
@Test(timeOut = 30000)
@@ -752,6 +756,7 @@ public class ServerCnxTest {
assertTrue(response instanceof CommandError, "Response is not CommandError but " + response);
CommandError error = (CommandError) response;
assertEquals(error.getError(), ServerError.ServiceNotReady);
+ channel.finish();
}
@Test(timeOut = 30000)
@@ -1175,7 +1180,6 @@ public class ServerCnxTest {
PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(successTopicName).get();
topicRef.markBatchMessagePublished();
-
// test SUBSCRIBE on topic and cursor creation success
clientCommand = Commands.newSubscribe(successTopicName, failSubName, 2, 2, SubType.Exclusive, 0 /* priority */,
"test" /* consumer name */, 0 /*avoid reseting cursor*/);
@@ -1649,5 +1653,7 @@ public class ServerCnxTest {
Object response = getResponse();
assertTrue(response instanceof CommandSuccess);
+
+ channel.finish();
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 3e51e35..a83301c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -180,6 +180,12 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic
admin.namespaces().createNamespace(NAMESPACE1);
admin.namespaces().createNamespace(NAMESPACE2);
admin.namespaces().createNamespace(NAMESPACE3);
+ admin.lookups().lookupTopic(TOPIC1.toString());
+ admin.lookups().lookupTopic(TOPIC2.toString());
+ admin.lookups().lookupTopic(TOPIC3.toString());
+ admin.lookups().lookupTopic(TOPIC4.toString());
+ admin.lookups().lookupTopic(TOPIC5.toString());
+ admin.lookups().lookupTopic(TOPIC6.toString());
systemTopicFactory = new NamespaceEventsSystemTopicFactory(pulsarClient);
systemTopicBasedTopicPoliciesService = (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
}